需求分析

iidp 平台有时候需要将元数据同步到所有 pod 的内存中,比如v2.8.2版本中需要在各个pod内存中维护model名称和app的关系, 这样发起请求call的时候就可以直接通过model名称找到app,并进一步通过app找到pod ip完成远程通信。 类似地,app 和 pod ip的关系也是基于k8s watch机制实现的,但有一点不一样的是app的路由信息是直接使用k8s内置的资源service来实现的, 而对于model名称和app的关系,或者说任何其他的元数据的存储需要用户自己定义CRD,详细可参考k8s官方文档:https://kubernetes.io/zh-cn/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/

设计与实现

1. CRD定义

CRD定义的yaml文件如下:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: appmodels.iidp.com
spec:
  group: iidp.com
  names:
    kind: AppModel
    listKind: AppModelList
    plural: appmodels
    shortNames:
    - am
    singular: appmodel
  scope: Namespaced
  versions:
  - name: v1
    schema:
      openAPIV3Schema:
        properties:
          spec:
            properties:
              models:
                items:
                  type: string
                type: array
              type:
                type: string
              version:
                type: string
            type: object
        type: object
    served: true
    storage: true
该crd文件定义了一个类型为AppModel的 appmodel k8s资源,它的spec包括三个字段,分别是两个string类型的type和version字段和一个string数组类型的models字段。models字段表示一个app可以有多个model名称,比如一个名为 iiot 的app包含有 iiot_dataservice、iiot_edge_project、iiot_data_query等元模型。

2. CRD的创建

通过kubectl 或者 在代码中自动创建crd都可以,创建完以后我们查看一下具体的crd:

PS C:\Users\lzb\Desktop> k25 get crd appmodels.iidp.com -oyaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  creationTimestamp: "2025-04-24T03:39:09Z"
  generation: 1
  name: appmodels.iidp.com
  resourceVersion: "1060306843"
  selfLink: /apis/apiextensions.k8s.io/v1/customresourcedefinitions/appmodels.iidp.com
  uid: 49d3eba2-f2ed-4d10-b885-6e6154bb7baf
spec:
  conversion:
    strategy: None
  group: iidp.com
  names:
    kind: AppModel
    listKind: AppModelList
    plural: appmodels
    shortNames:
    - am
    singular: appmodel
  scope: Namespaced
  versions:
  - name: v1
    schema:
      openAPIV3Schema:
        properties:
          spec:
            properties:
              models:
                items:
                  type: string
                type: array
              type:
                type: string
              version:
                type: string
            type: object
        type: object
    served: true
    storage: true
status:
  acceptedNames:
    kind: AppModel
    listKind: AppModelList
    plural: appmodels
    shortNames:
    - am
    singular: appmodel
  conditions:
  - lastTransitionTime: "2025-04-24T03:39:09Z"
    message: no conflicts found
    reason: NoConflicts
    status: "True"
    type: NamesAccepted
  - lastTransitionTime: "2025-04-24T03:39:09Z"
    message: the initial names have been accepted
    reason: InitialNamesAccepted
    status: "True"
    type: Established
  storedVersions:
  - v1

3. CRD的使用

有了CRD以后,我们就相当于mysql中建好了一张表,那么就可以往这张表中插入具体的元数据了。这里使用golang进行示例。

// 创建动态客户端
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
	log.Fatalf("创建 Kubernetes 动态客户端失败: %v", err)
}

// 定义自定义资源对象
database := &unstructured.Unstructured{
	Object: map[string]interface{}{
		"apiVersion": "iidp.com/v1",
		"kind":       "AppModel",
		"metadata": map[string]interface{}{
			"name": "base",
			"labels": map[string]string{
				"iidp": "iidp",
			},
		},
		"spec": map[string]interface{}{
			"type":    "model",
			"version": "1.7",
			"models":  []string{"model1", "model2", "model3"},
		},
	},
}

// 定义 CRD 的 GroupVersionResource
var appModelGVR = schema.GroupVersionResource{
	Group:    "iidp.com",
	Version:  "v1",
	Resource: "appmodels", // CRD 的 plural 名称
}

// 创建自定义资源
namespace := "meta-test"
// get cr
_, err = dynamicClient.Resource(appModelGVR).Namespace(namespace).Get(context.TODO(), "base", metav1.GetOptions{})

createdDatabase, err := dynamicClient.Resource(appModelGVR).Namespace(namespace).
	Apply(context.TODO(), "base", database, metav1.ApplyOptions{
		FieldManager: "appmodels-manager",
		Force:        true,
	})
if err != nil {
	log.Fatalf("apply 自定义资源失败: %v", err)
}

fmt.Printf("自定义资源创建成功: %s\n", createdDatabase.GetName())

创建完自定义的cr资源后,我们可以查看一下:

PS C:\Users\lzb\Desktop> k25 get appmodels/base -o yaml -n meta-test
apiVersion: iidp.com/v1
kind: AppModel
metadata:
  creationTimestamp: "2025-04-24T03:40:36Z"
  generation: 5
  labels:
    iidp: iidp
  name: base
  namespace: meta-test
  resourceVersion: "1060428463"
  selfLink: /apis/iidp.com/v1/namespaces/meta-test/appmodels/base
  uid: 941f0a54-0f2f-4642-8684-3945e3272f9f
spec:
  models:
  - model1
  - model2
  - model3
  type: model
  version: "1.7"
可以发现spec中的models字段已经成功的存储了我们需要的元数据。

4. CRD的watch

接下来我们需要实现对crd的watch功能,k8s的watch机制是基于etcd的watch实现的,具体可以参考k8s官方文档:https://kubernetes.io/zh-cn/docs/reference/using-api/api-concepts/#watching-changes-to-objects 这里我们使用golang的client-go库来实现对crd的watch,代码如下:


labelSelector := "iidp=iidp" // 选择器,筛选出带有特定标签的资源
labelsTweakListOptionsFunc := func(options *metav1.ListOptions) {
	if options.LabelSelector != "" {
		options.LabelSelector += "," + labelSelector
	} else {
		options.LabelSelector = labelSelector
	}
}
// 创建动态 Shared Informer 工厂
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
	dynamicClient,
	300*time.Second,            // Resync 周期
	"meta-test",                // 命名空间(如果监听所有命名空间,设置为 "")
	labelsTweakListOptionsFunc, // 过滤选项
)

// 获取 Informer
informer := factory.ForResource(appModelGVR).Informer()

// 设置事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		log.Println("新增资源:")
		printObject(obj)
	},
	UpdateFunc: func(oldObj, newObj interface{}) {
		log.Println("更新资源:")
		printObject(newObj)
	},
	DeleteFunc: func(obj interface{}) {
		log.Println("删除资源:")
		printObject(obj)
	},
})

// 启动 Informer
stopCh := make(chan struct{})
defer close(stopCh)

go func() {
	log.Println("启动 Informer...")
	informer.Run(stopCh)
}()

// 等待缓存同步完成
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
	log.Fatalf("等待缓存同步失败")
}

select {}
通过上面的代码,我们可以实现对crd的watch功能,然后在事件处理方法中将app和models的关系存储到内存中,供后续的请求使用。 到此我们完成了基于k8s的元数据同步功能。