Kubernetes APIServer扩展机制原理

终于来到了这一篇,APIServer的扩展机制,前面介绍的那几篇,可以说都是在给这篇铺平道路,先来回顾下:

  • Kubernetes APIServer 机制概述中我们介绍到了APIServer的本质其实是一个实现了RESTful API的WebServer,它使用golang的net/http的Server构建,并且Handler是其中非常重要的概念,此外,又简单介绍了APIServer的扩展机制,即Aggregator, APIExtensions以及KubeAPIServer这三者之间通过Delegation的方式实现了扩展。
  • Kubernetes APIServer Storage 框架解析中,我们介绍了APIServer相关的存储框架,每个API对象,都有对应的REST store以及etcd store,它们是如何存储进数据库的。
  • Kubernetes APIServer GenericAPIServer中介绍了GenericAPIServer的作用,以及它的Handler是如何构建,API对象是如何以APIGroupInfo的形式注册进Handler中的,以及PostStartHook的机制。
  • Kubernetes APIServer API Resource Installation中,介绍了KubeAPIServer, Aggregator, APIExtensions中的API对象资源是如何构建成REST Store,并且组织成APIGroupInfo,然后注册进GenericAPIServer中的,然后又盘点了下当前版本的Kubernetes中都有哪些API对象资源。

在上面的基础知识之上,我们来分析下APIServer的扩展机制是如何实现的。之所以要花这么大的力气去分析它实现的原理,主要原因还是在于它的应用实在是太广泛了,尤其是CRD + Operator这种扩展模式,逐渐变成了很多软件在云原生时代运行的标准模式,我们在使用这些机制去扩展它的功能的时候,如果能够理解其实现原理,犹如庖丁解牛,心中是十分有底气的。

Kubernetes已经逐渐变成这场云原生运动的事实标准,未来的应用可能都在Kubernetes上开发以及运行,未来交付一个软件,除了交付运行时之外,还可以一起打包交付该软件的运维能力,安装/部署/备份/更新/升级等等,仿佛有一个机器人在默默地干着运维的工作,保证软件正常运行,作为一个运维er,这种场景想想就刺激,但是理想是美好的,现实中还得有一段漫长的脚踏实地的发展以及被人们慢慢接受的过程。现在国外的,像红帽的OpenShift,凭借其强大的产品研发和设计能力,走在这场云原生运动的前列,新发布的OpenShift 4.0中,本身就大量使用CRD + Operator,而且将其设计到应用市场中,鼓励开发者使用这种模式发布自己的应用;而国内的,当属阿里云了,在今年7月,发布了云原生架构白皮书,扛起了国内云原生运动的大旗,阿里云大牛如云,又占据了国内云计算市场的半壁江山,有非常大的优势做好这件事情。

Delegation

之前已经简单介绍过多次,APIServer的扩展机制,是由Aggregator, KubeAPIServer, APIExtensions,这三者通过Delegation的方式实现的,这三者本质上都是APIServer,KubeAPIServer是Kubernetes内置的API对象所在的APIServer,而Aggregator和APIExtensions是Kubernetes API的两个扩展机制使用的APIServer,对这两个扩展机制的介绍见官方文档APIExtensions就是CRD的实现,而Aggregator是一种高级扩展,可以让Kubernetes APIServer跟外部的APIServer进行联动,这三者中,每个都包含一个GenericAPIServer,真正delegation的其实是这三个GenericAPIServer,这一小节,我们先来理一下,这三者之间Delegation的关系。

还是先来看下这三个对应的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# kubernetes/pkg/controlplane/instance.go

// KubeAPIServer
type Instance struct {
GenericAPIServer *genericapiserver.GenericAPIServer

ClusterAuthenticationInfo clusterauthenticationtrust.ClusterAuthenticationInfo
}

# kube-aggregator/pkg/apiserver/apiserver.go

// Aggregator
type APIAggregator struct {
GenericAPIServer *genericapiserver.GenericAPIServer
delegateHandler http.Handler
// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
proxyHandlers map[string]*proxyHandler
......
}

# apiextensions-apiserver/pkg/apiserver/apiserver.go

// APIExtensions
type CustomResourceDefinitions struct {
GenericAPIServer *genericapiserver.GenericAPIServer
// provided for easier embedding
Informers externalinformers.SharedInformerFactory
}

可以看到每个都包含了一个GenericAPIServer指针类型的成员变量,他们是在Config->Complete->New模式的New()方法中被创建出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# kubernetes/pkg/controlplane/instance.go

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
......
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
......
}

# kube-aggregator/pkg/apiserver/apiserver.go

func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)

s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
proxyClientCert: c.ExtraConfig.ProxyClientCert,
proxyClientKey: c.ExtraConfig.ProxyClientKey,
proxyTransport: c.ExtraConfig.ProxyTransport,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
serviceResolver: c.ExtraConfig.ServiceResolver,
openAPIConfig: openAPIConfig,
egressSelector: c.GenericConfig.EgressSelector,
}
}

# kube-aggregator/pkg/apiserver/apiserver.go

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
}

可以看到,在New()方法中,每一个都通过GenericAPIServer的New()方法创建了一个GenericAPIServer,传了两个参数进去,一个是该GenericAPIServer的name,另外一个是DelegationTarget,即该GenericAPIServer的Delegation是谁,来看下这个New()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# apiserver/pkg/server/config.go

func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

s := &GenericAPIServer{
......
delegationTarget: delegationTarget,
......
}
return s, nil
}

看到从参数传进来的delegationTarget被赋值给GenericAPIServer的delegationTarget属性。

再来看下,上面三个APIServer是怎么被创建出来的,是在CreateServerChain()阶段,依次调用了上面的New()方法,创建出来这三个APIServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# kubernetes/cmd/kube-apiserver/app/server.go

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}

// aggregator comes last in the chain
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}

return aggregatorServer, nil
}

分别通过各自config的New()方法创建出来APIServer,注意一下这三个APIServer出创建的顺序,以及delegationTarget参数的传递,可以看到,首先创建的是APIExtensionsServer,它的delegationTarget传的是一个空的Delegate,即什么都不做,继而将APIExtensionsServer的GenericAPIServer,作为delegationTarget传给了config.ControlPlane.New(),创建出了KubeAPIServer,再然后,将kubeAPIServer的GenericAPIServer作为delegationTarget传给了createAggregatorServer(),创建出了aggregatorServer,注意,最终CreateServerChain()这个方法返回的也只有aggregatorServer,所以他们之间delegation的关系为: Aggregator -> KubeAPIServer -> APIExtensions,如下图所示:

k8s-apiserver-delegation

从上面可以看出,Aggregator其实是一个非常重要的存在,CreateServerChain()最终返回的是一个Aggregator APIServer,并且Aggregator是Delegation这个Chain最开头的APIServer,的确,Aggregator有一个内置的API资源,叫做apiservices,用来表示一个外部的API服务,在创建AggregatorServer时,KubeAPIServer和APIExtensions中的资源组,即GroupVersion,会被转换成Aggregator APIService对象,注册到Aggregator中,并且整个APIServer的入口,其实是Aggregator的GenericAPIServer,下面我们来看看这些是怎么实现的。

kube-apiserver-autoregistration

在aggregator启动的时候,即在createAggregatorServer()方法中,crd和apiserver中定义的资源组(GroupVersion),会通过kube-apiserver-autoregistration poststarthook,被转换成APIService,然后注册进aggregator中,比如将GroupVersion{Group: "apps", Version: "v1"}转成APIService{Spec: v1.APIServiceSpec{Group: "apps", Version" "v1"}},存进数据库中。apiserver中的对象资源因为是k8s内置的,是固定的,所以只需要在启动的时候,注册一次就可以了,但是CRD中的资源,是用户自定义的,可能随时增删改,所以需要不断的进行更新同步。相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# kubernetes/cmd/kube-apiserver/app/aggregator.go

func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}

// create controllers for auto-registration
apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}

// autoRegisterController中有一个queue和一个map,map用来存储APIService的具体对象,而queue只用来存储APIService的name
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)

// 将apiserver中的GroupVersion,转换成APIService,添加到autoRegistrationController的queue和map中,这个只执行一次
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
crdRegistrationController := crdregistration.NewCRDRegistrationController(
apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
autoRegistrationController)

// Imbue all builtin group-priorities onto the aggregated discovery
if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
for gv, entry := range apiVersionPriorities {
aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version))
}
}

err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
// 通过不断循环,将crd中的GroupVersion,转换成APIService,添加到autoRegistrationController的queue和map中
go crdRegistrationController.Run(5, context.StopCh)
go func() {
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
// we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
if crdAPIEnabled {
klog.Infof("waiting for initial CRD sync...")
crdRegistrationController.WaitForInitialSync()
klog.Infof("initial CRD sync complete...")
} else {
klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
}
// 通过不断的轮询,将queue中的APIService取出,通过apiservice的API,添加或者更新到etcd数据库中,固化下来。
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})

......

return aggregatorServer, nil
}

这个转换过程,主要是通过两个Controller来实现的: crdRegistrationControllerautoRegistrationController,这里就体现了Kubernetes中非常核心的设计模式,Controller-Loop模式,即不断从API中获取对象定义,然后按照API对象的定义,执行对应的操作,确保API对象定义和实际的效果是相符的,这种API也叫做declarative api,即申明式API。

autoRegistrationController中定义了一个队列,用来保存添加进来的APIService对象,这些APIService,可能是KubeAPIServer或者APIExtensions APIServer转换过来的,也可能是通过APIService的API直接添加进来的,然后在kube-apiserver-autoregistration PostStartHook中,启动这个Controller,通过不断轮询,将队列中的APIService取出,然后调用apiservice对应的API,将他们添加或者更新到etcd数据库中,固化下来。

crdRegistrationController则是将APIExtensions APIServer中定义的CRD对象转换成APIService,注册到autoRegistrationController的队列中,然后在kube-apiserver-autoregistration PostStartHook中,启动这个Controller,通过不断轮询CRD的API,将CRD中的GroupVersion,转换成APIService,添加到autoRegistrationController的队列中。除了APIExtensions APIServer,还有KubeAPIServer,因为它里面的对象资源是内置的,不会动态发生变化,所以,在apiServicesToRegister()方法中只进行一次转换,然后注册进autoRegistrationController队列中。

但是需要注意由APIExtensions和KubeAPIServer转换过来的APIService是特殊的,叫做local APIService,来看看APIService的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# kube-aggregator/pkg/apis/apiregistration/v1/types.go

type APIServiceSpec struct {
// Service is a reference to the service for this API server. It must communicate
// on port 443.
// If the Service is nil, that means the handling for the API groupversion is handled locally on this server.
// The call will simply delegate to the normal handler chain to be fulfilled.
// +optional
Service *ServiceReference `json:"service,omitempty" protobuf:"bytes,1,opt,name=service"`
// Group is the API group name this server hosts
Group string `json:"group,omitempty" protobuf:"bytes,2,opt,name=group"`
// Version is the API version this server hosts. For example, "v1"
Version string `json:"version,omitempty" protobuf:"bytes,3,opt,name=version"`
......
}

看上面的Service属性的注释可以看到,Service属性如果是nil的话,则表明该APIService是一个local APIService,local APIService的请求,将会delegate给对应的handler来处理,来看看APIExtensions和KubeAPIServer中的GroupVersion是怎么转换成local APIService的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# kubernetes/cmd/kube-apiserver/app/aggregator.go

func makeAPIService(gv schema.GroupVersion) *v1.APIService {
apiServicePriority, ok := apiVersionPriorities[gv]
if !ok {
// if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version
// being permanently stuck in the APIServices list.
klog.Infof("Skipping APIService creation for %v", gv)
return nil
}
return &v1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
Spec: v1.APIServiceSpec{
Group: gv.Group,
Version: gv.Version,
GroupPriorityMinimum: apiServicePriority.group,
VersionPriority: apiServicePriority.version,
},
}
}

可以看到,构建APIServiceSpec时,并没有传Service属性,不传的话,默认为nil,这样的APIService,则会由本地的APIServer进行处理,其实也就是KubeAPIServer或者是APIExtension APIServer,而不会被Aggregator给proxy出去,这个后面看下Aggregator的proxy策略就知道了。

apiservice-registration-controller

这个Controller的作用,则是通过轮询数据库中的APIService对象,为每个APIService构建Handler,并且向Aggregator中的GenericAPIServer注册的过程,来看下它的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# kube-aggregator/pkg/apiserver/apiservice_controller.go

type APIHandlerManager interface {
AddAPIService(apiService *v1.APIService) error
RemoveAPIService(apiServiceName string)
}

type APIServiceRegistrationController struct {
apiHandlerManager APIHandlerManager

apiServiceLister listers.APIServiceLister
apiServiceSynced cache.InformerSynced

// To allow injection for testing.
syncFn func(key string) error

queue workqueue.RateLimitingInterface
}

func NewAPIServiceRegistrationController(apiServiceInformer informers.APIServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController {
c := &APIServiceRegistrationController{
apiHandlerManager: apiHandlerManager,
apiServiceLister: apiServiceInformer.Lister(),
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "APIServiceRegistrationController"),
}

apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addAPIService,
UpdateFunc: c.updateAPIService,
DeleteFunc: c.deleteAPIService,
})

c.syncFn = c.sync

return c
}

这个Controller是在APIAggregator的New()方法中被创建,并且添加到poststarthook中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# kube-aggregator/pkg/apiserver/apiserver.go

func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler()
......
}
......
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
......
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
handlerSyncedCh := make(chan struct{})
go apiserviceRegistrationController.Run(context.StopCh, handlerSyncedCh)
select {
case <-context.StopCh:
case <-handlerSyncedCh:
}
return nil
})
......
}

可以看到上面APIServiceRegistrationController中有一个apiHandlerManager的成员变量,其实它就是APIAggregator,通过NewAPIServiceRegistrationController()方法构建Controller的时候,传参过去的,而在APIAggregator中,则实现了为APIService构建Handler,并且注册进GenericAPIServer中的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# kube-aggregator/pkg/apiserver/apiserver.go

func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
// since they are wired against listers because they require multiple resources to respond
if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
proxyHandler.updateAPIService(apiService)
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService)
}
return nil
}

proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
// v1. is a special case for the legacy API. It proxies to a wider set of endpoints.
if apiService.Name == legacyAPIServiceName {
proxyPath = "/api"
}

// register the proxy handler
proxyHandler := &proxyHandler{
localDelegate: s.delegateHandler,
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
proxyTransport: s.proxyTransport,
serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector,
}
proxyHandler.updateAPIService(apiService)
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
}
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

......
}

AddAPIService()方法中,以该APIServcie的Group和Version构建了路径:proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version,然后构建了一个proxyHandler的Handler,然后将path: proxyHandler注册进了Aggregator的GenericAPIServer中的NonGoRestfulMux中,完成了Handler的注册。与之对应的还有个RemoveAPIService()用来将一个APIService的Handler从Aggregator中移除。

下面来看看该Controller启动之后的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# kube-aggregator/pkg/apiserver/apiservice_controller.go

func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}, handlerSyncedCh chan<- struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting APIServiceRegistrationController")
defer klog.Infof("Shutting down APIServiceRegistrationController")

if !controllers.WaitForCacheSync("APIServiceRegistrationController", stopCh, c.apiServiceSynced) {
return
}

/// initially sync all APIServices to make sure the proxy handler is complete
if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
services, err := c.apiServiceLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially list APIServices: %v", err))
return false, nil
}
for _, s := range services {
if err := c.apiHandlerManager.AddAPIService(s); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially sync APIService %s: %v", s.Name, err))
return false, nil
}
}
return true, nil
}, stopCh); err == wait.ErrWaitTimeout {
utilruntime.HandleError(fmt.Errorf("timed out waiting for proxy handler to initialize"))
return
} else if err != nil {
panic(fmt.Errorf("unexpected error: %v", err))
}
close(handlerSyncedCh)

// only start one worker thread since its a slow moving API and the aggregation server adding bits
// aren't threadsafe
go wait.Until(c.runWorker, time.Second, stopCh)

<-stopCh
}

func (c *APIServiceRegistrationController) sync(key string) error {
apiService, err := c.apiServiceLister.Get(key)
if apierrors.IsNotFound(err) {
c.apiHandlerManager.RemoveAPIService(key)
return nil
}
if err != nil {
return err
}

return c.apiHandlerManager.AddAPIService(apiService)
}

在PostStartHook中,将该Controller启动之后,即调用Run()方法,首先会通过APIService API拿到当前所有的APIService对象,然后调用Aggregator的AddAPIService()方法,将现有的APIService都注册进GenericAPIServer中。之后,就通过Informer机制,不断轮询APIService API,当监测到有APIService对象的增删改时,则会调用sync()方法,从API中获取到该APIService对象,然后将其从GenericAPIServer的Handler中增加或者删除。

以上,就是这两个poststarthook中的几个Controller的逻辑,我们画张图,总结一下:

kubernetes_aggregator_autoregistration

Aggregator proxyHandler

下面我们来重点关注一下Aggregator中的proxyHandler的proxy逻辑,但是首先来看下proxyHandler的构建逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# kube-aggregator/pkg/apiserver/apiserver.go

func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
......
proxyHandler := &proxyHandler{
localDelegate: s.delegateHandler,
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
proxyTransport: s.proxyTransport,
serviceResolver: s.serviceResolver,
egressSelector: s.egressSelector,
}
proxyHandler.updateAPIService(apiService)
......
}

# kube-aggregator/pkg/apiserver/handler_proxy.go

type proxyHandler struct {
// localDelegate is used to satisfy local APIServices
localDelegate http.Handler

// proxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use
// this to confirm the proxy's identity
proxyClientCert []byte
proxyClientKey []byte
proxyTransport *http.Transport

// Endpoints based routing to map from cluster IP to routable IP
serviceResolver ServiceResolver

handlingInfo atomic.Value

// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector
}

type proxyHandlingInfo struct {
// local indicates that this APIService is locally satisfied
local bool

// name is the name of the APIService
name string
// restConfig holds the information for building a roundtripper
restConfig *restclient.Config
// transportBuildingError is an error produced while building the transport. If this
// is non-nil, it will be reported to clients.
transportBuildingError error
// proxyRoundTripper is the re-useable portion of the transport. It does not vary with any request.
proxyRoundTripper http.RoundTripper
// serviceName is the name of the service this handler proxies to
serviceName string
// namespace is the namespace the service lives in
serviceNamespace string
// serviceAvailable indicates this APIService is available or not
serviceAvailable bool
// servicePort is the port of the service this handler proxies to
servicePort int32
}

func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIService) {
if apiService.Spec.Service == nil {
r.handlingInfo.Store(proxyHandlingInfo{local: true})
return
}

newInfo := proxyHandlingInfo{
name: apiService.Name,
restConfig: &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify,
ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
CertData: r.proxyClientCert,
KeyData: r.proxyClientKey,
CAData: apiService.Spec.CABundle,
},
},
serviceName: apiService.Spec.Service.Name,
serviceNamespace: apiService.Spec.Service.Namespace,
servicePort: *apiService.Spec.Service.Port,
serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
}
......
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
r.handlingInfo.Store(newInfo)
}

proxyHandler中有一个localDelegate成员变量,是一个http.Handler,它从APIAggregator的delegateHandler属性赋值过来,这个值又是在APIAggregator的New()方法传进来的:

1
2
3
4
5
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler()
......
}

它是APIAggregator的delegationTarget的UnprotectedHandler()创建的,那这个UnprotectedHandler()是什么呢?来看下这个方法:

1
2
3
4
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
// when we delegate, we need the server we're delegating to choose whether or not to use gorestful
return s.Handler.Director
}

可以看到这个就是GenericAPIServer中的Handler的Director成员变量,该Handler中,还有一个成员变量是FullHandlerChain:

1
2
3
4
5
6
APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}

这个内容,我们在Kubernetes APIServer GenericAPIServer有过介绍,FullHandlerChain在director外面包了很多filter,用来做认证授权这些操作,而Director则直接是director,即没有认证授权这些操作,director也是一个Handler,所以这里把它叫做UnprotectedHandler,意思是可以不经过认证授权,就可以直接被该Handler处理。前面在Delegation小节,我们介绍过,Aggregator的delegationTarget是KubeAPIServer,所以这里的Director其实就是KubeAPIServer的APIServerHandler中的Director,这样做的意图也很明显,就是当请求从Aggregator delegate给KubeAPIServer时,就不需要再认证了,因为请求在达到Aggregator的Handler时,就已经经过认证了。

除了这个localDelegate成员变量,还有一个需要关注的就是handlingInfo,它是一个原子变量,通过Store/Load对其内容进行存储和读取,在updateAPIService()方法中,将APIService转换成proxyHandlingInfo结构体,然后存储到handlingInfo变量中。注意,该方法的第一行内容,如果APIServiceSpec的Service为nil,则将proxyHandlingInfo的local属性置为true,意思是这是个local APIService,即前面介绍过的,KubeAPIServer和APIExtensins转换来的APIService,都是local APIService。

了解了以上内容,再来看下proxyHandler的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# kube-aggregator/pkg/apiserver/handler_proxy.go

func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
value := r.handlingInfo.Load()
if value == nil {
r.localDelegate.ServeHTTP(w, req)
return
}
handlingInfo := value.(proxyHandlingInfo)
if handlingInfo.local {
if r.localDelegate == nil {
http.Error(w, "", http.StatusNotFound)
return
}
r.localDelegate.ServeHTTP(w, req)
return
}

if !handlingInfo.serviceAvailable {
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
}

if handlingInfo.transportBuildingError != nil {
proxyError(w, req, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError)
return
}

user, ok := genericapirequest.UserFrom(req.Context())
if !ok {
proxyError(w, req, "missing user", http.StatusInternalServerError)
return
}

// write a new location based on the existing request pointed at the target service
location := &url.URL{}
location.Scheme = "https"
rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName, handlingInfo.servicePort)
if err != nil {
klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return
}
location.Host = rloc.Host
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()

newReq, cancelFn := newRequestForProxy(location, req)
defer cancelFn()

if handlingInfo.proxyRoundTripper == nil {
proxyError(w, req, "", http.StatusNotFound)
return
}

// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req)
if err != nil {
proxyError(w, req, err.Error(), http.StatusInternalServerError)
return
}
proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)

// if we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does
// NOT use the roundtripper. Its a direct call that bypasses the round tripper. This means that we have to
// attach the "correct" user headers to the request ahead of time. After the initial upgrade, we'll be back
// at the roundtripper flow, so we only have to muck with this request, but we do have to do it.
if upgrade {
transport.SetAuthProxyHeaders(newReq, user.GetName(), user.GetGroups(), user.GetExtra())
}

handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
handler.ServeHTTP(w, newReq)
}

可以看到该方法整体逻辑还是比较简单的,最开始判断handlingInfo是否为local的,如果是,则由localDelegate去处理该请求,即由KubeAPIServer的Handler去处理,这样就将请求给delegate出去了,如果不是,则说明该请求应该是被外部的APIServer去处理,然后会使用handlingInfo中的APIService信息,构建一个proxy handler,将该请求给proxy出去,这个proxy handler的具体细节,这里就不展开了。

现在,我们来梳理下Aggregator到KubeAPIServer的delegation逻辑,在前面Delegation小节,我们介绍过CreateServerChain()最终构建出来一个aggregatorServer,aggregatorServer的delegationTarget是KubeAPIServer,而KubeAPIServer的delegationTarget又是APIExtensions APIServer,当Kubernetes APIServer最终运行起来时,其实运行的是aggregatorServer中的GenericAPIServer的启动逻辑,向net/http Server注册的Handler,其实是Aggregator中的GenericAPIServer中的Handler,所以整个API的入口,其实是Aggregator的Handler,又因为前面通过那两个poststarthook,将KubeAPIServer和APIExtensions APIServer中的API对象资源,转换成了Aggregator的APIService,并且为每一个APIService注册了proxyHandler来处理请求,并且是注册到NonGoRestfulMux中的,所以,当任何请求到APIServer时,都是由Aggregator中的GenericAPIServer中的Handler中的NonGoRestfulMux来处理的,但是在这之前,是要先经过包围在Handler外的filter处理的,即经过认证授权等操作,再进入到NonGoRestfulMux的处理逻辑,然后NonGoRestfulMux又根据路径路由到对应的proxyHandler来处理,即进入了上面proxyHandler的ServeHTTP()方法,如果发现对应的APIService是local的,就需要将该请求delegate给它的delegationTarget来进一步处理,即KubeAPIServer的GenericAPIServer,这样就到了KubeAPIServer的GenericAPIServer的Handler的处理逻辑了。

KubeAPIServer Handler

经过上面的分析,请求被Aggregator delegate到了KubeAPIServer的GenericAPIServer的Handler进行处理,注意是UnprotectedHandler,也即Director对应的Handler,即delegate的请求,是不经过认证授权的,到了这里,就比较好理解了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# apiserver/pkg/server/handler.go

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path

// check to see if our webservices want to claim this path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
// if we are exactly /apis or /apis/, then we need special handling in loop.
// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
if path == "/apis" || path == "/apis/" {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
d.goRestfulContainer.Dispatch(w, req)
return
}

case strings.HasPrefix(path, ws.RootPath()):
// ensure an exact match or a path boundary match
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}

// if we didn't find a match, then we just skip gorestful altogether
klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
d.nonGoRestfulMux.ServeHTTP(w, req)
}

因为KubeAPIServer中的API对象资源,都是注册到goRestfulContainer中的,所以,这些API对象,都是由goRestfulContainer进行路由,找到对应的WebService进行处理,即上面的d.goRestfulContainer.Dispatch()方法,如果goRestfulContainer没有匹配到对应的路径,则由nonGoRestfulMux来进一步处理。那KubeAPIServer又是怎么Delegate给APIExtensions APIServer的呢?其实是通过nonGoRestfulMux的NotFoundHandler来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# apiserver/pkg/server/config.go

func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
......
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
......
}

# apiserver/pkg/server/handler.go

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
......
}

可以看到,GenericAPIServer的New()方法中,将delegationTarget.UnprotectedHandler(),即Director,传递给NewAPIServerHandler(),对应的参数为notFoundHandler,并通过nonGoRestfulMux的NotFoundHandler()方法注册进去,这个notFoundHandler的作用,就是在所有的注册到nonGoRestfulMux中的路径都没有匹配到时,则由该Handler进行处理,在Kubernetes APIServer NonGoRestfulMux中有过介绍,而在KubeAPIServer中,这个notFoundHandler就是注册的APIExtensions APIServer中的GenericAPIServer中的Handler中的Director。

所以,再结合上面的director的处理逻辑,goRestfulContainer没有匹配到路径,则进入到nonGoRestfulMux,nonGoRestfulMux再没有匹配到路径,则进入到notFoundHander进行处理,这就进入到了APIExtensions APIServer的Handler处理逻辑了。

APIExtensions crdHandler

最后到了APIExtensions的GenericAPIServer的Handler,我们先来看下它是如何构建的,在CustomResourceDefinitions的New()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# apiextensions-apiserver/pkg/apiserver/apiserver.go

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
......

delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
delegateHandler = http.NotFoundHandler()
}

crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
establishingController,
c.ExtraConfig.ServiceResolver,
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.RequestTimeout,
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
apiGroupInfo.StaticOpenAPISpec,
c.GenericConfig.MaxRequestBodyBytes,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
}

可以看到,它构建了一个crdHandler,然后将其注册到了CustomResourceDefinitions.GenericAPIServer.Handler.NonGoRestfulMux中,注册的路径为"/apis"或者是以"/apis/"开头的路径。这样当请求自定义的资源时,就会由注册到GenericAPIServer中的NonGoRestfulMux中的crdHandler进行处理,该crdHandler的ServeHTTP()方法又是一个非常复杂的实现,此外还定义了很多Controller,添加到poststarthook中,来对CRD资源进行处理,这里就不介绍了。

PostStartHook Delegation

最后,关于PostStartHook还有一个特殊点需要介绍下,KubeAPIServer, APIExtensions APIServer和Aggregator这三个APIServer delegation之后,注册在他们中的poststarthook其实也被delegate了,看下GenericAPIServer的New()方法有下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
# apiserver/pkg/server/config.go

func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// first add poststarthooks from delegated targets
for k, v := range delegationTarget.PostStartHooks() {
s.postStartHooks[k] = v
}

for k, v := range delegationTarget.PreShutdownHooks() {
s.preShutdownHooks[k] = v
}
}

即当前的GenericAPIServer把它的delegationTarget的PostStartHook添加到了自己的postStartHooks中来,即KubeAPIServer包含了APIExtensions的PostStartHooks,而Aggregator又包含了KubeAPIServer的PostStartHooks,这样,这三者中所有的PostStartHooks就都注册到了Aggregator中,又因为服务启动时,是运行的Aggregator的GenericAPIServer,通过这种方式,将所有的poststarthooks启动起来,而不是分别运行的这三个APIServer中的poststarthooks,所以这些poststarthooks只被启动了一次,并没有重复运行。下面是我盘点的当前版本的Kubernetes中的PostStartHooks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
* APIAggregator
* GenericAPIServer // kube-aggregator
* delegationTarget // Master.GenericAPIServer
* Handler // APIServerHandler
* NonGoRestfulMux
* NotFoundHandler // Master.GenericAPIServer.Handler.Director
* GoRestfulContainer
* FullHandlerChain
* Director
* postStartHooks
* // CustomResourceDefinitions
* start-apiextensions-informers
* start-apiextensions-controllers
* crd-informer-synced
* // Config
* start-kube-apiserver-admission-initializer
* // Generic
* generic-apiserver-start-informers
* priority-and-fairness-config-consumer
* // Master
* bootstrap-controller
* scheduling/bootstrap-system-priority-classes
* priority-and-fairness-config-producer
* rbac/bootstrap-roles
* start-cluster-authentication-info-controller
* // Aggregator
* start-kube-aggregator-informers
* apiservice-registration-controller // 将APIService转换成proxyHandler,注册进NonGoRestfulMux
* AddAPIService()
* apiservice-status-available-controller
* kube-apiserver-autoregistration // 将apiserver+crd中的groupversion通过API注册进apiservice

总结

本篇文章在前面文章的基础上,梳理了APIServer的扩展机制,只是梳理了大致脉络,很多无关的细节都省略了,可能还是要结合代码和前面的文章,才能好理解一些,最后我们还是来总结一下:

  • 在aggregator启动的时候,即在createAggregatorServer()方法中,crd和apiserver中定义的资源组(GroupVersion),会通过kube-apiserver-autoregistration poststarthook,被转换成APIService,然后注册进aggregator中,比如将GroupVersion{group: “apps”, version: “v1”}转成APIService,存进数据库中。apiserver中的资源因为是k8s内置的,是固定的,所以只需要在启动的时候,注册一次就可以了,但是CRD中的资源,因为可能会变动,所以需要不断的进行更新同步。
  • crd + apiserver + aggregator中的资源,都会转换成APIService,然后在apiservice-registration-controller poststarthook 中,以path: proxyHandler的形式,注册进aggregator的GenericAPIServer的NonGoRestfulMux中,只不过crd + apiserver中的资源被转换成的APIService只是形式上的转换,即只包含APIService的name和Kind,apiService.Spec.Service是nil,即是local的资源,这类资源,aggregator并不会proxy给外部的apiextension-apiserver,而是会让delegateHandler去处理,aggregator的delegateTarget是apiserver的Instance.GenericAPIServer,即将请求传给了kube-apiserver去处理。剩下的真正的APIService,则被proxy出去,由外部的APIServer去处理。
  • 整个apiserver的入口是aggregator,run的是aggregator的genericapiserver,请求先到aggregator的GenericAPIServer的Handler,经过一系列handler chain之后,最终到达了director的handler,然后再由其根据path进行分发,因为apiserver+crd中的资源,都以APIService注册进aggregator中的GenericAPIServer中的NonGoRestfulMux,所以根据path,先分发到aggregator中对应的proxyHandler,然后发现是local的,于是再让delegateHandler进行处理,这样就进入到了kube-apiserver的Instance.GenericAPIServer.Handler.Director处理逻辑,k8s内置的标准的资源对象,都是在这里进行处理的,如果在Instance的GenericAPIServer中没匹配到对应的path,则会进入到nonGoRestfulMux的NotFoundHandler中进行处理,这样就进入到了CustomResourceDefinitions.GenericAPIServer.Handler.Director中,CRD的资源都是在这里进行处理的。
  • 从上面的实现机制来看,感觉又有点过度设计的嫌疑,费了很大劲把KubeAPIServer和CRD中的资源都转成了APIService,但是实际上并没有多大的作用,仅仅是判断下是否为local就给delegate出去了,而且把Pod等这类内置的资源转成APIService也跟APIService的定义不符合,APIService的定义应该是一个API服务,要转也是把KubeAPIService以及APIExtension这种级别的APIServer给转成APIService才算合理,所以这种实现方式稍微有点诡异,让人很费解。
作者

hackerain

发布于

2020-10-08

更新于

2023-10-28

许可协议