9a56162c3dbecf8180a1deb54216ccd02679ce0d
\345\237\272\344\272\216k8s\345\256\236\347\216\260\345\205\203\346\225\260\346\215\256\347\232\204\345\220\214\346\255\245.md
| ... | ... | @@ -0,0 +1,254 @@ |
| 1 | +### 需求分析 |
|
| 2 | + |
|
| 3 | +iidp 平台有时候需要将元数据同步到所有 pod 的内存中,比如v2.8.2版本中需要在各个pod内存中维护model名称和app的关系, |
|
| 4 | +这样发起请求call的时候就可以直接通过model名称找到app,并进一步通过app找到pod ip完成远程通信。 |
|
| 5 | +类似地,app 和 pod ip的关系也是基于k8s watch机制实现的,但有一点不一样的是app的路由信息是直接使用k8s内置的资源service来实现的, |
|
| 6 | +而对于model名称和app的关系,或者说任何其他的元数据的存储需要用户自己定义CRD,详细可参考k8s官方文档:https://kubernetes.io/zh-cn/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/ |
|
| 7 | + |
|
| 8 | +### 设计与实现 |
|
| 9 | + |
|
| 10 | +#### 1. CRD定义 |
|
| 11 | + |
|
| 12 | +CRD定义的yaml文件如下: |
|
| 13 | +```yaml |
|
| 14 | +apiVersion: apiextensions.k8s.io/v1 |
|
| 15 | +kind: CustomResourceDefinition |
|
| 16 | +metadata: |
|
| 17 | + name: appmodels.iidp.com |
|
| 18 | +spec: |
|
| 19 | + group: iidp.com |
|
| 20 | + names: |
|
| 21 | + kind: AppModel |
|
| 22 | + listKind: AppModelList |
|
| 23 | + plural: appmodels |
|
| 24 | + shortNames: |
|
| 25 | + - am |
|
| 26 | + singular: appmodel |
|
| 27 | + scope: Namespaced |
|
| 28 | + versions: |
|
| 29 | + - name: v1 |
|
| 30 | + schema: |
|
| 31 | + openAPIV3Schema: |
|
| 32 | + properties: |
|
| 33 | + spec: |
|
| 34 | + properties: |
|
| 35 | + models: |
|
| 36 | + items: |
|
| 37 | + type: string |
|
| 38 | + type: array |
|
| 39 | + type: |
|
| 40 | + type: string |
|
| 41 | + version: |
|
| 42 | + type: string |
|
| 43 | + type: object |
|
| 44 | + type: object |
|
| 45 | + served: true |
|
| 46 | + storage: true |
|
| 47 | +``` |
|
| 48 | +该crd文件定义了一个类型为AppModel的 appmodel k8s资源,它的spec包括三个字段,分别是两个string类型的type和version字段和一个string数组类型的models字段。models字段表示一个app可以有多个model名称,比如一个名为 iiot 的app包含有 iiot_dataservice、iiot_edge_project、iiot_data_query等元模型。 |
|
| 49 | + |
|
| 50 | +#### 2. CRD的创建 |
|
| 51 | +通过kubectl 或者 在代码中自动创建crd都可以,创建完以后我们查看一下具体的crd: |
|
| 52 | +```shell |
|
| 53 | +PS C:\Users\lzb\Desktop> k25 get crd appmodels.iidp.com -oyaml |
|
| 54 | +apiVersion: apiextensions.k8s.io/v1 |
|
| 55 | +kind: CustomResourceDefinition |
|
| 56 | +metadata: |
|
| 57 | + creationTimestamp: "2025-04-24T03:39:09Z" |
|
| 58 | + generation: 1 |
|
| 59 | + name: appmodels.iidp.com |
|
| 60 | + resourceVersion: "1060306843" |
|
| 61 | + selfLink: /apis/apiextensions.k8s.io/v1/customresourcedefinitions/appmodels.iidp.com |
|
| 62 | + uid: 49d3eba2-f2ed-4d10-b885-6e6154bb7baf |
|
| 63 | +spec: |
|
| 64 | + conversion: |
|
| 65 | + strategy: None |
|
| 66 | + group: iidp.com |
|
| 67 | + names: |
|
| 68 | + kind: AppModel |
|
| 69 | + listKind: AppModelList |
|
| 70 | + plural: appmodels |
|
| 71 | + shortNames: |
|
| 72 | + - am |
|
| 73 | + singular: appmodel |
|
| 74 | + scope: Namespaced |
|
| 75 | + versions: |
|
| 76 | + - name: v1 |
|
| 77 | + schema: |
|
| 78 | + openAPIV3Schema: |
|
| 79 | + properties: |
|
| 80 | + spec: |
|
| 81 | + properties: |
|
| 82 | + models: |
|
| 83 | + items: |
|
| 84 | + type: string |
|
| 85 | + type: array |
|
| 86 | + type: |
|
| 87 | + type: string |
|
| 88 | + version: |
|
| 89 | + type: string |
|
| 90 | + type: object |
|
| 91 | + type: object |
|
| 92 | + served: true |
|
| 93 | + storage: true |
|
| 94 | +status: |
|
| 95 | + acceptedNames: |
|
| 96 | + kind: AppModel |
|
| 97 | + listKind: AppModelList |
|
| 98 | + plural: appmodels |
|
| 99 | + shortNames: |
|
| 100 | + - am |
|
| 101 | + singular: appmodel |
|
| 102 | + conditions: |
|
| 103 | + - lastTransitionTime: "2025-04-24T03:39:09Z" |
|
| 104 | + message: no conflicts found |
|
| 105 | + reason: NoConflicts |
|
| 106 | + status: "True" |
|
| 107 | + type: NamesAccepted |
|
| 108 | + - lastTransitionTime: "2025-04-24T03:39:09Z" |
|
| 109 | + message: the initial names have been accepted |
|
| 110 | + reason: InitialNamesAccepted |
|
| 111 | + status: "True" |
|
| 112 | + type: Established |
|
| 113 | + storedVersions: |
|
| 114 | + - v1 |
|
| 115 | +``` |
|
| 116 | + |
|
| 117 | +#### 3. CRD的使用 |
|
| 118 | +有了CRD以后,我们就相当于mysql中建好了一张表,那么就可以往这张表中插入具体的元数据了。这里使用golang进行示例。 |
|
| 119 | +```go |
|
| 120 | + // 创建动态客户端 |
|
| 121 | + dynamicClient, err := dynamic.NewForConfig(config) |
|
| 122 | + if err != nil { |
|
| 123 | + log.Fatalf("创建 Kubernetes 动态客户端失败: %v", err) |
|
| 124 | + } |
|
| 125 | + |
|
| 126 | + // 定义自定义资源对象 |
|
| 127 | + database := &unstructured.Unstructured{ |
|
| 128 | + Object: map[string]interface{}{ |
|
| 129 | + "apiVersion": "iidp.com/v1", |
|
| 130 | + "kind": "AppModel", |
|
| 131 | + "metadata": map[string]interface{}{ |
|
| 132 | + "name": "base", |
|
| 133 | + "labels": map[string]string{ |
|
| 134 | + "iidp": "iidp", |
|
| 135 | + }, |
|
| 136 | + }, |
|
| 137 | + "spec": map[string]interface{}{ |
|
| 138 | + "type": "model", |
|
| 139 | + "version": "1.7", |
|
| 140 | + "models": []string{"model1", "model2", "model3"}, |
|
| 141 | + }, |
|
| 142 | + }, |
|
| 143 | + } |
|
| 144 | + |
|
| 145 | + // 定义 CRD 的 GroupVersionResource |
|
| 146 | + var appModelGVR = schema.GroupVersionResource{ |
|
| 147 | + Group: "iidp.com", |
|
| 148 | + Version: "v1", |
|
| 149 | + Resource: "appmodels", // CRD 的 plural 名称 |
|
| 150 | + } |
|
| 151 | + |
|
| 152 | + // 创建自定义资源 |
|
| 153 | + namespace := "meta-test" |
|
| 154 | + // get cr |
|
| 155 | + _, err = dynamicClient.Resource(appModelGVR).Namespace(namespace).Get(context.TODO(), "base", metav1.GetOptions{}) |
|
| 156 | + |
|
| 157 | + createdDatabase, err := dynamicClient.Resource(appModelGVR).Namespace(namespace). |
|
| 158 | + Apply(context.TODO(), "base", database, metav1.ApplyOptions{ |
|
| 159 | + FieldManager: "appmodels-manager", |
|
| 160 | + Force: true, |
|
| 161 | + }) |
|
| 162 | + if err != nil { |
|
| 163 | + log.Fatalf("apply 自定义资源失败: %v", err) |
|
| 164 | + } |
|
| 165 | + |
|
| 166 | + fmt.Printf("自定义资源创建成功: %s\n", createdDatabase.GetName()) |
|
| 167 | + |
|
| 168 | +``` |
|
| 169 | + |
|
| 170 | +创建完自定义的cr资源后,我们可以查看一下: |
|
| 171 | +```shell |
|
| 172 | +PS C:\Users\lzb\Desktop> k25 get appmodels/base -o yaml -n meta-test |
|
| 173 | +apiVersion: iidp.com/v1 |
|
| 174 | +kind: AppModel |
|
| 175 | +metadata: |
|
| 176 | + creationTimestamp: "2025-04-24T03:40:36Z" |
|
| 177 | + generation: 5 |
|
| 178 | + labels: |
|
| 179 | + iidp: iidp |
|
| 180 | + name: base |
|
| 181 | + namespace: meta-test |
|
| 182 | + resourceVersion: "1060428463" |
|
| 183 | + selfLink: /apis/iidp.com/v1/namespaces/meta-test/appmodels/base |
|
| 184 | + uid: 941f0a54-0f2f-4642-8684-3945e3272f9f |
|
| 185 | +spec: |
|
| 186 | + models: |
|
| 187 | + - model1 |
|
| 188 | + - model2 |
|
| 189 | + - model3 |
|
| 190 | + type: model |
|
| 191 | + version: "1.7" |
|
| 192 | +``` |
|
| 193 | +可以发现spec中的models字段已经成功的存储了我们需要的元数据。 |
|
| 194 | + |
|
| 195 | +#### 4. CRD的watch |
|
| 196 | + |
|
| 197 | +接下来我们需要实现对crd的watch功能,k8s的watch机制是基于etcd的watch实现的,具体可以参考k8s官方文档:https://kubernetes.io/zh-cn/docs/reference/using-api/api-concepts/#watching-changes-to-objects |
|
| 198 | +这里我们使用golang的client-go库来实现对crd的watch,代码如下: |
|
| 199 | + |
|
| 200 | +```go |
|
| 201 | + |
|
| 202 | + labelSelector := "iidp=iidp" // 选择器,筛选出带有特定标签的资源 |
|
| 203 | + labelsTweakListOptionsFunc := func(options *metav1.ListOptions) { |
|
| 204 | + if options.LabelSelector != "" { |
|
| 205 | + options.LabelSelector += "," + labelSelector |
|
| 206 | + } else { |
|
| 207 | + options.LabelSelector = labelSelector |
|
| 208 | + } |
|
| 209 | + } |
|
| 210 | + // 创建动态 Shared Informer 工厂 |
|
| 211 | + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( |
|
| 212 | + dynamicClient, |
|
| 213 | + 300*time.Second, // Resync 周期 |
|
| 214 | + "meta-test", // 命名空间(如果监听所有命名空间,设置为 "") |
|
| 215 | + labelsTweakListOptionsFunc, // 过滤选项 |
|
| 216 | + ) |
|
| 217 | + |
|
| 218 | + // 获取 Informer |
|
| 219 | + informer := factory.ForResource(appModelGVR).Informer() |
|
| 220 | + |
|
| 221 | + // 设置事件处理函数 |
|
| 222 | + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ |
|
| 223 | + AddFunc: func(obj interface{}) { |
|
| 224 | + log.Println("新增资源:") |
|
| 225 | + printObject(obj) |
|
| 226 | + }, |
|
| 227 | + UpdateFunc: func(oldObj, newObj interface{}) { |
|
| 228 | + log.Println("更新资源:") |
|
| 229 | + printObject(newObj) |
|
| 230 | + }, |
|
| 231 | + DeleteFunc: func(obj interface{}) { |
|
| 232 | + log.Println("删除资源:") |
|
| 233 | + printObject(obj) |
|
| 234 | + }, |
|
| 235 | + }) |
|
| 236 | + |
|
| 237 | + // 启动 Informer |
|
| 238 | + stopCh := make(chan struct{}) |
|
| 239 | + defer close(stopCh) |
|
| 240 | + |
|
| 241 | + go func() { |
|
| 242 | + log.Println("启动 Informer...") |
|
| 243 | + informer.Run(stopCh) |
|
| 244 | + }() |
|
| 245 | + |
|
| 246 | + // 等待缓存同步完成 |
|
| 247 | + if !cache.WaitForCacheSync(stopCh, informer.HasSynced) { |
|
| 248 | + log.Fatalf("等待缓存同步失败") |
|
| 249 | + } |
|
| 250 | + |
|
| 251 | + select {} |
|
| 252 | +``` |
|
| 253 | +通过上面的代码,我们可以实现对crd的watch功能,然后在事件处理方法中将app和models的关系存储到内存中,供后续的请求使用。 |
|
| 254 | +到此我们完成了基于k8s的元数据同步功能。 |
|
| ... | ... | \ No newline at end of file |