Kubernetes APIServer GenericAPIServer

Kubernetes APIServer 机制概述中我们介绍到了APIServer的本质其实是一个实现了RESTful API的WebServer,它使用golang的net/http的Server构建,并且Handler是其中非常重要的概念,此外,又简单介绍了APIServer的扩展机制,即Aggregator, APIExtensions以及KubeAPIServer这三者之间通过Delegation的方式实现了扩展。

Kubernetes APIServer Storage 框架解析中,我们介绍了APIServer相关的存储框架,每个API对象,都有对应的REST store以及etcd store

而本篇文章介绍到的GenericAPIServer跟上面两个内容紧密相关,它是APIServer的基础,Aggregator, APIExtensions以及KubeAPIServer每个都包含一个GenericAPIServer,他们各自的API对象都以Group的形式,注册进GenericAPIServer中,并且组织成最终的Handler,然后结合net/http Server,将APIServer运行起来,因此,掌握GenericAPIServer有助于理解APIServer的扩展机制以及运行原理,本篇文章重点介绍GenericAPIServer的以下四方面内容:

  • Handler的构建
  • API对象的注册
  • Handler的处理
  • PostStartHook

其相关的代码在apiserver库中的apiserver/pkg/server/目录下。

基础知识

APIGroupInfo

在API对象进行注册时,都被组织成APIGroupInfo的结构体形式,这里面包含了所有注册需要的信息,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# apiserver/pkg/server/genericapiserver.go

type APIGroupInfo struct {
PrioritizedVersions []schema.GroupVersion

// Info about the resources in this group. It's a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage

OptionsExternalVersion *schema.GroupVersion
MetaGroupVersion *schema.GroupVersion

Scheme *runtime.Scheme
NegotiatedSerializer runtime.NegotiatedSerializer
ParameterCodec runtime.ParameterCodec

StaticOpenAPISpec *spec.Swagger
}

这里面最关键的信息就是VersionedResourcesStorageMap这个属性,如注释所说,它是一个从version映射到resource,然后再从resource映射到rest storage的两层映射,比如batch组中的cronjobs资源,有v1beta1v2alpha1两个版本,则在该map中,则分别有两个映射,v1beta1 -> cronjobs -> cronjobs rest storage,以及v2alpha1 -> cronjobs -> cronjobs rest storage,这里说的rest storage就是之前介绍到的rest store,每一个版本的API对象,都会有自己的一个rest store,从这里就可以看到,Kubernetes对多版本是如何管理的,本质上,它把不同版本的API对象,其实当成不同的对象,有自己独立的存储。

go-restful

Kubernetes使用go-restful这个第三方库对核心的RESTful API进行了实现,其基础知识以及在Kubernetes中的用法,可阅读这篇文章:go-restful简析,这里不再介绍。

NonGoRestfulMux

NonGoRestfulMux是Kubernetes APIServer中非核心API使用的RESTful框架,因为没有使用go-restful实现,因此称之为NonGoRestfulMux,其详细介绍,见Kubernetes APIServer NonGoRestfulMux

Handler的构建

这里说的Handler指的是最终net/http Server要运行的Handler,它在GenericAPIServer中被构建出来,首先我们来看下GenericAPIServer的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# apiserver/pkg/server/genericapiserver.go

type GenericAPIServer struct {
// SecureServingInfo holds configuration of the TLS server.
SecureServingInfo *SecureServingInfo

// "Outputs"
// Handler holds the handlers being used by this API server
Handler *APIServerHandler

// delegationTarget is the next delegate in the chain. This is never nil.
delegationTarget DelegationTarget

......
}

一个GenericAPIServer包含的信息非常的多,上面结构体并没有列出全部属性,在这里我们只关注几个重点信息就行:

  • SecureServingInfo *SecureServingInfo: 这里面包含的是运行APIServer需要的TLS相关的信息
  • Handler *APIServerHandler: 这个就是要运行APIServer需要使用到的Handler,各个API对象向APIServer中注册,说的就是向Handler注册,它是最重要的信息
  • delegationTarget DelegationTarget: 这个是扩展机制中用到的,指定该GenericAPIServer的delegation是谁

再来看下Handler *APIServerHandler的结构体信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# apiserver/pkg/server/handler.go

type APIServerHandler struct {
// FullHandlerChain is the one that is eventually served with. It should include the full filter
// chain and then call the Director.
FullHandlerChain http.Handler
// The registered APIs. InstallAPIs uses this. Other servers probably shouldn't access this directly.
GoRestfulContainer *restful.Container
// NonGoRestfulMux is the final HTTP handler in the chain.
// It comes after all filters and the API handling
// This is where other servers can attach handler to various parts of the chain.
NonGoRestfulMux *mux.PathRecorderMux

// Director is here so that we can properly handle fall through and proxy cases.
// This looks a bit bonkers, but here's what's happening. We need to have /apis handling registered in gorestful in order to have
// swagger generated for compatibility. Doing that with `/apis` as a webservice, means that it forcibly 404s (no defaulting allowed)
// all requests which are not /apis or /apis/. We need those calls to fall through behind goresful for proper delegation. Trying to
// register for a pattern which includes everything behind it doesn't work because gorestful negotiates for verbs and content encoding
// and all those things go crazy when gorestful really just needs to pass through. In addition, openapi enforces unique verb constraints
// which we don't fit into and it still muddies up swagger. Trying to switch the webservices into a route doesn't work because the
// containing webservice faces all the same problems listed above.
// This leads to the crazy thing done here. Our mux does what we need, so we'll place it in front of gorestful. It will introspect to
// decide if the route is likely to be handled by goresful and route there if needed. Otherwise, it goes to PostGoRestful mux in
// order to handle "normal" paths and delegation. Hopefully no API consumers will ever have to deal with this level of detail. I think
// we should consider completely removing gorestful.
// Other servers should only use this opaquely to delegate to an API server.
Director http.Handler
}

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}

gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})

director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}

return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}

可以看到,APIServerHandler中包含一个go-restful构建出来的Container,GoRestfulContainer,以及一个PathRecorderMux构建出来的NonGoRestfulMux,注意,他们都是指针类型的,此外还有一个FullHandlerChain以及Director,都是对一个director结构体的引用,来看看这个结构体:

1
2
3
4
5
6
7
8
9
10
11
# apiserver/pkg/server/handler.go

type director struct {
name string
goRestfulContainer *restful.Container
nonGoRestfulMux *mux.PathRecorderMux
}

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
......
}

它里面又包含了goRestfulContainernonGoRestfulMux,但是注意他们也是以指针的形式作为成员变量的,并且该director还实现了ServeHTTP()方法,即director还是一个Handler。上面的APIServerHandler中也包含了GoRestfulContainer, NonGoRestfulMux的指针类型的成员变量,他们指针指向的其实是同一个实体,即在NewAPIServerHandler()方法中New出来的实体。为什么在APIServerHandler中已经有这两个变量了,还要再单独生成一个director结构体来引用这两个变量,其实这跟他们的用法有关,下面会讲到。

现在先来说下FullHandlerChainDirector的区别,他们两个都是对director的引用,区别是FullHandlerChain在director外面还包围了一层Chain,我们来看看这个Chain是什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# apiserver/pkg/server/config.go

handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}

apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
if c.FlowControl != nil {
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}

上面的BuildHandlerChainFunc默认为DefaultBuildHandlerChain(),看到该方法中传入一个Handler,然后在该Handler外面,像包洋葱一样,包了一层又一层的filter,这些filter的作用其实就是在请求到来时,在Handler真正处理之前,先要经过的一系列认证,授权,审计等等检查,如果通过了,才会由最终的Handler来处理该请求,没通过,则会报相应的错误,可见,认证授权等操作,就是在这个阶段生效的,经过一系列filter的包装,最终构建出来的Handler,就是FullHandlerChain,而director就是这个被层层包装的Handler。而Director这个成员变量,没有被filter包装,这样通过Director就可以绕过认证授权这些filter,直接由Handler进行处理。那么问题来了,难道还有请求不需要认证授权的?这个Director存在的意义是什么?的确是有请求不需要认证授权,这就涉及到APIServer的扩展机制了,后面会介绍到。

小结一下,APIServerHandler中包含4个成员变量,FullHandlerChain和Director其实是两个Handler,一个是带认证授权这些filter的,一个是不带的,都是对director的引用,而GoRestfulContainer和NonGoRestfulMux则分别是指针类型的引用,指向真正的goRestfulContainer和nonGoRestfulMux实体,同时这两个实体,又被director所引用。

从这里就大概可以看出GoRestfulContainer和NonGoRestfulMux这两个变量在这里的作用了,在上层向goRestfulContainer和nonGoRestfulMux实体中注册API对象时,就是通过调用这两个变量来对真正的实体进行操作的,如下面的示例:

1
apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)

因为是指针,都指向同一个实体,这样director作为Handler也就能用到注册进来的API对象了。

API对象的注册

所谓API对象的注册,其实就是向GenericAPIServer的Handler中添加各个API对象的WebService和Route,GenericAPIServer提供了InstallLegacyAPIGroup(), InstallAPIGroups(), InstallAPIGroup()这三个方法,供外部调用,向其中注册APIGroupInfo,APIGroupInfo在上面基础知识中介绍过,里面存储了这个APIGroup的version, resource以及对应的REST storage实体,上面的三个方法,最后都会调用到同一个内部函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# apiserver/pkg/server/genericapiserver.go

func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
continue
}

apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
if apiGroupInfo.OptionsExternalVersion != nil {
apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
}
apiGroupVersion.OpenAPIModels = openAPIModels
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes

if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
}
}

return nil
}

func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) (*genericapi.APIGroupVersion, error) {
storage := make(map[string]rest.Storage)
for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
if strings.ToLower(k) != k {
return nil, fmt.Errorf("resource names must be lowercase only, not %q", k)
}
storage[k] = v
}
version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
version.Root = apiPrefix
version.Storage = storage
return version, nil
}

func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion) *genericapi.APIGroupVersion {

allServedVersionsByResource := map[string][]string{}
for version, resourcesInVersion := range apiGroupInfo.VersionedResourcesStorageMap {
for resource := range resourcesInVersion {
if len(groupVersion.Group) == 0 {
allServedVersionsByResource[resource] = append(allServedVersionsByResource[resource], version)
} else {
allServedVersionsByResource[resource] = append(allServedVersionsByResource[resource], fmt.Sprintf("%s/%s", groupVersion.Group, version))
}
}
}

return &genericapi.APIGroupVersion{
GroupVersion: groupVersion,
AllServedVersionsByResource: allServedVersionsByResource,
MetaGroupVersion: apiGroupInfo.MetaGroupVersion,

ParameterCodec: apiGroupInfo.ParameterCodec,
Serializer: apiGroupInfo.NegotiatedSerializer,
Creater: apiGroupInfo.Scheme,
Convertor: apiGroupInfo.Scheme,
ConvertabilityChecker: apiGroupInfo.Scheme,
UnsafeConvertor: runtime.UnsafeObjectConvertor(apiGroupInfo.Scheme),
Defaulter: apiGroupInfo.Scheme,
Typer: apiGroupInfo.Scheme,
Namer: runtime.Namer(meta.NewAccessor()),

EquivalentResourceRegistry: s.EquivalentResourceRegistry,

Admit: s.admissionControl,
MinRequestTimeout: s.minRequestTimeout,
Authorizer: s.Authorizer,
}
}

这个函数的逻辑,就是遍历APIGroupInfo中的version,按照version的维度来进行安装API对象,即构建出来一个APIGroupVersion,将该版本的resource和REST storage存储到该结构体中,然后执行安装操作: apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer);,可以看到这里传的就是上面说到的APIServerHandler中的GoRestfulContainer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# apiserver/pkg/endpoints/groupversion.go

func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}

apiResources, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}

在该方法中,又构造了一个APIInstaller结构体,将APIGroupVersion的指针传给它,由它去执行安装操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# apiserver/pkg/endpoints/installer.go

func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var errors []error
ws := a.newWebService()

// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
if err != nil {
errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
}
if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
}
return apiResources, ws, errors
}

在这里面New了一个WebService,然后遍历group中的REST storage,将storage中的path取出来,进行排序,然后再遍历这个path数组,针对每一个path: storage向WebService中执行注册操作,即registerResourceHandlers(),这就来到了最关键的地方,这是一个非常长的函数,这里面,对Storage进行类型转换,因为Storage实现了rest store的各种接口,所以首先将其转换成getter, creater, lister, updater等类型,分别对应该API对象在数据库层面的增删查改等操作,然后构造对应的Handler,然后再创建WebService的Route,最后将其添加到WebService中,我们以getter为例,简单看下这个过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# apiserver/pkg/endpoints/installer.go

(a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) {
......
getter, isGetter := storage.(rest.Getter)
......
handler = restfulGetResource(getter, exporter, reqScope)
......
route := ws.GET(action.Path).To(handler)
......
for _, route := range routes {
ws.Route(route)
}
......
}

func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.GetResource(r, e, &scope)(res.ResponseWriter, req.Request) // 直接调用了返回的方法
}
}

# apiserver/pkg/endpoints/handlers/get.go

func GetResource(r rest.Getter, e rest.Exporter, scope *RequestScope) http.HandlerFunc {
return getResourceHandler(scope,
func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
......
return r.Get(ctx, name, &options)
})
}

func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
namespace, name, err := scope.Namer.Name(req)
result, err := getter(ctx, name, req, trace)
......
transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result) //对从数据库返回的结果进行转换以及序列化,然后返回
}
}

可以看到,在Handler方法中,去调用rest.Getter的Get方法,调用rest storage去数据库中获取对应的name的资源进行返回,然后对从数据库返回的结果进行转换以及序列化,最终返回给用户,需要注意GetResource()返回的是一个方法,而在 restfulGetResource() 方法中,则通过 handlers.GetResource()() 的方式,直接调用了该方法。 在这里,我们就看到了在Kubernetes APIServer Storage 框架解析介绍的REST store是如何应用的。

此外,还需要注意一点就是 apiserver/pkg/endpoints/handlers/ 这个目录下的都是高度抽象化后的各种handler,除了上例中处理Get请求的handler之外,还有Create, Delete, Watch等操作相关的handler,它们接收各种资源的REST store,调用对应的接口,对数据库相应的进行增删查改,然后执行一些序列化操作,返回给客户端,这些handler可以说是对每个API对象操作的入口。

Handler的处理

Handler构建出来,并且向其中注册了API对象,最后我们来看下,Handler是如何处理请求的,核心的逻辑,其实在上面已经介绍过,即在director的ServeHTTP()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# apiserver/pkg/server/handler.go

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path

// check to see if our webservices want to claim this path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
// if we are exactly /apis or /apis/, then we need special handling in loop.
// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
if path == "/apis" || path == "/apis/" {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
d.goRestfulContainer.Dispatch(w, req)
return
}

case strings.HasPrefix(path, ws.RootPath()):
// ensure an exact match or a path boundary match
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}

// if we didn't find a match, then we just skip gorestful altogether
klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
d.nonGoRestfulMux.ServeHTTP(w, req)
}

func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.FullHandlerChain.ServeHTTP(w, r)
}

首先是APIServerHandler的ServeHTTP()方法,调用了FullHandlerChain的ServeHTTP()方法,经过了层层的filter,最终到了director的ServeHTTP()方法,在该方法中,首先遍历goRestfulContainer中注册的WebService,看path跟哪个WebService中的路径匹配,如果匹配,则调用goRestfulContainer.Dispatch()处理该请求,如果都没有匹配上,则最终调用nonGoRestfulMux来处理该请求。

PostStartHook

在GenericAPIServer中还有一个重要的机制,就是这个PostStartHook,它是在APIServer启动之后,执行的一些Hook函数,这些Hook函数是在APIServer创建的过程中,注册进去的,在APIServer启动之后,做一些初始化或者周期性循环的任务,来看下相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# apiserver/pkg/server/hooks.go

func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) error {
if len(name) == 0 {
return fmt.Errorf("missing name")
}
if hook == nil {
return fmt.Errorf("hook func may not be nil: %q", name)
}
if s.disabledPostStartHooks.Has(name) {
klog.V(1).Infof("skipping %q because it was explicitly disabled", name)
return nil
}

s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()

if s.postStartHooksCalled {
return fmt.Errorf("unable to add %q because PostStartHooks have already been called", name)
}
if postStartHook, exists := s.postStartHooks[name]; exists {
// this is programmer error, but it can be hard to debug
return fmt.Errorf("unable to add %q because it was already registered by: %s", name, postStartHook.originatingStack)
}

// done is closed when the poststarthook is finished. This is used by the health check to be able to indicate
// that the poststarthook is finished
done := make(chan struct{})
if err := s.AddBootSequenceHealthChecks(postStartHookHealthz{name: "poststarthook/" + name, done: done}); err != nil {
return err
}
s.postStartHooks[name] = postStartHookEntry{hook: hook, originatingStack: string(debug.Stack()), done: done}

return nil
}

func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
s.postStartHooksCalled = true

context := PostStartHookContext{
LoopbackClientConfig: s.LoopbackClientConfig,
StopCh: stopCh,
}

for hookName, hookEntry := range s.postStartHooks {
go runPostStartHook(hookName, hookEntry, context)
}
}

通过AddPostStartHook()方法向GenericAPIServer中添加Hook,然后在APIServer启动时,调用RunPostStartHooks(),遍历postStartHooks列表,使用goroutine运行每一个hook,来看一个添加PostStartHook的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# kubernetes/cmd/kube-apiserver/app/aggregator.go

err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go crdRegistrationController.Run(5, context.StopCh)
go func() {
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
// we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
crdRegistrationController.WaitForInitialSync()
}
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})

上面就是一个周期循环的Hook,用来将crd对象,不断轮询,转换成aggregator中的apiservices对象。

总结

通过上面的分析,可以看到,本质上,GenericAPIServer最核心的功能,就是对net/http Handler的构造,为了理解其过程,介绍了go-restful, NonGoRestfulMux, APIGroupInfo等基础知识,Handler构建,API对象注册的过程,以及PostStartHook,还涉及到一些以前介绍过的知识,比如REST store,在这里我们看到了其是如何应用的,除了这些核心内容,还有一些将net/http Server如何Run起来的一些内容,逻辑比较简单,前面也介绍过,这里就不再介绍了。

作者

hackerain

发布于

2020-10-05

更新于

2023-10-28

许可协议