\345\210\206\345\270\203\345\274\217\347\263\273\347\273\237\346\236\266\346\236\204\346\226\271\346\241\210\350\256\276\350\256\241\345\210\235\347\250\277.md
... ...
@@ -0,0 +1,355 @@
1
+### 分布式系统架构技术方案
2
+
3
+#### 1. 现状分析
4
+
5
+##### 1.1 引擎到 Sidecar 到 Dapr 的多次转发
6
+
7
+当前架构中(引擎 --> sidecar(修改url中的域名) --> dapr --> app服务),引擎通过 Sidecar 进行URL 域名修改后,再转发到 Dapr。这种设计导致了两次不必要的网络转发,增加了延迟和复杂性,同时这些转发对应用并不透明,比如对stream流的支持。
8
+```java
9
+ public Object invoke(JsonRpcRequest request, HttpHeaders headers, String appsvc) throws IOException {
10
+ String requestURL = String.valueOf(request.getParams().getMap().get("requestURL"));
11
+
12
+ InetAddress ip = InetAddress.getByName(appsvc);
13
+ log.info(appsvc, System.currentTimeMillis());
14
+
15
+ // 将引擎请求的url替换成目标的新的域名
16
+ requestURL = httpUtils.replaceDomainAndPort(ip.getHostAddress(), configUtils.SNEST_PORT, requestURL);
17
+ log.info(requestURL, System.currentTimeMillis());
18
+
19
+ Object result;
20
+
21
+ // 拿到新的 url 再发起http请求
22
+ try (Response okResponse = MetaHttpClient.post(requestURL, headers.toSingleValueMap(), request)){
23
+ InputStream responseBodyStream = okResponse.body().byteStream();
24
+ Headers headersResponse = okResponse.headers();
25
+ byte[] responseBodyBytes = IOUtils.toByteArray(responseBodyStream);
26
+ result = ResponseEntity.status(HttpStatus.OK).headers(convertToSpringHeaders(headersResponse)).body(responseBodyBytes);
27
+ }
28
+ return result;
29
+ }
30
+```
31
+
32
+##### 1.2 引擎转发逻辑问题
33
+
34
+引擎的转发逻辑是:如果目标app在本地容器中,则直接处理;如果不在本地容器中,则转发请求。
35
+然而,如果访问一个不存在的应用,则会导致死循环(业务中反馈的出现 loop call 问题),因为每个节点都觉得不在本容器内就转发,没有停止的条件。一般来说,路由应该是确定性的,如果app存在则访问,否则直接报错不存在。
36
+
37
+```java
38
+
39
+ public static Tuple<Boolean, Object> transmitSideCar(Map<String, Object> paramMap, HttpHeaders headers, JsonRpcRequest request) {
40
+ WebMode curRunMode = WebMode.of(ConfigUtils.get(MetaConstant.ENGINE_RUN_MODE));
41
+ if (WebMode.DISTRIBUTED.equals(curRunMode) || WebMode.HIGHAVAILABLE.equals(curRunMode)) {
42
+ String tag = Objects.toString(paramMap.get("tag"), "master");
43
+ String appName = Objects.toString(paramMap.get("app"), null);
44
+
45
+ // 判断访问的目标app是否本进程中,
46
+ if (!appInCurrentProcess(appName, Meta.getMetaContainer(String.format("%s.%s", appName, tag)))) {
47
+ Object result;
48
+
49
+ if (StringUtils.isEmpty(request.getMethod())) {
50
+ request.setMethod("service");
51
+ }
52
+ Map<String, Object> context = (Map<String, Object>) paramMap.getOrDefault("context", new HashMap<>());
53
+ request.getParams().getMap().put("context", context);
54
+
55
+ headers.remove("content-length");
56
+
57
+ // 注入trace
58
+ Map<String, String> traceHeaders = TraceUtil.injectHeaders();
59
+ if (traceHeaders != null) traceHeaders.forEach(headers::add);
60
+
61
+ // 如果app 不在本进程则转发给sidecar
62
+ result = post(headers, request);
63
+
64
+ return new Tuple<>(true, result);
65
+ }
66
+ }
67
+
68
+ // 如果app存在本地,则直接处理
69
+ return new Tuple<>(false, null);
70
+ }
71
+```
72
+
73
+##### 1.3 应用安装的复杂性
74
+
75
+在当前架构中,应用安装过程由引擎完成一份部分, Sidecar 完成另一部分,需要相互协调和等待回调。这种设计增加了复杂性和不确定性。理想情况下,应用的安装和卸载应该由一个组件独立完成。
76
+比如回调逻辑可能会因为异常出现死循环:
77
+
78
+
79
+引擎端实现,判断请求是否exeMod为local来判断:
80
+
81
+```java
82
+
83
+public boolean installApps(List<AppDataInfo> installApps) {
84
+ ....
85
+
86
+ try {
87
+ //判断应用是否已经安装
88
+ Meta meta = BaseContextHandler.getMeta();
89
+ String appRunMode = Objects.toString(meta.getContext("exeMod"), null);
90
+
91
+
92
+ // 判断appappRunMode是否是 local,如果是则说明是引擎本身的安装,如果不是则是sidecar回调的安装。
93
+ if (!StringUtils.equals(appRunMode, "local")) {
94
+ // 校验应用是否安装
95
+ boolean isSuccessful = postMasterToInstallApp(meta, installApps, kind, replicas);
96
+ return isSuccessful;
97
+ }
98
+
99
+
100
+ // 处理sidecar 回调的安装请求
101
+
102
+
103
+ AppGroupContainer bussinessAppGroupContainer = EngineContainer.getBussinessAppGroupContainer();
104
+
105
+ ...
106
+
107
+
108
+
109
+```
110
+
111
+sidecar 回调,如果不成功会一直尝试:
112
+```java
113
+
114
+ @Override
115
+ public void run(ApplicationArguments args0) throws Exception {
116
+ log.info("-----------Start------- ");
117
+ if (StringUtils.equals(configUtils.APP_NAME, configUtils.APP_MASTER_NAME)) return;
118
+
119
+ //String install_opt = System.getenv("INSTALL_OPT");
120
+ Thread.sleep(10000);
121
+ Mono<State<HashMap>> retrievedMessageMono = daprClient.getState(configUtils.STATE_STORE_NAME, ConfigUtils.APP_KEY_NAME, HashMap.class);
122
+ HashMap<String, Map> appHashMap = retrievedMessageMono.block().getValue();
123
+ if (appHashMap == null) {
124
+ log.error("安装参数为空");
125
+ return;
126
+ }
127
+ String install_opt = JSONUtil.toJsonStr(appHashMap.get(configUtils.APP_NAME));
128
+ log.info("启动调用安装参数" + install_opt);
129
+ Boolean flag = true;
130
+ if (!StringUtils.isBlank(install_opt)) {
131
+ //Thread.sleep(5000);
132
+ // 如果不成功,则一直尝试
133
+ while (flag) {
134
+ Thread.sleep(3000);
135
+ flag = !deployService.installInvoke(install_opt, ConfigUtils.hostAddress);
136
+ }
137
+ }
138
+ }
139
+```
140
+
141
+1.4 内存同步遇到的问题
142
+
143
+- 一致性
144
+
145
+ 基于hazelcast的事件通知来更新本地内存存储是异步的,且没有顺序保证,不可重复获取事件,可能导致并发情况下的一致性问题。
146
+
147
+- 数据丢失
148
+
149
+ hazelcast分布式存储是基于分片的,备份也是基于分片的,目前配置的配置分片数是2,那意味着最多能宕掉2台机器,如果在一些情况下,比如较大规模的服务重启更新,可能会造成数据丢失。
150
+ 同时由于事件通知是异步和不可重复的,可能在异常处理情况下,没有正确处理事件,导致数据丢失。
151
+
152
+
153
+- 复杂性
154
+
155
+ 引入hazelcast来组网并作数据同步,本身具有复杂性,且也带来更多的内存消耗(hazelcast本身的存储,备份数据等)
156
+
157
+以上问题当确定网关也就是master节点的地位以后,就变得非常简单了。简单来说master节点永远是主节点,其他业务节点只是从节点,而且内存同步的发起方永远是master,那么master节点只需要把需要操作的内存信息以http请求方式广播给所有的从节点就可以了,http本身是同步且待ack确认的。
158
+
159
+
160
+#### 2. 新的实现方案
161
+
162
+##### 2.1 使用网关进行统一管理
163
+
164
+网关服务可以通过固定的 service 名称(如 `master`)来实现统一管理,确保先启动并完成所有内置应用的表结构初始化、种子数据初始化等只需要一个容器一次性初始化任务。这些任务只需要一个容器执行,无需在所有容器中重复执行。
165
+
166
+网关高可用:
167
+- 网关可以部署多个实例,实现无状态和高可用。
168
+- 通过选主机制(Leader Election),确保高可用性。
169
+
170
+ 可以参考以下示例代码实现选主机制:
171
+ ```java
172
+ public class LeaderElectionExample {
173
+ public static void main(String[] args) throws Exception {
174
+ ApiClient client = Config.defaultClient();
175
+ Configuration.setDefaultApiClient(client);
176
+
177
+ // New
178
+ String appNamespace = "default";
179
+ String appName = "leader-election-foobar";
180
+ String lockHolderIdentityName = UUID.randomUUID().toString(); // Anything unique
181
+ EndpointsLock lock = new EndpointsLock(appNamespace, appName, lockHolderIdentityName);
182
+
183
+ LeaderElectionConfig leaderElectionConfig =
184
+ new LeaderElectionConfig(
185
+ lock, Duration.ofMillis(10000), Duration.ofMillis(8000), Duration.ofMillis(2000));
186
+ try (LeaderElector leaderElector = new LeaderElector(leaderElectionConfig)) {
187
+ leaderElector.run(
188
+ () -> {
189
+ System.out.println("Do something when getting leadership.");
190
+ },
191
+ () -> {
192
+ System.out.println("Do something when losing leadership.");
193
+ });
194
+ }
195
+ }
196
+ }
197
+ ```
198
+
199
+
200
+这里应该插入架构图的,但是不知道咋插入 ₍ᐢ..ᐢ₎ todo
201
+
202
+##### 2.2 MySQL 维护应用和容器服务的关系
203
+
204
+在 MySQL 中维护app和其所在容器服务的关系。每个 Pod 中可在进程内缓存这些关系,并设置有效期(如 5 分钟)。如果缓存未过期但访问失败,则强制从 MySQL 读取,以确保数据的正确性。
205
+
206
+用途:
207
+- 在访问时,直接读取app与容器的路由信息,进行访问,没有网络转发。
208
+- 在安装和重启时,可以直接从 MySQL 获取完整的需要安装的app信息,没有各做一半的情况,无需协调和回调,独立可完成全部的安装和重启。
209
+- 使用 CoreDNS 实现透明通信,支持 stream 模式,体验类似于单机版。
210
+
211
+此外,本地缓存可通过 watch 功能主动更新,防止无意义、尝试性的操作。可以参考以下示例代码实现 watch 功能:
212
+```java
213
+public class WatchExample {
214
+ public static void main(String[] args) throws IOException, ApiException {
215
+ ApiClient client = Config.defaultClient();
216
+ // infinite timeout
217
+ OkHttpClient httpClient =
218
+ client.getHttpClient().newBuilder().readTimeout(0, TimeUnit.SECONDS).build();
219
+ client.setHttpClient(httpClient);
220
+ Configuration.setDefaultApiClient(client);
221
+
222
+ CoreV1Api api = new CoreV1Api();
223
+
224
+ Watch<V1Namespace> watch =
225
+ Watch.createWatch(
226
+ client,
227
+// api.listNamespaceCall(
228
+// null, null, null, null, null, 5, null, null, null, Boolean.TRUE, null),
229
+// new TypeToken<Watch.Response<V1Namespace>>() {}.getType());
230
+
231
+ api.listServiceForAllNamespacesCall(
232
+ null, null, null,
233
+ null, null, null, null,
234
+ null, null, Boolean.TRUE, null),
235
+ new TypeToken<Watch.Response<V1Namespace>>() {}.getType());
236
+
237
+ try {
238
+ for (Watch.Response<V1Namespace> item : watch) {
239
+ // 打印service的操作类型、名称
240
+ System.out.printf("%s : %s%n", item.type, item.object.getMetadata().getName());
241
+ }
242
+ } finally {
243
+ watch.close();
244
+ }
245
+ }
246
+}
247
+```
248
+通过 watch 服务变化,及时主动更新内存中的 service 和应用对应关系,确保数据的一致性和实时性。
249
+
250
+##### 2.3 安装和卸载应用的一致性
251
+
252
+
253
+安装和卸载功能可以由一个独立的内置app管理和创建容器。为了确保一致性,可以采用以下策略:
254
+
255
+1. 先写入 MySQL,如果出现错误则直接报错给用户,提示重试。
256
+2. 成功写入 MySQL 后,再创建容器。
257
+3. 如果创建容器失败,可以考虑轮询重试,或者最佳实践是使用 Operator 模式。
258
+
259
+可以在安装卸载应用中实现 Operator 的功能,确保操作的自动化和一致性。
260
+
261
+
262
+创建一个deployment和对应的service:
263
+
264
+```java
265
+
266
+public class GenericClientExample {
267
+
268
+ public static void main(String[] args) throws Exception {
269
+ // create a service and deployment.
270
+ // 创建 Deployment
271
+ V1Deployment deployment = new V1Deployment()
272
+ .metadata(new V1ObjectMeta().name("foo").namespace("default"))
273
+ .spec(new V1DeploymentSpec()
274
+ .replicas(1)
275
+ .selector(new V1LabelSelector().putMatchLabelsItem("app", "foo"))
276
+ .template(new V1PodTemplateSpec()
277
+ .metadata(new V1ObjectMeta().putLabelsItem("app", "foo").putLabelsItem("version", "v1"))
278
+ .spec(new V1PodSpec()
279
+ .containers(Arrays.asList(
280
+ new V1Container()
281
+ .name("foo")
282
+ .image("busybox")
283
+ .command(Arrays.asList("sleep", "3600")))))));
284
+
285
+ ApiClient apiClient = ClientBuilder.standard().build();
286
+ GenericKubernetesApi<V1Deployment, V1DeploymentList> deploymentClient =
287
+ new GenericKubernetesApi<>(V1Deployment.class, V1DeploymentList.class,
288
+ "apps", "v1", "deployments", apiClient);
289
+
290
+ V1Deployment createdDeployment = deploymentClient.create(deployment, new CreateOptions()).throwsApiException().getObject();
291
+ System.out.println("Deployment created: " + createdDeployment.getMetadata().getName());
292
+
293
+ // 创建 Service
294
+ V1Service service = new V1Service()
295
+ .metadata(new V1ObjectMeta().name("foo2").namespace("default"))
296
+ .spec(new V1ServiceSpec()
297
+ .selector(new HashMap<String, String>() {{ put("app", "foo"); }})
298
+ .ports(Arrays.asList(new V1ServicePort().port(80).targetPort(new IntOrString(80)))));
299
+
300
+ GenericKubernetesApi<V1Service, V1ServiceList> serviceClient =
301
+ new GenericKubernetesApi<>(V1Service.class, V1ServiceList.class, "", "v1", "services", apiClient);
302
+
303
+ V1Service createdService = serviceClient.create(service, new CreateOptions()).throwsApiException().getObject();
304
+ System.out.println("Service created: " + createdService.getMetadata().getName());
305
+ }
306
+
307
+}
308
+
309
+```
310
+
311
+删除一个deployment和service,则较为简单,指定namesapce和name即可:
312
+```java
313
+
314
+public class GenericClientExample {
315
+
316
+ public static void main(String[] args) throws Exception {
317
+ // delete a service and deployment
318
+ // delete Deployment
319
+ ApiClient apiClient = ClientBuilder.standard().build();
320
+ GenericKubernetesApi<V1Deployment, V1DeploymentList> deploymentClient =
321
+ new GenericKubernetesApi<>(V1Deployment.class, V1DeploymentList.class,
322
+ "apps"/*deployment等内置控制器属于apps分组*/, "v1", "deployments", apiClient);
323
+
324
+ V1Deployment deletedDeployment = deploymentClient.delete("default", "foo", new DeleteOptions()).throwsApiException().getObject();
325
+ System.out.println("Deployment deleted: " + deletedDeployment.getStatus());
326
+
327
+ // delete Service
328
+ GenericKubernetesApi<V1Service, V1ServiceList> serviceClient =
329
+ new GenericKubernetesApi<>(V1Service.class, V1ServiceList.class, ""/* service pod 等内置资源属于core组,不需要指定分组*/, "v1", "services", apiClient);
330
+ serviceClient.delete("default", "foo2", new DeleteOptions()).throwsApiException();
331
+ System.out.println("Service deleted");
332
+ }
333
+
334
+}
335
+```
336
+
337
+可能有人会感觉容器的管理和维护,应该是属于运维层面的事情,不应该放到业务中来处理,确实是的,其实最佳做法还是按照业界的做法,编写operator来完成所有的容器操作,而且这个操作是一致的,尤其是再涉及到多个资源的情况,比如有service、deployment、configmap、pvc、secret等,如何保证创建、删除和修改这些资源的一致性呢?
338
+k8s已经给出了答案,通过operaotr即可做到 https://kubernetes.io/zh-cn/docs/concepts/extend-kubernetes/operator/ 以及如何管理依赖和引用:https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/
339
+
340
+#### 3. 总结
341
+
342
+通过上述改进方案,可以解决当前架构中的多个问题:
343
+
344
+- 减少不必要的网络转发,提升系统性能。
345
+- 实现确定性的路由,避免死循环。
346
+- 简化应用安装和卸载过程,减少协调和回调,提高效率。
347
+- 使用网关和 MySQL 维护应用和容器服务关系,确保数据一致性和高可用性。
348
+- 引入 Operator 模式,实现自动化管理和一致性操作。
349
+
350
+这些改进将显著提升系统的可靠性、可维护性和性能,为业务发展提供坚实的技术保障。
351
+
352
+
353
+以上实现代码,本人已全部验证过。
354
+
355
+参考:https://github.com/kubernetes-client/java