☰
Current Page
Main Menu
Home
Home
Editing
分布式系统架构方案设计初稿
Edit
Preview
h1
h2
h3
default
Set your preferred keybinding
default
vim
emacs
markdown
Set this page's format to
AsciiDoc
Creole
Markdown
MediaWiki
Org-mode
Plain Text
RDoc
Textile
Rendering unavailable for
BibTeX
Pod
reStructuredText
Help 1
Help 1
Help 1
Help 2
Help 3
Help 4
Help 5
Help 6
Help 7
Help 8
Autosaved text is available. Click the button to restore it.
Restore Text
## 分布式系统架构技术方案 目前已经实现,代码地址:[gitlab地址](http://192.168.175.55:9888/ipaas/sie-snest/tree/feat-k8s-api): ### 1. 现状分析 #### 1.1 引擎到 Sidecar 到 Dapr 的多次转发 当前架构中(引擎 --> sidecar(获取全量redis路由信息,查找出app对应的服务名,修改url中的域名) --> dapr(发送) --> dapr(接收) --> app服务),引擎通过 Sidecar 进行URL 域名修改后,再转发到 Dapr。这种设计引入了多次不必要的网络转发,增加了延迟和复杂性,同时这些转发对应用并不透明,比如对stream流的支持。 ```java public Object invoke(JsonRpcRequest request, HttpHeaders headers, String appsvc) throws IOException { String requestURL = String.valueOf(request.getParams().getMap().get("requestURL")); InetAddress ip = InetAddress.getByName(appsvc); log.info(appsvc, System.currentTimeMillis()); // 将引擎请求的url替换成目标的新的域名 requestURL = httpUtils.replaceDomainAndPort(ip.getHostAddress(), configUtils.SNEST_PORT, requestURL); log.info(requestURL, System.currentTimeMillis()); Object result; // 拿到新的 url 再发起http请求 try (Response okResponse = MetaHttpClient.post(requestURL, headers.toSingleValueMap(), request)){ InputStream responseBodyStream = okResponse.body().byteStream(); Headers headersResponse = okResponse.headers(); byte[] responseBodyBytes = IOUtils.toByteArray(responseBodyStream); result = ResponseEntity.status(HttpStatus.OK).headers(convertToSpringHeaders(headersResponse)).body(responseBodyBytes); } return result; } ``` [[http://iidp.chinasie.com:9999/iidpminio/sie-snest-gw/p1.png]] #### 1.2 引擎转发逻辑问题 引擎的转发逻辑是:如果目标app在本地容器中,则直接处理;如果不在本地容器中,则转发请求。 然而,如果访问一个不存在的应用,则会导致死循环(业务中反馈的出现 loop call 问题),因为每个节点都觉得不在本容器内就转发,没有停止的条件。一般来说,路由应该是确定性的,如果app存在则访问,否则直接报错不存在。 ```java public static Tuple<Boolean, Object> transmitSideCar(Map<String, Object> paramMap, HttpHeaders headers, JsonRpcRequest request) { WebMode curRunMode = WebMode.of(ConfigUtils.get(MetaConstant.ENGINE_RUN_MODE)); if (WebMode.DISTRIBUTED.equals(curRunMode) || WebMode.HIGHAVAILABLE.equals(curRunMode)) { String tag = Objects.toString(paramMap.get("tag"), "master"); String appName = Objects.toString(paramMap.get("app"), null); // 判断访问的目标app是否本进程中, if (!appInCurrentProcess(appName, Meta.getMetaContainer(String.format("%s.%s", appName, tag)))) { Object result; if (StringUtils.isEmpty(request.getMethod())) { request.setMethod("service"); } Map<String, Object> context = (Map<String, Object>) paramMap.getOrDefault("context", new HashMap<>()); request.getParams().getMap().put("context", context); headers.remove("content-length"); // 注入trace Map<String, String> traceHeaders = TraceUtil.injectHeaders(); if (traceHeaders != null) traceHeaders.forEach(headers::add); // 如果app 不在本进程则转发给sidecar result = post(headers, request); return new Tuple<>(true, result); } } // 如果app存在本地,则直接处理 return new Tuple<>(false, null); } ``` 就绪探针和存活探针存在的问题 [[http://iidp.chinasie.com:9999/iidpminio/sie-snest-gw/p2.png]] [[http://iidp.chinasie.com:9999/iidpminio/sie-snest-gw/p3.png]] #### 1.3 转发的路由问题 已有的实现是将app和对应的路由关系保存在redis中,且不说redis可能丢失数据的问题,现有实现也没有一种类似路由存活的机制,比如某个业务app下线了,但路由信息还在,依然会将请求转发到这个容器中,造成不可访问。 #### 1.4 应用安装的复杂性 在当前架构中,应用安装过程由引擎完成一份部分, Sidecar 完成另一部分,需要相互协调和等待回调。这种设计增加了复杂性和不确定性。理想情况下,应用的安装和卸载应该由一个组件独立完成。 比如回调逻辑可能会因为异常出现死循环. 已有的实现边车和引擎没有相关关联,如果引擎由于oom导致容器重启(不是pod重启),但是对于边车来说是不知道引擎重启的,那么业务app就无法安装。 另外,如果是边车容器重启,它依然会尝试请求引擎安装接口,导致引擎重复安装app,导致引擎数据错误,多个类加载器等。 引擎端实现,判断请求是否exeMod为local来判断: ```java public boolean installApps(List<AppDataInfo> installApps) { .... try { //判断应用是否已经安装 Meta meta = BaseContextHandler.getMeta(); String appRunMode = Objects.toString(meta.getContext("exeMod"), null); // 判断appappRunMode是否是 local,如果是则说明是引擎本身的安装,如果不是则是sidecar回调的安装。 if (!StringUtils.equals(appRunMode, "local")) { // 校验应用是否安装 boolean isSuccessful = postMasterToInstallApp(meta, installApps, kind, replicas); return isSuccessful; } // 处理sidecar 回调的安装请求 AppGroupContainer bussinessAppGroupContainer = EngineContainer.getBussinessAppGroupContainer(); ... ``` sidecar 回调,如果不成功会一直尝试: ```java @Override public void run(ApplicationArguments args0) throws Exception { log.info("-----------Start------- "); if (StringUtils.equals(configUtils.APP_NAME, configUtils.APP_MASTER_NAME)) return; //String install_opt = System.getenv("INSTALL_OPT"); Thread.sleep(10000); Mono<State<HashMap>> retrievedMessageMono = daprClient.getState(configUtils.STATE_STORE_NAME, ConfigUtils.APP_KEY_NAME, HashMap.class); HashMap<String, Map> appHashMap = retrievedMessageMono.block().getValue(); if (appHashMap == null) { log.error("安装参数为空"); return; } String install_opt = JSONUtil.toJsonStr(appHashMap.get(configUtils.APP_NAME)); log.info("启动调用安装参数" + install_opt); Boolean flag = true; if (!StringUtils.isBlank(install_opt)) { //Thread.sleep(5000); // 如果不成功,则一直尝试 while (flag) { Thread.sleep(3000); flag = !deployService.installInvoke(install_opt, ConfigUtils.hostAddress); } } } ``` #### 1.5 内存同步遇到的问题 - 一致性 基于hazelcast的事件通知来更新本地内存存储是异步的,且没有顺序保证,不可重复获取事件,可能导致并发情况下的一致性问题。 - 数据丢失 hazelcast分布式存储是基于分片的,备份也是基于分片的,目前配置的配置分片数是2,那意味着最多能宕掉2台机器,如果在一些情况下,比如较大规模的服务重启更新,可能会造成数据丢失。 同时由于事件通知是异步和不可重复的,可能在异常处理情况下,没有正确处理事件,导致数据丢失。 - 复杂性 引入hazelcast来组网并作数据同步,本身具有复杂性,且也带来更多的内存消耗(hazelcast本身的存储,备份数据等) 以上问题当确定网关也就是master节点的地位以后,就变得非常简单了。简单来说master节点永远是主节点,其他业务节点只是从节点,而且内存同步的发起方永远是master,那么master节点只需要把需要操作的内存信息以http请求方式广播给所有的从节点就可以了,http本身是同步且待ack确认的。 ### 2. 新的实现方案 #### 2.1 使用网关进行统一管理和路由分发 网关服务可以通过固定的 service 名称(如 `master`)来实现统一管理,确保先启动并完成所有内置应用的表结构初始化、种子数据初始化等只需要一个容器一次性初始化任务。这些任务只需要一个容器执行,无需在所有容器中重复执行。 [[http://iidp.chinasie.com:9999/iidpminio/sie-snest-gw/p5.jpg]] 网关高可用: - 网关可以部署多个实例,实现无状态和高可用。 - 通过选主机制(Leader Election),确保高可用性。 可以参考以下示例代码实现选主机制: ```java public class LeaderElectionExample { public static void main(String[] args) throws Exception { ApiClient client = Config.defaultClient(); Configuration.setDefaultApiClient(client); // New String appNamespace = "default"; String appName = "leader-election-foobar"; String lockHolderIdentityName = UUID.randomUUID().toString(); // Anything unique EndpointsLock lock = new EndpointsLock(appNamespace, appName, lockHolderIdentityName); LeaderElectionConfig leaderElectionConfig = new LeaderElectionConfig( lock, Duration.ofMillis(10000), Duration.ofMillis(8000), Duration.ofMillis(2000)); try (LeaderElector leaderElector = new LeaderElector(leaderElectionConfig)) { leaderElector.run( () -> { System.out.println("Do something when getting leadership."); }, () -> { System.out.println("Do something when losing leadership."); }); } } } ``` 架构流程图: [[http://iidp.chinasie.com:9999/iidpminio/design/gateway.png]] 基于okHttp进行转发 ```java public class ReverseProxyServlet extends HttpServlet { private final List<String> targets; private final Random random = new Random(); private final OkHttpClient client = new OkHttpClient(); public ReverseProxyServlet(List<String> targets) { this.targets = targets; } @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String target = targets.get(random.nextInt(targets.size())); String targetUrl = target + req.getRequestURI(); Request proxyRequest = new Request.Builder() .url(targetUrl) .build(); try (Response proxyResponse = client.newCall(proxyRequest).execute()) { resp.setStatus(proxyResponse.code()); if (proxyResponse.body() != null) { resp.getOutputStream().write(proxyResponse.body().bytes()); } } } public static void main(String[] args) throws Exception { List<String> targets = new ArrayList<>(); targets.add("http://localhost:9091"); targets.add("http://localhost:9092"); Server server = new Server(9090); ServletContextHandler context = new ServletContextHandler(); context.setContextPath("/"); server.setHandler(context); context.addServlet(new ServletHolder(new ReverseProxyServlet(targets)), "/*"); server.start(); server.join(); } } ``` 自定义服务注册与发现功能 ```java import java.util.ArrayList; import java.util.List; import java.util.Random; public class ServiceRegistry { private final List<String> services = new ArrayList<>(); private final Random random = new Random(); public void register(String serviceUrl) { services.add(serviceUrl); } public void deregister(String serviceUrl) { services.remove(serviceUrl); } public String getRandomService() { if (services.isEmpty()) { throw new IllegalStateException("No registered services"); } return services.get(random.nextInt(services.size())); } public List<String> getAllServices() { return new ArrayList<>(services); } } ``` #### 2.2 在service的yaml文件中维护app和容器的关系 在k8s已有的设计中,一个service是如何找到与它关联的pod的呢?是基于yaml文件中的 selector label,那么同样的原理,我们需要通过app来找到与这个app关联的服务的呢?也一样基于yaml中的selector annotation。 在service的yaml文件中维护app和它所在容器ip的关系。每个 Pod 中可在进程内缓存这些关系,并通过k8s提供的watch api实时监听和更新本地路由缓存。 用途: - 在访问时,直接读取app与容器的路由信息,进行访问,没有网络转发。 - 在安装和重启时,可以直接从 service yaml 文件中 获取完整的需要安装的app信息,没有各做一半的情况,无需协调和回调,独立可完成全部的安装和重启。 - 使用pod ip直接通信,实现透明通信,支持 stream 模式,体验类似于单机版。 [[http://iidp.chinasie.com:9999/iidpminio/sie-snest-gw/p4.png]] 此外,本地缓存可通过 watch 功能主动更新,防止无意义、尝试性的操作。可以参考以下示例代码实现 watch 功能: ```java public static void watchServicesInNamespace(BiConsumer<AppRoute.EventType, Map<String, String>> updateRouteCache) { SharedInformerFactory informerFactory = new SharedInformerFactory(apiClient); SharedIndexInformer<V1Service> serviceInformer = informerFactory.sharedIndexInformerFor( (callGeneratorParams) -> coreApi.listNamespacedServiceCall( NAMESPACE, null, null, null, null, K8sConstants.LABEL_SELECTOR_IIDP, null, "", null, callGeneratorParams.timeoutSeconds, callGeneratorParams.watch, null), V1Service.class, V1ServiceList.class); serviceLister = new Lister<>(serviceInformer.getIndexer(), NAMESPACE); SharedIndexInformer<V1Endpoints> endpointsInformer = informerFactory.sharedIndexInformerFor( (callGeneratorParams) -> coreApi.listNamespacedEndpointsCall( NAMESPACE, null, null, null, null, K8sConstants.LABEL_SELECTOR_IIDP, null, "", null, callGeneratorParams.timeoutSeconds, callGeneratorParams.watch, null), V1Endpoints.class, V1EndpointsList.class); endpointsLister = new Lister<>(endpointsInformer.getIndexer(), NAMESPACE); serviceInformer.addEventHandler( new ResourceEventHandler<V1Service>() { @Override public void onAdd(V1Service obj) { V1ObjectMeta metadata = obj.getMetadata(); if (metadata == null) { return; } logger.info("\n\n ==================== Service added: {} \n\n", metadata.getName()); // 更新路由缓存 updateRouteCache.accept(AppRoute.EventType.ADDED, getRouteMap(metadata)); } @Override public void onUpdate(V1Service oldObj, V1Service newObj) { logger.info("\n\n ==================== Service updated: {}\n\n", newObj.getMetadata().getName()); // 有新的app加入则会更新service,需要同步到路由缓存 updateRouteCache.accept(AppRoute.EventType.UPDATED, getRouteMap(newObj.getMetadata())); } @Override public void onDelete(V1Service obj, boolean deletedFinalStateUnknown) { V1ObjectMeta metadata = obj.getMetadata(); if (metadata == null) { return; } logger.info("\n\n ===================== Service deleted: {} \n\n", metadata.getName()); updateRouteCache.accept(AppRoute.EventType.DELETED, getRouteMap(metadata)); } private Map<String, String> getRouteMap(V1ObjectMeta metadata) { Map<String, String> annotations = metadata.getAnnotations(); if (annotations == null) { return null; } String installedApps = annotations.get(K8sConstants.ANNOTATION_INSTALLED_APPS); if (installedApps == null) { return null; } Map<String, String> routeMap = new HashMap<>(); String serviceName = metadata.getName(); // 按逗号切割,并去掉两边的空格 String[] apps = installedApps.split(","); for (String app : apps) { routeMap.put(app.trim(), serviceName); } String dependencies = annotations.get(K8sConstants.ANNOTATION_DEPENDENCIES); if (dependencies != null && !dependencies.isEmpty()) { String[] deps = dependencies.split(","); for (String dep : deps) { routeMap.put(dep.trim(), metadata.getName()); } } return routeMap; } }); informerFactory.startAllRegisteredInformers(); // 使用 Wait.poll 方法等待 Informer 同步完成 boolean synced = Wait.poll(Duration.ofMillis(100), Duration.ofSeconds(10), serviceInformer::hasSynced); if (synced) { logger.info("Cache fully loaded (total {} services), discovery client is now available", serviceLister.list().size()); } else { logger.error("Failed to sync informer within the timeout period."); throw new RuntimeException("Failed to sync informer within the timeout period."); } } ``` 通过 watch 服务变化,及时主动更新内存中的 service 和app对应关系,确保数据的一致性和实时性。 #### 2.3 安装和卸载应用的一致性 安装和卸载功能都由master节点进行管理和创建容器。为了确保一致性,可以采用以下策略: 1. 先写入 MySQL,如果出现错误则直接报错给用户,提示重试。 2. 成功写入 MySQL 后,再创建容器。 3. 如果创建容器失败,可以考虑轮询重试,或者最佳实践是使用 Operator 模式。 可以在安装卸载应用中实现 Operator 的功能,确保操作的自动化和一致性。 [[http://iidp.chinasie.com:9999/iidpminio/design/route.png]] 操作k8s接口 创建一个deployment和对应的service: ```java public class GenericClientExample { public static void main(String[] args) throws Exception { // create a service and deployment. // 创建 Deployment V1Deployment deployment = new V1Deployment() .metadata(new V1ObjectMeta().name("foo").namespace("default")) .spec(new V1DeploymentSpec() .replicas(1) .selector(new V1LabelSelector().putMatchLabelsItem("app", "foo")) .template(new V1PodTemplateSpec() .metadata(new V1ObjectMeta().putLabelsItem("app", "foo").putLabelsItem("version", "v1")) .spec(new V1PodSpec() .containers(Arrays.asList( new V1Container() .name("foo") .image("busybox") .command(Arrays.asList("sleep", "3600"))))))); ApiClient apiClient = ClientBuilder.standard().build(); GenericKubernetesApi<V1Deployment, V1DeploymentList> deploymentClient = new GenericKubernetesApi<>(V1Deployment.class, V1DeploymentList.class, "apps", "v1", "deployments", apiClient); V1Deployment createdDeployment = deploymentClient.create(deployment, new CreateOptions()).throwsApiException().getObject(); System.out.println("Deployment created: " + createdDeployment.getMetadata().getName()); // 创建 Service V1Service service = new V1Service() .metadata(new V1ObjectMeta().name("foo2").namespace("default")) .spec(new V1ServiceSpec() .selector(new HashMap<String, String>() {{ put("app", "foo"); }}) .ports(Arrays.asList(new V1ServicePort().port(80).targetPort(new IntOrString(80))))); GenericKubernetesApi<V1Service, V1ServiceList> serviceClient = new GenericKubernetesApi<>(V1Service.class, V1ServiceList.class, "", "v1", "services", apiClient); V1Service createdService = serviceClient.create(service, new CreateOptions()).throwsApiException().getObject(); System.out.println("Service created: " + createdService.getMetadata().getName()); } } ``` 删除一个deployment和service,则较为简单,指定namesapce和name即可: ```java public class GenericClientExample { public static void main(String[] args) throws Exception { // delete a service and deployment // delete Deployment ApiClient apiClient = ClientBuilder.standard().build(); GenericKubernetesApi<V1Deployment, V1DeploymentList> deploymentClient = new GenericKubernetesApi<>(V1Deployment.class, V1DeploymentList.class, "apps"/*deployment等内置控制器属于apps分组*/, "v1", "deployments", apiClient); V1Deployment deletedDeployment = deploymentClient.delete("default", "foo", new DeleteOptions()).throwsApiException().getObject(); System.out.println("Deployment deleted: " + deletedDeployment.getStatus()); // delete Service GenericKubernetesApi<V1Service, V1ServiceList> serviceClient = new GenericKubernetesApi<>(V1Service.class, V1ServiceList.class, ""/* service pod 等内置资源属于core组,不需要指定分组*/, "v1", "services", apiClient); serviceClient.delete("default", "foo2", new DeleteOptions()).throwsApiException(); System.out.println("Service deleted"); } } ``` 可能有人会感觉容器的管理和维护,应该是属于运维层面的事情,不应该放到业务中来处理,确实是的,其实最佳做法还是按照业界的做法,编写operator来完成所有的容器操作,而且这个操作是一致的,尤其是再涉及到多个资源的情况,比如有service、deployment、configmap、pvc、secret等,如何保证创建、删除和修改这些资源的一致性呢? k8s已经给出了答案,通过operaotr即可做到 [[https://kubernetes.io/zh-cn/docs/concepts/extend-kubernetes/operator/]] 以及如何管理依赖和引用:[[https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/]] ### 3. 总结 通过上述改进方案,可以解决当前架构中的多个问题: - 减少不必要的网络转发,直接使用pod ip进行通信,提升系统性能。 - 实现确定性的路由,避免死循环。 - 简化应用安装和卸载过程,减少协调和回调,提高效率。 - 使用service yaml文件维护应用和容器服务关系,确保数据一致性,只要容器在就能保证需要安装的app。 - 不需要依赖hazelcast组网来进行内存同步,简化同步流程,确保元模型数据一致性和正确性。 - 引入 Operator 模式,实现自动化管理和一致性操作k8s资源。 这些改进将显著提升系统的可靠性、可维护性和性能,为业务发展提供坚实的技术保障。 以上实现代码,本人已全部验证过。 参考:[[https://github.com/kubernetes-client/java]]
Uploading file...
Sidebar
[[_TOC_]]
Edit message:
Cancel