K8s client-go 源码中 informer 的 Indexer 部分是如何实现的?

2026-04-28 10:041阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2907个文字,预计阅读时间需要12分钟。

K8s client-go 源码中 informer 的 Indexer 部分是如何实现的?

%E2%80%9CInformers%E5%AE%9E%E7%8E%B0%E4%BA%86%E6%8C%81%E7%BB%AD%E8%8E%B7%E5%8F%96%E9%9B%86%E7%BE%A4%E8%B5%84%E6%BA%90%E7%9A%84%E5%8A%9F%E8%83%BD%EF%BC%8C%E5%B9%B6%E5%9C%A8%E6%9C%AC%E5%9C%B0%E7%BB%B4%E6%8A%A5%E4%BA%86%E8%B5%84%E6%BA%90%E5%AF%B9%E8%B1%A1%E7%9A%84%E5%86%85%E5%AD%98%E7%BC%93%E5%AD%98%EF%BC%8C%E4%BB%A5%E5%87%8F%E5%B0%91%E8%AF%B7%E6%B1%82%E5%8E%8B%E5%8A%9B%E3%80%82%E5%9C%A8%E5%90%AF%E5%8A%A8%E6%97%B6%E5%80%99%E9%A6%96%E5%85%88%E6%89%A7%E8%A1%8C%E2%80%9D

informers实现了持续获取集群的所有资源对象、监听集群的资源对象变化功能,并在本地维护了全量资源对象的内存缓存,以减少对apiserver、对etcd的请求压力。Informers在启动的时候会首先在客户端调用List接口来获取全量的对象集合,然后通过Watch接口来获取增量的对象,然后更新本地缓存。 client-go之Indexer源码分析 1.Indexer概述

Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { items map[string]interface{} indexers Indexers indices Indices ... }

informer所维护的缓存依赖于threadSafeMap结构体中的items属性,其本质上是一个用map构建的键值对,资源对象都存在items这个map中,key为资源对象的namespace/name组成,value为资源对象本身,这些构成了informer的本地缓存。

Indexer除了维护了一份本地内存缓存外,还有一个很重要的功能,便是索引功能了。索引的目的就是为了快速查找,比如我们需要查找某个node节点上的所有pod、查找某个命名空间下的所有pod等,利用到索引,可以实现快速查找。关于索引功能,则依赖于threadSafeMap结构体中的indexers与indices属性。

先通过一张informer概要架构图看一下Indexer所处位置与其概要功能。

2.Indexer的结构定义分析 2.1 Indexer interface

Indexer接口继承了一个Store接口(实现本地缓存),以及包含几个index索引相关的方法声明(实现索引功能)。

// staging/src/k8s.io/client-go/tools/cache/index.go type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error } 2.2 Store interface

Store接口本身,定义了Add、Update、Delete、List、Get等一些对象增删改查的方法声明,用于操作informer的本地缓存。

// staging/src/k8s.io/client-go/tools/cache/store.go type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error } 2.3 cache struct

结合代码,可以看到cache struct是Indexer接口的一个实现,所以自然也是Store接口的一个实现,cache struct包含一个ThreadSafeStore接口的实现,以及一个计算object key的函数KeyFunc。

cache struct会根据keyFunc生成某个obj对象对应的一个唯一key, 然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象。

// staging/src/k8s.io/client-go/tools/cache/store.go type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc } 2.4 ThreadSafeStore interface

ThreadSafeStore接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法,其方法名称与Indexer接口的类似,最大区别是ThreadSafeStore接口的增删改查方法入参基本都有key,由cache struct中的KeyFunc函数计算得出object key。

K8s client-go 源码中 informer 的 Indexer 部分是如何实现的?

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error Resync() error } 2.5 threadSafeMap struct

threadSafeMap struct是ThreadSafeStore接口的一个实现,其最重要的一个属性便是items了,items是用map构建的键值对,资源对象都存在items这个map中,key根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices } 2.6 Indexer结构定义小结

下面对上面介绍的Indexer的相关struct与interface做个小结:
(1)Store interface: 定义了Add、Update、Delete、List、Get等一些对象增删改查的方法声明,用于操作informer的本地缓存;
(2)Indexer interface: 继承了一个Store接口(实现本地缓存),以及包含几个index索引相关的方法声明(实现索引功能);
(3)cache struct: Indexer接口的一个实现,所以自然也是Store接口的一个实现,cache struct包含一个ThreadSafeStore接口的实现,以及一个计算object key的函数KeyFunc;
(4)ThreadSafeStore interface: 包含了操作本地缓存的增删改查方法以及索引功能的相关方法,其方法名称与Indexer接口的类似,最大区别是ThreadSafeStore接口的增删改查方法入参基本都有key,由cache struct中的KeyFunc函数计算得出object key;
(5)threadSafeMap struct: ThreadSafeStore接口的一个实现,其最重要的一个属性便是items了,items是用map构建的键值对,资源对象都存在items这个map中,key根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关;

3.Indexer的索引功能

在threadSafeMap struct中,与索引功能有关的是indexers与indices属性;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices } type Indexers map[string]IndexFunc type IndexFunc func(obj interface{}) ([]string, error) type Indices map[string]Index type Index map[string]sets.String 3.1 type Indexers map[string]IndexFunc / type IndexFunc func(obj interface{}) ([]string, error)

Indexers包含了所有索引器(索引分类)及其索引器函数IndexFunc,IndexFunc为计算某个索引键下的所有对象键列表的方法;

Indexers: { "索引器1": 索引函数1, "索引器2": 索引函数2, }

数据示例:

Indexers: { "namespace": MetaNamespaceIndexFunc, "nodeName": NodeNameIndexFunc, }

func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } return []string{meta.GetNamespace()}, nil } func NodeNameIndexFunc(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { return []string{""}, fmt.Errorf("object is not a pod) } return []string{pod.Spec.NodeName}, nil } 3.2 type Indices map[string]Index / type Index map[string]sets.String

Indices包含了所有索引器(索引分类)及其所有的索引数据Index;而Index则包含了索引键以及索引键下的所有对象键的列表;

Indices: { "索引器1": { "索引键1": ["对象键1", "对象键2"], "索引键2": ["对象键3"], }, "索引器2": { "索引键3": ["对象键1"], "索引键4": ["对象键2", "对象键3"], } }

数据示例:

pod1 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-1", Namespace: "default", }, Spec: v1.PodSpec{ NodeName: "node1", } } pod2 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-2", Namespace: "default", }, Spec: v1.PodSpec{ NodeName: "node2", } } pod3 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-3", Namespace: "kube-system", }, Spec: v1.PodSpec{ NodeName: "node2", } }

Indices: { "namespace": { "default": ["pod-1", "pod-2"], "kube-system": ["pod-3"], }, "nodeName": { "node1": ["pod-1"], "node2": ["pod-2", "pod-3"], } } 3.3 索引结构小结

Indexers: { "索引器1": 索引函数1, "索引器2": 索引函数2, } Indices: { "索引器1": { "索引键1": ["对象键1", "对象键2"], "索引键2": ["对象键3"], }, "索引器2": { "索引键3": ["对象键1"], "索引键4": ["对象键2", "对象键3"], } } 3.4 索引功能方法分析

看到Indexer interface,除了继承的Store外,其他的几个方法声明均与索引功能相关,下面对几个常用方法进行介绍。

// staging/src/k8s.io/client-go/tools/cache/index.go type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }

下面的方法介绍基于以下数据:

Indexers: { "namespace": MetaNamespaceIndexFunc, "nodeName": NodeNameIndexFunc, }

Indices: { "namespace": { "default": ["pod-1", "pod-2"], "kube-system": ["pod-3"], }, "nodeName": { "node1": ["pod-1"], "node2": ["pod-2", "pod-3"], } } 3.4.1 ByIndex(indexName, indexedValue string) ([]interface{}, error)

调用ByIndex方法,传入索引器名称indexName,以及索引键名称indexedValue,方法寻找该索引器下,索引键对应的对象键列表,然后根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。

// staging/src/k8s.io/client-go/tools/cache/store.go func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) { return c.cacheStorage.ByIndex(indexName, indexKey) }

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexKey] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) } return list, nil }

使用示例:

pods, err := index.ByIndex("namespace", "default") if err != nil { panic(err) } for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name) } fmt.Println("=====") pods, err := index.ByIndex("nodename", "node1") if err != nil { panic(err) } for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name) }

输出:

pod-1 pod-2 ===== pod-1 3.4.2 IndexKeys(indexName, indexedValue string) ([]string, error)

IndexKeys方法与ByIndex方法类似,只不过只返回对象键列表,不会根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。

// staging/src/k8s.io/client-go/tools/cache/store.go func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) { return c.cacheStorage.IndexKeys(indexName, indexKey) }

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexKey] return set.List(), nil } 4.Indexer本地缓存

从前面的分析可以知道,informer中的本地缓存实际上指的是Indexer中的threadSafeMap,具体到属性,则是threadSafeMap中的items属性;

threadSafeMap struct

threadSafeMap struct中的items属性即为informer的本地缓存;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices }

接下来分析下threadSafeMap的几个核心方法,主要都是操作items属性的;

前面对informer-Controller的分析中(代码如下),提到的s.indexer.Add、s.indexer.Update、s.indexer.Delete、s.indexer.Get等方法其实最终就是调用的threadSafeMap.Add、threadSafeMap.Update、threadSafeMap.Delete、threadSafeMap.Get等;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } 4.1 threadSafeMap.Add

调用链:s.indexer.Add --> cache.Add --> threadSafeMap.Add

threadSafeMap.Add方法将key:object存入items中,并调用updateIndices方法更新索引(updateIndices方法这里不展开分析,可以自行查看源码);

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) }

也可以看到对threadSafeMap进行操作的方法,基本都会先获取锁,然后方法执行完毕释放锁,所以是并发安全的。

4.2 threadSafeMap.Update

调用链:s.indexer.Update --> cache.Update --> threadSafeMap.Update

threadSafeMap.Update方法逻辑与threadSafeMap.Add方法相同;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Update(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) } 4.3 threadSafeMap.Delete

调用链:s.indexer.Delete --> cache.Delete --> threadSafeMap.Delete

threadSafeMap.Delete方法中,先判断本地缓存items中是否存在该key,存在则调用deleteFromIndices删除相关索引,然后删除items中的key及其对应object;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { c.deleteFromIndices(obj, key) delete(c.items, key) } } 4.4 threadSafeMap.Get

调用链:s.indexer.Get --> cache.Get --> threadSafeMap.Get

threadSafeMap.Get方法逻辑相对简单,没有索引的相关操作,而是直接从items中通过key获取对应的object并返回;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) { c.lock.RLock() defer c.lock.RUnlock() item, exists = c.items[key] return item, exists } 总结

Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力。

informer所维护的缓存依赖于threadSafeMap结构体中的items属性,其本质上是一个用map构建的键值对,资源对象都存在items这个map中,key为资源对象的namespace/name组成,value为资源对象本身,这些构成了informer的本地缓存。

Indexer除了维护了一份本地内存缓存外,还有一个很重要的功能,便是索引功能了。索引的目的就是为了快速查找,比如我们需要查找某个node节点上的所有pod、查找某个命名空间下的所有pod等,利用到索引,可以实现快速查找。关于索引功能,则依赖于threadSafeMap结构体中的indexers与indices属性。

最后以一张图来回顾总结一下Indexer在informer中所处位置与其概要功能。

本文共计2907个文字,预计阅读时间需要12分钟。

K8s client-go 源码中 informer 的 Indexer 部分是如何实现的?

%E2%80%9CInformers%E5%AE%9E%E7%8E%B0%E4%BA%86%E6%8C%81%E7%BB%AD%E8%8E%B7%E5%8F%96%E9%9B%86%E7%BE%A4%E8%B5%84%E6%BA%90%E7%9A%84%E5%8A%9F%E8%83%BD%EF%BC%8C%E5%B9%B6%E5%9C%A8%E6%9C%AC%E5%9C%B0%E7%BB%B4%E6%8A%A5%E4%BA%86%E8%B5%84%E6%BA%90%E5%AF%B9%E8%B1%A1%E7%9A%84%E5%86%85%E5%AD%98%E7%BC%93%E5%AD%98%EF%BC%8C%E4%BB%A5%E5%87%8F%E5%B0%91%E8%AF%B7%E6%B1%82%E5%8E%8B%E5%8A%9B%E3%80%82%E5%9C%A8%E5%90%AF%E5%8A%A8%E6%97%B6%E5%80%99%E9%A6%96%E5%85%88%E6%89%A7%E8%A1%8C%E2%80%9D

informers实现了持续获取集群的所有资源对象、监听集群的资源对象变化功能,并在本地维护了全量资源对象的内存缓存,以减少对apiserver、对etcd的请求压力。Informers在启动的时候会首先在客户端调用List接口来获取全量的对象集合,然后通过Watch接口来获取增量的对象,然后更新本地缓存。 client-go之Indexer源码分析 1.Indexer概述

Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { items map[string]interface{} indexers Indexers indices Indices ... }

informer所维护的缓存依赖于threadSafeMap结构体中的items属性,其本质上是一个用map构建的键值对,资源对象都存在items这个map中,key为资源对象的namespace/name组成,value为资源对象本身,这些构成了informer的本地缓存。

Indexer除了维护了一份本地内存缓存外,还有一个很重要的功能,便是索引功能了。索引的目的就是为了快速查找,比如我们需要查找某个node节点上的所有pod、查找某个命名空间下的所有pod等,利用到索引,可以实现快速查找。关于索引功能,则依赖于threadSafeMap结构体中的indexers与indices属性。

先通过一张informer概要架构图看一下Indexer所处位置与其概要功能。

2.Indexer的结构定义分析 2.1 Indexer interface

Indexer接口继承了一个Store接口(实现本地缓存),以及包含几个index索引相关的方法声明(实现索引功能)。

// staging/src/k8s.io/client-go/tools/cache/index.go type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error } 2.2 Store interface

Store接口本身,定义了Add、Update、Delete、List、Get等一些对象增删改查的方法声明,用于操作informer的本地缓存。

// staging/src/k8s.io/client-go/tools/cache/store.go type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error } 2.3 cache struct

结合代码,可以看到cache struct是Indexer接口的一个实现,所以自然也是Store接口的一个实现,cache struct包含一个ThreadSafeStore接口的实现,以及一个计算object key的函数KeyFunc。

cache struct会根据keyFunc生成某个obj对象对应的一个唯一key, 然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象。

// staging/src/k8s.io/client-go/tools/cache/store.go type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc } 2.4 ThreadSafeStore interface

ThreadSafeStore接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法,其方法名称与Indexer接口的类似,最大区别是ThreadSafeStore接口的增删改查方法入参基本都有key,由cache struct中的KeyFunc函数计算得出object key。

K8s client-go 源码中 informer 的 Indexer 部分是如何实现的?

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error Resync() error } 2.5 threadSafeMap struct

threadSafeMap struct是ThreadSafeStore接口的一个实现,其最重要的一个属性便是items了,items是用map构建的键值对,资源对象都存在items这个map中,key根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices } 2.6 Indexer结构定义小结

下面对上面介绍的Indexer的相关struct与interface做个小结:
(1)Store interface: 定义了Add、Update、Delete、List、Get等一些对象增删改查的方法声明,用于操作informer的本地缓存;
(2)Indexer interface: 继承了一个Store接口(实现本地缓存),以及包含几个index索引相关的方法声明(实现索引功能);
(3)cache struct: Indexer接口的一个实现,所以自然也是Store接口的一个实现,cache struct包含一个ThreadSafeStore接口的实现,以及一个计算object key的函数KeyFunc;
(4)ThreadSafeStore interface: 包含了操作本地缓存的增删改查方法以及索引功能的相关方法,其方法名称与Indexer接口的类似,最大区别是ThreadSafeStore接口的增删改查方法入参基本都有key,由cache struct中的KeyFunc函数计算得出object key;
(5)threadSafeMap struct: ThreadSafeStore接口的一个实现,其最重要的一个属性便是items了,items是用map构建的键值对,资源对象都存在items这个map中,key根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关;

3.Indexer的索引功能

在threadSafeMap struct中,与索引功能有关的是indexers与indices属性;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices } type Indexers map[string]IndexFunc type IndexFunc func(obj interface{}) ([]string, error) type Indices map[string]Index type Index map[string]sets.String 3.1 type Indexers map[string]IndexFunc / type IndexFunc func(obj interface{}) ([]string, error)

Indexers包含了所有索引器(索引分类)及其索引器函数IndexFunc,IndexFunc为计算某个索引键下的所有对象键列表的方法;

Indexers: { "索引器1": 索引函数1, "索引器2": 索引函数2, }

数据示例:

Indexers: { "namespace": MetaNamespaceIndexFunc, "nodeName": NodeNameIndexFunc, }

func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } return []string{meta.GetNamespace()}, nil } func NodeNameIndexFunc(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { return []string{""}, fmt.Errorf("object is not a pod) } return []string{pod.Spec.NodeName}, nil } 3.2 type Indices map[string]Index / type Index map[string]sets.String

Indices包含了所有索引器(索引分类)及其所有的索引数据Index;而Index则包含了索引键以及索引键下的所有对象键的列表;

Indices: { "索引器1": { "索引键1": ["对象键1", "对象键2"], "索引键2": ["对象键3"], }, "索引器2": { "索引键3": ["对象键1"], "索引键4": ["对象键2", "对象键3"], } }

数据示例:

pod1 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-1", Namespace: "default", }, Spec: v1.PodSpec{ NodeName: "node1", } } pod2 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-2", Namespace: "default", }, Spec: v1.PodSpec{ NodeName: "node2", } } pod3 := &v1.Pod { ObjectMeta: metav1.ObjectMeta { Name: "pod-3", Namespace: "kube-system", }, Spec: v1.PodSpec{ NodeName: "node2", } }

Indices: { "namespace": { "default": ["pod-1", "pod-2"], "kube-system": ["pod-3"], }, "nodeName": { "node1": ["pod-1"], "node2": ["pod-2", "pod-3"], } } 3.3 索引结构小结

Indexers: { "索引器1": 索引函数1, "索引器2": 索引函数2, } Indices: { "索引器1": { "索引键1": ["对象键1", "对象键2"], "索引键2": ["对象键3"], }, "索引器2": { "索引键3": ["对象键1"], "索引键4": ["对象键2", "对象键3"], } } 3.4 索引功能方法分析

看到Indexer interface,除了继承的Store外,其他的几个方法声明均与索引功能相关,下面对几个常用方法进行介绍。

// staging/src/k8s.io/client-go/tools/cache/index.go type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(indexName string) []string ByIndex(indexName, indexedValue string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }

下面的方法介绍基于以下数据:

Indexers: { "namespace": MetaNamespaceIndexFunc, "nodeName": NodeNameIndexFunc, }

Indices: { "namespace": { "default": ["pod-1", "pod-2"], "kube-system": ["pod-3"], }, "nodeName": { "node1": ["pod-1"], "node2": ["pod-2", "pod-3"], } } 3.4.1 ByIndex(indexName, indexedValue string) ([]interface{}, error)

调用ByIndex方法,传入索引器名称indexName,以及索引键名称indexedValue,方法寻找该索引器下,索引键对应的对象键列表,然后根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。

// staging/src/k8s.io/client-go/tools/cache/store.go func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) { return c.cacheStorage.ByIndex(indexName, indexKey) }

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexKey] list := make([]interface{}, 0, set.Len()) for key := range set { list = append(list, c.items[key]) } return list, nil }

使用示例:

pods, err := index.ByIndex("namespace", "default") if err != nil { panic(err) } for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name) } fmt.Println("=====") pods, err := index.ByIndex("nodename", "node1") if err != nil { panic(err) } for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name) }

输出:

pod-1 pod-2 ===== pod-1 3.4.2 IndexKeys(indexName, indexedValue string) ([]string, error)

IndexKeys方法与ByIndex方法类似,只不过只返回对象键列表,不会根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。

// staging/src/k8s.io/client-go/tools/cache/store.go func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) { return c.cacheStorage.IndexKeys(indexName, indexKey) }

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexKey] return set.List(), nil } 4.Indexer本地缓存

从前面的分析可以知道,informer中的本地缓存实际上指的是Indexer中的threadSafeMap,具体到属性,则是threadSafeMap中的items属性;

threadSafeMap struct

threadSafeMap struct中的items属性即为informer的本地缓存;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices }

接下来分析下threadSafeMap的几个核心方法,主要都是操作items属性的;

前面对informer-Controller的分析中(代码如下),提到的s.indexer.Add、s.indexer.Update、s.indexer.Delete、s.indexer.Get等方法其实最终就是调用的threadSafeMap.Add、threadSafeMap.Update、threadSafeMap.Delete、threadSafeMap.Get等;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } 4.1 threadSafeMap.Add

调用链:s.indexer.Add --> cache.Add --> threadSafeMap.Add

threadSafeMap.Add方法将key:object存入items中,并调用updateIndices方法更新索引(updateIndices方法这里不展开分析,可以自行查看源码);

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) }

也可以看到对threadSafeMap进行操作的方法,基本都会先获取锁,然后方法执行完毕释放锁,所以是并发安全的。

4.2 threadSafeMap.Update

调用链:s.indexer.Update --> cache.Update --> threadSafeMap.Update

threadSafeMap.Update方法逻辑与threadSafeMap.Add方法相同;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Update(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) } 4.3 threadSafeMap.Delete

调用链:s.indexer.Delete --> cache.Delete --> threadSafeMap.Delete

threadSafeMap.Delete方法中,先判断本地缓存items中是否存在该key,存在则调用deleteFromIndices删除相关索引,然后删除items中的key及其对应object;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { c.deleteFromIndices(obj, key) delete(c.items, key) } } 4.4 threadSafeMap.Get

调用链:s.indexer.Get --> cache.Get --> threadSafeMap.Get

threadSafeMap.Get方法逻辑相对简单,没有索引的相关操作,而是直接从items中通过key获取对应的object并返回;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) { c.lock.RLock() defer c.lock.RUnlock() item, exists = c.items[key] return item, exists } 总结

Indexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力。

informer所维护的缓存依赖于threadSafeMap结构体中的items属性,其本质上是一个用map构建的键值对,资源对象都存在items这个map中,key为资源对象的namespace/name组成,value为资源对象本身,这些构成了informer的本地缓存。

Indexer除了维护了一份本地内存缓存外,还有一个很重要的功能,便是索引功能了。索引的目的就是为了快速查找,比如我们需要查找某个node节点上的所有pod、查找某个命名空间下的所有pod等,利用到索引,可以实现快速查找。关于索引功能,则依赖于threadSafeMap结构体中的indexers与indices属性。

最后以一张图来回顾总结一下Indexer在informer中所处位置与其概要功能。