Kubernetes controller-runtime 介绍

我们在做CRD开发时,除了要写CRD定义之外,最重要的是实现CRD对应的Controller,这样CRD才能真正有用,而不论什么CRD,它的Controller的逻辑框架是大致一样的,主要就是监听CRD资源的变化事件,然后触发Reconcile逻辑去执行对应的动作,确保实际状态跟CRD的定义状态保持一致,此外,还有一些其他通用功能,比如监控、选主、Wehbook等,这种开发范式现在称之为Operator,在万物皆可CRD的云原生时代,这种通用需求早已被剥离出来,成为单独的第三方库,即controller-runtime,而operator-sdk也封装的是 controller-runtime,方便进行operator的开发,可见其重要性,本篇文章就对 controller-runtime 的概念、原理以及核心逻辑进行下介绍。

我们就以 kubebuilder 上的这个controller-runtime的架构图来说吧:

Manager

顾名思义,Manager就是起到一个管理的作用,是一个集大成者,它管理的范围包括:高可用(HA,即主备模式的leader election)、监控、用来做Admission的Webhook、针对API资源(i.e. CRD)的Controller、以及用来跟Kubernetes集群交互的Client和Cache,还有服务的启停等等,我们来看看它的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// pkg/manager/internal.go

type controllerManager struct {
sync.Mutex
started bool

stopProcedureEngaged *int64
errChan chan error
runnables *runnables

// cluster holds a variety of methods to interact with a cluster. Required.
cluster cluster.Cluster
......
}

Manager属性有很多,这里我们重点关注两个属性:runnables 和 cluster.

runnables

Manager将所有需要长期运行的任务抽象出来一个叫做 Runnable 的概念,下面为它的接口定义:

1
2
3
4
5
6
7
8
// pkg/manager/manager.go

type Runnable interface {
// Start starts running the component. The component will stop running
// when the context is closed. Start blocks until the context is closed or
// an error occurs.
Start(context.Context) error
}

很简单,即实现了Start()方法的结构体即是一个Runnable,而且这个方法是需要阻塞住的,直到服务停止,即Runnable是一些需要长期运行的任务,比如Webhook ServerController等都实现了Start()方法,通过这层抽象,方便Manager对这些任务进行统一管理。而Manager中的属性runnables是分了好几个类,并且通过组的形式对这些Runnable进行管理:

1
2
3
4
5
6
7
8
9
10
11
12
// pkg/manager/runnable_group.go

type runnables struct {
Webhooks *runnableGroup
Caches *runnableGroup
LeaderElection *runnableGroup
Others *runnableGroup
}

type runnableGroup struct {}
func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {}
func (r *runnableGroup) Start(ctx context.Context) error {}

可以看到分了 Webhooks, Caches, LeaderElection, Others 4个组,runnableGroup提供了Add()方法向组中添加Runnable,提供了Start()方法去启动组内所有Runnable,而Manager也提供了Add()方法,向runnables中添加runnable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// pkg/manager/internal.go

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.Lock()
defer cm.Unlock()
return cm.add(r)
}

func (cm *controllerManager) add(r Runnable) error {
// Set dependencies on the object
if err := cm.SetFields(r); err != nil {
return err
}
return cm.runnables.Add(r)
}

最后在Manager启动的时候,会分别启动runnables中的runnable组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// pkg/manager/internal.go

func (cm *controllerManager) Start(ctx context.Context) (err error) {
......

if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}

// Start and wait for caches.
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}

// Start the non-leaderelection Runnables after the cache has synced.
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}

......
}

cluster

Manager又抽象出来一个概念,叫做cluster,它封装了跟一个Kubernetes集群进行交互的逻辑,我们来看下它的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// pkg/cluster/internal.go

type cluster struct {
// config is the rest.config used to talk to the apiserver. Required.
config *rest.Config

// scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults
// to scheme.scheme.
scheme *runtime.Scheme

cache cache.Cache

// client is the client injected into Controllers (and EventHandlers, Sources and Predicates).
client client.Client

// apiReader is the reader that will make requests to the api server and not the cache.
apiReader client.Reader
......
}

cluster本身没做什么事情,它主要依赖两个属性:client和cache,cluster只是对他们的简单封装。

cache

cache是依赖于Informer机制为API资源构建的缓存机制,即通过List&Watch机制,将所关心的API资源缓存到本地,这样在Controller中去List或者Get某个资源对象时,可以直接从缓存拿数据,提高性能。下面来看下这个cache的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// pkg/cache/informer_cache.go
type informerCache struct {
*internal.InformersMap
}

// pkg/cache/internal/deleg_map.go
type InformersMap struct {
structured *specificInformersMap
unstructured *specificInformersMap
metadata *specificInformersMap

Scheme *runtime.Scheme
}

// pkg/cache/internal/informers_map.go
type specificInformersMap struct {
......

// informersByGVK is the cache of informers keyed by groupVersionKind
informersByGVK map[schema.GroupVersionKind]*MapEntry

......
}

type MapEntry struct {
// Informer is the cached informer
Informer cache.SharedIndexInformer

// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader
}

可以看到这个cache本质上就是几个Informer的Map,分成了structured, unstructured, metadata这三类,而map的value,即是client-go中的 SharedIndexInformer,这里稍微解释下structured和unstructured,它们其实还有另外两个名字,叫typed和untyped,typed是在scheme中注册过的已知的资源类型,比如Pod这种,而untyped/unstructured则表示未知的类型,或者叫做通用的类型,可以用它来表示任何类型,apimachinery中有专门的类型 Unstructured 来表示它,并且提供了丰富的方法对其进行操作。

informerCache提供了Get和List方法用来从缓存中获取对象,实际上是调用的对应的Informer对象的Indexer接口从Informer的缓存中拿的数据:

1
2
3
4
// pkg/cache/informer_cache.go

func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error {}
func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error {}

需要注意的是,缓存的构建,即针对某类资源(GVK)的Informer,是以lazy的方式构建的,即在GET时,如果map中没有对应的Informer才去创建,这样避免去创建一些无用的informer,浪费资源。

client

这里的client是通过cluster的options.NewClient()创建出来的,即用什么方法创建client是可以配置的,默认为下面的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// pkg/cluster/cluster.go

// DefaultNewClient creates the default caching client, that will never cache Unstructured.
func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
return ClientBuilderWithOptions(ClientOptions{})(cache, config, options, uncachedObjects...)
}

// ClientBuilderWithOptions returns a Client constructor that will build a client
// honoring the options argument
func ClientBuilderWithOptions(options ClientOptions) NewClientFunc {
return func(cache cache.Cache, config *rest.Config, clientOpts client.Options, uncachedObjects ...client.Object) (client.Client, error) {
options.UncachedObjects = append(options.UncachedObjects, uncachedObjects...)

c, err := client.New(config, clientOpts)
if err != nil {
return nil, err
}

return client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cache,
Client: c,
UncachedObjects: options.UncachedObjects,
CacheUnstructured: options.CacheUnstructured,
})
}
}

这里先通过 client.New() 创建出来一个client,这个client是直接跟apiserver交互的client,然后再结合上一小节中的cache,分别赋值给CacheReader和Client属性,最终创建出来一个 delegatingClient,它就是一个具备缓存能力的client了,如果是需要直接跟apiserver交互的话,不走缓存的话,就调用Client属性,如果是需要从缓存中读取数据的话,就调用CacheReader属性,并且还可以选择配置哪些对象不缓存,以及是否缓存Unstructured的对象。

cluster也实现了Runnable接口的Start()方法,所以它也是一个runnable,在Manager启动时,会将cluster也添加到它的runnables里面去,cluster的Start()做的事情就是去启动它里面cache中的各个informer,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// pkg/manager/internal.go

func (cm *controllerManager) Start(ctx context.Context) (err error) {
......

// Add the cluster runnable.
if err := cm.add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}

......
}

// pkg/cluster/internal.go

func (c *cluster) Start(ctx context.Context) error {
defer c.recorderProvider.Stop(ctx)
return c.cache.Start(ctx)
}

以上就是Manager中的核心概念了,抽象出来runnable, cluster, client, cache等概念,其中最重要的是依赖informer机制构造的cache,以及依赖cache构造的client。

Manager构造的cache, client等,是要被runnable使用的,所以在向Manager添加runnable时,这些属性会被注入到runnable中,Manager提供了SetFields()方法来做这件事:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// pkg/manager/internal.go

func (cm *controllerManager) add(r Runnable) error {
// Set dependencies on the object
if err := cm.SetFields(r); err != nil {
return err
}
return cm.runnables.Add(r)
}

// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
func (cm *controllerManager) SetFields(i interface{}) error {
if err := cm.cluster.SetFields(i); err != nil {
return err
}
......
return nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// pkg/cluster/internal.go

func (c *cluster) SetFields(i interface{}) error {
if _, err := inject.ConfigInto(c.config, i); err != nil {
return err
}
if _, err := inject.ClientInto(c.client, i); err != nil {
return err
}
if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
return err
}
if _, err := inject.SchemeInto(c.scheme, i); err != nil {
return err
}
if _, err := inject.CacheInto(c.cache, i); err != nil {
return err
}
if _, err := inject.MapperInto(c.mapper, i); err != nil {
return err
}
return nil
}

Controller

Controller就是我们常说的控制器了,本质上它还是依赖于Informer机制的List&Watch功能,能够及时获知到api对象的变化事件,然后触发注册到informer中的事件处理回调函数,去做对应的动作,只不过这里的Controller又抽象出来三个概念,即上图中的event, predicate, reconciler,我们来看下它的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// pkg/internal/controller/controller.go

type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string

// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int

// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler

// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func() workqueue.RateLimitingInterface

// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue.RateLimitingInterface

// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
// Deprecated: the caller should handle injected fields itself.
SetFields func(i interface{}) error

......

// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription
......
}

// watchDescription contains all the information necessary to start a watch.
type watchDescription struct {
src source.Source
handler handler.EventHandler
predicates []predicate.Predicate
}

先来看下watchDescription这个结构体,它里面包含了Source,EventHandler以及Predicates列表,source表示事件的来源,可以是集群内事件,即API对象的增删改事件,也可以是集群外事件,比如Github Webhook回调,EventHandler是事件回调函数,当有事件发生时,被触发执行的函数,Predicate是事件过滤器,用于过滤出有用的事件。

然后是属性Do,就是reconciler,即实际的动作执行者,属性Queue是一个限速队列,用来存放从Informer中接收到的事件,startWatches是一个watchDescription列表,说明一个controller可以同时对多个source进行处理,只是reconciler只能是一个统一的处理方法。

它们之间的关系是:首先将事件回调handler注册到对应的informer中,然后通过list&watch机制收到关于某一类资源的增删改事件,当有事件发生时,对应的handler方法会被触发执行,这些事件有些可能没有用或者controller不关心的,可以通过定义predicates来进行过滤,过滤完的事件,会放到队列Queue中,这个队列是带限速功能的,controller会有worker线程,消费这个队列,从队列中拿到事件,交给reconciler来处理,即去执行最终的动作。

Source

我们来看下这个source,它其实是一个接口,定义如下:

1
2
3
4
5
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

只有一个Start()方法,如注释所说,它做的事情就是注册EventHandler到Informer中,即向Informer中注册事件回调函数,可以看到这个方法除了事件回调函数之外,还有workqueue以及predicate两个参数,这个workqueue就是上面提到的带限速功能的Queue。

实现了该接口的有两个结构体:Kind和Channel,Kind是用来处理集群内的事件,比如Pod, Deployment的增删改,而Channel则是用来处理集群外的事件,比如Github的Webhook回调,我们主要来看下Kind的定义以及它的Start()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// pkg/source/source.go

type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type client.Object

// cache used to watch APIs
cache cache.Cache

// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
started chan error
startCancel func()
}

func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
......
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
var (
i cache.Informer
lastErr error
)

// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
......
return true, nil
});
......
_, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
......
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()

return nil
}

Kind中有Type和cache两个属性,Type就是我们要监听的某个资源类型,cache就是上文中提到的cache,里面是API资源的Informer Map,看它的Start()逻辑,跟我们所说的一样,就是从cache中拿到对应类型的Informer,然后向其中注册EventHandler。

EventHandler

再来看下这个EventHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// pkg/source/internal/eventsource.go

type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
}

func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}

// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}

for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}

// Invoke create handler
e.EventHandler.Create(c, e.Queue)
}
func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {}
func (e EventHandler) OnDelete(obj interface{}) {}

可以看到这个EventHandler有三个方法,分别对应增删改的三种事件,比如监听到该对象的新增事件时,OnAdd()函数就会被触发,将对应对象obj作为参数传进来,然后会经过predicates的过滤,最终调用Kind.Start()传进来的EventHandler的Create()方法,去进一步处理,这里EventHandler中的EventHandler它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// pkg/handler/eventhandler.go

type EventHandler interface {
// Create is called in response to an create event - e.g. Pod Creation.
Create(event.CreateEvent, workqueue.RateLimitingInterface)

// Update is called in response to an update event - e.g. Pod Updated.
Update(event.UpdateEvent, workqueue.RateLimitingInterface)

// Delete is called in response to a delete event - e.g. Pod Deleted.
Delete(event.DeleteEvent, workqueue.RateLimitingInterface)

// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
Generic(event.GenericEvent, workqueue.RateLimitingInterface)
}

是对不同事件的对应处理方法,这个事件回调函数,就需要用户自己定义了,controller-runtime也内置了一个Handler,用来将事件转换成Request对象,然后添加到队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// pkg/handler/enqueue.go

type EnqueueRequestForObject struct{}

func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
......

你也可以定义自己的EventHandler,或者封装EnqueueRequestForObject去扩展自己的EventHandler,比如除了入队列之外,还添加上日志等等。

Predicate

再来看下 Predicate,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// pkg/predicate/predicate.go

type Predicate interface {
// Create returns true if the Create event should be processed
Create(event.CreateEvent) bool

// Delete returns true if the Delete event should be processed
Delete(event.DeleteEvent) bool

// Update returns true if the Update event should be processed
Update(event.UpdateEvent) bool

// Generic returns true if the Generic event should be processed
Generic(event.GenericEvent) bool
}

定义了针对增删改事件的过滤方法,需要知道的是我们上面传的predicates列表中的predicate之间的关系是可以有or, and, not等逻辑关系的,比如下例:

1
2
3
predicates := []ctrlpredicate.Predicate{
ctrlpredicate.Or(ctrlpredicate.GenerationChangedPredicate{}, libpredicate.NoGenerationPredicate{}),
}

两个predicates之间是“或”的关系。

然后,Controller对外提供了Watch()方法,用来向Controller中注册Souce, EventHandler, Predicates:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// pkg/internal/controller/controller.go

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()

// Inject Cache into arguments
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}

// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}

c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

可以看到如果该Controller还没有启动,则将source, eventhandler, predicates组成watchDescription对象,然后添加到startWatches列表中,随后在Controller启动时,会遍历startWatches中的source进行启动,如果Controller已经启动,则会直接启动source,而所谓启动source,即上面source小节说的,向Informer中注册EventHandler。

最后来看下Controller中的启动逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
// pkg/internal/controller/controller.go

func (c *Controller) Start(ctx context.Context) error {
......
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))

if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
......
}

Reconciler

Controller在启动时,会起若干Reconcile的线程用来监听消息队列,然后触发Reconcile逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// pkg/internal/controller/controller.go

func (c *Controller) Start(ctx context.Context) error {
......
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}()
}

c.Started = true
......
}

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}

// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)

ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

c.reconcileHandler(ctx, obj)
return true
}

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// Make sure that the object is a valid request.
req, ok := obj.(reconcile.Request)
......
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
// resource to be synced.
result, err := c.Reconcile(ctx, req)
......
}

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
......
return c.Do.Reconcile(ctx, req)
}

Reconciler的接口定义如下:

1
2
3
4
5
6
7
8
// pkg/reconcile/reconcile.go

type Reconciler interface {
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
Reconcile(context.Context, Request) (Result, error)
}

CRD的开发者,最主要的任务就是去开发这个Reconcile()逻辑了,定义好Reconciler之后,会在创建Controller时,传递进来,最终保存到Controller.Do变量中。

Builder

为了方便用户创建Controller,controller-runtime还提供了一个Builder结构体以及相关方法用来方便的构建Controller以及向其中注册监听对象(Source)和事件回调函数(EventHandler),其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
// pkg/builder/controller.go

type Builder struct {
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
name string
}

forInput记录要监听哪个CRD,ownsInput记录这个CRD拥有哪些对象,比如deployment, service等,是否要监听该CRD直接拥有的对象的相关事件,watchesInput表示是否要直接监听某些source和eventhandler,是一个low-level的注册事件的方法,一般优先用前两个,Builder对应的提供了For(), Owns(), Watches()方法,用来进行设置这三个属性,例如For():

1
2
3
4
5
6
7
8
9
10
11
// pkg/builder/controller.go

func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
input := ForInput{object: object}
for _, opt := range opts {
opt.ApplyToFor(&input)
}

blder.forInput = input
return blder
}

接下来就是创建Controller以及对上面的事件源调用Watch了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// pkg/builder/controller.go
func (blder *Builder) Complete(r reconcile.Reconciler) error {
_, err := blder.Build(r)
return err
}

func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
......
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}

// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}

return blder.ctrl, nil
}


func (blder *Builder) doController(r reconcile.Reconciler) error {
globalOpts := blder.mgr.GetControllerOptions()

ctrlOptions := blder.ctrlOptions
if ctrlOptions.Reconciler == nil {
ctrlOptions.Reconciler = r
}

......

blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
}

func (blder *Builder) doWatch() error {
// Reconcile type
if blder.forInput.object != nil {
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}

......
}

这样,开发者只需要创建一个Builder,然后指定要监听的对象,Builder就会自动去创建Controller, Source, EventHandler, Predicate等,然后对这些对象进行Watch,开发者只需要专注于编写Reconcile逻辑就可以了。

总结

以上大致过了下Controller相关的概念和逻辑,我们来总结下,Manager起管理的作用,并且为运行在其中的runnable提供跟Kubernetes集群交互的Client和Cache,Controller则依赖Informer机制对事件源进行监听,注册事件处理回调函数,触发Reconcile逻辑进行业务处理,Controller对外暴露了Watch()接口用来向Controller中注册事件回调,开发者需要开发的主要就是Reconcile逻辑,其他基本上都由controller-runtime封装好了,我们来简单举个例子,看下controller-runtime典型的应用场景:

首先,要创建manager:

1
mgr, err := manager.New(cfg, options)

其次,定义Reconciler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

// AnsibleOperatorReconciler - object to reconcile runner requests
type AnsibleOperatorReconciler struct {
GVK schema.GroupVersionKind
Runner runner.Runner
Client client.Client
APIReader client.Reader
EventHandlers []events.EventHandler
ReconcilePeriod time.Duration
ManageStatus bool
AnsibleDebugLogs bool
WatchAnnotationsChanges bool
}

// Reconcile - handle the event.
func (r *AnsibleOperatorReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { }//nolint:gocyclo

aor := &AnsibleOperatorReconciler{
Client: mgr.GetClient(),
GVK: options.GVK,
Runner: options.Runner,
EventHandlers: eventHandlers,
ReconcilePeriod: options.ReconcilePeriod,
ManageStatus: options.ManageStatus,
AnsibleDebugLogs: options.AnsibleDebugLogs,
APIReader: mgr.GetAPIReader(),
WatchAnnotationsChanges: options.WatchAnnotationsChanges,
}

然后,创建Controller:

1
2
3
4
5
c, err := controller.New(fmt.Sprintf("%v-controller", strings.ToLower(options.GVK.Kind)), mgr,
controller.Options{
Reconciler: aor,
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
})

再定义predicates等,调用Watch()注册事件回调函数:

1
2
3
4
5
predicates := []ctrlpredicate.Predicate{
ctrlpredicate.Or(ctrlpredicate.GenerationChangedPredicate{}, libpredicate.NoGenerationPredicate{}),
}

err = c.Watch(&source.Kind{Type: u}, &handler.LoggingEnqueueRequestForObject{}, predicates...)

最后,启动Manager:

1
mgr.Start(signals.SetupSignalHandler())
作者

hackerain

发布于

2023-11-18

更新于

2024-02-23

许可协议