\345\210\206\345\270\203\345\274\217\347\263\273\347\273\237\346\236\266\346\236\204\346\226\271\346\241\210\350\256\276\350\256\241\345\210\235\347\250\277.md
... ...
@@ -286,61 +286,141 @@ public class ServiceRegistry {
286 286
287 287
```
288 288
289
-#### 2.2 MySQL 维护app和容器服务的关系
289
+#### 2.2 在service的yaml文件中维护app和容器的关系
290 290
291
-在 MySQL 中维护app和其所在容器服务的关系。每个 Pod 中可在进程内缓存这些关系,并设置有效期(如 5 分钟)。如果缓存未过期但访问失败,则强制从 MySQL 读取,以确保数据的正确性。
291
+在service的yaml文件中维护app和容器的关系。每个 Pod 中可在进程内缓存这些关系,并通过k8s提供的watch api实时监听和更新本地路由缓存。
292 292
293 293
用途:
294 294
- 在访问时,直接读取app与容器的路由信息,进行访问,没有网络转发。
295
-- 在安装和重启时,可以直接从 MySQL 获取完整的需要安装的app信息,没有各做一半的情况,无需协调和回调,独立可完成全部的安装和重启。
296
-- 使用k8s CoreDNS 实现透明通信,支持 stream 模式,体验类似于单机版。
295
+- 在安装和重启时,可以直接从 service yaml 文件中 获取完整的需要安装的app信息,没有各做一半的情况,无需协调和回调,独立可完成全部的安装和重启。
296
+- 使用pod ip直接通信,实现透明通信,支持 stream 模式,体验类似于单机版。
297 297
298 298
299 299
此外,本地缓存可通过 watch 功能主动更新,防止无意义、尝试性的操作。可以参考以下示例代码实现 watch 功能:
300 300
```java
301
-public class WatchExample {
302
- public static void main(String[] args) throws IOException, ApiException {
303
- ApiClient client = Config.defaultClient();
304
- // infinite timeout
305
- OkHttpClient httpClient =
306
- client.getHttpClient().newBuilder().readTimeout(0, TimeUnit.SECONDS).build();
307
- client.setHttpClient(httpClient);
308
- Configuration.setDefaultApiClient(client);
309
-
310
- CoreV1Api api = new CoreV1Api();
311
-
312
- Watch<V1Namespace> watch =
313
- Watch.createWatch(
314
- client,
315
-// api.listNamespaceCall(
316
-// null, null, null, null, null, 5, null, null, null, Boolean.TRUE, null),
317
-// new TypeToken<Watch.Response<V1Namespace>>() {}.getType());
318
-
319
- api.listServiceForAllNamespacesCall(
320
- null, null, null,
321
- null, null, null, null,
322
- null, null, Boolean.TRUE, null),
323
- new TypeToken<Watch.Response<V1Namespace>>() {}.getType());
324
-
325
- try {
326
- for (Watch.Response<V1Namespace> item : watch) {
327
- // 打印service的操作类型、名称
328
- System.out.printf("%s : %s%n", item.type, item.object.getMetadata().getName());
329
- }
330
- } finally {
331
- watch.close();
301
+
302
+ public static void watchServicesInNamespace(BiConsumer<AppRoute.EventType, Map<String, String>> updateRouteCache) {
303
+ SharedInformerFactory informerFactory = new SharedInformerFactory(apiClient);
304
+
305
+ SharedIndexInformer<V1Service> serviceInformer = informerFactory.sharedIndexInformerFor(
306
+ (callGeneratorParams) -> coreApi.listNamespacedServiceCall(
307
+ NAMESPACE,
308
+ null,
309
+ null,
310
+ null,
311
+ null,
312
+ K8sConstants.LABEL_SELECTOR_IIDP,
313
+ null,
314
+ "",
315
+ null,
316
+ callGeneratorParams.timeoutSeconds,
317
+ callGeneratorParams.watch,
318
+ null),
319
+ V1Service.class,
320
+ V1ServiceList.class);
321
+ serviceLister = new Lister<>(serviceInformer.getIndexer(), NAMESPACE);
322
+
323
+ SharedIndexInformer<V1Endpoints> endpointsInformer = informerFactory.sharedIndexInformerFor(
324
+ (callGeneratorParams) -> coreApi.listNamespacedEndpointsCall(
325
+ NAMESPACE,
326
+ null,
327
+ null,
328
+ null,
329
+ null,
330
+ K8sConstants.LABEL_SELECTOR_IIDP,
331
+ null,
332
+ "",
333
+ null,
334
+ callGeneratorParams.timeoutSeconds,
335
+ callGeneratorParams.watch,
336
+ null),
337
+ V1Endpoints.class,
338
+ V1EndpointsList.class);
339
+ endpointsLister = new Lister<>(endpointsInformer.getIndexer(), NAMESPACE);
340
+
341
+ serviceInformer.addEventHandler(
342
+ new ResourceEventHandler<V1Service>() {
343
+ @Override
344
+ public void onAdd(V1Service obj) {
345
+ V1ObjectMeta metadata = obj.getMetadata();
346
+ if (metadata == null) {
347
+ return;
348
+ }
349
+ logger.info("\n\n ==================== Service added: {} \n\n", metadata.getName());
350
+ // 更新路由缓存
351
+ updateRouteCache.accept(AppRoute.EventType.ADDED, getRouteMap(metadata));
352
+ }
353
+
354
+ @Override
355
+ public void onUpdate(V1Service oldObj, V1Service newObj) {
356
+ logger.info("\n\n ==================== Service updated: {}\n\n", newObj.getMetadata().getName());
357
+ // 有新的app加入则会更新service,需要同步到路由缓存
358
+ updateRouteCache.accept(AppRoute.EventType.UPDATED, getRouteMap(newObj.getMetadata()));
359
+ }
360
+
361
+ @Override
362
+ public void onDelete(V1Service obj, boolean deletedFinalStateUnknown) {
363
+ V1ObjectMeta metadata = obj.getMetadata();
364
+ if (metadata == null) {
365
+ return;
366
+ }
367
+
368
+ logger.info("\n\n ===================== Service deleted: {} \n\n", metadata.getName());
369
+
370
+ updateRouteCache.accept(AppRoute.EventType.DELETED, getRouteMap(metadata));
371
+ }
372
+
373
+ private Map<String, String> getRouteMap(V1ObjectMeta metadata) {
374
+ Map<String, String> annotations = metadata.getAnnotations();
375
+ if (annotations == null) {
376
+ return null;
377
+ }
378
+ String installedApps = annotations.get(K8sConstants.ANNOTATION_INSTALLED_APPS);
379
+ if (installedApps == null) {
380
+ return null;
381
+ }
382
+
383
+ Map<String, String> routeMap = new HashMap<>();
384
+ String serviceName = metadata.getName();
385
+ // 按逗号切割,并去掉两边的空格
386
+ String[] apps = installedApps.split(",");
387
+ for (String app : apps) {
388
+ routeMap.put(app.trim(), serviceName);
389
+ }
390
+
391
+ String dependencies = annotations.get(K8sConstants.ANNOTATION_DEPENDENCIES);
392
+ if (dependencies != null && !dependencies.isEmpty()) {
393
+ String[] deps = dependencies.split(",");
394
+ for (String dep : deps) {
395
+ routeMap.put(dep.trim(), metadata.getName());
396
+ }
397
+ }
398
+
399
+ return routeMap;
400
+ }
401
+ });
402
+
403
+ informerFactory.startAllRegisteredInformers();
404
+
405
+ // 使用 Wait.poll 方法等待 Informer 同步完成
406
+ boolean synced = Wait.poll(Duration.ofMillis(100), Duration.ofSeconds(10), serviceInformer::hasSynced);
407
+ if (synced) {
408
+ logger.info("Cache fully loaded (total {} services), discovery client is now available", serviceLister.list().size());
409
+ } else {
410
+ logger.error("Failed to sync informer within the timeout period.");
411
+ throw new RuntimeException("Failed to sync informer within the timeout period.");
412
+ }
332 413
}
333
- }
334
-}
414
+
335 415
```
336
-通过 watch 服务变化,及时主动更新内存中的 service 和应用对应关系,确保数据的一致性和实时性。
416
+通过 watch 服务变化,及时主动更新内存中的 service 和app对应关系,确保数据的一致性和实时性。
337 417
338 418
339 419
340 420
#### 2.3 安装和卸载应用的一致性
341 421
342 422
343
-安装和卸载功能可以由一个独立的内置app管理和创建容器。为了确保一致性,可以采用以下策略:
423
+安装和卸载功能都由master节点进行管理和创建容器。为了确保一致性,可以采用以下策略:
344 424
345 425
1. 先写入 MySQL,如果出现错误则直接报错给用户,提示重试。
346 426
2. 成功写入 MySQL 后,再创建容器。
... ...
@@ -434,10 +514,10 @@ k8s已经给出了答案,通过operaotr即可做到 [[https://kubernetes.io/zh
434 514
435 515
通过上述改进方案,可以解决当前架构中的多个问题:
436 516
437
-- 减少不必要的网络转发,提升系统性能。
517
+- 减少不必要的网络转发,直接使用pod ip进行通信,提升系统性能。
438 518
- 实现确定性的路由,避免死循环。
439 519
- 简化应用安装和卸载过程,减少协调和回调,提高效率。
440
-- 使用网关和 MySQL 维护应用和容器服务关系,确保数据一致性和高可用性。
520
+- 使用service yaml文件维护应用和容器服务关系,确保数据一致性,只要容器在就能保证需要安装的app。
441 521
- 不需要依赖hazelcast组网来进行内存同步,简化同步流程,确保元模型数据一致性和正确性。
442 522
- 引入 Operator 模式,实现自动化管理和一致性操作k8s资源。
443 523