// Aggregator type APIAggregator struct { GenericAPIServer *genericapiserver.GenericAPIServer delegateHandler http.Handler // proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name proxyHandlers map[string]*proxyHandler ...... }
// aggregator comes last in the chain aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines returnnil, err }
// Imbue all builtin group-priorities onto the aggregated discovery if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil { for gv, entry := range apiVersionPriorities { aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version)) } }
err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext)error { // 通过不断循环,将crd中的GroupVersion,转换成APIService,添加到autoRegistrationController的queue和map中 go crdRegistrationController.Run(5, context.StopCh) gofunc() { // let the CRD controller process the initial set of CRDs before starting the autoregistration controller. // this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist. // we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery. if crdAPIEnabled { klog.Infof("waiting for initial CRD sync...") crdRegistrationController.WaitForInitialSync() klog.Infof("initial CRD sync complete...") } else { klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync") } // 通过不断的轮询,将queue中的APIService取出,通过apiservice的API,添加或者更新到etcd数据库中,固化下来。 autoRegistrationController.Run(5, context.StopCh) }() returnnil }) ......
type APIServiceSpec struct { // Service is a reference to the service for this API server. It must communicate // on port 443. // If the Service is nil, that means the handling for the API groupversion is handled locally on this server. // The call will simply delegate to the normal handler chain to be fulfilled. // +optional Service *ServiceReference `json:"service,omitempty" protobuf:"bytes,1,opt,name=service"` // Group is the API group name this server hosts Group string`json:"group,omitempty" protobuf:"bytes,2,opt,name=group"` // Version is the API version this server hosts. For example, "v1" Version string`json:"version,omitempty" protobuf:"bytes,3,opt,name=version"` ...... }
funcmakeAPIService(gv schema.GroupVersion) *v1.APIService { apiServicePriority, ok := apiVersionPriorities[gv] if !ok { // if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version // being permanently stuck in the APIServices list. klog.Infof("Skipping APIService creation for %v", gv) returnnil } return &v1.APIService{ ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group}, Spec: v1.APIServiceSpec{ Group: gv.Group, Version: gv.Version, GroupPriorityMinimum: apiServicePriority.group, VersionPriority: apiServicePriority.version, }, } }
func(s *APIAggregator) AddAPIService(apiService *v1.APIService) error { // if the proxyHandler already exists, it needs to be updated. The aggregation bits do not // since they are wired against listers because they require multiple resources to respond if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists { proxyHandler.updateAPIService(apiService) if s.openAPIAggregationController != nil { s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService) } returnnil }
proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version // v1. is a special case for the legacy API. It proxies to a wider set of endpoints. if apiService.Name == legacyAPIServiceName { proxyPath = "/api" }
klog.Infof("Starting APIServiceRegistrationController") defer klog.Infof("Shutting down APIServiceRegistrationController")
if !controllers.WaitForCacheSync("APIServiceRegistrationController", stopCh, c.apiServiceSynced) { return }
/// initially sync all APIServices to make sure the proxy handler is complete if err := wait.PollImmediateUntil(time.Second, func() (bool, error) { services, err := c.apiServiceLister.List(labels.Everything()) if err != nil { utilruntime.HandleError(fmt.Errorf("failed to initially list APIServices: %v", err)) returnfalse, nil } for _, s := range services { if err := c.apiHandlerManager.AddAPIService(s); err != nil { utilruntime.HandleError(fmt.Errorf("failed to initially sync APIService %s: %v", s.Name, err)) returnfalse, nil } } returntrue, nil }, stopCh); err == wait.ErrWaitTimeout { utilruntime.HandleError(fmt.Errorf("timed out waiting for proxy handler to initialize")) return } elseif err != nil { panic(fmt.Errorf("unexpected error: %v", err)) } close(handlerSyncedCh)
// only start one worker thread since its a slow moving API and the aggregation server adding bits // aren't threadsafe go wait.Until(c.runWorker, time.Second, stopCh)
type proxyHandler struct { // localDelegate is used to satisfy local APIServices localDelegate http.Handler
// proxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use // this to confirm the proxy's identity proxyClientCert []byte proxyClientKey []byte proxyTransport *http.Transport
// Endpoints based routing to map from cluster IP to routable IP serviceResolver ServiceResolver
handlingInfo atomic.Value
// egressSelector selects the proper egress dialer to communicate with the custom apiserver // overwrites proxyTransport dialer if not nil egressSelector *egressselector.EgressSelector }
type proxyHandlingInfo struct { // local indicates that this APIService is locally satisfied local bool
// name is the name of the APIService name string // restConfig holds the information for building a roundtripper restConfig *restclient.Config // transportBuildingError is an error produced while building the transport. If this // is non-nil, it will be reported to clients. transportBuildingError error // proxyRoundTripper is the re-useable portion of the transport. It does not vary with any request. proxyRoundTripper http.RoundTripper // serviceName is the name of the service this handler proxies to serviceName string // namespace is the namespace the service lives in serviceNamespace string // serviceAvailable indicates this APIService is available or not serviceAvailable bool // servicePort is the port of the service this handler proxies to servicePort int32 }
func(s *GenericAPIServer) UnprotectedHandler() http.Handler { // when we delegate, we need the server we're delegating to choose whether or not to use gorestful return s.Handler.Director }
// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req) if err != nil { proxyError(w, req, err.Error(), http.StatusInternalServerError) return } proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
// if we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does // NOT use the roundtripper. Its a direct call that bypasses the round tripper. This means that we have to // attach the "correct" user headers to the request ahead of time. After the initial upgrade, we'll be back // at the roundtripper flow, so we only have to muck with this request, but we do have to do it. if upgrade { transport.SetAuthProxyHeaders(newReq, user.GetName(), user.GetGroups(), user.GetExtra()) }
// check to see if our webservices want to claim this path for _, ws := range d.goRestfulContainer.RegisteredWebServices() { switch { case ws.RootPath() == "/apis": // if we are exactly /apis or /apis/, then we need special handling in loop. // normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly. // We can't rely on a prefix match since /apis matches everything (see the big comment on Director above) if path == "/apis" || path == "/apis/" { klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath()) // don't use servemux here because gorestful servemuxes get messed up when removing webservices // TODO fix gorestful, remove TPRs, or stop using gorestful d.goRestfulContainer.Dispatch(w, req) return }
case strings.HasPrefix(path, ws.RootPath()): // ensure an exact match or a path boundary match iflen(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' { klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath()) // don't use servemux here because gorestful servemuxes get messed up when removing webservices // TODO fix gorestful, remove TPRs, or stop using gorestful d.goRestfulContainer.Dispatch(w, req) return } } }
// if we didn't find a match, then we just skip gorestful altogether klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path) d.nonGoRestfulMux.ServeHTTP(w, req) }
func(c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) { // first add poststarthooks from delegated targets for k, v := range delegationTarget.PostStartHooks() { s.postStartHooks[k] = v }
for k, v := range delegationTarget.PreShutdownHooks() { s.preShutdownHooks[k] = v } }