分布式系统架构技术方案

目前已经实现,代码地址:gitlab地址

1. 现状分析

1.1 引擎到 Sidecar 到 Dapr 的多次转发

当前架构中(引擎 –> sidecar(修改url中的域名) –> dapr –> app服务),引擎通过 Sidecar 进行URL 域名修改后,再转发到 Dapr。这种设计导致了两次不必要的网络转发,增加了延迟和复杂性,同时这些转发对应用并不透明,比如对stream流的支持。

  public Object invoke(JsonRpcRequest request, HttpHeaders headers, String appsvc) throws IOException {
      String requestURL = String.valueOf(request.getParams().getMap().get("requestURL"));

      InetAddress ip = InetAddress.getByName(appsvc);
      log.info(appsvc, System.currentTimeMillis());
      
      // 将引擎请求的url替换成目标的新的域名
      requestURL = httpUtils.replaceDomainAndPort(ip.getHostAddress(), configUtils.SNEST_PORT, requestURL);
      log.info(requestURL, System.currentTimeMillis());

      Object result;
      
      // 拿到新的 url 再发起http请求
      try (Response okResponse = MetaHttpClient.post(requestURL, headers.toSingleValueMap(), request)){
          InputStream responseBodyStream = okResponse.body().byteStream();
          Headers headersResponse = okResponse.headers();
          byte[] responseBodyBytes = IOUtils.toByteArray(responseBodyStream);
          result = ResponseEntity.status(HttpStatus.OK).headers(convertToSpringHeaders(headersResponse)).body(responseBodyBytes);
      }
      return result;
  }

1.2 引擎转发逻辑问题

引擎的转发逻辑是:如果目标app在本地容器中,则直接处理;如果不在本地容器中,则转发请求。 然而,如果访问一个不存在的应用,则会导致死循环(业务中反馈的出现 loop call 问题),因为每个节点都觉得不在本容器内就转发,没有停止的条件。一般来说,路由应该是确定性的,如果app存在则访问,否则直接报错不存在。


  public static Tuple<Boolean, Object> transmitSideCar(Map<String, Object> paramMap, HttpHeaders headers, JsonRpcRequest request) {
      WebMode curRunMode = WebMode.of(ConfigUtils.get(MetaConstant.ENGINE_RUN_MODE));
      if (WebMode.DISTRIBUTED.equals(curRunMode) || WebMode.HIGHAVAILABLE.equals(curRunMode)) {
          String tag = Objects.toString(paramMap.get("tag"), "master");
          String appName = Objects.toString(paramMap.get("app"), null);

          // 判断访问的目标app是否本进程中,
          if (!appInCurrentProcess(appName, Meta.getMetaContainer(String.format("%s.%s", appName, tag)))) {
              Object result;

              if (StringUtils.isEmpty(request.getMethod())) {
                  request.setMethod("service");
              }
              Map<String, Object> context = (Map<String, Object>) paramMap.getOrDefault("context", new HashMap<>());
              request.getParams().getMap().put("context", context);

              headers.remove("content-length");

              // 注入trace
              Map<String, String> traceHeaders = TraceUtil.injectHeaders();
              if (traceHeaders != null) traceHeaders.forEach(headers::add);

              // 如果app 不在本进程则转发给sidecar
              result = post(headers, request);

              return new Tuple<>(true, result);
          }
      }
      
      // 如果app存在本地,则直接处理
      return new Tuple<>(false, null);
  }

1.3 转发的路由问题

已有的实现是将app和对应的路由关系保存在redis中,且不说redis可能丢失数据的问题,现有实现也没有一种类似路由存活的机制,比如某个业务app下线了,但路由信息还在,依然会将请求转发到这个容器中,造成不可访问。

1.4 应用安装的复杂性

在当前架构中,应用安装过程由引擎完成一份部分, Sidecar 完成另一部分,需要相互协调和等待回调。这种设计增加了复杂性和不确定性。理想情况下,应用的安装和卸载应该由一个组件独立完成。 比如回调逻辑可能会因为异常出现死循环.

已有的实现边车和引擎没有相关关联,如果引擎由于oom导致容器重启(不是pod重启),但是对于边车来说是不知道引擎重启的,那么业务app就无法安装。
另外,如果是边车容器重启,它依然会尝试请求引擎安装接口,导致引擎重复安装app,导致引擎数据错误,多个类加载器等。

引擎端实现,判断请求是否exeMod为local来判断:


public boolean installApps(List<AppDataInfo> installApps) {
       ....

        try {
            //判断应用是否已经安装
            Meta meta = BaseContextHandler.getMeta();
            String appRunMode = Objects.toString(meta.getContext("exeMod"), null);


            // 判断appappRunMode是否是 local,如果是则说明是引擎本身的安装,如果不是则是sidecar回调的安装。
            if (!StringUtils.equals(appRunMode, "local")) {
                // 校验应用是否安装
                boolean isSuccessful = postMasterToInstallApp(meta, installApps, kind, replicas);
                return isSuccessful;
            }


            // 处理sidecar 回调的安装请求
            
            
            AppGroupContainer bussinessAppGroupContainer = EngineContainer.getBussinessAppGroupContainer();

            ...
            

sidecar 回调,如果不成功会一直尝试:


  @Override
  public void run(ApplicationArguments args0) throws Exception {
      log.info("-----------Start------- ");
      if (StringUtils.equals(configUtils.APP_NAME, configUtils.APP_MASTER_NAME)) return;

      //String install_opt = System.getenv("INSTALL_OPT");
      Thread.sleep(10000);
      Mono<State<HashMap>> retrievedMessageMono = daprClient.getState(configUtils.STATE_STORE_NAME, ConfigUtils.APP_KEY_NAME, HashMap.class);
      HashMap<String, Map> appHashMap = retrievedMessageMono.block().getValue();
      if (appHashMap == null) {
          log.error("安装参数为空");
          return;
      }
      String install_opt = JSONUtil.toJsonStr(appHashMap.get(configUtils.APP_NAME));
      log.info("启动调用安装参数" + install_opt);
      Boolean flag = true;
      if (!StringUtils.isBlank(install_opt)) {
          //Thread.sleep(5000);
          // 如果不成功,则一直尝试
          while (flag) {
              Thread.sleep(3000);
              flag = !deployService.installInvoke(install_opt, ConfigUtils.hostAddress);
          }
      }
  }

1.5 内存同步遇到的问题

  • 一致性

    基于hazelcast的事件通知来更新本地内存存储是异步的,且没有顺序保证,不可重复获取事件,可能导致并发情况下的一致性问题。

  • 数据丢失

    hazelcast分布式存储是基于分片的,备份也是基于分片的,目前配置的配置分片数是2,那意味着最多能宕掉2台机器,如果在一些情况下,比如较大规模的服务重启更新,可能会造成数据丢失。 同时由于事件通知是异步和不可重复的,可能在异常处理情况下,没有正确处理事件,导致数据丢失。

  • 复杂性

    引入hazelcast来组网并作数据同步,本身具有复杂性,且也带来更多的内存消耗(hazelcast本身的存储,备份数据等)

以上问题当确定网关也就是master节点的地位以后,就变得非常简单了。简单来说master节点永远是主节点,其他业务节点只是从节点,而且内存同步的发起方永远是master,那么master节点只需要把需要操作的内存信息以http请求方式广播给所有的从节点就可以了,http本身是同步且待ack确认的。

2. 新的实现方案

2.1 使用网关进行统一管理

网关服务可以通过固定的 service 名称(如 master)来实现统一管理,确保先启动并完成所有内置应用的表结构初始化、种子数据初始化等只需要一个容器一次性初始化任务。这些任务只需要一个容器执行,无需在所有容器中重复执行。

网关高可用:

  • 网关可以部署多个实例,实现无状态和高可用。
  • 通过选主机制(Leader Election),确保高可用性。

    可以参考以下示例代码实现选主机制:

      public class LeaderElectionExample {
        public static void main(String[] args) throws Exception {
          ApiClient client = Config.defaultClient();
          Configuration.setDefaultApiClient(client);
        
          // New
          String appNamespace = "default";
          String appName = "leader-election-foobar";
          String lockHolderIdentityName = UUID.randomUUID().toString(); // Anything unique
          EndpointsLock lock = new EndpointsLock(appNamespace, appName, lockHolderIdentityName);
        
          LeaderElectionConfig leaderElectionConfig =
              new LeaderElectionConfig(
                  lock, Duration.ofMillis(10000), Duration.ofMillis(8000), Duration.ofMillis(2000));
          try (LeaderElector leaderElector = new LeaderElector(leaderElectionConfig)) {
            leaderElector.run(
                () -> {
                  System.out.println("Do something when getting leadership.");
                },
                () -> {
                  System.out.println("Do something when losing leadership.");
                });
          }
        }
      }
    

架构流程图:

基于okHttp进行转发



public class ReverseProxyServlet extends HttpServlet {

    private final List<String> targets;
    private final Random random = new Random();
    private final OkHttpClient client = new OkHttpClient();

    public ReverseProxyServlet(List<String> targets) {
        this.targets = targets;
    }

    @Override
    protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        String target = targets.get(random.nextInt(targets.size()));
        String targetUrl = target + req.getRequestURI();

        Request proxyRequest = new Request.Builder()
                .url(targetUrl)
                .build();

        try (Response proxyResponse = client.newCall(proxyRequest).execute()) {
            resp.setStatus(proxyResponse.code());
            if (proxyResponse.body() != null) {
                resp.getOutputStream().write(proxyResponse.body().bytes());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        List<String> targets = new ArrayList<>();
        targets.add("http://localhost:9091");
        targets.add("http://localhost:9092");

        Server server = new Server(9090);
        ServletContextHandler context = new ServletContextHandler();
        context.setContextPath("/");
        server.setHandler(context);

        context.addServlet(new ServletHolder(new ReverseProxyServlet(targets)), "/*");
        server.start();
        server.join();
    }
}
自定义服务注册与发现功能
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class ServiceRegistry {
    private final List<String> services = new ArrayList<>();
    private final Random random = new Random();

    public void register(String serviceUrl) {
        services.add(serviceUrl);
    }

    public void deregister(String serviceUrl) {
        services.remove(serviceUrl);
    }

    public String getRandomService() {
        if (services.isEmpty()) {
            throw new IllegalStateException("No registered services");
        }
        return services.get(random.nextInt(services.size()));
    }

    public List<String> getAllServices() {
        return new ArrayList<>(services);
    }
}

2.2 在service的yaml文件中维护app和容器的关系

在service的yaml文件中维护app和容器的关系。每个 Pod 中可在进程内缓存这些关系,并通过k8s提供的watch api实时监听和更新本地路由缓存。

用途:

  • 在访问时,直接读取app与容器的路由信息,进行访问,没有网络转发。
  • 在安装和重启时,可以直接从 service yaml 文件中 获取完整的需要安装的app信息,没有各做一半的情况,无需协调和回调,独立可完成全部的安装和重启。
  • 使用pod ip直接通信,实现透明通信,支持 stream 模式,体验类似于单机版。

此外,本地缓存可通过 watch 功能主动更新,防止无意义、尝试性的操作。可以参考以下示例代码实现 watch 功能:


  public static void watchServicesInNamespace(BiConsumer<AppRoute.EventType, Map<String, String>> updateRouteCache) {
      SharedInformerFactory informerFactory = new SharedInformerFactory(apiClient);

      SharedIndexInformer<V1Service> serviceInformer = informerFactory.sharedIndexInformerFor(
              (callGeneratorParams) -> coreApi.listNamespacedServiceCall(
                      NAMESPACE,
                      null,
                      null,
                      null,
                      null,
                      K8sConstants.LABEL_SELECTOR_IIDP,
                      null,
                      "",
                      null,
                      callGeneratorParams.timeoutSeconds,
                      callGeneratorParams.watch,
                      null),
              V1Service.class,
              V1ServiceList.class);
      serviceLister = new Lister<>(serviceInformer.getIndexer(), NAMESPACE);

      SharedIndexInformer<V1Endpoints> endpointsInformer = informerFactory.sharedIndexInformerFor(
              (callGeneratorParams) -> coreApi.listNamespacedEndpointsCall(
                      NAMESPACE,
                      null,
                      null,
                      null,
                      null,
                      K8sConstants.LABEL_SELECTOR_IIDP,
                      null,
                      "",
                      null,
                      callGeneratorParams.timeoutSeconds,
                      callGeneratorParams.watch,
                      null),
              V1Endpoints.class,
              V1EndpointsList.class);
      endpointsLister = new Lister<>(endpointsInformer.getIndexer(), NAMESPACE);

      serviceInformer.addEventHandler(
              new ResourceEventHandler<V1Service>() {
                  @Override
                  public void onAdd(V1Service obj) {
                      V1ObjectMeta metadata = obj.getMetadata();
                      if (metadata == null) {
                          return;
                      }
                      logger.info("\n\n ==================== Service added: {} \n\n", metadata.getName());
                      // 更新路由缓存
                      updateRouteCache.accept(AppRoute.EventType.ADDED, getRouteMap(metadata));
                  }

                  @Override
                  public void onUpdate(V1Service oldObj, V1Service newObj) {
                      logger.info("\n\n ==================== Service updated: {}\n\n", newObj.getMetadata().getName());
                      // 有新的app加入则会更新service,需要同步到路由缓存
                      updateRouteCache.accept(AppRoute.EventType.UPDATED, getRouteMap(newObj.getMetadata()));
                  }

                  @Override
                  public void onDelete(V1Service obj, boolean deletedFinalStateUnknown) {
                      V1ObjectMeta metadata = obj.getMetadata();
                      if (metadata == null) {
                          return;
                      }

                      logger.info("\n\n ===================== Service deleted: {} \n\n", metadata.getName());

                      updateRouteCache.accept(AppRoute.EventType.DELETED, getRouteMap(metadata));
                  }

                  private Map<String, String> getRouteMap(V1ObjectMeta metadata) {
                      Map<String, String> annotations = metadata.getAnnotations();
                      if (annotations == null) {
                          return null;
                      }
                      String installedApps = annotations.get(K8sConstants.ANNOTATION_INSTALLED_APPS);
                      if (installedApps == null) {
                          return null;
                      }

                      Map<String, String> routeMap = new HashMap<>();
                      String serviceName = metadata.getName();
                      // 按逗号切割,并去掉两边的空格
                      String[] apps = installedApps.split(",");
                      for (String app : apps) {
                          routeMap.put(app.trim(), serviceName);
                      }

                      String dependencies = annotations.get(K8sConstants.ANNOTATION_DEPENDENCIES);
                      if (dependencies != null && !dependencies.isEmpty()) {
                          String[] deps = dependencies.split(",");
                          for (String dep : deps) {
                              routeMap.put(dep.trim(), metadata.getName());
                          }
                      }

                      return routeMap;
                  }
              });

      informerFactory.startAllRegisteredInformers();

      // 使用 Wait.poll 方法等待 Informer 同步完成
      boolean synced = Wait.poll(Duration.ofMillis(100), Duration.ofSeconds(10), serviceInformer::hasSynced);
      if (synced) {
          logger.info("Cache fully loaded (total {} services), discovery client is now available", serviceLister.list().size());
      } else {
          logger.error("Failed to sync informer within the timeout period.");
          throw new RuntimeException("Failed to sync informer within the timeout period.");
      }
  }
通过 watch 服务变化,及时主动更新内存中的 service 和app对应关系,确保数据的一致性和实时性。

2.3 安装和卸载应用的一致性

安装和卸载功能都由master节点进行管理和创建容器。为了确保一致性,可以采用以下策略:

  1. 先写入 MySQL,如果出现错误则直接报错给用户,提示重试。
  2. 成功写入 MySQL 后,再创建容器。
  3. 如果创建容器失败,可以考虑轮询重试,或者最佳实践是使用 Operator 模式。

可以在安装卸载应用中实现 Operator 的功能,确保操作的自动化和一致性。

操作k8s接口

创建一个deployment和对应的service:


public class GenericClientExample {

  public static void main(String[] args) throws Exception {
    // create a service and deployment.
    // 创建 Deployment
    V1Deployment deployment = new V1Deployment()
            .metadata(new V1ObjectMeta().name("foo").namespace("default"))
            .spec(new V1DeploymentSpec()
                    .replicas(1)
                    .selector(new V1LabelSelector().putMatchLabelsItem("app", "foo"))
                    .template(new V1PodTemplateSpec()
                            .metadata(new V1ObjectMeta().putLabelsItem("app", "foo").putLabelsItem("version", "v1"))
                            .spec(new V1PodSpec()
                                    .containers(Arrays.asList(
                                            new V1Container()
                                                    .name("foo")
                                                    .image("busybox")
                                                    .command(Arrays.asList("sleep", "3600")))))));

    ApiClient apiClient = ClientBuilder.standard().build();
    GenericKubernetesApi<V1Deployment, V1DeploymentList> deploymentClient =
            new GenericKubernetesApi<>(V1Deployment.class, V1DeploymentList.class,
                    "apps", "v1", "deployments", apiClient);

    V1Deployment createdDeployment = deploymentClient.create(deployment, new CreateOptions()).throwsApiException().getObject();
    System.out.println("Deployment created: " + createdDeployment.getMetadata().getName());

    // 创建 Service
    V1Service service = new V1Service()
            .metadata(new V1ObjectMeta().name("foo2").namespace("default"))
            .spec(new V1ServiceSpec()
                    .selector(new HashMap<String, String>() {{ put("app", "foo"); }})
                    .ports(Arrays.asList(new V1ServicePort().port(80).targetPort(new IntOrString(80)))));

    GenericKubernetesApi<V1Service, V1ServiceList> serviceClient =
            new GenericKubernetesApi<>(V1Service.class, V1ServiceList.class, "", "v1", "services", apiClient);

    V1Service createdService = serviceClient.create(service, new CreateOptions()).throwsApiException().getObject();
    System.out.println("Service created: " + createdService.getMetadata().getName());
  }

}

删除一个deployment和service,则较为简单,指定namesapce和name即可:


public class GenericClientExample {

  public static void main(String[] args) throws Exception {
    // delete a service and deployment
    // delete Deployment
    ApiClient apiClient = ClientBuilder.standard().build();
    GenericKubernetesApi<V1Deployment, V1DeploymentList> deploymentClient =
            new GenericKubernetesApi<>(V1Deployment.class, V1DeploymentList.class,
                    "apps"/*deployment等内置控制器属于apps分组*/, "v1", "deployments", apiClient);

    V1Deployment deletedDeployment = deploymentClient.delete("default", "foo", new DeleteOptions()).throwsApiException().getObject();
    System.out.println("Deployment deleted: " + deletedDeployment.getStatus());

    // delete Service
    GenericKubernetesApi<V1Service, V1ServiceList> serviceClient =
            new GenericKubernetesApi<>(V1Service.class, V1ServiceList.class, ""/* service pod 等内置资源属于core组,不需要指定分组*/, "v1", "services", apiClient);
    serviceClient.delete("default", "foo2", new DeleteOptions()).throwsApiException();
    System.out.println("Service deleted");
  }
    
}

可能有人会感觉容器的管理和维护,应该是属于运维层面的事情,不应该放到业务中来处理,确实是的,其实最佳做法还是按照业界的做法,编写operator来完成所有的容器操作,而且这个操作是一致的,尤其是再涉及到多个资源的情况,比如有service、deployment、configmap、pvc、secret等,如何保证创建、删除和修改这些资源的一致性呢?
k8s已经给出了答案,通过operaotr即可做到 https://kubernetes.io/zh-cn/docs/concepts/extend-kubernetes/operator/ 以及如何管理依赖和引用:https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/

3. 总结

通过上述改进方案,可以解决当前架构中的多个问题:

  • 减少不必要的网络转发,直接使用pod ip进行通信,提升系统性能。
  • 实现确定性的路由,避免死循环。
  • 简化应用安装和卸载过程,减少协调和回调,提高效率。
  • 使用service yaml文件维护应用和容器服务关系,确保数据一致性,只要容器在就能保证需要安装的app。
  • 不需要依赖hazelcast组网来进行内存同步,简化同步流程,确保元模型数据一致性和正确性。
  • 引入 Operator 模式,实现自动化管理和一致性操作k8s资源。

这些改进将显著提升系统的可靠性、可维护性和性能,为业务发展提供坚实的技术保障。

以上实现代码,本人已全部验证过。

参考:https://github.com/kubernetes-client/java