type Runnable interface { // Start starts running the component. The component will stop running // when the context is closed. Start blocks until the context is closed or // an error occurs. Start(context.Context) error }
// Add sets dependencies on i, and adds it to the list of Runnables to start. func(cm *controllerManager) Add(r Runnable) error { cm.Lock() defer cm.Unlock() return cm.add(r) }
func(cm *controllerManager) add(r Runnable) error { // Set dependencies on the object if err := cm.SetFields(r); err != nil { return err } return cm.runnables.Add(r) }
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } }
// Start and wait for caches. if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } }
// Start the non-leaderelection Runnables after the cache has synced. if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } }
func(cm *controllerManager) add(r Runnable) error { // Set dependencies on the object if err := cm.SetFields(r); err != nil { return err } return cm.runnables.Add(r) }
// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. func(cm *controllerManager) SetFields(i interface{}) error { if err := cm.cluster.SetFields(i); err != nil { return err } ...... returnnil }
type Controller struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. Name string
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. MaxConcurrentReconciles int
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and // ensures that the state of the system matches the state specified in the object. // Defaults to the DefaultReconcileFunc. Do reconcile.Reconciler
// MakeQueue constructs the queue for this controller once the controller is ready to start. // This exists because the standard Kubernetes workqueues start themselves immediately, which // leads to goroutine leaks if something calls controller.New repeatedly. MakeQueue func() workqueue.RateLimitingInterface
// Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing Queue workqueue.RateLimitingInterface
// SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates // Deprecated: the caller should handle injected fields itself. SetFields func(i interface{})error
......
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []watchDescription ...... }
// watchDescription contains all the information necessary to start a watch. type watchDescription struct { src source.Source handler handler.EventHandler predicates []predicate.Predicate }
type Source interface { // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error }
type Kind struct { // Type is the type of object to watch. e.g. &v1.Pod{} Type client.Object
// cache used to watch APIs cache cache.Cache
// started may contain an error if one was encountered during startup. If its closed and does not // contain an error, startup and syncing finished. started chanerror startCancel func() }
func(ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { ...... // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) ks.started = make(chanerror) gofunc() { var ( i cache.Informer lastErr error )
// Tries to get an informer until it returns true, // an error or the specified context is cancelled or expired. if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) { // Lookup the Informer from the Cache and add an EventHandler which populates the Queue i, lastErr = ks.cache.GetInformer(ctx, ks.Type) ...... returntrue, nil }); ...... _, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) ...... if !ks.cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here ks.started <- errors.New("cache did not sync") } close(ks.started) }()
func(e EventHandler) OnAdd(obj interface{}) { c := event.CreateEvent{}
// Pull Object out of the object if o, ok := obj.(client.Object); ok { c.Object = o } else { log.Error(nil, "OnAdd missing Object", "object", obj, "type", fmt.Sprintf("%T", obj)) return }
for _, p := range e.Predicates { if !p.Create(c) { return } }
type EventHandler interface { // Create is called in response to an create event - e.g. Pod Creation. Create(event.CreateEvent, workqueue.RateLimitingInterface)
// Update is called in response to an update event - e.g. Pod Updated. Update(event.UpdateEvent, workqueue.RateLimitingInterface)
// Delete is called in response to a delete event - e.g. Pod Deleted. Delete(event.DeleteEvent, workqueue.RateLimitingInterface)
// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or // external trigger request - e.g. reconcile Autoscaling, or a Webhook. Generic(event.GenericEvent, workqueue.RateLimitingInterface) }
// Inject Cache into arguments if err := c.SetFields(src); err != nil { return err } if err := c.SetFields(evthdler); err != nil { return err } for _, pr := range prct { if err := c.SetFields(pr); err != nil { return err } }
// Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). if !c.Started { c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) returnnil }
func(c *Controller) Start(ctx context.Context) error { ...... c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles) wg.Add(c.MaxConcurrentReconciles) for i := 0; i < c.MaxConcurrentReconciles; i++ { gofunc() { defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. for c.processNextWorkItem(ctx) { } }() }
c.Started = true ...... }
func(c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { // Stop working returnfalse }
// We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. defer c.Queue.Done(obj)
func(c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { // Make sure that the object is a valid request. req, ok := obj.(reconcile.Request) ...... // RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the // resource to be synced. result, err := c.Reconcile(ctx, req) ...... }
type Reconciler interface { // Reconcile performs a full reconciliation for the object referred to by the Request. // The Controller will requeue the Request to be processed again if an error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. Reconcile(context.Context, Request) (Result, error) }