需求分析
iidp 平台有时候需要将元数据同步到所有 pod 的内存中,比如v2.8.2版本中需要在各个pod内存中维护model名称和app的关系, 这样发起请求call的时候就可以直接通过model名称找到app,并进一步通过app找到pod ip完成远程通信。 类似地,app 和 pod ip的关系也是基于k8s watch机制实现的,但有一点不一样的是app的路由信息是直接使用k8s内置的资源service来实现的, 而对于model名称和app的关系,或者说任何其他的元数据的存储需要用户自己定义CRD,详细可参考k8s官方文档:https://kubernetes.io/zh-cn/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/
设计与实现
1. CRD定义
CRD定义的yaml文件如下:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
creationTimestamp: "2025-05-01T07:21:38Z"
generation: 1
name: appmodels.iidp.com
resourceVersion: "1067883926"
selfLink: /apis/apiextensions.k8s.io/v1/customresourcedefinitions/appmodels.iidp.com
uid: 16777c57-b6ac-4441-b770-abb8373bb52f
spec:
conversion:
strategy: None
group: iidp.com
names:
kind: AppModel
listKind: AppModelList
plural: appmodels
shortNames:
- am
singular: appmodel
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
spec:
properties:
app:
description: App name of the model
type: string
models:
description: Model list of the iidp-model
items:
properties:
name:
description: Model identifier
type: string
version:
description: version of model
type: string
required:
- name
- version
type: object
minItems: 1
type: array
version:
description: Version of app
type: string
type: object
type: object
served: true
storage: true
该crd文件定义了一个类型为AppModel的 appmodel k8s资源,它的spec包括三个字段,分别是两个string类型的app和version字段和一个Model数组类型的models字段。models字段表示一个app可以有多个model名称,比如一个名为 iiot 的app包含有 iiot_dataservice、iiot_edge_project、iiot_data_query等元模型。
注意: 不管是app还是model,它们都包含各自的version字段用于描述自身的版本信息,比如可用于判断版本是否改变等。
2. CRD的创建
通过kubectl 或者 在代码中自动创建crd都可以,创建完以后我们查看一下具体的crd:
PS C:\Users\lzb\Desktop> k25 get crd appmodels.iidp.com -oyaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
creationTimestamp: "2025-05-01T07:21:38Z"
generation: 1
name: appmodels.iidp.com
resourceVersion: "1067883926"
selfLink: /apis/apiextensions.k8s.io/v1/customresourcedefinitions/appmodels.iidp.com
uid: 16777c57-b6ac-4441-b770-abb8373bb52f
spec:
conversion:
strategy: None
group: iidp.com
names:
kind: AppModel
listKind: AppModelList
plural: appmodels
shortNames:
- am
singular: appmodel
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
spec:
properties:
app:
description: App name of the model
type: string
models:
description: Model list of the iidp-model
items:
properties:
name:
description: Model identifier
type: string
version:
description: version of model
type: string
required:
- name
- version
type: object
minItems: 1
type: array
version:
description: Version of app
type: string
type: object
type: object
served: true
storage: true
status:
acceptedNames:
kind: AppModel
listKind: AppModelList
plural: appmodels
shortNames:
- am
singular: appmodel
conditions:
- lastTransitionTime: "2025-05-01T07:21:38Z"
message: no conflicts found
reason: NoConflicts
status: "True"
type: NamesAccepted
- lastTransitionTime: "2025-05-01T07:21:38Z"
message: the initial names have been accepted
reason: InitialNamesAccepted
status: "True"
type: Established
storedVersions:
- v1
3. CRD的使用
有了CRD以后,我们就相当于mysql中建好了一张表,那么就可以往这张表中插入具体的元数据了。这里使用golang进行示例。
// 创建动态客户端
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
log.Fatalf("创建 Kubernetes 动态客户端失败: %v", err)
}
// 定义自定义资源对象
database := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "iidp.com/v1",
"kind": "AppModel",
"metadata": map[string]interface{}{
"name": "base",
"labels": map[string]string{
"iidp": "iidp",
},
},
"spec": map[string]interface{}{
"type": "model",
"version": "1.7",
"models": []string{"model1", "model2", "model3"},
},
},
}
// 定义 CRD 的 GroupVersionResource
var appModelGVR = schema.GroupVersionResource{
Group: "iidp.com",
Version: "v1",
Resource: "appmodels", // CRD 的 plural 名称
}
// 创建自定义资源
namespace := "meta-test"
// get cr
_, err = dynamicClient.Resource(appModelGVR).Namespace(namespace).Get(context.TODO(), "base", metav1.GetOptions{})
createdDatabase, err := dynamicClient.Resource(appModelGVR).Namespace(namespace).
Apply(context.TODO(), "base", database, metav1.ApplyOptions{
FieldManager: "appmodels-manager",
Force: true,
})
if err != nil {
log.Fatalf("apply 自定义资源失败: %v", err)
}
fmt.Printf("自定义资源创建成功: %s\n", createdDatabase.GetName())
创建完自定义的cr资源后,我们可以查看一下:
apiVersion: iidp.com/v1
kind: AppModel
metadata:
creationTimestamp: "2025-05-06T08:05:37Z"
generation: 1
labels:
iidp: iidp
name: newsdkapp
namespace: meta-test
resourceVersion: "1073098820"
selfLink: /apis/iidp.com/v1/namespaces/meta-test/appmodels/newsdkapp
uid: 879f9de3-2c8e-4168-87f7-376be768b221
spec:
app: newSdkApp
models:
- name: model1
version: v-1
- name: model2
version: v-2
- name: model3
version: v-3
version: v1
可以发现spec中的已经成功存储了app和models相关元数据信息。
4. CRD的watch
接下来我们需要实现对crd的watch功能,k8s的watch机制是基于etcd的watch实现的,具体可以参考k8s官方文档:https://kubernetes.io/zh-cn/docs/reference/using-api/api-concepts/#watching-changes-to-objects 这里我们使用golang的client-go库来实现对crd的watch,代码如下:
labelSelector := "iidp=iidp" // 选择器,筛选出带有特定标签的资源
labelsTweakListOptionsFunc := func(options *metav1.ListOptions) {
if options.LabelSelector != "" {
options.LabelSelector += "," + labelSelector
} else {
options.LabelSelector = labelSelector
}
}
// 创建动态 Shared Informer 工厂
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dynamicClient,
300*time.Second, // Resync 周期
"meta-test", // 命名空间(如果监听所有命名空间,设置为 "")
labelsTweakListOptionsFunc, // 过滤选项
)
// 获取 Informer
informer := factory.ForResource(appModelGVR).Informer()
// 设置事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Println("新增资源:")
printObject(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
log.Println("更新资源:")
printObject(newObj)
},
DeleteFunc: func(obj interface{}) {
log.Println("删除资源:")
printObject(obj)
},
})
// 启动 Informer
stopCh := make(chan struct{})
defer close(stopCh)
go func() {
log.Println("启动 Informer...")
informer.Run(stopCh)
}()
// 等待缓存同步完成
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
log.Fatalf("等待缓存同步失败")
}
select {}
通过上面的代码,我们可以实现对crd的watch功能,然后在事件处理方法中将app和models的关系存储到内存中,供后续的请求使用。
到此我们完成了基于k8s的元数据同步功能。
基于crd进行元模型内存同步
在上面的代码中,我们实现了对crd的watch功能,可知我们可以实时感知到crd中数据的变化,基于此特性,我们可以实现元模型数据在多个pod中的内存同步,下面使用java伪码示例:
public interface AppModelEventHandler {
void onAdd(AppModel appModel);
void onUpdate(AppModel appModel);
void onDelete(AppModel appModel);
}
class AppModelEventHandlerImpl implements AppModelEventHandler {
/*
定义一个保存所有app模型元数据的ConcurrentHashMap
key为app名称,value为另一个ConcurrentHashMap,key为model名称,值为模型元数据
*/
private static final ConcurrentHashMap<String, ConcurrentHashMap<String, ModelMeta>> appModelMetaMap = new ConcurrentHashMap<>();
@Override
public void onAdd(AppModel appModel) {
// Handle the addition of an AppModel
System.out.println("AppModel added: " + appModel);
// 将app模型元数据添加到ConcurrentHashMap中
ConcurrentHashMap<String, ModelMeta> modelMetaMap = new ConcurrentHashMap<>();
for (AppModel.Model model : appModel.models) {
String key = appModel.appName + "_" + model.name;
// 从redis 获取模型元数据,并构建ModelMeta对象
ModelMeta modelMeta = new ModelMeta();
modelMetaMap.put(model.name, modelMeta);
}
}
@Override
public void onUpdate(AppModel appModel) {
// Handle the update of an AppModel
System.out.println("AppModel updated: " + appModel);
// 更新app模型元数据
String appName = appModel.appName;
String appVersion = appModel.version;
// 判断 app名称是否存在,如果存在,则进一步判断app version是否与当前版本一致
// 如果一致,则说明没有更新,直接返回。
// 如果app version 不一致,则说明更新了版本号,需要更新模型元数据
// 遍历模型元数据,更新模型元数据
for (AppModel.Model model : appModel.models) {
String key = appName + "_" + model.name;
// 判断模型名称是否存在,如果存在,则继续判断模型版本号是否一致
// 如果一致,则说明没有更新,直接返回。
// 如果模型版本号不一致,则说明更新了版本号,需要更新模型元数据
// 从redis 获取模型元数据,并构建ModelMeta对象
ModelMeta modelMeta = new ModelMeta();
// 更新模型元数据
}
}
@Override
public void onDelete(AppModel appModel) {
// Handle the deletion of an AppModel
System.out.println("AppModel deleted: " + appModel);
}
}
在上面的代码中,我们定义了一个AppModelEventHandler接口,用于处理crd的增删改事件,然后在实现类中实现了对app模型元数据的增删改操作。一旦crd中数据发生变化,就会触发对应的事件处理方法,从而实现了元模型数据在多个pod中的内存同步。
需要注意的是,app和model 都是保存有自己的版本号,所以在更新的时候需要判断版本号是否一致,如果一致,则说明没有更新,直接返回即可,如果版本不一致则需要重新从redis加载最新版本的value。为什么要引入版本号version呢?主要是为了性能,因为k8s自带resync机制(为了解决事件消费失败问题),会将已有的数据以update事件定期重新同步到内存中,如果不判断版本号,则会导致每次都更新内存中的数据,且每次都需要从redis中获取value数据,造成很多无谓的性能浪费。