Kubernetes Kubelet 机制概述

距离上一篇文章,已经过去了将近9个月的时间,2021年第一篇文章,竟然是到8月份了,真没想到这个kubelet竟然拖了我这么长时间。研究api以及scheduler的日夜还历历在目,不知不觉就过了这么长时间,现在突然写起来,恍如隔世的感觉,这一方面说明kubelet相比其他组件确实要更复杂一些,另一方面说明最近这一段时间我有些懈怠了,感觉有50%的时间在忙其他事情,25%的时间在研究kubelet,然后25%的时间在懈怠。不过还好,经过这么长时间断断续续的研究,记了很多笔记,梳理清楚了其大致脉络,对kubelet有了一个比较全面的认知,尤其是跟框架有关的,比如CRI,CNI,CSI等各种Plugin机制,知道了这些框架的原理,不论是做插件开发还是运维,都能够按图索骥,快速找到问题所在,然后再深入到具体的细节中。

其实Kubernetes跟OpenStack在资源管理这个层面上非常类似,都需要涉及到最基础的计算、网络、存储以及各种外设这些资源的管理,在计算上,OpenStack是各种虚拟机,而Kubernetes是各种容器,而这两种计算形态的不同,从本质上决定了OpenStack和Kubernetes的不同,由于容器的易封装、轻量级的特点,逐渐演化出了云原生、微服务等新形式的业务形态,而虚拟机主要还是面向传统的业务形态,OpenStack中的Nova项目通过插件机制可以支持各种虚拟化方案,比如Qemu/KVM, Xen, HyperV, 甚至还有VMWare,当然最常用的还是KVM虚拟化方案,而Kubernetes则通过CRI协议对接各种容器方案,比如最常用的docker, cri-o,还有rkt, kata container等等;至于网络,Kubernetes本身并没有实现什么具体的网络方案,而是仅仅要求Pod之间网络是可以连通的,因此Kubernetes就依赖于第三方提供的网络方案,而第三方的网络方案通过CNI协议跟Container Runtime进行交互,这其实跟OpenStack也很类似,OpenStack的Neutron项目就抽象了二层三层的网络概念供虚拟机使用,而具体的实现则依赖于底层的SDN方案,通常一个成熟的SDN方案,既有面向IaaS的,也有面向PaaS的,他们都有对应的协议标准,所以可以在某一个网络方案上同时对这两者提供服务;至于存储,更是如此,一般存储都以三种形式体现:块存储,文件存储,以及对象存储,每一种形式的存储都有很多协议去实现,比如块存储就有FC、ISCSI、RBD等协议,文件存储有NFS、CephFS等,对象存储则主要是是S3或者是Swift,有的存储系统会同时提供这三种形式的存储,有的则专门只提供一种存储,OpenStack通过Cinder, Manila等项目对接多种存储后端提供不同的存储类型,而Kubernetes则依赖于CSI协议跟第三方存储进行交互。来看看Kubernetes通过各种接口协议跟外部资源整合的图:

所以从资源管理的角度来说,Kubernetes和OpenStack是存在某些功能上的重叠的,存在一定的竞争关系,Kubernetes完全可以在没有IaaS的环境下使用,直接部署在物理机上,但是两者的定位不同,Kubernetes更偏向于应用侧,侧重于怎么使用资源,而OpenStack等IaaS平台则更侧重于对底层各种硬件资源的统一管理,这在资源隔离上差别很明显,Kubernetes在网络和计算隔离上明显不如OpenStack等IaaS平台彻底,所以更通常的做法是将Kubernetes部署在IaaS平台上,甚至是跨多个云平台部署,充分利用IaaS平台的隔离性和弹性,这样Kubernetes作为IaaS平台的资源消费者而存在,不用去管底层硬件的复杂性和多样性,并且将IaaS平台的使用者由多变的人切换到固定的程序,这对IaaS平台来说会更具确定性和稳定性,所以这两者应该是合作共生的关系,而不是取代的关系,各自在各自的领域里做自己擅长的事情。

Kubernetes对各种资源的使用,则主要依赖于抽象出来的三种接口协议,即CRI, CNI和CSI,在Kubernetes经典的Controller-Loop模型中,kubelet是最终的动作执行者,它部署在每个worker节点,负责当前节点Pod相关的资源生命周期管理,通过这三个接口协议跟远端的资源服务提供者进行交互。通过CRI,向远端的计算资源提供者(容器运行时,Container Runtime)申请对应的容器资源,但是在创建容器之前,先要准备容器所在网络环境,即SandBox,所谓SandBox,其实就是网络命名空间,比如是一个network namespace或者是一个虚拟机,以及在其中的网络设备和相关的网络信息,而这些网络信息则是容器运行时(Container Runtime)通过CNI接口向远端的网络资源提供者申请的,包括IP地址,路由以及DNS等信息,将这些信息配置到网络命名空间中,SandBox就准备好了,然后就可以在其中创建容器了,在同一个SandBox中可以创建多个容器,它们共享同一个网络命名空间,这些就组成了所谓的Pod;Kubelet再通过CSI接口,向远端的存储资源提供者申请对应的存储资源,根据存储类型,可能需要挂载或者格式化成文件系统供Pod使用;这里面有点特殊的就是CNI,kubelet没有直接通过CNI跟网络资源提供者交互,而是由Container Runtime来做这件事,kubelet只需要通过CRI向Container Runtime发送请求,即可获得相关的网络信息。他们之间的关系如下图:

CRI和CSI这两者都是使用gRPC进行的远程过程调用,gRPC是一个高性能、开源、通用的RPC框架,由Google推出,基于HTTP2协议标准设计开发,默认采用Protocol Buffers数据序列化协议,支持多种开发语言,在gRPC客户端可以直接调用不同服务器上的远程程序,使用姿势看起来就像调用本地程序一样,很容易去构建分布式应用和服务。CRI和CSI都对应的提供了一些lib库,在这些库中定义好了客户端和服务端的接口,并且实现了客户端的相关代码逻辑,以及服务端的部分逻辑,作为客户端在使用CRI和CSI时,可以直接引用这些库,向对应的服务资源提供者发送rpc请求,作为服务端,可以引用这些库,更标准和快速的实现服务端的相关逻辑。至于CNI,它就不是通过gRPC的方式了,而是由很多二进制可执行文件组成的网络插件,被Container Runtime调用执行,每个网络插件对应的实现相关的网络功能,CNI也有对应的lib库,针对它的协议,封装了一些公共代码,可以用来方便构建自己的网络插件。

Kubelet实现对Pod以及各种外部资源的管理,主要依赖两个机制:一个是SyncLoop,一个是各种各样的Manager。在SyncLoop中,kubelet会从几个特定的事件来源处,获取到关于Pod的事件,比如通过informer机制从apiserver处获取到的Pod的增删改事件,这些事件触发kubelet根据Pod的期望状态对本节点的Pod做出相应操作,比如新建一个Pod,或者给Pod添加一个新的存储等等,除了apiserver的事件,还有每隔1秒获取到的定期执行sync的事件,周期性的sync事件确保Pod的实际状态跟期望状态是一致的,在Kubelet的实现中,每一个Pod都对应的建了一个worker线程,在该线程中处理对该Pod的更新操作,同一个Pod不能并发进行更新,但是不同Pod是可以并发进行操作的;而各种各样的Manager则负责各种对象以及资源的管理,它们互相配合,形成一个有机的整体,是kubelet各种功能的实现者,比如secretManager/configMapManager等,它们负责从apiserver处通过reflector机制将本节点Pod绑定的secret和configmap缓存到本地,containerManager负责管理container所需要使用到的资源,比如qos, cpu, memory, device等,statusManager负责Pod状态的持续维护,会周期性的将缓存中的pod status通过apiserver更新到数据库中,volumePluginManager负责管理内置(intree)和动态发现的(flexvolume dynamic)的存储插件,csi就是作为intree的一个plugin的形式存在的,volumeManager则是负责管理本节点上的pod/volume/node的attach和mount操作的,等等这些Manager就好比人体的各种器官一样,每个器官负责一个或多个功能,各种器官协调组成一个健康的个体。整体上看,kubelet的架构图如下:

SyncLoop负责Pod的增删改等操作,通过不断轮询,维护Pod这个主体跟期望状态一致,而各种Manager其实是一个个小的Loop,实现了跟Pod相关的某方面的功能,比如维护Pod在本地的缓存,以及Pod的状态的维护,Pod使用计算资源的维护,Pod使用存储资源的维护等等,这些相互配合,共同完成了kubelet完整的功能,所以,未来可能随着需求的变化,会不断有新的Manager被引入,旧的Manager被淘汰,但是总体的架构方式应该不会发生什么太大的变化。

下面梳理了下当前master分支,也就是1.21版本,SyncLoop的大致脉络,以及kubelet中各种Manager的作用简介:

SyncLoop脉络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
NewKubeletCommand()
* Run()
* run()
* PreInitRuntimeService()
* runDockershim()
* // DockerService继承自CRIService,包含RuntimeServiceServer 和 ImageServiceServer 两个Interface
* ds := dockershim.NewDockerService()
* // 创建了一个grpc.Server,并将DockerService注册到grpc.Server中
* dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
* dockerServer.Start()
* // 实现了grpc.Server的客户端
* kubeDeps.RemoteRuntimeService = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, ...)
* kubeDeps.RemoteImageService = remote.NewRemoteImageService(remoteImageEndpoint, ...)
* RunKubelet()
* k := createAndInitKubelet()
* NewMainKubelet()
* klet := &Kubelet{}
* runtime, err := kuberuntime.NewKubeGenericRuntimeManager()
* klet.containerRuntime = runtime
* klet.streamingRuntime = runtime
* klet.runner = runtime
* startKubelet(k)
* go k.Run(podCfg.Updates())
* go kl.cloudResourceSyncManager.Run(wait.NeverStop)
* go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
* go vm.volumePluginMgr.Run(stopCh) // start informer for CSIDriver
* go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
* go vm.reconciler.Run(stopCh)
* go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
* kl.registerWithAPIServer() // 向apiserver创建Node对象
* kl.updateNodeStatus()
* kl.tryUpdateNodeStatus(i)
* kl.setNodeStatus(node)
* defaultNodeStatusFuncs() // 更新node status的各个方面
* NodeAddress()
* MachineInfo()
* Images()
* .......
* updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
* go kl.fastStatusUpdateOnce()
* for{
* kl.updateRuntimeUp()
* kl.syncNodeStatus()
* }
* go kl.nodeLeaseController.Run(wait.NeverStop)
* go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
* 检查networkReady和runtimeReady,并且将状态设置到runtimeState中
* kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
* kl.cadvisor.Start()
* kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService)
* cm.cpuManager.Start()
* cm.memoryManager.Start()
* cm.setupNode(activePods) // container_manager_linux.go
* if CgroupsPerQOS
* cm.createNodeAllocatableCgroups() // node_container_manager_linux.go
* cm.qosContainerManager.Start(cm.getNodeAllocatableAbsolute, activePods) // qos_container_manager_linux.go
* go m.UpdateCgroups()
* cm.enforceNodeAllocatableCgroups() // node_container_manager_linux.go
* cm.deviceManager.Start()
* kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
* kl.containerLogManager.Start()
* kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
* kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
* go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
* err = kl.shutdownManager.Start()
* go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
* kl.statusManager.Start()
* kl.probeManager.Start()
* kl.runtimeClassManager.Start(wait.NeverStop)
* kl.pleg.Start()
* kl.syncLoop(updates, kl)
* for{
* kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
* HandlePodAdditions()
* HandlePodUpdates()
* HandlePodRemoves()
* HandlePodSyncs()
* // 1. Compute sandbox and container changes.
* // 2. Kill pod sandbox if necessary.
* // 3. Kill any containers that should not be running.
* // 4. Create sandbox if necessary.
* // 5. Create ephemeral containers.
* // 6. Create init containers.
* // 7. Create normal containers
* syncPod(o syncPodOptions) // kubelet.go, 在podWorkers中执行这个syncPod
* pcm := kl.containerManager.NewPodContainerManager()
* if CgroupsPerQOS
* return podContainerManagerImpl
* return podContainerManagerNoop
* kl.containerManager.UpdateQOSCgroups()
* cm.qosContainerManager.UpdateCgroups()
* pcm.EnsureExists(pod)
* kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
* // 1. Compute sandbox and container changes.
* podContainerChanges := m.computePodActions(pod, podStatus)
* // 2. Kill pod sandbox if necessary.
* m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
* m.runtimeService.StopPodSandbox(podSandbox.ID.ID)
* // 3. Kill any containers that should not be running.
* m.killContainer(pod,...)
* // 4. Create sandbox if necessary.
* // pod的网络是在建sandbox时建立的,sandbox可以理解成linux network namespace或者vm,即准备一个隔离环境
* m.createPodSandbox(pod, podContainerChanges.Attempt)
* m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
* // 5. Create ephemeral containers.
* doBackOff()
* startContainer() // kuberuntime_container.go
* // * pull the image
* m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
* // * create the container
* m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
* // * start the container
* m.runtimeService.StartContainer(containerID)
* // * run the post start lifecycle hooks (if applicable)
* m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
* // 6. Create init containers.
* doBackOff()
* startContainer() // kuberuntime_container.go
* // 7. Create normal containers.
* doBackOff()
* startContainer() // kuberuntime_container.go
* }
* go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints)
* go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
* go k.ListenAndServePodResources()

各种Manager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
* cloudResourceSyncManager
* pkg/kubelet/cloudresource/cloud_request_manager.go
* 从cloud provider周期性的同步instnace列表到本地
* secretManager
* pkg/kubelet/secret/secret_manager.go
* NewWatchingSecretManager
* 当一个pod在本节点注册时,会将该pod绑定的secret通过reflector机制缓存到本地
* configMapManager
* pkg/kubelet/configmap/configmap_manager.go
* NewWatchingConfigMapManager
* 当一个Pod在本节点注册时,会将该pod绑定的configmap通过reflector机制缓存到本地
* livenessManager
* pkg/kubelet/prober/results/results_manager.go
* Manager
* set/get某个container在本地缓存中的的liveness状态
* startupManager
* pkg/kubelet/prober/results/results_manager.go
* Manager
* set/get某个container在本地缓存中的startup缓存状态
* podCache
* pkg/kubelet/container/cache.go
* cache
* 各个pods的PodStatus缓存
* podManager
* pkg/kubelet/pod/pod_manager.go
* basicManager
* Kubelet relies on the pod manager as the source of truth for the desired state.
* 管理pod在本地的映射,从podUID或者是podFullName到pod的映射关系
* mirrorPod是static pod在apiserver中的代表对象,static pod就是从file, http等source创建的pod,不是从apiserver中直接创建的,这种pod会对应的在apiserver中创建一个mirror pod,跟static pod对应
* statusManager
* pkg/kubelet/status/status_manager.go
* manager
* pod status状态的管理,同时维护了一份本地的缓存,并且有一个周期性的sync任务,将缓存中的pod status跟通过apiserver跟数据库中的pod status进行同步
* 实现了GetPodStatus()、SetPodStatus()、SetContainerReadiness()等方法
* 这里有一个很经典的chan的用法,异步的实现缓存和数据库的同步
* resourceAnalyzer
* pkg/kubelet/server/stats/resource_analyzer.go
* resourceAnalyzer{fsResourceAnalyzer, SummaryProvider}
* SummaryProvider提供该node节点的cpu/内存/磁盘/网络/pods等信息,而fsResourceAnalyzer,则提供每个pod的volume信息,并且通过每个pod的周期性循环任务,将该节点上所有pod的volume信息缓存到本地statCache中
* dockerLegacyService
* DockerService
* runtimeService
* remoteRuntimeService,实现了k8s.io/cri-api项目中定义的RuntimeService接口,而它又是调用了k8s.io/cri-api中定义的runtimeServiceClient,它客户端实现了grpc的调用。
* grpc.Server的客户端,用来跟remote runtime service发送请求
* containerLogManager
* containerManager
* 路径:kubernetes/pkg/kubelet/cm/*
* 该manager并不是针对container本身的管理,而更多的是管理container所需要使用的资源的管理,比如qos, cpu, memory, device等
* cgroupManager
* 跟cgroup交互的manager,通过它来对cgroup进行更新,被qosContainerManager引用
* qosContainerManager
* 设置/kubepods.slice/kubepods-burstable.slice 和 /kubepods.slice/kubepods-besteffort.slice级别的cgroup
* besteffort级别的cpu.shares设置为固定值2,burstable级别的cpu.shares设置为该节点的所有pods的request cpu之和
* 此外还有hugepage, memory的设置,这里设置的是总的qos,并不是单个pod的
* 有一个周期性循环的任务在不断更新这两个cgroup
* podContainerManager
* 设置pod级别的cgroup,包括cpu, memory, hugepage, pids
* container级别的cgroup规则是由cri runtime下发的,kubelet没有直接下发
* cpuManager
* 可以为containder设置静态cpuset,即进行CPU绑定
* The static policy allows containers in Guaranteed pods with integer CPU requests access to exclusive CPUs on the node.
* https://kubernetes.io/docs/tasks/administer-cluster/cpu-management-policies/
* memoryManager
* 跟topologyManager一起配合使用
* deviceManager
* topologyManager
* 能够让container感知各种外设(gpu/sr-iov nic)和CPU,与NUMA node的关系,可以将设备绑定到同一个NUMA node上,以提高性能。
* https://kubernetes.io/blog/2020/04/01/kubernetes-1-18-feature-topoloy-manager-beta/
* containerRuntime
* kubeGenericRuntimeManager,实现了container/runtime.go中定义的Runtime接口
* Kubelet的Runtime Manager,通过runtimeService来向remote runtime进行交互,各个manager要跟runtime交互,都是通过该接口
* streamingRuntime
* runner
* runtimeCache
* runtimeCache {pods []*Pod}
* pod的本地缓存,从runtime里面拿到pods列表,更新到本地缓存中
* StatsProvider
* pleg(Pod Lifecycle Event Generator)
* GenericPLEG
* https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/pod-lifecycle-event-generator.md
* https://developers.redhat.com/blog/2019/11/13/pod-lifecycle-event-generator-understanding-the-pleg-is-not-healthy-issue-in-kubernetes/
* 它的作用是周期性从container runtime中获取到pods/containers列表,即relist,跟之前的状态进行比较,如果发生变化,则生成一个对应的event
* runtimeState
* 检查container runtime的storage/network/runtime的状态
* containerGC
* 清理不正常的container
* containerDeletor
* podContainerDeletor
* 通过channel的方式异步删除container,即外部调用deleteContainersInPod()方法,将待删除的container id放入到channel中,然后调用runtime中的DeleteContainer()方法,向runtime发送删除请求
* imageManager
* realImageGCManager
* 根据镜像占用的文件系统的百分比,删除没有用的镜像
* serverCertificateManager
* 动态rotate kubelet server certificate
* probeManager
* 针对每一个Pod中的每一个container的startup/readiness/liveness这三种probe,分别建一个周期性循环的worker任务,不断通过其定义的probe条件进行检查,将检查的结果更新到startupManager/readinessManager/livenessManager中,这三个Manager是同一个类型的Manager,通过其Set()方法,将probe的结果放到updates channel中,probeManager周期性的从readniessManager和startupManager的Updates channel中读取result,然后通过statusManager的SetContainerReadiness()和SetContainerStartup()方法将结果同步到statusManager中,然后statusManager再将结果异步同步到apiserver
* 以上的逻辑涉及到probeManager,statusManager, startupManager, readniessManager, livenessManager这几个manager的相互协作。
* tokenManager
* 获取ServiceAccountToken,缓存到本地,通过GetServiceAccountToken()先从cache中找token,如果过期了,则从TokenRequest API中重新获取新的token
* volumePluginMgr
* VolumePluginMgr
* plugins map[string]VolumePlugin
* probedPlugins map[string]VolumePlugin
* 管理intree和flexvolume dynamic的VolumePlugin,csi也是作为intree的一个plugin的形式存在的,所谓管理就是自动发现,注册,查找VolumePlugin。
* 在volumeManager中,会根据各种条件查找注册到volumePluginMgr中的VolumePlugin
* flexvolume动态发现插件的默认目录:/usr/libexec/kubernetes/kubelet-plugins/volume/exec/,由配置项VolumePluginDir进行配置
* pluginManager
* 主要是来注册CSIPlugin和DevicePlugin
* 这里面主要有两个loop: desiredStateOfWorldPopulator 和 reconciler
* 前者是通过fsnotify watch机制从插件目录发现csi的socket文件,默认路径在/var/lib/kubelet/plugins_registry/,然后将其信息添加到desiredStateOfWorld结构中;
* 后者会去对比actualStateOfWorld 和 desiredStateOfWorld中记录的插件注册的情况,desiredStateOfWorld是全部期望注册的插件,而actualStateOfWorld则是全部已经注册的插件,如果没注册的,则会调用operationExecutor去注册,如果需要插件已经没删除,则调用operationExecutor去删除注册;
* operationExecutor是用来执行注册方法的执行器,本质上就是通过goroutine去执行注册方法,而operationGenerator则是注册方法生成器,在该注册方法中,首先通过该socket建立了一个grpc的客户端,通过向该socket发送grpc请求,即client.GetInfo(),获取到该CSI插件的信息,根据该插件的种类(CSIPlugin或者是DevicePlugin),来调用相应的handler,来进一步进行注册,首先要handler.ValidatePlugin(),然后handler.RegisterPlugin(),handler是在服务启动时,添加到pluginManager中的。
* 如果是CSIPlugin的话,其注册流程大致如下:
* 主要代码逻辑在 kubernetes/pkg/volume/csi/ 路径下
* 首先根据插件的socket文件,初始化一个csi的grpc client,用来跟csi server进行交互
* csi rpc client又引用了container-storage-interface项目中定义的csi protobuffer协议的接口
* 发送csi.NodeGetInfo() rpc请求,获取到本节点的相关信息 //NodeGetInfo()即是CSI规范定义的接口
* 接下来,通过nim,即nodeInfoManager(这个是在volumePluginMgr在进行插件初始化的时候实例化的),继续进行注册,主要分为两步:
* 更新本节点的Node对象,添加csi相关的annotation和labels
* 创建或者更新本节点对应的CSINode对象,里面包含了该node的CSI插件信息,主要是包含插件的名字
* volumeManager
* 是用来管理本node上的pod/volume/node的attach 和 mount 操作的
* DesiredStateOfWorldPopulator 周期性的从podManager中获取本node的Pod列表,然后遍历pod列表,获取到每个pod的Volumes,遍历每个volume,获取到详细的信息,然后添加到desiredStateOfWorld中,desiredStateOfWorld用以下的数据结构记录本节点的所有pod的所有volume信息,包括该volume是否可挂载,可mount,以及所属的pod,而且某个volume可能属于多个pod
* desiredStateOfWorld
* volumesToMount map[v1.UniqueVolumeName]volumeToMount
* volumePluginMgr *volume.VolumePluginMgr
* volumeToMount
* volumeName v1.UniqueVolumeName
* podsToMount map[types.UniquePodName]podToMount
* pluginIsAttachable bool
* pluginIsDeviceMountable bool
* volumeGidValue string
* podToMount
* podName types.UniquePodName
* pod *v1.Pod
* volumeSpec *volume.Spec
* OperatorGenerator是从volume对应的VolumePlugin中获取到对应的AttachVolume/MountVolume等具体实现方法
* OperatorExecutor会在goroutine中调用OperatorGenerator中的方法去执行具体的动作
* reconciler会周期性的从desiredStateOfWorld中获取到需要进行Attach或者Mount的Volume,然后调用OperatorExecutor来执行具体的Attach/Mount操作
* rc.unmountVolumes()
* // Filesystem volume case
* volumePlugin, err := og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName)
* volumeUnmounter, newUnmounterErr :=volumePlugin.NewUnmounter()
* unmountErr := volumeUnmounter.TearDown()
* // Block volume case
* blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginByName(volumeToUnmount.PluginName)
* blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper()
* customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper)
* unmapErr = customBlockVolumeUnmapper.UnmapPodDevice()
* rc.mountAttachVolumes()
* // if volume is not attached
* attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
* volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
* devicePath, attachErr := volumeAttacher.Attach()
* // if volume is not mounted
* // Filesystem volume case
* volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
* volumeMounter, newMounterErr := volumePlugin.NewMounter()
* attachableVolumePlugin, _ := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
* volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
* deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
* volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
* devicePath, err = volumeAttacher.WaitForAttach()
* err = volumeDeviceMounter.MountDevice()
* mountErr := volumeMounter.SetUp()
* // Block volume case
* blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec)
* blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper()
* attachableVolumePlugin, _ := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
* volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
* devicePath, err = volumeAttacher.WaitForAttach()
* customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper);
* stagingPath, mapErr = customBlockVolumeMapper.SetUpDevice()
* pluginDevicePath, mapErr := customBlockVolumeMapper.MapPodDevice()
* rc.unmountDetachDevices()
* 所以,具体的Attach/Mount逻辑是在对应的VolumePlugin中实现的
* podWorkers
* klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
* podUpdates map[types.UID]chan UpdatePodOptions // 每个pod对应了一个chan,里面保存了针对该pod的更新选项
* 每一个pod有一个loop,当有对该pod的更新操作时,该loop就会被触发执行syncPod()方法,每个pod同时只能有一个syncPod()动作在执行
* podKiller
* 删除pod
* evictionManager
* Eviction,就是当节点的内存,磁盘,pid这几个资源压力大时,会选择性的将一些Pod kill掉,以保持资源足够维持系统稳定
* admitHandlers
* 在kubelet创建pod时,会依次调用admit handler去检查该pod是否符合创建条件,如果不符合的话,则会返回相应的错误信息
* softAdmitHandlers
* softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is run.
* A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely.
* nodeLeaseController
* node的心跳机制,每个node都在apiserver中创建了一个Lease对象,每隔10秒,kubelet就会更新它对应的Lease对象,类似于租约续期
* 除了Lease这种心跳机制,还有NodeStatus,kubelet也会周期性的更新NodeStatus,不过这个间稍长,默认是5分钟
* shutdownManager
作者

hackerain

发布于

2021-08-15

更新于

2023-03-11

许可协议