☰
Current Page
Main Menu
Home
Home
Editing
基于k8s实现元数据的同步
Edit
Preview
h1
h2
h3
default
Set your preferred keybinding
default
vim
emacs
markdown
Set this page's format to
AsciiDoc
Creole
Markdown
MediaWiki
Org-mode
Plain Text
RDoc
Textile
Rendering unavailable for
BibTeX
Pod
reStructuredText
Help 1
Help 1
Help 1
Help 2
Help 3
Help 4
Help 5
Help 6
Help 7
Help 8
Autosaved text is available. Click the button to restore it.
Restore Text
### 需求分析 iidp 平台有时候需要将元数据同步到所有 pod 的内存中,比如v2.8.2版本中需要在各个pod内存中维护model名称和app的关系, 这样发起请求call的时候就可以直接通过model名称找到app,并进一步通过app找到pod ip完成远程通信。 类似地,app 和 pod ip的关系也是基于k8s watch机制实现的,但有一点不一样的是app的路由信息是直接使用k8s内置的资源service来实现的, 而对于model名称和app的关系,或者说任何其他的元数据的存储需要用户自己定义CRD,详细可参考k8s官方文档:[[https://kubernetes.io/zh-cn/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/]] ### 设计与实现 #### 1. CRD定义 CRD定义的yaml文件如下: ```yaml apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: creationTimestamp: "2025-05-01T07:21:38Z" generation: 1 name: appmodels.iidp.com resourceVersion: "1067883926" selfLink: /apis/apiextensions.k8s.io/v1/customresourcedefinitions/appmodels.iidp.com uid: 16777c57-b6ac-4441-b770-abb8373bb52f spec: conversion: strategy: None group: iidp.com names: kind: AppModel listKind: AppModelList plural: appmodels shortNames: - am singular: appmodel scope: Namespaced versions: - name: v1 schema: openAPIV3Schema: properties: spec: properties: app: description: App name of the model type: string models: description: Model list of the iidp-model items: properties: name: description: Model identifier type: string version: description: version of model type: string required: - name - version type: object minItems: 1 type: array version: description: Version of app type: string type: object type: object served: true storage: true ``` 该crd文件定义了一个类型为AppModel的 appmodel k8s资源,它的spec包括三个字段,分别是两个string类型的app和version字段和一个Model数组类型的models字段。models字段表示一个app可以有多个model名称,比如一个名为 iiot 的app包含有 iiot_dataservice、iiot_edge_project、iiot_data_query等元模型。 <br> <font color=red>注意:</font> 不管是app还是model,它们都包含各自的version字段用于描述自身的版本信息,比如可用于判断版本是否改变等。 #### 2. CRD的创建 通过kubectl 或者 在代码中自动创建crd都可以,创建完以后我们查看一下具体的crd: ```shell PS C:\Users\lzb\Desktop> k25 get crd appmodels.iidp.com -oyaml apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: creationTimestamp: "2025-05-01T07:21:38Z" generation: 1 name: appmodels.iidp.com resourceVersion: "1067883926" selfLink: /apis/apiextensions.k8s.io/v1/customresourcedefinitions/appmodels.iidp.com uid: 16777c57-b6ac-4441-b770-abb8373bb52f spec: conversion: strategy: None group: iidp.com names: kind: AppModel listKind: AppModelList plural: appmodels shortNames: - am singular: appmodel scope: Namespaced versions: - name: v1 schema: openAPIV3Schema: properties: spec: properties: app: description: App name of the model type: string models: description: Model list of the iidp-model items: properties: name: description: Model identifier type: string version: description: version of model type: string required: - name - version type: object minItems: 1 type: array version: description: Version of app type: string type: object type: object served: true storage: true status: acceptedNames: kind: AppModel listKind: AppModelList plural: appmodels shortNames: - am singular: appmodel conditions: - lastTransitionTime: "2025-05-01T07:21:38Z" message: no conflicts found reason: NoConflicts status: "True" type: NamesAccepted - lastTransitionTime: "2025-05-01T07:21:38Z" message: the initial names have been accepted reason: InitialNamesAccepted status: "True" type: Established storedVersions: - v1 ``` #### 3. CRD的使用 有了CRD以后,我们就相当于mysql中建好了一张表,那么就可以往这张表中插入具体的元数据了。这里使用golang进行示例。 ```go // 创建动态客户端 dynamicClient, err := dynamic.NewForConfig(config) if err != nil { log.Fatalf("创建 Kubernetes 动态客户端失败: %v", err) } // 定义自定义资源对象 database := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "iidp.com/v1", "kind": "AppModel", "metadata": map[string]interface{}{ "name": "base", "labels": map[string]string{ "iidp": "iidp", }, }, "spec": map[string]interface{}{ "type": "model", "version": "1.7", "models": []string{"model1", "model2", "model3"}, }, }, } // 定义 CRD 的 GroupVersionResource var appModelGVR = schema.GroupVersionResource{ Group: "iidp.com", Version: "v1", Resource: "appmodels", // CRD 的 plural 名称 } // 创建自定义资源 namespace := "meta-test" // get cr _, err = dynamicClient.Resource(appModelGVR).Namespace(namespace).Get(context.TODO(), "base", metav1.GetOptions{}) createdDatabase, err := dynamicClient.Resource(appModelGVR).Namespace(namespace). Apply(context.TODO(), "base", database, metav1.ApplyOptions{ FieldManager: "appmodels-manager", Force: true, }) if err != nil { log.Fatalf("apply 自定义资源失败: %v", err) } fmt.Printf("自定义资源创建成功: %s\n", createdDatabase.GetName()) ``` 创建完自定义的cr资源后,我们可以查看一下: ```yaml apiVersion: iidp.com/v1 kind: AppModel metadata: creationTimestamp: "2025-05-06T08:05:37Z" generation: 1 labels: iidp: iidp name: newsdkapp namespace: meta-test resourceVersion: "1073098820" selfLink: /apis/iidp.com/v1/namespaces/meta-test/appmodels/newsdkapp uid: 879f9de3-2c8e-4168-87f7-376be768b221 spec: app: newSdkApp models: - name: model1 version: v-1 - name: model2 version: v-2 - name: model3 version: v-3 version: v1 ``` 可以发现spec中的已经成功存储了app和models相关元数据信息。由于只包含name和version这种元信息,所以整体数据量不会很大,假设1个app有100个元模型,每个元模型name和version共50个字符串,那么单个cr资源有100 * 50 = 5000,约为5k大小,对于1M的上限来说绰绰有余。 #### 4. CRD的watch 接下来我们需要实现对crd的watch功能,k8s的watch机制是基于etcd的watch实现的,具体可以参考k8s官方文档:[[https://kubernetes.io/zh-cn/docs/reference/using-api/api-concepts/#watching-changes-to-objects]] 这里我们使用golang的client-go库来实现对crd的watch,代码如下: ```go labelSelector := "iidp=iidp" // 选择器,筛选出带有特定标签的资源 labelsTweakListOptionsFunc := func(options *metav1.ListOptions) { if options.LabelSelector != "" { options.LabelSelector += "," + labelSelector } else { options.LabelSelector = labelSelector } } // 创建动态 Shared Informer 工厂 factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( dynamicClient, 300*time.Second, // Resync 周期 "meta-test", // 命名空间(如果监听所有命名空间,设置为 "") labelsTweakListOptionsFunc, // 过滤选项 ) // 获取 Informer informer := factory.ForResource(appModelGVR).Informer() // 设置事件处理函数 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { log.Println("新增资源:") printObject(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { log.Println("更新资源:") printObject(newObj) }, DeleteFunc: func(obj interface{}) { log.Println("删除资源:") printObject(obj) }, }) // 启动 Informer stopCh := make(chan struct{}) defer close(stopCh) go func() { log.Println("启动 Informer...") informer.Run(stopCh) }() // 等待缓存同步完成 if !cache.WaitForCacheSync(stopCh, informer.HasSynced) { log.Fatalf("等待缓存同步失败") } select {} ``` 通过上面的代码,我们可以实现对crd的watch功能,然后在事件处理方法中将app和models的关系存储到内存中,供后续的请求使用。 到此我们完成了基于k8s的元数据同步功能。 ### 基于crd进行元模型内存同步 在上面的代码中,我们实现了对crd的watch功能,可知我们可以实时感知到crd中数据的变化,基于此特性,我们可以实现元模型数据在多个pod中的内存同步,下面使用java伪码示例: ```java public interface AppModelEventHandler { void onAdd(AppModel appModel); void onUpdate(AppModel appModel); void onDelete(AppModel appModel); } class AppModelEventHandlerImpl implements AppModelEventHandler { /* 定义一个保存所有app模型元数据的ConcurrentHashMap key为app名称,value为另一个ConcurrentHashMap,key为model名称,值为模型元数据 */ private static final ConcurrentHashMap<String, ConcurrentHashMap<String, ModelMeta>> appModelMetaMap = new ConcurrentHashMap<>(); @Override public void onAdd(AppModel appModel) { // Handle the addition of an AppModel System.out.println("AppModel added: " + appModel); // 将app模型元数据添加到ConcurrentHashMap中 ConcurrentHashMap<String, ModelMeta> modelMetaMap = new ConcurrentHashMap<>(); for (AppModel.Model model : appModel.models) { String key = appModel.appName + "_" + model.name; // 从redis 获取模型元数据,并构建ModelMeta对象 ModelMeta modelMeta = new ModelMeta(); modelMetaMap.put(model.name, modelMeta); } } @Override public void onUpdate(AppModel appModel) { // Handle the update of an AppModel System.out.println("AppModel updated: " + appModel); // 更新app模型元数据 String appName = appModel.appName; String appVersion = appModel.version; // 判断 app名称是否存在,如果存在,则进一步判断app version是否与当前版本一致 // 如果一致,则说明没有更新,直接返回。 // 如果app version 不一致,则说明更新了版本号,需要更新模型元数据 // 遍历模型元数据,更新模型元数据 for (AppModel.Model model : appModel.models) { String key = appName + "_" + model.name; // 判断模型名称是否存在,如果存在,则继续判断模型版本号是否一致 // 如果一致,则说明没有更新,直接返回。 // 如果模型版本号不一致,则说明更新了版本号,需要更新模型元数据 // 从redis 获取模型元数据,并构建ModelMeta对象 ModelMeta modelMeta = new ModelMeta(); // 更新模型元数据 } } @Override public void onDelete(AppModel appModel) { // Handle the deletion of an AppModel System.out.println("AppModel deleted: " + appModel); } } ``` 在上面的代码和注释中,我们定义了一个AppModelEventHandler接口,用于处理crd的增删改事件,然后在实现类中实现了对app模型元数据的增删改操作。一旦crd中数据发生变化,就会触发对应的事件处理方法,从而实现了元模型数据在多个pod中的内存同步。 需要注意的是,app和model 都是保存有自己的版本号,所以在更新的时候需要判断版本号是否一致,如果一致,则说明没有更新,直接返回即可,如果版本不一致则需要重新从redis加载最新版本的value。在大部分情况下一般version都是一致,但是如果消费事件异常可能会导致不一致,这个时候k8s也提供了resync机制来重新同步一次数据(注意不是同步事件,而是数据本身)。 为什么要引入版本号version呢?主要是为了性能,因为k8s自带resync机制(为了解决事件消费失败问题),会将已有的数据以update事件定期重新同步到内存中,如果不判断版本号,则会导致每次都更新内存中的数据,且每次都需要从redis中获取value数据,造成很多无谓的性能浪费。 参考:[Informer 中为什么需要引入 Resync 机制?](https://github.com/cloudnativeto/sig-kubernetes/issues/11)
Uploading file...
Sidebar
[[_TOC_]]
Edit message:
Cancel