需求分析
iidp 平台有时候需要将元数据同步到所有 pod 的内存中,比如v2.8.2版本中需要在各个pod内存中维护model名称和app的关系, 这样发起请求call的时候就可以直接通过model名称找到app,并进一步通过app找到pod ip完成远程通信。 类似地,app 和 pod ip的关系也是基于k8s watch机制实现的,但有一点不一样的是app的路由信息是直接使用k8s内置的资源service来实现的, 而对于model名称和app的关系,或者说任何其他的元数据的存储需要用户自己定义CRD,详细可参考k8s官方文档:https://kubernetes.io/zh-cn/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/
设计与实现
1. CRD定义
CRD定义的yaml文件如下:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: appmodels.iidp.com
spec:
group: iidp.com
names:
kind: AppModel
listKind: AppModelList
plural: appmodels
shortNames:
- am
singular: appmodel
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
spec:
properties:
models:
items:
type: string
type: array
type:
type: string
version:
type: string
type: object
type: object
served: true
storage: true
该crd文件定义了一个类型为AppModel的 appmodel k8s资源,它的spec包括三个字段,分别是两个string类型的type和version字段和一个string数组类型的models字段。models字段表示一个app可以有多个model名称,比如一个名为 iiot 的app包含有 iiot_dataservice、iiot_edge_project、iiot_data_query等元模型。
2. CRD的创建
通过kubectl 或者 在代码中自动创建crd都可以,创建完以后我们查看一下具体的crd:
PS C:\Users\lzb\Desktop> k25 get crd appmodels.iidp.com -oyaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
creationTimestamp: "2025-04-24T03:39:09Z"
generation: 1
name: appmodels.iidp.com
resourceVersion: "1060306843"
selfLink: /apis/apiextensions.k8s.io/v1/customresourcedefinitions/appmodels.iidp.com
uid: 49d3eba2-f2ed-4d10-b885-6e6154bb7baf
spec:
conversion:
strategy: None
group: iidp.com
names:
kind: AppModel
listKind: AppModelList
plural: appmodels
shortNames:
- am
singular: appmodel
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
spec:
properties:
models:
items:
type: string
type: array
type:
type: string
version:
type: string
type: object
type: object
served: true
storage: true
status:
acceptedNames:
kind: AppModel
listKind: AppModelList
plural: appmodels
shortNames:
- am
singular: appmodel
conditions:
- lastTransitionTime: "2025-04-24T03:39:09Z"
message: no conflicts found
reason: NoConflicts
status: "True"
type: NamesAccepted
- lastTransitionTime: "2025-04-24T03:39:09Z"
message: the initial names have been accepted
reason: InitialNamesAccepted
status: "True"
type: Established
storedVersions:
- v1
3. CRD的使用
有了CRD以后,我们就相当于mysql中建好了一张表,那么就可以往这张表中插入具体的元数据了。这里使用golang进行示例。
// 创建动态客户端
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
log.Fatalf("创建 Kubernetes 动态客户端失败: %v", err)
}
// 定义自定义资源对象
database := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "iidp.com/v1",
"kind": "AppModel",
"metadata": map[string]interface{}{
"name": "base",
"labels": map[string]string{
"iidp": "iidp",
},
},
"spec": map[string]interface{}{
"type": "model",
"version": "1.7",
"models": []string{"model1", "model2", "model3"},
},
},
}
// 定义 CRD 的 GroupVersionResource
var appModelGVR = schema.GroupVersionResource{
Group: "iidp.com",
Version: "v1",
Resource: "appmodels", // CRD 的 plural 名称
}
// 创建自定义资源
namespace := "meta-test"
// get cr
_, err = dynamicClient.Resource(appModelGVR).Namespace(namespace).Get(context.TODO(), "base", metav1.GetOptions{})
createdDatabase, err := dynamicClient.Resource(appModelGVR).Namespace(namespace).
Apply(context.TODO(), "base", database, metav1.ApplyOptions{
FieldManager: "appmodels-manager",
Force: true,
})
if err != nil {
log.Fatalf("apply 自定义资源失败: %v", err)
}
fmt.Printf("自定义资源创建成功: %s\n", createdDatabase.GetName())
创建完自定义的cr资源后,我们可以查看一下:
PS C:\Users\lzb\Desktop> k25 get appmodels/base -o yaml -n meta-test
apiVersion: iidp.com/v1
kind: AppModel
metadata:
creationTimestamp: "2025-04-24T03:40:36Z"
generation: 5
labels:
iidp: iidp
name: base
namespace: meta-test
resourceVersion: "1060428463"
selfLink: /apis/iidp.com/v1/namespaces/meta-test/appmodels/base
uid: 941f0a54-0f2f-4642-8684-3945e3272f9f
spec:
models:
- model1
- model2
- model3
type: model
version: "1.7"
可以发现spec中的models字段已经成功的存储了我们需要的元数据。
4. CRD的watch
接下来我们需要实现对crd的watch功能,k8s的watch机制是基于etcd的watch实现的,具体可以参考k8s官方文档:https://kubernetes.io/zh-cn/docs/reference/using-api/api-concepts/#watching-changes-to-objects 这里我们使用golang的client-go库来实现对crd的watch,代码如下:
labelSelector := "iidp=iidp" // 选择器,筛选出带有特定标签的资源
labelsTweakListOptionsFunc := func(options *metav1.ListOptions) {
if options.LabelSelector != "" {
options.LabelSelector += "," + labelSelector
} else {
options.LabelSelector = labelSelector
}
}
// 创建动态 Shared Informer 工厂
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dynamicClient,
300*time.Second, // Resync 周期
"meta-test", // 命名空间(如果监听所有命名空间,设置为 "")
labelsTweakListOptionsFunc, // 过滤选项
)
// 获取 Informer
informer := factory.ForResource(appModelGVR).Informer()
// 设置事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Println("新增资源:")
printObject(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
log.Println("更新资源:")
printObject(newObj)
},
DeleteFunc: func(obj interface{}) {
log.Println("删除资源:")
printObject(obj)
},
})
// 启动 Informer
stopCh := make(chan struct{})
defer close(stopCh)
go func() {
log.Println("启动 Informer...")
informer.Run(stopCh)
}()
// 等待缓存同步完成
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
log.Fatalf("等待缓存同步失败")
}
select {}
通过上面的代码,我们可以实现对crd的watch功能,然后在事件处理方法中将app和models的关系存储到内存中,供后续的请求使用。
到此我们完成了基于k8s的元数据同步功能。