如何深入探讨Kubernetes的客户端实现原理?

2026-05-29 05:391阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计4631个文字,预计阅读时间需要19分钟。

如何深入探讨Kubernetes的客户端实现原理?

从2016年8月起,Kubernetes官方提取了与Kubernetes相关的核心源代码,形成了一个独立的项目,即client-go,作为官方提供的Go客户端端。Kubernetes的部分代码也是基于这个项目开发的。

Prepare Introduction

从2016年8月起,Kubernetes官方提取了与Kubernetes相关的核心源代码,形成了一个独立的项目,即client-go,作为官方提供的go客户端。Kubernetes的部分代码也是基于这个项目的。

client-go 是kubernetes中广义的客户端基础库,在Kubernetes各个组件中或多或少都有使用其功能。。也就是说,client-go可以在kubernetes集群中添加、删除和查询资源对象(包括deployment、service、pod、ns等)。

在了解client-go前,还需要掌握一些概念

  • 在客户端验证 API
  • 使用证书和使用令牌,来验证客户端
  • kubernetes集群的访问模式
使用证书和令牌来验证客户端

在访问apiserver时,会对访问者进行鉴权,因为是10.0.0.4:6443/version \{ "major": "1", "minor": "18+", "gitVersion": "v1.18.20-dirty", "gitCommit": "1f3e19b7beb1cc0110255668c4238ed63dadb7ad", "gitTreeState": "dirty", "buildDate": "2022-05-17T12:45:14Z", "goVersion": "go1.16.15", "compiler": "gc", "platform": "linux/amd64" } $ curl -k 10.0.0.4:6443/api/v1/namespace/default/pods/netbox { "kind": "Status", "apiVersion": "v1", "metadata": { }, "status": "Failure", "message": "namespace \"default\" is forbidden: User \"system:anonymous\" cannot get resource \"namespace/pods\" in API group \"\" at the cluster scope", "reason": "Forbidden", "details": { "name": "default", "kind": "namespace" }, "code": 403 }

从错误中可以看出,该请求已通过身份验证,用户是 system:anonymous,但该用户未授权列出对应的资源。而上述请求只是忽略 curl 的10.0.0.4:6443/api/v1/namespaces/default/pods/netbox 使用serviceaccount验证客户端身份

使用一个serviceaccount JWT,获取一个SA的方式如下

kubectl get secrets \ $(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}') -o jsonpath='{.data.token}' \ | base64 --decode JWT=$(kubectl get secrets \ $(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}') -o jsonpath='{.data.token}' \ | base64 --decode)

使用secret来访问API

curl --cacert /etc/kubernetes/pki/ca.crt \ --header "Authorization: Bearer $JWT" \ 10.0.0.4:6443/apis/apps/v1/namespaces/default/deployments Pod内部调用Kubernetes API

kubernete会将Kubernetes API地址通过环境变量提供给 Pod,可以通过命令看到

$ env|grep -i kuber KUBERNETES_SERVICE_PORT=443 KUBERNETES_PORT=tcp://192.168.0.1:443 KUBERNETES_PORT_443_TCP_ADDR=192.168.0.1 KUBERNETES_PORT_443_TCP_PORT=443 KUBERNETES_PORT_443_TCP_PROTO=tcp KUBERNETES_PORT_443_TCP=tcp://192.168.0.1:443 KUBERNETES_SERVICE_PORT_HTTPS=443 KUBERNETES_SERVICE_HOST=192.168.0.1

并且还会在将 Kubernetes CA和SA等信息放置在目录 /var/run/secrets/kubernetes.io/serviceaccount/,通过这些就可以从Pod内部访问API

cd /var/run/secrets/kubernetes.io/serviceaccount/ curl --cacert ca.crt --header "Authorization: Bearer $(cat token)" $KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT/api/v1/namespaces/default/pods/netbox

Reference

Kubernetes API Reference Docs

client-go 关于client-go的模块 k8s.io/api

与Pods、ConfigMaps、Secrets和其他Kubernetes 对象所对应的数据结构都在,k8s.io/api,此包几乎没有算法,仅仅是数据机构,该模块有多达上千个用于描述Kubernetes中资源API的结构;通常被client,server,controller等其他的组件使用。

k8s.io/apimachinery

根据该库的描述文件可知,这个库是Server和Client中使用的Kubernetes API共享依赖库,也是kubernetes中更低一级的通用的数据结构。在我们构建自定义资源时,不需要为自定义结构创建属性,如 Kind, apiVersionname...,这些都是库 apimachinery 所提供的功能。

如,在包 k8s.io/apimachinery/pkg/apis/meta 定义了两个结构 TypeMetaObjectMeta;将这这两个结构嵌入自定义的结构中,可以以通用的方式兼容对象,如Kubernetes中的资源 Deplyment 也是这么完成的

通过图来了解Kubernetes的资源如何实现的

如在 k8s.io/apimachinery/pkg/runtime/interfaces.go 中定义了 interface,这个类为在schema中注册的API都需要实现这个结构

type Object interface { GetObjectKind() schema.ObjectKind DeepCopyObject() Object }

非结构化数据

非结构化数据 Unstructured 是指在kubernete中允许将没有注册为Kubernetes API的对象,作为Json对象的方式进行操作,如,使用非结构化 Kubernetes 对象

desired := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "v1", "kind": "ConfigMap", "metadata": map[string]interface{}{ "namespace": namespace, "generateName": "crud-dynamic-simple-", }, "data": map[string]interface{}{ "foo": "bar", }, }, } 非结构化数据的转换

k8s.io/apimachinery/pkg/runtime.UnstructuredConverter 中,也提供了将非结构化数据转换为Kubernetes API注册过的结构,参考如何将非结构化对象转换为Kubernetes Object。

Reference

go types

install client-go

如何选择 client-go 的版本

​ 对于不同的kubernetes版本使用标签 v0.x.y 来表示对应的客户端版本。具体对应参考 client-go 。

​ 例如使用的kubernetes版本为 v1.18.20 则使用对应的标签 v0.x.y 来替换符合当前版本的客户端库。例如:

go get k8s.io/client-go@v0.18.10

官网中给出了client-go的兼容性矩阵,可以很明了的看出如何选择适用于自己kubernetes版本的对应的client-go

  • 表示 该版本的 client-go 与对应的 kubernetes版本功能完全一致
  • + client-go 具有 kubernetes apiserver中不具备的功能。
  • - Kubernetes apiserver 具有client-go 无法使用的功。

一般情况下,除了对应的版本号完全一致外,其他都存在 功能的+-

client-go 目录介绍

client-go的每一个目录都是一个go package

  • kubernetes 包含与Kubernetes API所通信的客户端集
  • discovery 用于发现kube-apiserver所支持的api
  • dynamic 包含了一个动态客户端,该客户端能够对kube-apiserver任意的API进行操作。
  • transport 提供了用于设置认证和启动链接的功能
  • tools/cache: 一些 low-level controller与一些数据结构如fifo,reflector等
structure of client-go
  • RestClient:是最基础的基础架构,其作用是将是使用了miro.medium.com/max/700/1*iI8uFsPRBY5m_g_WW4huMQ.png

    在代码中的注释可以看到一些信息,根据信息可以总结出

    • Delta FIFO 是一个生产者-消费者的队列,生产者是 Reflector,消费者是 Pop()
    • 与传统的FIFO有两点不同
      • Delta FIFO

    Delta FIFO也是实现了 Queue以及一些其他 interface 的类,

    type DeltaFIFO struct { lock sync.RWMutex // 一个读写锁,保证线程安全 cond sync.Cond items map[string]Deltas // 存放的类型是一个key[string] =》 value[Delta] 类型的数据 queue []string // 用于存储item的key,是一个fifo populated bool // populated 是用来标记首次被加入的数据是否被变动 initialPopulationCount int // 首次调用 replace() 的数量 keyFunc KeyFunc knownObjects KeyListerGetter // 这里为indexer closed bool // 代表已关闭 closedLock sync.Mutex emitDeltaTypeReplaced bool // 表示事件的类型,true为 replace(), false 为 sync() }

    那么delta的类型是,也就是说通常情况下,Delta为一个 string[runtime.object] 的对象

    type Delta struct { Type DeltaType // 这就是一个string Object interface{} // 之前API部分有了解到,API的类型大致为两类,runtime.Object和非结构化数据 }

    apimachinery/pkg/runtime/interfaces.go

    那么此时,已经明白了Delta FIFO的结构,为一个Delta的队列,整个结构如下

    第一步创建一个Delta FIFO

    现在版本中,对创建Delta FIFO是通过函数 NewDeltaFIFOWithOptions()

    如何深入探讨Kubernetes的客户端实现原理?

    func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.KeyFunction == nil { opts.KeyFunction = MetaNamespaceKeyFunc // 默认的计算key的方法 } f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f } queueActionLocked,Delta FIFO添加操作

    这里说下之前说道的,在追加时的操作 queueActionLocked ,如add update delete实际上走的都是这里

    func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) // 计算key if err != nil { return KeyError{obj, err} } // 把新数据添加到DeltaFIFO中,Detal就是 动作为key,对象为值 // item是DeltaFIFO中维护的一个 map[string]Deltas newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) // 去重,去重我们前面讨论过了 if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } // 不存在则添加 f.items[id] = newDeltas f.cond.Broadcast() } else { delete(f.items, id) // 这里走不到,因为添加更新等操作用newDelta是1 // 源码中也说要忽略这里 } return nil }

    在FIFO继承的Stroe的方法中,如,Add, Update等都是需要去重的,去重的操作是通过对比最后一个和倒数第二个值

    func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) ...

    在函数 dedupDeltas() 中实现的这个

    // re-listing and watching can deliver the same update multiple times in any order. This will combine the most recent two deltas if they are the same. func dedupDeltas(deltas Deltas) Deltas { n := len(deltas) if n < 2 { return deltas } a := &deltas[n-1] // 如 [1,2,3,4] a=4 b := &deltas[n-2] // b=3,这里两个值其实为事件 if out := isDup(a, b); out != nil { d := append(Deltas{}, deltas[:n-2]...) return append(d, *out) } return deltas }

    如果b对象的类型是 DeletedFinalStateUnknown 也会认为是一个旧对象被删除,这里在去重时也只是对删除的操作进行去重。

    // tools/cache/delta_fifo.go func isDup(a, b *Delta) *Delta { if out := isDeletionDup(a, b); out != nil { return out } // TODO: Detect other duplicate situations? Are there any? return nil } // keep the one with the most information if both are deletions. func isDeletionDup(a, b *Delta) *Delta { if b.Type != Deleted || a.Type != Deleted { return nil } // Do more sophisticated checks, or is this sufficient? if _, ok := b.Object.(DeletedFinalStateUnknown); ok { return a } return b }

    为什么需要去重?什么情况下需合并

    代码中开发者给我们留了一个TODO

    TODO: is there anything other than deletions that need deduping?

    • 取决于Detal FIFO 生产-消费延迟
      • 当在一个资源的创建时,其状态会频繁的更新,如 Creating,Runinng等,这个时候会出现大量写入FIFO中的数据,但是在消费端可能之前的并未消费完。
      • 在上面那种情况下,以及Kubernetes 声明式 API 的设计,其实多余的根本不关注,只需要最后一个动作如Running,这种情况下,多个内容可以合并为一个步骤
    • 然而在代码中,去重仅仅是在Delete状态生效,显然这不可用;那么结合这些得到:
      • 在一个工作时间窗口内,如果对于删除操作来说发生多次,与发生一次实际上没什么区别,可以去重
      • 但在更新于新增操作时,实际上在对于声明式 API 的设计个人感觉是完全可以做到去重操作。
        • 同一个时间窗口内多次操作,如更新,实际上Kubernetes应该只关注最终状态而不是命令式?
    Compute Key

    上面大概对一些Detal FIFO的逻辑进行了分析,那么对于Detal FIFO如何去计算,也就是说 MetaNamespaceKeyFunc ,这个是默认的KeyFunc,作用是计算Detals中的唯一key。

    func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { // 显示声明的则为这个值 return string(key), nil } meta, err := meta.Accessor(obj) // 那么使用Accessor,每一个资源都会实现这个Accessor if err != nil { return "", fmt.Errorf("object has no meta: %v", err) } if len(meta.GetNamespace()) > 0 { return meta.GetNamespace() + "/" + meta.GetName(), nil } return meta.GetName(), nil }

    ObjectMetaAccessor 每个Kubernetes资源都会实现这个对象,如Deployment

    // accessor interface type ObjectMetaAccessor interface { GetObjectMeta() Object } // 会被ObjectMeta所实现 func (obj *ObjectMeta) GetObjectMeta() Object { return obj } // 而每一个资源都会继承这个 ObjectMeta,如 ClusterRole type ClusterRole struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"protobuf:"bytes,1,opt,name=metadata"`

    那么这个Deltas的key则为集群类型的是资源本身的名字,namespace范围的则为 meta.GetNamespace() + "/" + meta.GetName(),可以在上面代码中看到,这样就可以给Detal生成了一个唯一的key

    keyof,用于计算对象的key

    func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { if d, ok := obj.(Deltas); ok { if len(d) == 0 { // 长度为0的时候是一个初始的类型 return "", KeyError{obj, ErrZeroLengthDeltasObject} } obj = d.Newest().Object // 用最新的一个对象,如果为空则是nil } if d, ok := obj.(DeletedFinalStateUnknown); ok { return d.Key, nil // 到了这里,之前提到过,是一个过期的值将会被删除 } return f.keyFunc(obj) // 调用具体的key计算函数 } Indexer

    indexer 在整个 client-go 架构中提供了一个具有线程安全的数据存储的对象存储功能;对于Indexer这里会分析下对应的架构及使用方法。

    client-go/tools/cache/index.go 中可以看到 indexer是一个实现了Store 的一个interface

    type Indexer interface { // 继承了store,拥有store的所有方法 Store // 返回indexname的obj的交集 Index(indexName string, obj interface{}) ([]interface{}, error) // 通过对 indexName,indexedValue与之相匹配的集合 IndexKeys(indexName, indexedValue string) ([]string, error) // 给定一个indexName 返回所有的indexed ListIndexFuncValues(indexName string) []string // 通过indexname,返回与indexedvalue相关的 obj ByIndex(indexName, indexedValue string) ([]interface{}, error) // 返回所有的indexer GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }

    实际上对他的实现是一个 cache,cache是一个KeyFunc与ThreadSafeStore实现的indexer,有名称可知具有线程安全的功能

    type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc }

    既然index继承了Store那么,也就是 ThreadSafeStore 必然实现了Store,这是一个基础保证

    type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error Resync() error // Resync is a no-op and is deprecated } // KeyFunc是一个生成key的函数,给一个对象,返回一个key值 type KeyFunc func(obj interface{}) (string, error)

    那么这个indexer structure可以通过图来很直观的看出来

    cache的结构

    cache中会出现三种数据结构,也可以成为三种名词,为 Index , Indexers , Indices

    type Index map[string]sets.String type Indexers map[string]IndexFunc type Indices map[string]Index

    可以看出:

    • Index 映射到对象,sets.String 也是在API中定义的数据类型 [string]Struct{}
    • Indexers 是这个 IndexIndexFunc , 是一个如何计算Index的keyname的函数
    • Indices 通过Index 名词拿到对应的对象

    这个名词的概念如下,通过图来了解会更加清晰

    从创建开始

    创建一个cache有两种方式,一种是指定indexer,一种是默认indexer

    // NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc) Store { return &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } } // NewIndexer returns an Indexer implemented simply with a map and a lock. func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } } 更新操作

    在indexer中的更新操作(诸如 add , update ),实际上操作的是 updateIndices, 通过在代码可以看出

    tools/cache/thread_safe_store.go 的 77行起,那么就来看下 updateIndices() 具体做了什么

    func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { // 在操作时,如果有旧对象,需要先删除 if oldObj != nil { c.deleteFromIndices(oldObj, key) } // 先对整个indexer遍历,拿到index name与 index function for name, indexFunc := range c.indexers { // 通过index function,计算出对象的indexed name indexValues, err := indexFunc(newObj) if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } // 接下来通过遍历的index name 拿到这个index的对象 index := c.indices[name] if index == nil { // 确认这个index是否存在, index = Index{} // 如果不存在将一个Index{}初始化 c.indices[name] = index } // 通过计算出的indexed name来拿到对应的 set of object for _, indexValue := range indexValues { set := index[indexValue] if set == nil { // 如果这个set不存在,则初始化这个set set = sets.String{} index[indexValue] = set } set.Insert(key) // 然后将key插入set中 } } }

    那么通过上面可以了解到了 updateIndices 的逻辑,那么通过对更新函数分析来看看他具体做了什么?这里是add函数,通过一段代码模拟操作来熟悉结构

    testIndexer := "testIndexer" testIndex := "testIndex" indexers := cache.Indexers{ testIndexer: func(obj interface{}) (strings []string, e error) { indexes := []string{testIndex} // index的名词 return indexes, nil }, } indices := cache.Indices{} store := cache.NewThreadSafeStore(indexers, indices) fmt.Printf("%#v\n", store.GetIndexers()) store.Add("retain", "pod--1") store.Add("delete", "pod--2") store.Update("retain", "pod-3") //lists := store.Update("retain", "pod-3") lists := store.List() for _, item := range lists { fmt.Println(item) }

    这里是对add操作以及对updateIndices() 进行操作

    // threadSafe.go func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] // 这个item就是存储object的地方, 为空 c.items[key] = obj // 这里已经添加了新的值 c.updateIndices(oldObject, obj, key) // 转至updateIndices } // updateIndices func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { // 就当是新创建的,这里是空的忽略 if oldObj != nil { c.deleteFromIndices(oldObj, key) } // 这个时候拿到的就是 name=testKey function=testIndexer for name, indexFunc := range c.indexers { // 通过testIndexer对testKey计算出的结果是 []string{testIndexer} indexValues, err := indexFunc(newObj) if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } index := c.indices[name] if index == nil { index = Index{} // 因为假设为空了,故到这里c.indices[testIndexer]= Index{} c.indices[name] = index } for _, indexValue := range indexValues { // indexValue=testIndexer // set := c.index[name] = c.indices[testIndexer]Index{} set := index[indexValue] if set == nil { set = sets.String{} index[indexValue] = set } set.Insert(key) // 到这里就为set=indices[testIndexer]Index{} } } }

    总结一下,到这里,可以很明显的看出来,indexer中的三个概念是什么了,前面如果没有看明白话

    • Index:通过indexer计算出key的名称,值为对应obj的一个集合,可以理解为索引的数据结构
      • 比如说 Pod:{"nginx-pod1": v1.Pod{Name:Nginx}}
    • Indexers :这个很简单,就是,对于Index中如何计算每个key的名称;可以理解为分词器,索引的过程
    • Indices 通过Index 名词拿到对应的对象,是Index的集合;是将原始数据Item做了一个索引,可以理解为做索引的具体字段
      • 比如说 Indices["Pod"]{"nginx-pod1": v1.Pod{Name:Nginx}, "nginx-pod2": v1.Pod{Name:Nginx}}
    • Items:实际上存储的在Indices中的set.String{key:value} ,中的 key=value
      • 例如:Item:{"nginx-pod1": v1.Pod{Name:Nginx}, "coredns-depoyment": App.Deployment{Name:coredns}}
    删除操作

    对于删除操作,在最新版本中是使用了 updateIndices 就是 add update delete全都是相同的方法操作,对于旧版包含1.19- 是单独的一个操作

    // v1.2+ func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { c.updateIndices(obj, nil, key) delete(c.items, key) } } // v1.19- func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { c.deleteFromIndices(obj, key) delete(c.items, key) } } indexer使用

    上面了解了indexer概念,可以通过写代码来尝试使用一些indexer

    package main import ( "fmt" appsV1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" ) func main() { indexers := cache.Indexers{ "getDeplyment": func(obj interface{}) (strings []string, e error) { d, ok := obj.(*appsV1.Deployment) if !ok { return []string{}, nil } return []string{d.Name}, nil }, "getDaemonset": func(obj interface{}) (strings []string, e error) { d, ok := obj.(*appsV1.DaemonSet) if !ok { return []string{}, nil } return []string{d.Name}, nil }, } // 第一个参数是计算set内的key的名称 就是map[string]sets.String的这个strings的名称/namespace/resorcename // 第二个参数是计算index即外部的key的名称 indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers) deployment := &appsV1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: "nginx-deplyment", Namespace: "test", }, } daemonset := &appsV1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: "firewall-daemonset", Namespace: "test", }, } daemonset2 := &appsV1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: "etcd-daemonset", Namespace: "default", }, } indexer.Add(deployment) indexer.Add(daemonset) indexer.Add(daemonset2) // 第一个参数是索引器 // 第二个参数是所引起做索引的字段 lists, _ := indexer.ByIndex("getDaemonset", "etcd-daemonset") for _, item := range lists { switch item.(type) { case *appsV1.Deployment: fmt.Println(item.(*appsV1.Deployment).Name) case *appsV1.DaemonSet: fmt.Println(item.(*appsV1.DaemonSet).Name) } } }

本文共计4631个文字,预计阅读时间需要19分钟。

如何深入探讨Kubernetes的客户端实现原理?

从2016年8月起,Kubernetes官方提取了与Kubernetes相关的核心源代码,形成了一个独立的项目,即client-go,作为官方提供的Go客户端端。Kubernetes的部分代码也是基于这个项目开发的。

Prepare Introduction

从2016年8月起,Kubernetes官方提取了与Kubernetes相关的核心源代码,形成了一个独立的项目,即client-go,作为官方提供的go客户端。Kubernetes的部分代码也是基于这个项目的。

client-go 是kubernetes中广义的客户端基础库,在Kubernetes各个组件中或多或少都有使用其功能。。也就是说,client-go可以在kubernetes集群中添加、删除和查询资源对象(包括deployment、service、pod、ns等)。

在了解client-go前,还需要掌握一些概念

  • 在客户端验证 API
  • 使用证书和使用令牌,来验证客户端
  • kubernetes集群的访问模式
使用证书和令牌来验证客户端

在访问apiserver时,会对访问者进行鉴权,因为是10.0.0.4:6443/version \{ "major": "1", "minor": "18+", "gitVersion": "v1.18.20-dirty", "gitCommit": "1f3e19b7beb1cc0110255668c4238ed63dadb7ad", "gitTreeState": "dirty", "buildDate": "2022-05-17T12:45:14Z", "goVersion": "go1.16.15", "compiler": "gc", "platform": "linux/amd64" } $ curl -k 10.0.0.4:6443/api/v1/namespace/default/pods/netbox { "kind": "Status", "apiVersion": "v1", "metadata": { }, "status": "Failure", "message": "namespace \"default\" is forbidden: User \"system:anonymous\" cannot get resource \"namespace/pods\" in API group \"\" at the cluster scope", "reason": "Forbidden", "details": { "name": "default", "kind": "namespace" }, "code": 403 }

从错误中可以看出,该请求已通过身份验证,用户是 system:anonymous,但该用户未授权列出对应的资源。而上述请求只是忽略 curl 的10.0.0.4:6443/api/v1/namespaces/default/pods/netbox 使用serviceaccount验证客户端身份

使用一个serviceaccount JWT,获取一个SA的方式如下

kubectl get secrets \ $(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}') -o jsonpath='{.data.token}' \ | base64 --decode JWT=$(kubectl get secrets \ $(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}') -o jsonpath='{.data.token}' \ | base64 --decode)

使用secret来访问API

curl --cacert /etc/kubernetes/pki/ca.crt \ --header "Authorization: Bearer $JWT" \ 10.0.0.4:6443/apis/apps/v1/namespaces/default/deployments Pod内部调用Kubernetes API

kubernete会将Kubernetes API地址通过环境变量提供给 Pod,可以通过命令看到

$ env|grep -i kuber KUBERNETES_SERVICE_PORT=443 KUBERNETES_PORT=tcp://192.168.0.1:443 KUBERNETES_PORT_443_TCP_ADDR=192.168.0.1 KUBERNETES_PORT_443_TCP_PORT=443 KUBERNETES_PORT_443_TCP_PROTO=tcp KUBERNETES_PORT_443_TCP=tcp://192.168.0.1:443 KUBERNETES_SERVICE_PORT_HTTPS=443 KUBERNETES_SERVICE_HOST=192.168.0.1

并且还会在将 Kubernetes CA和SA等信息放置在目录 /var/run/secrets/kubernetes.io/serviceaccount/,通过这些就可以从Pod内部访问API

cd /var/run/secrets/kubernetes.io/serviceaccount/ curl --cacert ca.crt --header "Authorization: Bearer $(cat token)" $KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT/api/v1/namespaces/default/pods/netbox

Reference

Kubernetes API Reference Docs

client-go 关于client-go的模块 k8s.io/api

与Pods、ConfigMaps、Secrets和其他Kubernetes 对象所对应的数据结构都在,k8s.io/api,此包几乎没有算法,仅仅是数据机构,该模块有多达上千个用于描述Kubernetes中资源API的结构;通常被client,server,controller等其他的组件使用。

k8s.io/apimachinery

根据该库的描述文件可知,这个库是Server和Client中使用的Kubernetes API共享依赖库,也是kubernetes中更低一级的通用的数据结构。在我们构建自定义资源时,不需要为自定义结构创建属性,如 Kind, apiVersionname...,这些都是库 apimachinery 所提供的功能。

如,在包 k8s.io/apimachinery/pkg/apis/meta 定义了两个结构 TypeMetaObjectMeta;将这这两个结构嵌入自定义的结构中,可以以通用的方式兼容对象,如Kubernetes中的资源 Deplyment 也是这么完成的

通过图来了解Kubernetes的资源如何实现的

如在 k8s.io/apimachinery/pkg/runtime/interfaces.go 中定义了 interface,这个类为在schema中注册的API都需要实现这个结构

type Object interface { GetObjectKind() schema.ObjectKind DeepCopyObject() Object }

非结构化数据

非结构化数据 Unstructured 是指在kubernete中允许将没有注册为Kubernetes API的对象,作为Json对象的方式进行操作,如,使用非结构化 Kubernetes 对象

desired := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "v1", "kind": "ConfigMap", "metadata": map[string]interface{}{ "namespace": namespace, "generateName": "crud-dynamic-simple-", }, "data": map[string]interface{}{ "foo": "bar", }, }, } 非结构化数据的转换

k8s.io/apimachinery/pkg/runtime.UnstructuredConverter 中,也提供了将非结构化数据转换为Kubernetes API注册过的结构,参考如何将非结构化对象转换为Kubernetes Object。

Reference

go types

install client-go

如何选择 client-go 的版本

​ 对于不同的kubernetes版本使用标签 v0.x.y 来表示对应的客户端版本。具体对应参考 client-go 。

​ 例如使用的kubernetes版本为 v1.18.20 则使用对应的标签 v0.x.y 来替换符合当前版本的客户端库。例如:

go get k8s.io/client-go@v0.18.10

官网中给出了client-go的兼容性矩阵,可以很明了的看出如何选择适用于自己kubernetes版本的对应的client-go

  • 表示 该版本的 client-go 与对应的 kubernetes版本功能完全一致
  • + client-go 具有 kubernetes apiserver中不具备的功能。
  • - Kubernetes apiserver 具有client-go 无法使用的功。

一般情况下,除了对应的版本号完全一致外,其他都存在 功能的+-

client-go 目录介绍

client-go的每一个目录都是一个go package

  • kubernetes 包含与Kubernetes API所通信的客户端集
  • discovery 用于发现kube-apiserver所支持的api
  • dynamic 包含了一个动态客户端,该客户端能够对kube-apiserver任意的API进行操作。
  • transport 提供了用于设置认证和启动链接的功能
  • tools/cache: 一些 low-level controller与一些数据结构如fifo,reflector等
structure of client-go
  • RestClient:是最基础的基础架构,其作用是将是使用了miro.medium.com/max/700/1*iI8uFsPRBY5m_g_WW4huMQ.png

    在代码中的注释可以看到一些信息,根据信息可以总结出

    • Delta FIFO 是一个生产者-消费者的队列,生产者是 Reflector,消费者是 Pop()
    • 与传统的FIFO有两点不同
      • Delta FIFO

    Delta FIFO也是实现了 Queue以及一些其他 interface 的类,

    type DeltaFIFO struct { lock sync.RWMutex // 一个读写锁,保证线程安全 cond sync.Cond items map[string]Deltas // 存放的类型是一个key[string] =》 value[Delta] 类型的数据 queue []string // 用于存储item的key,是一个fifo populated bool // populated 是用来标记首次被加入的数据是否被变动 initialPopulationCount int // 首次调用 replace() 的数量 keyFunc KeyFunc knownObjects KeyListerGetter // 这里为indexer closed bool // 代表已关闭 closedLock sync.Mutex emitDeltaTypeReplaced bool // 表示事件的类型,true为 replace(), false 为 sync() }

    那么delta的类型是,也就是说通常情况下,Delta为一个 string[runtime.object] 的对象

    type Delta struct { Type DeltaType // 这就是一个string Object interface{} // 之前API部分有了解到,API的类型大致为两类,runtime.Object和非结构化数据 }

    apimachinery/pkg/runtime/interfaces.go

    那么此时,已经明白了Delta FIFO的结构,为一个Delta的队列,整个结构如下

    第一步创建一个Delta FIFO

    现在版本中,对创建Delta FIFO是通过函数 NewDeltaFIFOWithOptions()

    如何深入探讨Kubernetes的客户端实现原理?

    func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.KeyFunction == nil { opts.KeyFunction = MetaNamespaceKeyFunc // 默认的计算key的方法 } f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f } queueActionLocked,Delta FIFO添加操作

    这里说下之前说道的,在追加时的操作 queueActionLocked ,如add update delete实际上走的都是这里

    func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) // 计算key if err != nil { return KeyError{obj, err} } // 把新数据添加到DeltaFIFO中,Detal就是 动作为key,对象为值 // item是DeltaFIFO中维护的一个 map[string]Deltas newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) // 去重,去重我们前面讨论过了 if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } // 不存在则添加 f.items[id] = newDeltas f.cond.Broadcast() } else { delete(f.items, id) // 这里走不到,因为添加更新等操作用newDelta是1 // 源码中也说要忽略这里 } return nil }

    在FIFO继承的Stroe的方法中,如,Add, Update等都是需要去重的,去重的操作是通过对比最后一个和倒数第二个值

    func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) ...

    在函数 dedupDeltas() 中实现的这个

    // re-listing and watching can deliver the same update multiple times in any order. This will combine the most recent two deltas if they are the same. func dedupDeltas(deltas Deltas) Deltas { n := len(deltas) if n < 2 { return deltas } a := &deltas[n-1] // 如 [1,2,3,4] a=4 b := &deltas[n-2] // b=3,这里两个值其实为事件 if out := isDup(a, b); out != nil { d := append(Deltas{}, deltas[:n-2]...) return append(d, *out) } return deltas }

    如果b对象的类型是 DeletedFinalStateUnknown 也会认为是一个旧对象被删除,这里在去重时也只是对删除的操作进行去重。

    // tools/cache/delta_fifo.go func isDup(a, b *Delta) *Delta { if out := isDeletionDup(a, b); out != nil { return out } // TODO: Detect other duplicate situations? Are there any? return nil } // keep the one with the most information if both are deletions. func isDeletionDup(a, b *Delta) *Delta { if b.Type != Deleted || a.Type != Deleted { return nil } // Do more sophisticated checks, or is this sufficient? if _, ok := b.Object.(DeletedFinalStateUnknown); ok { return a } return b }

    为什么需要去重?什么情况下需合并

    代码中开发者给我们留了一个TODO

    TODO: is there anything other than deletions that need deduping?

    • 取决于Detal FIFO 生产-消费延迟
      • 当在一个资源的创建时,其状态会频繁的更新,如 Creating,Runinng等,这个时候会出现大量写入FIFO中的数据,但是在消费端可能之前的并未消费完。
      • 在上面那种情况下,以及Kubernetes 声明式 API 的设计,其实多余的根本不关注,只需要最后一个动作如Running,这种情况下,多个内容可以合并为一个步骤
    • 然而在代码中,去重仅仅是在Delete状态生效,显然这不可用;那么结合这些得到:
      • 在一个工作时间窗口内,如果对于删除操作来说发生多次,与发生一次实际上没什么区别,可以去重
      • 但在更新于新增操作时,实际上在对于声明式 API 的设计个人感觉是完全可以做到去重操作。
        • 同一个时间窗口内多次操作,如更新,实际上Kubernetes应该只关注最终状态而不是命令式?
    Compute Key

    上面大概对一些Detal FIFO的逻辑进行了分析,那么对于Detal FIFO如何去计算,也就是说 MetaNamespaceKeyFunc ,这个是默认的KeyFunc,作用是计算Detals中的唯一key。

    func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { // 显示声明的则为这个值 return string(key), nil } meta, err := meta.Accessor(obj) // 那么使用Accessor,每一个资源都会实现这个Accessor if err != nil { return "", fmt.Errorf("object has no meta: %v", err) } if len(meta.GetNamespace()) > 0 { return meta.GetNamespace() + "/" + meta.GetName(), nil } return meta.GetName(), nil }

    ObjectMetaAccessor 每个Kubernetes资源都会实现这个对象,如Deployment

    // accessor interface type ObjectMetaAccessor interface { GetObjectMeta() Object } // 会被ObjectMeta所实现 func (obj *ObjectMeta) GetObjectMeta() Object { return obj } // 而每一个资源都会继承这个 ObjectMeta,如 ClusterRole type ClusterRole struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"protobuf:"bytes,1,opt,name=metadata"`

    那么这个Deltas的key则为集群类型的是资源本身的名字,namespace范围的则为 meta.GetNamespace() + "/" + meta.GetName(),可以在上面代码中看到,这样就可以给Detal生成了一个唯一的key

    keyof,用于计算对象的key

    func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { if d, ok := obj.(Deltas); ok { if len(d) == 0 { // 长度为0的时候是一个初始的类型 return "", KeyError{obj, ErrZeroLengthDeltasObject} } obj = d.Newest().Object // 用最新的一个对象,如果为空则是nil } if d, ok := obj.(DeletedFinalStateUnknown); ok { return d.Key, nil // 到了这里,之前提到过,是一个过期的值将会被删除 } return f.keyFunc(obj) // 调用具体的key计算函数 } Indexer

    indexer 在整个 client-go 架构中提供了一个具有线程安全的数据存储的对象存储功能;对于Indexer这里会分析下对应的架构及使用方法。

    client-go/tools/cache/index.go 中可以看到 indexer是一个实现了Store 的一个interface

    type Indexer interface { // 继承了store,拥有store的所有方法 Store // 返回indexname的obj的交集 Index(indexName string, obj interface{}) ([]interface{}, error) // 通过对 indexName,indexedValue与之相匹配的集合 IndexKeys(indexName, indexedValue string) ([]string, error) // 给定一个indexName 返回所有的indexed ListIndexFuncValues(indexName string) []string // 通过indexname,返回与indexedvalue相关的 obj ByIndex(indexName, indexedValue string) ([]interface{}, error) // 返回所有的indexer GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }

    实际上对他的实现是一个 cache,cache是一个KeyFunc与ThreadSafeStore实现的indexer,有名称可知具有线程安全的功能

    type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc }

    既然index继承了Store那么,也就是 ThreadSafeStore 必然实现了Store,这是一个基础保证

    type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error Resync() error // Resync is a no-op and is deprecated } // KeyFunc是一个生成key的函数,给一个对象,返回一个key值 type KeyFunc func(obj interface{}) (string, error)

    那么这个indexer structure可以通过图来很直观的看出来

    cache的结构

    cache中会出现三种数据结构,也可以成为三种名词,为 Index , Indexers , Indices

    type Index map[string]sets.String type Indexers map[string]IndexFunc type Indices map[string]Index

    可以看出:

    • Index 映射到对象,sets.String 也是在API中定义的数据类型 [string]Struct{}
    • Indexers 是这个 IndexIndexFunc , 是一个如何计算Index的keyname的函数
    • Indices 通过Index 名词拿到对应的对象

    这个名词的概念如下,通过图来了解会更加清晰

    从创建开始

    创建一个cache有两种方式,一种是指定indexer,一种是默认indexer

    // NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc) Store { return &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } } // NewIndexer returns an Indexer implemented simply with a map and a lock. func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } } 更新操作

    在indexer中的更新操作(诸如 add , update ),实际上操作的是 updateIndices, 通过在代码可以看出

    tools/cache/thread_safe_store.go 的 77行起,那么就来看下 updateIndices() 具体做了什么

    func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { // 在操作时,如果有旧对象,需要先删除 if oldObj != nil { c.deleteFromIndices(oldObj, key) } // 先对整个indexer遍历,拿到index name与 index function for name, indexFunc := range c.indexers { // 通过index function,计算出对象的indexed name indexValues, err := indexFunc(newObj) if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } // 接下来通过遍历的index name 拿到这个index的对象 index := c.indices[name] if index == nil { // 确认这个index是否存在, index = Index{} // 如果不存在将一个Index{}初始化 c.indices[name] = index } // 通过计算出的indexed name来拿到对应的 set of object for _, indexValue := range indexValues { set := index[indexValue] if set == nil { // 如果这个set不存在,则初始化这个set set = sets.String{} index[indexValue] = set } set.Insert(key) // 然后将key插入set中 } } }

    那么通过上面可以了解到了 updateIndices 的逻辑,那么通过对更新函数分析来看看他具体做了什么?这里是add函数,通过一段代码模拟操作来熟悉结构

    testIndexer := "testIndexer" testIndex := "testIndex" indexers := cache.Indexers{ testIndexer: func(obj interface{}) (strings []string, e error) { indexes := []string{testIndex} // index的名词 return indexes, nil }, } indices := cache.Indices{} store := cache.NewThreadSafeStore(indexers, indices) fmt.Printf("%#v\n", store.GetIndexers()) store.Add("retain", "pod--1") store.Add("delete", "pod--2") store.Update("retain", "pod-3") //lists := store.Update("retain", "pod-3") lists := store.List() for _, item := range lists { fmt.Println(item) }

    这里是对add操作以及对updateIndices() 进行操作

    // threadSafe.go func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] // 这个item就是存储object的地方, 为空 c.items[key] = obj // 这里已经添加了新的值 c.updateIndices(oldObject, obj, key) // 转至updateIndices } // updateIndices func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { // 就当是新创建的,这里是空的忽略 if oldObj != nil { c.deleteFromIndices(oldObj, key) } // 这个时候拿到的就是 name=testKey function=testIndexer for name, indexFunc := range c.indexers { // 通过testIndexer对testKey计算出的结果是 []string{testIndexer} indexValues, err := indexFunc(newObj) if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } index := c.indices[name] if index == nil { index = Index{} // 因为假设为空了,故到这里c.indices[testIndexer]= Index{} c.indices[name] = index } for _, indexValue := range indexValues { // indexValue=testIndexer // set := c.index[name] = c.indices[testIndexer]Index{} set := index[indexValue] if set == nil { set = sets.String{} index[indexValue] = set } set.Insert(key) // 到这里就为set=indices[testIndexer]Index{} } } }

    总结一下,到这里,可以很明显的看出来,indexer中的三个概念是什么了,前面如果没有看明白话

    • Index:通过indexer计算出key的名称,值为对应obj的一个集合,可以理解为索引的数据结构
      • 比如说 Pod:{"nginx-pod1": v1.Pod{Name:Nginx}}
    • Indexers :这个很简单,就是,对于Index中如何计算每个key的名称;可以理解为分词器,索引的过程
    • Indices 通过Index 名词拿到对应的对象,是Index的集合;是将原始数据Item做了一个索引,可以理解为做索引的具体字段
      • 比如说 Indices["Pod"]{"nginx-pod1": v1.Pod{Name:Nginx}, "nginx-pod2": v1.Pod{Name:Nginx}}
    • Items:实际上存储的在Indices中的set.String{key:value} ,中的 key=value
      • 例如:Item:{"nginx-pod1": v1.Pod{Name:Nginx}, "coredns-depoyment": App.Deployment{Name:coredns}}
    删除操作

    对于删除操作,在最新版本中是使用了 updateIndices 就是 add update delete全都是相同的方法操作,对于旧版包含1.19- 是单独的一个操作

    // v1.2+ func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { c.updateIndices(obj, nil, key) delete(c.items, key) } } // v1.19- func (c *threadSafeMap) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() if obj, exists := c.items[key]; exists { c.deleteFromIndices(obj, key) delete(c.items, key) } } indexer使用

    上面了解了indexer概念,可以通过写代码来尝试使用一些indexer

    package main import ( "fmt" appsV1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" ) func main() { indexers := cache.Indexers{ "getDeplyment": func(obj interface{}) (strings []string, e error) { d, ok := obj.(*appsV1.Deployment) if !ok { return []string{}, nil } return []string{d.Name}, nil }, "getDaemonset": func(obj interface{}) (strings []string, e error) { d, ok := obj.(*appsV1.DaemonSet) if !ok { return []string{}, nil } return []string{d.Name}, nil }, } // 第一个参数是计算set内的key的名称 就是map[string]sets.String的这个strings的名称/namespace/resorcename // 第二个参数是计算index即外部的key的名称 indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers) deployment := &appsV1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: "nginx-deplyment", Namespace: "test", }, } daemonset := &appsV1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: "firewall-daemonset", Namespace: "test", }, } daemonset2 := &appsV1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: "etcd-daemonset", Namespace: "default", }, } indexer.Add(deployment) indexer.Add(daemonset) indexer.Add(daemonset2) // 第一个参数是索引器 // 第二个参数是所引起做索引的字段 lists, _ := indexer.ByIndex("getDaemonset", "etcd-daemonset") for _, item := range lists { switch item.(type) { case *appsV1.Deployment: fmt.Println(item.(*appsV1.Deployment).Name) case *appsV1.DaemonSet: fmt.Println(item.(*appsV1.DaemonSet).Name) } } }