<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • 勤奮是步入勝利之門的通行證

    深入解析kubernetes controller-runtime

    Overview

    controller-runtime 是 Kubernetes 社區提供可供快速搭建一套 實現了controller 功能的工具,無需自行實現Controller的功能了;在 KubebuilderOperator SDK 也是使用 controller-runtime 。本文將對 controller-runtime 的工作原理以及在不同場景下的使用方式進行簡要的總結和介紹。

    controller-runtime structure

    controller-runtime 主要組成是需要用戶創建的 ManagerReconciler 以及 Controller Runtime 自己啟動的 CacheController

    • Manager:是用戶在初始化時創建的,用于啟動 Controller Runtime 組件
    • Reconciler:是用戶需要提供來處理自己的業務邏輯的組件(即在通過 code-generator 生成的api-like而實現的controller中的業務處理部分)。
    • Cache:一個緩存,用來建立 InformerApiServer 的連接來監聽資源并將被監聽的對象推送到queue中。
    • Controller: 一方面向 Informer 注冊 eventHandler,另一方面從隊列中獲取數據。controller 將從隊列中獲取數據并執行用戶自定義的 Reconciler 功能。

    image

    圖:controller-runtime structure

    image

    圖:controller-runtime flowchart

    由圖可知,Controller會向 Informer 注冊一些列eventHandler;然后Cache啟動Informer(informer屬于cache包中),與ApiServer建立監聽;當Informer檢測到資源變化時,將對象加入queue,Controller 將元素取出并在用戶端執行 Reconciler。

    Controller引入

    我們從 controller-rumtime項目的 example 進行引入看下,整個架構都是如何實現的。

    可以看到 example 下的實際上實現了一個 reconciler 的結構體,實現了 Reconciler 抽象和 Client 結構體

    type reconciler struct {
    	client.Client
    	scheme *runtime.Scheme
    }
    

    那么來看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile 方法,這個是具體處理的邏輯過程

    type Reconciler interface {
    	Reconcile(context.Context, Request) (Result, error)
    }
    

    下面在看下誰來實現了這個 Reconciler 抽象

    type Controller interface {
    	reconcile.Reconciler // 協調的具體步驟,通過ns/name\
        // 通過predicates來評估來源數據,并加入queue中(放入隊列的是reconcile.Requests)
    	Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
        // 啟動controller,類似于自定義的Run()
    	Start(ctx context.Context) error
    	GetLogger() logr.Logger
    }
    

    controller structure

    controller-runtime\pkg\internal\controller\controller.go 中實現了這個 Controller

    type Controller struct {
    	Name string // controller的標識
        
    	MaxConcurrentReconciles int // 并發運行Reconciler的數量,默認1
    	// 實現了reconcile.Reconciler的調節器, 默認DefaultReconcileFunc
    	Do reconcile.Reconciler
    	// makeQueue會構建一個對應的隊列,就是返回一個限速隊列
    	MakeQueue func() workqueue.RateLimitingInterface
    	// MakeQueue創造出來的,在出入隊列就是操作的這個
    	Queue workqueue.RateLimitingInterface
    
    	// 用于注入其他內容
        // 已棄用
    	SetFields func(i interface{}) error
    
    	mu sync.Mutex
    	// 標識開始的狀態
    	Started bool
    	// 在啟動時傳遞的上下文,用于停止控制器
    	ctx context.Context
    	// 等待緩存同步的時間 默認2分鐘
    	CacheSyncTimeout time.Duration
    
    	// 維護了eventHandler predicates,在控制器啟動時啟動
    	startWatches []watchDescription
    
    	// 日志構建器,輸出入日志
    	LogConstructor func(request *reconcile.Request) logr.Logger
    
    	// RecoverPanic為是否對reconcile引起的panic恢復
    	RecoverPanic bool
    }
    

    看完了controller的structure,接下來看看controller是如何使用的

    injection

    Controller.Watch 實現了注入的動作,可以看到 watch() 通過參數將 對應的事件函數傳入到內部

    func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
    	c.mu.Lock()
    	defer c.mu.Unlock()
    
    	// 使用SetFields來完成注入操作
    	if err := c.SetFields(src); err != nil {
    		return err
    	}
    	if err := c.SetFields(evthdler); err != nil {
    		return err
    	}
    	for _, pr := range prct {
    		if err := c.SetFields(pr); err != nil {
    			return err
    		}
    	}
    
    	// 如果Controller還未啟動,那么將這些動作緩存到本地
    	if !c.Started {
    		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
    		return nil
    	}
    
    	c.LogConstructor(nil).Info("Starting EventSource", "source", src)
    	return src.Start(c.ctx, evthdler, c.Queue, prct...)
    }
    

    啟動操作實際上為informer注入事件函數

    type Source interface {
    	// start 是Controller 調用,用以向 Informer 注冊 EventHandler, 將 reconcile.Requests(一個入隊列的動作) 排入隊列。
    	Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
    }
    
    func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
    	prct ...predicate.Predicate) error {
    	// Informer should have been specified by the user.
    	if is.Informer == nil {
    		return fmt.Errorf("must specify Informer.Informer")
    	}
    
    	is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
    	return nil
    }
    

    我們知道對于 eventHandler,實際上應該是一個 onAddonUpdate 這種類型的函數,queue則是workqueue,那么 Predicates 是什么呢?

    通過追蹤可以看到定義了 Predicate 抽象,可以看出Predicate 是Watch到的事件時什么類型的,當對于每個類型的事件,對應的函數就為 true,在 eventHandler 中,這些被用作,事件的過濾。

    // Predicate filters events before enqueuing the keys.
    type Predicate interface {
    	// Create returns true if the Create event should be processed
    	Create(event.CreateEvent) bool
    
    	// Delete returns true if the Delete event should be processed
    	Delete(event.DeleteEvent) bool
    
    	// Update returns true if the Update event should be processed
    	Update(event.UpdateEvent) bool
    
    	// Generic returns true if the Generic event should be processed
    	Generic(event.GenericEvent) bool
    }
    

    在對應的動作中,可以看到這里作為過濾操作

    func (e EventHandler) OnAdd(obj interface{}) {
    	c := event.CreateEvent{}
    
    	// Pull Object out of the object
    	if o, ok := obj.(client.Object); ok {
    		c.Object = o
    	} else {
    		log.Error(nil, "OnAdd missing Object",
    			"object", obj, "type", fmt.Sprintf("%T", obj))
    		return
    	}
    
    	for _, p := range e.Predicates {
    		if !p.Create(c) {
    			return
    		}
    	}
    
    	// Invoke create handler
    	e.EventHandler.Create(c, e.Queue)
    }
    

    上面就看到了,對應是 EventHandler.Create 進行添加的,那么這些動作具體是在做什么呢?

    在代碼 pkg/handler ,可以看到這些操作,類似于create,這里將ns/name放入到隊列中。

    func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
    	if evt.Object == nil {
    		enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
    		return
    	}
    	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
    		Name:      evt.Object.GetName(),
    		Namespace: evt.Object.GetNamespace(),
    	}})
    }
    

    unqueue

    上面看到了,入隊的動作實際上都是將 ns/name 加入到隊列中,那么出隊列時又做了些什么呢?

    通過 controller.Start() 可以看到controller在啟動后都做了些什么動作

    func (c *Controller) Start(ctx context.Context) error {
    	c.mu.Lock()
    	if c.Started {
    		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
    	}
    
    	c.initMetrics()
    
    	// Set the internal context.
    	c.ctx = ctx
    
    	c.Queue = c.MakeQueue() // 初始化queue
    	go func() { // 退出時,讓queue關閉
    		<-ctx.Done()
    		c.Queue.ShutDown()
    	}()
    
    	wg := &sync.WaitGroup{}
    	err := func() error {
    		defer c.mu.Unlock()
    		defer utilruntime.HandleCrash()
    
    		// 啟動informer前,將之前準備好的 evnetHandle predictates source注冊
    		for _, watch := range c.startWatches {
    			c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
    				// 上面我們看過了,start就是真正的注冊動作
    			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
    				return err
    			}
    		}
    
    		// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
    		c.LogConstructor(nil).Info("Starting Controller")
    		 // startWatches上面我們也看到了,是evnetHandle predictates source被緩存到里面,
            // 這里是拿出來將其啟動
    		for _, watch := range c.startWatches {
    			syncingSource, ok := watch.src.(source.SyncingSource)
    			if !ok {
    				continue
    			}
    
    			if err := func() error {
    				// use a context with timeout for launching sources and syncing caches.
    				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
    				defer cancel()
    
    				// WaitForSync waits for a definitive timeout, and returns if there
    				// is an error or a timeout
    				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
    					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
    					c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
    					return err
    				}
    
    				return nil
    			}(); err != nil {
    				return err
    			}
    		}
    
    		// which won't be garbage collected if we hold a reference to it.
    		c.startWatches = nil
    
    		// Launch workers to process resources
    		c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
    		wg.Add(c.MaxConcurrentReconciles)
            // 啟動controller消費端的線程
    		for i := 0; i < c.MaxConcurrentReconciles; i++ {
    			go func() {
    				defer wg.Done()
    				for c.processNextWorkItem(ctx) {
    				}
    			}()
    		}
    
    		c.Started = true
    		return nil
    	}()
    	if err != nil {
    		return err
    	}
    
    	<-ctx.Done() // 阻塞,直到上下文關閉
    	c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
    	wg.Wait() // 等待所有線程都關閉
    	c.LogConstructor(nil).Info("All workers finished")
    	return nil
    }
    

    通過上面的分析,可以看到,每個消費的worker線程,實際上調用的是 processNextWorkItem 下面就來看看他究竟做了些什么?

    func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    	obj, shutdown := c.Queue.Get() // 從隊列中拿取數據
    	if shutdown {
    		return false
    	}
    
    	defer c.Queue.Done(obj)
    	// 下面應該是prometheus指標的一些東西
    	ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
    	defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
    	// 獲得的對象通過reconcileHandler處理
    	c.reconcileHandler(ctx, obj)
    	return true
    }
    

    那么下面看看 reconcileHandler 做了些什么

    func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
    	// Update metrics after processing each item
    	reconcileStartTS := time.Now()
    	defer func() {
    		c.updateMetrics(time.Since(reconcileStartTS))
    	}()
    
    	// 檢查下取出的數據是否為reconcile.Request,在之前enqueue時了解到是插入的這個類型的值
    	req, ok := obj.(reconcile.Request)
    	if !ok {
    		// 如果錯了就忘記
    		c.Queue.Forget(obj)
    		c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
    		return
    	}
    
    	log := c.LogConstructor(&req)
    
    	log = log.WithValues("reconcileID", uuid.NewUUID())
    	ctx = logf.IntoContext(ctx, log)
    
    	// 這里調用了自己在實現controller實現的Reconcile的動作
    	result, err := c.Reconcile(ctx, req)
    	switch {
    	case err != nil:
    		c.Queue.AddRateLimited(req)
    		ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
    		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
    		log.Error(err, "Reconciler error")
    	case result.RequeueAfter > 0:
    		c.Queue.Forget(obj)
    		c.Queue.AddAfter(req, result.RequeueAfter)
    		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
    	case result.Requeue:
    		c.Queue.AddRateLimited(req)
    		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
    	default:
    		c.Queue.Forget(obj)
    		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
    	}
    }
    

    通過對example中的 Reconcile 查找其使用,可以看到,調用他的就是上面我們說道的 reconcileHandler ,到這里我們就知道了,controller 的運行流為 Controller.Start() > Controller.processNextWorkItem > Controller.reconcileHandler > Controller.Reconcile 最終到達了我們自定義的業務邏輯處理 Reconcile

    image

    Manager

    在上面學習 controller-runtime 時了解到,有一個 Manager 的組件,這個組件是做什么呢?我們來分析下。

    Manager 是用來創建與啟動 controller 的(允許多個 controller 與 一個 manager 關聯),Manager會啟動分配給他的所有controller,以及其他可啟動的對象。

    example 看到,會初始化一個 ctrl.NewManager

    func main() {
       ctrl.SetLogger(zap.New())
    
       mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
       if err != nil {
          setupLog.Error(err, "unable to start manager")
          os.Exit(1)
       }
    
       // in a real controller, we'd create a new scheme for this
       err = api.AddToScheme(mgr.GetScheme())
       if err != nil {
          setupLog.Error(err, "unable to add scheme")
          os.Exit(1)
       }
    
       err = ctrl.NewControllerManagedBy(mgr).
          For(&api.ChaosPod{}).
          Owns(&corev1.Pod{}).
          Complete(&reconciler{
             Client: mgr.GetClient(),
             scheme: mgr.GetScheme(),
          })
       if err != nil {
          setupLog.Error(err, "unable to create controller")
          os.Exit(1)
       }
    
       err = ctrl.NewWebhookManagedBy(mgr).
          For(&api.ChaosPod{}).
          Complete()
       if err != nil {
          setupLog.Error(err, "unable to create webhook")
          os.Exit(1)
       }
    
       setupLog.Info("starting manager")
       if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
          setupLog.Error(err, "problem running manager")
          os.Exit(1)
       }
    }
    

    這個 manager 就是 controller-runtime\pkg\manager\manager.go 下的 Manager, Manager 通過初始化 Caches 和 Clients 等共享依賴,并將它們提供給 Runnables。

    type Manager interface {
    	// 提供了與APIServer交互的方式,如incluster,indexer,cache等
    	cluster.Cluster
    
        // Runnable 是任意可允許的cm中的組件,如 webhook,controller,Caches,在new中調用時,
        // 可以看到是傳入的是一個controller,這里可以啟動的是帶有Start()方法的,通過調用Start()
        // 來啟動組件
        Add(Runnable) error
        
        // 實現選舉方法。當elected關閉,則選舉為leader
    	Elected() <-chan struct{}
    
    	// 這為一些列健康檢查和指標的方法,和我們關注的沒有太大關系
    	AddMetricsExtraHandler(path string, handler http.Handler) error
    	AddHealthzCheck(name string, check healthz.Checker) error
    	AddReadyzCheck(name string, check healthz.Checker) error
    
    	// Start將啟動所有注冊進來的控制器,直到ctx取消。如果有任意controller報錯,則立即退出
        // 如果使用了 LeaderElection,則必須在此返回后立即退出二進制文件,
    	Start(ctx context.Context) error
    
    	// GetWebhookServer returns a webhook.Server
    	GetWebhookServer() *webhook.Server
    
    	// GetLogger returns this manager's logger.
    	GetLogger() logr.Logger
    
    	// GetControllerOptions returns controller global configuration options.
    	GetControllerOptions() v1alpha1.ControllerConfigurationSpec
    }
    

    controller-manager

    controllerManager 則實現了這個manager的抽象

    type controllerManager struct {
    	sync.Mutex
    	started bool
    
    	stopProcedureEngaged *int64
    	errChan              chan error
    	runnables            *runnables
    	
    	cluster cluster.Cluster
    
    	// recorderProvider 用于記錄eventhandler source predictate
    	recorderProvider *intrec.Provider
    
    	// resourceLock forms the basis for leader election
    	resourceLock resourcelock.Interface
    
    	// 在退出時是否關閉選舉租約
    	leaderElectionReleaseOnCancel bool
    	// 一些指標性的,暫時不需要關注
    	metricsListener net.Listener
    	metricsExtraHandlers map[string]http.Handler
    	healthProbeListener net.Listener
    	readinessEndpointName string
    	livenessEndpointName string
    	readyzHandler *healthz.Handler
    	healthzHandler *healthz.Handler
    
    	// 有關controller全局參數
    	controllerOptions v1alpha1.ControllerConfigurationSpec
    
    	logger logr.Logger
    
    	// 用于關閉 LeaderElection.Run(...) 的信號
    	leaderElectionStopped chan struct{}
    
        // 取消選舉,在失去選舉后,必須延遲到gracefulShutdown之后os.exit()
    	leaderElectionCancel context.CancelFunc
    
    	// leader取消選舉
    	elected chan struct{}
    
    	port int
    	host string
    	certDir string
    	webhookServer *webhook.Server
    	webhookServerOnce sync.Once
    	// 非leader節點強制leader的等待時間
    	leaseDuration time.Duration
    	// renewDeadline is the duration that the acting controlplane will retry
    	// refreshing leadership before giving up.
    	renewDeadline time.Duration
    	// LeaderElector重新操作的時間
    	retryPeriod time.Duration
    	// gracefulShutdownTimeout 是在manager停止之前讓runnables停止的持續時間。
    	gracefulShutdownTimeout time.Duration
    
    	// onStoppedLeading is callled when the leader election lease is lost.
    	// It can be overridden for tests.
    	onStoppedLeading func()
    
    	shutdownCtx context.Context
    	internalCtx    context.Context
    	internalCancel context.CancelFunc
    	internalProceduresStop chan struct{}
    }
    

    workflow

    了解完ControllerManager之后,我們通過 example 來看看 ControllerManager 的workflow

    func main() {
       ctrl.SetLogger(zap.New())
       // New一個manager
       mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
       if err != nil {
          setupLog.Error(err, "unable to start manager")
          os.Exit(1)
       }
    
       // in a real controller, we'd create a new scheme for this
       err = api.AddToScheme(mgr.GetScheme())
       if err != nil {
          setupLog.Error(err, "unable to add scheme")
          os.Exit(1)
       }
    
       err = ctrl.NewControllerManagedBy(mgr).
          For(&api.ChaosPod{}).
          Owns(&corev1.Pod{}).
          Complete(&reconciler{
             Client: mgr.GetClient(),
             scheme: mgr.GetScheme(),
          })
       if err != nil {
          setupLog.Error(err, "unable to create controller")
          os.Exit(1)
       }
    
       err = ctrl.NewWebhookManagedBy(mgr).
          For(&api.ChaosPod{}).
          Complete()
       if err != nil {
          setupLog.Error(err, "unable to create webhook")
          os.Exit(1)
       }
    
       setupLog.Info("starting manager")
       if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
          setupLog.Error(err, "problem running manager")
          os.Exit(1)
       }
    }
    
    • 通過 manager.New() 初始化一個manager,這里面會初始化一些列的manager的參數
    • 通過 ctrl.NewControllerManagedBy 注冊 controller 到manager中
      • ctrl.NewControllerManagedBy 是 builder的一個別名,構建出一個builder類型的controller
      • builder 中的 ctrl 就是 controller
    • 啟動manager

    builder

    下面看來看下builder在構建時做了什么

    // Builder builds a Controller.
    type Builder struct {
    	forInput         ForInput
    	ownsInput        []OwnsInput
    	watchesInput     []WatchesInput
    	mgr              manager.Manager
    	globalPredicates []predicate.Predicate
    	ctrl             controller.Controller
    	ctrlOptions      controller.Options
    	name             string
    }
    

    我們看到 example 中是調用了 For() 動作,那么這個 For() 是什么呢?

    通過注釋,我們可以看到 For() 提供了 調解對象類型,ControllerManagedBy 通過 reconciling object 來相應對應create/delete/update 事件。調用 For() 相當于調用了 Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})

    func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
    	if blder.forInput.object != nil {
    		blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
    		return blder
    	}
    	input := ForInput{object: object}
    	for _, opt := range opts {
    		opt.ApplyToFor(&input) //最終把我們要監聽的對象每個 opts注冊進去
    	}
    
    	blder.forInput = input
    	return blder
    }
    

    接下來是調用的 Owns()Owns() 看起來和 For() 功能是類似的。只是說屬于不同,是通過Owns方法設置的

    func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
    	input := OwnsInput{object: object}
    	for _, opt := range opts {
    		opt.ApplyToOwns(&input)
    	}
    
    	blder.ownsInput = append(blder.ownsInput, input)
    	return blder
    }
    

    最后到了 Complete(),Complete 是完成這個controller的構建

    // Complete builds the Application Controller.
    func (blder *Builder) Complete(r reconcile.Reconciler) error {
    	_, err := blder.Build(r)
    	return err
    }
    
    // Build 創建控制器并返回
    func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
    	if r == nil {
    		return nil, fmt.Errorf("must provide a non-nil Reconciler")
    	}
    	if blder.mgr == nil {
    		return nil, fmt.Errorf("must provide a non-nil Manager")
    	}
    	if blder.forInput.err != nil {
    		return nil, blder.forInput.err
    	}
    	// Checking the reconcile type exist or not
    	if blder.forInput.object == nil {
    		return nil, fmt.Errorf("must provide an object for reconciliation")
    	}
    
    	// Set the ControllerManagedBy
    	if err := blder.doController(r); err != nil {
    		return nil, err
    	}
    
    	// Set the Watch
    	if err := blder.doWatch(); err != nil {
    		return nil, err
    	}
    
    	return blder.ctrl, nil
    }
    

    這里面可以看到,會完成 doController 和 doWatch

    doController會初始化好這個controller并返回

    func (blder *Builder) doController(r reconcile.Reconciler) error {
    	globalOpts := blder.mgr.GetControllerOptions()
    
    	ctrlOptions := blder.ctrlOptions
    	if ctrlOptions.Reconciler == nil {
    		ctrlOptions.Reconciler = r
    	}
    
    	// 通過檢索GVK獲得默認的名稱
    	gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
    	if err != nil {
    		return err
    	}
    
    	// 設置并發,如果最大并發為0則找到一個
        // 追蹤下去看似是對于沒有設置時,例如會根據 app group中的 ReplicaSet設定
        // 就是在For()傳遞的一個類型的數量來確定并發的數量
    	if ctrlOptions.MaxConcurrentReconciles == 0 {
    		groupKind := gvk.GroupKind().String()
    
    		if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
    			ctrlOptions.MaxConcurrentReconciles = concurrency
    		}
    	}
    
    	// Setup cache sync timeout.
    	if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
    		ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
    	}
    	// 給controller一個name,如果沒有初始化傳遞,則使用Kind做名稱
    	controllerName := blder.getControllerName(gvk)
    
    	// Setup the logger.
    	if ctrlOptions.LogConstructor == nil {
    		log := blder.mgr.GetLogger().WithValues(
    			"controller", controllerName,
    			"controllerGroup", gvk.Group,
    			"controllerKind", gvk.Kind,
    		)
    
    		lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]
    
    		ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
    			log := log
    			if req != nil {
    				log = log.WithValues(
    					lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),
    					"namespace", req.Namespace, "name", req.Name,
    				)
    			}
    			return log
    		}
    	}
    
    	// 這里就是構建一個新的控制器了,也就是前面說到的  manager.New()
    	blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
    	return err
    }
    

    manager.New()

    start Manager

    接下來是manager的啟動,也就是對應的 start()doWatch()

    通過下述代碼我們可以看出來,對于 doWatch() 就是把 compete() 前的一些資源的事件函數都注入到controller 中

    func (blder *Builder) doWatch() error {
    	// 調解類型,這也也就是對于For的obj來說,我們需要的是什么結構的,如非結構化數據或metadata-only
        // metadata-only就是配置成一個GVK schema.GroupVersionKind
    	typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
    	if err != nil {
    		return err
        }&source.Kind{}
        // 一些準備工作,將對象封裝為&source.Kind{}
        // 
    	src := &source.Kind{Type: typeForSrc}
    	hdler := &handler.EnqueueRequestForObject{} // 就是包含obj的一個事件隊列
    	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
    	// 這里又到之前說過的controller watch了
        // 將一系列的準備動作注入到cache 如 source eventHandler predicate
        if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
    		return err
    	}
    
    	// 再重復 ownsInput 動作
    	for _, own := range blder.ownsInput {
    		typeForSrc, err := blder.project(own.object, own.objectProjection)
    		if err != nil {
    			return err
    		}
    		src := &source.Kind{Type: typeForSrc}
    		hdler := &handler.EnqueueRequestForOwner{
    			OwnerType:    blder.forInput.object,
    			IsController: true,
    		}
    		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
    		allPredicates = append(allPredicates, own.predicates...)
    		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
    			return err
    		}
    	}
    
    	// 在對 ownsInput 進行重復的操作
    	for _, w := range blder.watchesInput {
    		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
    		allPredicates = append(allPredicates, w.predicates...)
    
    		// If the source of this watch is of type *source.Kind, project it.
    		if srckind, ok := w.src.(*source.Kind); ok {
    			typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
    			if err != nil {
    				return err
    			}
    			srckind.Type = typeForSrc
    		}
    
    		if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
    			return err
    		}
    	}
    	return nil
    }
    

    由于前兩部 builder 的操作將 mgr 指針傳入到 builder中,并且操作了 complete() ,也就是操作了 build() ,這代表了對 controller 完成了初始化,和事件注入(watch)的操作,所以 Start(),就是將controller啟動

    func (cm *controllerManager) Start(ctx context.Context) (err error) {
    	cm.Lock()
    	if cm.started {
    		cm.Unlock()
    		return errors.New("manager already started")
    	}
    	var ready bool
    	defer func() {
    		if !ready {
    			cm.Unlock()
    		}
    	}()
    
    	// Initialize the internal context.
    	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
    
    	// 這個channel代表了controller的停止
    	stopComplete := make(chan struct{})
    	defer close(stopComplete)
    	// This must be deferred after closing stopComplete, otherwise we deadlock.
    	defer func() {
    		stopErr := cm.engageStopProcedure(stopComplete)
    		if stopErr != nil {
    			if err != nil {
    				err = kerrors.NewAggregate([]error{err, stopErr})
    			} else {
    				err = stopErr
    			}
    		}
    	}()
    
    	// Add the cluster runnable.
    	if err := cm.add(cm.cluster); err != nil {
    		return fmt.Errorf("failed to add cluster to runnables: %w", err)
    	}
        // 指標類
    	if cm.metricsListener != nil {
    		cm.serveMetrics()
    	}
    	if cm.healthProbeListener != nil {
    		cm.serveHealthProbes()
    	}
    	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
    		if !errors.Is(err, wait.ErrWaitTimeout) {
    			return err
    		}
    	}
    
    	// 等待informer同步完成
    	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
    		if !errors.Is(err, wait.ErrWaitTimeout) {
    			return err
    		}
    	}
    
    	// 非選舉模式,runnable將在cache同步完成后啟動
    	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
    		if !errors.Is(err, wait.ErrWaitTimeout) {
    			return err
    		}
    	}
    
    	// Start the leader election and all required runnables.
    	{
    		ctx, cancel := context.WithCancel(context.Background())
    		cm.leaderElectionCancel = cancel
    		go func() {
    			if cm.resourceLock != nil {
    				if err := cm.startLeaderElection(ctx); err != nil {
    					cm.errChan <- err
    				}
    			} else {
    				// Treat not having leader election enabled the same as being elected.
    				if err := cm.startLeaderElectionRunnables(); err != nil {
    					cm.errChan <- err
    				}
    				close(cm.elected)
    			}
    		}()
    	}
    
    	ready = true
    	cm.Unlock()
    	select {
    	case <-ctx.Done():
    		// We are done
    		return nil
    	case err := <-cm.errChan:
    		// Error starting or running a runnable
    		return err
    	}
    }
    

    可以看到上面啟動了4種類型的runnable,實際上就是對這runnable進行啟動,例如 controller,cache等。

    回顧一下,我們之前在使用code-generator 生成,并自定義controller時,我們也是通過啟動 informer.Start() ,否則會報錯。

    最后可以通過一張關系圖來表示,client-go與controller-manager之間的關系

    image

    Reference

    diving controller runtime

    posted @ 2022-06-27 22:13  Cylon  閱讀(120)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看