<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • k8s client-go源碼分析 informer源碼分析(2)-初始化與啟動分析

    k8s client-go源碼分析 informer源碼分析(2)-初始化與啟動分析

    前面一篇文章對k8s informer做了概要分析,本篇文章將對informer的初始化與啟動進行分析。

    informer架構

    先來回憶一下informer的架構。

    k8s client-go informer主要包括以下部件:
    (1)Reflector:Reflector從kube-apiserver中list&watch資源對象,然后調用DeltaFIFO的Add/Update/Delete/Replace方法將資源對象及其變化包裝成Delta并將其丟到DeltaFIFO中;
    (2)DeltaFIFO:DeltaFIFO中存儲著一個map和一個queue,即map[object key]Deltas以及object key的queue,Deltas為Delta的切片類型,Delta裝有對象及對象的變化類型(Added/Updated/Deleted/Sync) ,Reflector負責DeltaFIFO的輸入,Controller負責處理DeltaFIFO的輸出;
    (3)Controller:Controller從DeltaFIFO的queue中pop一個object key出來,并獲取其關聯的 Deltas出來進行處理,遍歷Deltas,根據對象的變化更新Indexer中的本地內存緩存,并通知Processor,相關對象有變化事件發生;
    (4)Processor:Processor根據對象的變化事件類型,調用相應的ResourceEventHandler來處理對象的變化;
    (5)Indexer:Indexer中有informer維護的指定資源對象的相對于etcd數據的一份本地內存緩存,可通過該緩存獲取資源對象,以減少對apiserver、對etcd的請求壓力;
    (6)ResourceEventHandler:用戶根據自身處理邏輯需要,注冊自定義的的ResourceEventHandler,當對象發生變化時,將觸發調用對應類型的ResourceEventHandler來做處理。

    概述

        ...
    	factory := informers.NewSharedInformerFactory(client, 30*time.Second)
    	podInformer := factory.Core().V1().Pods()
    	informer := podInformer.Informer()
    	...
    	go factory.Start(stopper)
    	...
    	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
    		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    		return
    	}
    	...
    

    上一節有列舉了informer的使用代碼,注意看到示例代碼中的下面這段代碼,做了informer初始化與啟動,其中包括:
    (1)informers.NewSharedInformerFactory:初始化informer factory;
    (2)podInformer.Informer:初始化pod informer;
    (3)factory.Start:啟動informer factory;
    (4)cache.WaitForCacheSync:等待list操作獲取到的對象都同步到informer本地緩存Indexer中;

    下面也將根據這四部分進行informer的初始化與啟動分析。

    基于k8s v1.17.4版本依賴的client-go

    1.SharedInformerFactory的初始化

    1.1 sharedInformerFactory結構體

    先來看下sharedInformerFactory結構體,看下里面有哪些屬性。

    看到幾個比較重要的屬性:
    (1)client:連接k8s的clientSet;
    (2)informers:是個map,可以裝各個對象的informer;
    (3)startedInformers:記錄已經啟動的informer;

    // staging/src/k8s.io/client-go/informers/factory.go
    type sharedInformerFactory struct {
    	client           kubernetes.Interface
    	namespace        string
    	tweakListOptions internalinterfaces.TweakListOptionsFunc
    	lock             sync.Mutex
    	defaultResync    time.Duration
    	customResync     map[reflect.Type]time.Duration
    
    	informers map[reflect.Type]cache.SharedIndexInformer
    	// startedInformers is used for tracking which informers have been started.
    	// This allows Start() to be called multiple times safely.
    	startedInformers map[reflect.Type]bool
    }
    

    1.2 NewSharedInformerFactory

    NewSharedInformerFactory方法用于初始化informer factory,主要是初始化并返回sharedInformerFactory結構體。

    // staging/src/k8s.io/client-go/informers/factory.go
    func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
    	return NewSharedInformerFactoryWithOptions(client, defaultResync)
    }
    
    func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
    	return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
    }
    
    func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
    	factory := &sharedInformerFactory{
    		client:           client,
    		namespace:        v1.NamespaceAll,
    		defaultResync:    defaultResync,
    		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
    		startedInformers: make(map[reflect.Type]bool),
    		customResync:     make(map[reflect.Type]time.Duration),
    	}
    
    	// Apply all options
    	for _, opt := range options {
    		factory = opt(factory)
    	}
    
    	return factory
    }
    

    2.對象informer的初始化

    上一節有列舉了informer的使用代碼,注意看到示例代碼中的下面這段代碼,這里利用了工廠方法設計模式,podInformer.Informer()即初始化了sharedInformerFactory中的pod的informer,具體調用關系可自行看如下代碼,比較簡單,這里不再展開分析。

        // 初始化informer factory以及pod informer
    	factory := informers.NewSharedInformerFactory(client, 30*time.Second)
    	podInformer := factory.Core().V1().Pods()
    	informer := podInformer.Informer()
    

    2.1 podInformer.Informer

    Informer方法中調用了f.factory.InformerFor方法來做pod informer的初始化。

    // k8s.io/client-go/informers/core/v1/pod.go
    func (f *podInformer) Informer() cache.SharedIndexInformer {
    	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
    }
    

    2.2 f.factory.InformerFor

    Informer方法中調用了f.factory.InformerFor方法來做pod informer的初始化,并傳入f.defaultInformer作為newFunc,而在f.factory.InformerFor方法中,調用newFunc來初始化informer。

    這里也可以看到,其實informer初始化后會存儲進map f.informers[informerType]中,即存儲進sharedInformerFactory結構體的informers屬性中,方便共享使用。

    // staging/src/k8s.io/client-go/informers/factory.go
    func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    
    	informerType := reflect.TypeOf(obj)
    	informer, exists := f.informers[informerType]
    	if exists {
    		return informer
    	}
    
    	resyncPeriod, exists := f.customResync[informerType]
    	if !exists {
    		resyncPeriod = f.defaultResync
    	}
    
    	informer = newFunc(f.client, resyncPeriod)
    	f.informers[informerType] = informer
    
    	return informer
    }
    

    2.3 newFunc/f.defaultInformer

    defaultInformer方法中,調用了NewFilteredPodInformer方法來初始化pod informer,最終初始化并返回sharedIndexInformer結構體。

    // k8s.io/client-go/informers/core/v1/pod.go
    func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
    }
    
    func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    	return cache.NewSharedIndexInformer(
    		&cache.ListWatch{
    			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    				if tweakListOptions != nil {
    					tweakListOptions(&options)
    				}
    				return client.CoreV1().Pods(namespace).List(options)
    			},
    			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    				if tweakListOptions != nil {
    					tweakListOptions(&options)
    				}
    				return client.CoreV1().Pods(namespace).Watch(options)
    			},
    		},
    		&corev1.Pod{},
    		resyncPeriod,
    		indexers,
    	)
    }
    
    func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    	realClock := &clock.RealClock{}
    	sharedIndexInformer := &sharedIndexInformer{
    		processor:                       &sharedProcessor{clock: realClock},
    		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
    		listerWatcher:                   lw,
    		objectType:                      objType,
    		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
    		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
    		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
    		clock: realClock,
    	}
    	return sharedIndexInformer
    }
    

    2.4 sharedIndexInformer結構體

    sharedIndexInformer結構體中重點看到以下幾個屬性:
    (1)indexer:對應著informer中的部件Indexer,Indexer中有informer維護的指定資源對象的相對于etcd數據的一份本地內存緩存,可通過該緩存獲取資源對象,以減少對apiserver、對etcd的請求壓力;
    (2)controller:對應著informer中的部件Controller,Controller從DeltaFIFO中pop Deltas出來處理,根據對象的變化更新Indexer中的本地內存緩存,并通知Processor,相關對象有變化事件發生;
    (3)processor:對應著informer中的部件Processor,Processor根據對象的變化事件類型,調用相應的ResourceEventHandler來處理對象的變化;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    type sharedIndexInformer struct {
    	indexer    Indexer
    	controller Controller
    
    	processor             *sharedProcessor
    	cacheMutationDetector CacheMutationDetector
    
    	// This block is tracked to handle late initialization of the controller
    	listerWatcher ListerWatcher
    	objectType    runtime.Object
    
    	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    	// shouldResync to check if any of our listeners need a resync.
    	resyncCheckPeriod time.Duration
    	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    	// value).
    	defaultEventHandlerResyncPeriod time.Duration
    	// clock allows for testability
    	clock clock.Clock
    
    	started, stopped bool
    	startedLock      sync.Mutex
    
    	// blockDeltas gives a way to stop all event distribution so that a late event handler
    	// can safely join the shared informer.
    	blockDeltas sync.Mutex
    }
    
    Indexer接口與cache結構體

    cache結構體為Indexer接口的實現;

    // staging/src/k8s.io/client-go/tools/cache/store.go
    type cache struct {
    	cacheStorage ThreadSafeStore
    	keyFunc KeyFunc
    }
    

    threadSafeMap struct是ThreadSafeStore接口的一個實現,其最重要的一個屬性便是items了,items是用map構建的鍵值對,資源對象都存在items這個map中,key根據資源對象來算出,value為資源對象本身,這里的items即為informer的本地緩存了,而indexers與indices屬性則與索引功能有關。

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    type threadSafeMap struct {
    	lock  sync.RWMutex
    	items map[string]interface{}
    
    	// indexers maps a name to an IndexFunc
    	indexers Indexers
    	// indices maps a name to an Index
    	indices Indices
    }
    

    關于Indexer的詳細分析會在后續有專門的文章做分析,這里不展開分析;

    controller結構體

    而controller結構體則包含了informer中的主要部件Reflector以及DeltaFIFO;
    (1)Reflector:Reflector從kube-apiserver中list&watch資源對象,然后將對象的變化包裝成Delta并將其丟到DeltaFIFO中;
    (2)DeltaFIFO:DeltaFIFO存儲著map[object key]Deltas以及object key的queue,Delta裝有對象及對象的變化類型 ,Reflector負責DeltaFIFO的輸入,Controller負責處理DeltaFIFO的輸出;

    // staging/src/k8s.io/client-go/tools/cache/controller.go
    type controller struct {
    	config         Config
    	reflector      *Reflector
    	reflectorMutex sync.RWMutex
    	clock          clock.Clock
    }
    
    type Config struct {
    	// The queue for your objects; either a FIFO or
    	// a DeltaFIFO. Your Process() function should accept
    	// the output of this Queue's Pop() method.
    	Queue
    	...
    }
    

    3.啟動sharedInformerFactory

    sharedInformerFactory.Start為informer factory的啟動方法,其主要邏輯為循環遍歷informers,然后跑goroutine調用informer.Run來啟動sharedInformerFactory中存儲的各個informer。

    // staging/src/k8s.io/client-go/informers/factory.go
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    
    	for informerType, informer := range f.informers {
    		if !f.startedInformers[informerType] {
    			go informer.Run(stopCh)
    			f.startedInformers[informerType] = true
    		}
    	}
    }
    

    sharedIndexInformer.Run

    sharedIndexInformer.Run用于啟動informer,主要邏輯為:
    (1)調用NewDeltaFIFO,初始化DeltaFIFO;
    (2)構建Config結構體,這里留意下Process屬性,賦值了s.HandleDeltas,后面會分析到該方法;
    (3)調用New,利用Config結構體來初始化controller;
    (4)調用s.processor.run,啟動processor;
    (5)調用s.controller.Run,啟動controller;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
        
        // 初始化DeltaFIFO
    	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
        
        // 構建Config結構體
    	cfg := &Config{
    		Queue:            fifo,
    		ListerWatcher:    s.listerWatcher,
    		ObjectType:       s.objectType,
    		FullResyncPeriod: s.resyncCheckPeriod,
    		RetryOnError:     false,
    		ShouldResync:     s.processor.shouldResync,
    
    		Process: s.HandleDeltas,
    	}
    
    	func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
            // 初始化controller
    		s.controller = New(cfg)
    		s.controller.(*controller).clock = s.clock
    		s.started = true
    	}()
    
    	// Separate stop channel because Processor should be stopped strictly after controller
    	processorStopCh := make(chan struct{})
    	var wg wait.Group
    	defer wg.Wait()              // Wait for Processor to stop
    	defer close(processorStopCh) // Tell Processor to stop
    	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    	// 啟動processor
    	wg.StartWithChannel(processorStopCh, s.processor.run)
    
    	defer func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    		s.stopped = true // Don't want any new listeners
    	}()
    	// 啟動controller
    	s.controller.Run(stopCh)
    }
    

    3.1 New

    New函數初始化了controller并return。

    // staging/src/k8s.io/client-go/tools/cache/controller.go
    func New(c *Config) Controller {
    	ctlr := &controller{
    		config: *c,
    		clock:  &clock.RealClock{},
    	}
    	return ctlr
    }
    

    3.2 s.processor.run

    s.processor.run啟動了processor,其中注意到listener.run與listener.pop兩個核心方法即可,暫時沒有用到,等下面用到他們的時候再做分析。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    	func() {
    		p.listenersLock.RLock()
    		defer p.listenersLock.RUnlock()
    		for _, listener := range p.listeners {
    			p.wg.Start(listener.run)
    			p.wg.Start(listener.pop)
    		}
    		p.listenersStarted = true
    	}()
    	<-stopCh
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    	for _, listener := range p.listeners {
    		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    	}
    	p.wg.Wait() // Wait for all .pop() and .run() to stop
    }
    

    3.3 controller.Run

    controller.Run為controller的啟動方法,這里主要看到幾個點:
    (1)調用NewReflector,初始化Reflector;
    (2)調用r.Run,實際上是調用了Reflector的啟動方法來啟動Reflector;
    (3)調用c.processLoop,開始controller的核心處理;

    // k8s.io/client-go/tools/cache/controller.go
    func (c *controller) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	go func() {
    		<-stopCh
    		c.config.Queue.Close()
    	}()
    	r := NewReflector(
    		c.config.ListerWatcher,
    		c.config.ObjectType,
    		c.config.Queue,
    		c.config.FullResyncPeriod,
    	)
    	r.ShouldResync = c.config.ShouldResync
    	r.clock = c.clock
    
    	c.reflectorMutex.Lock()
    	c.reflector = r
    	c.reflectorMutex.Unlock()
    
    	var wg wait.Group
    	defer wg.Wait()
    
    	wg.StartWithChannel(stopCh, r.Run)
    
    	wait.Until(c.processLoop, time.Second, stopCh)
    }
    
    3.3.1 Reflector結構體

    先來看到Reflector結構體,這里重點看到以下屬性:
    (1)expectedType:放到Store中(即DeltaFIFO中)的對象類型;
    (2)store:store會賦值為DeltaFIFO,具體可以看之前的informer初始化與啟動分析即可得知,這里不再展開分析;
    (3)listerWatcher:存放list方法和watch方法的ListerWatcher interface實現;

    // k8s.io/client-go/tools/cache/reflector.go
    type Reflector struct {
        ...
        expectedType reflect.Type
        store Store
        listerWatcher ListerWatcher
        ...
    }
    
    3.3.2 r.Run/Reflector.Run

    Reflector.Run方法中啟動了Reflector,而Reflector的核心處理邏輯為從kube-apiserver處做list&watch操作,然后將得到的對象封裝存儲進DeltaFIFO中。

    // staging/src/k8s.io/client-go/tools/cache/reflector.go
    func (r *Reflector) Run(stopCh <-chan struct{}) {
    	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    	wait.Until(func() {
    		if err := r.ListAndWatch(stopCh); err != nil {
    			utilruntime.HandleError(err)
    		}
    	}, r.period, stopCh)
    }
    
    3.3.3 controller.processLoop

    controller的核心處理方法processLoop中,最重要的邏輯是循環調用c.config.Queue.Pop將DeltaFIFO中的隊頭元素給pop出來,然后調用c.config.Process方法來做處理,當處理出錯時,再調用c.config.Queue.AddIfNotPresent將對象重新加入到DeltaFIFO中去。

    // k8s.io/client-go/tools/cache/controller.go
    func (c *controller) processLoop() {
    	for {
    		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    		if err != nil {
    			if err == ErrFIFOClosed {
    				return
    			}
    			if c.config.RetryOnError {
    				// This is the safe way to re-enqueue.
    				c.config.Queue.AddIfNotPresent(obj)
    			}
    		}
    	}
    }
    
    3.3.4 c.config.Process/sharedIndexInformer.HandleDeltas

    根據前面sharedIndexInformer.Run方法的分析中可以得知,c.config.Process其實就是sharedIndexInformer.HandleDeltas。

    HandleDeltas方法中,將從DeltaFIFO中pop出來的對象以及類型,相應的在indexer中做添加、更新、刪除操作,并調用s.processor.distribute通知自定義的ResourceEventHandler。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    	s.blockDeltas.Lock()
    	defer s.blockDeltas.Unlock()
    
    	// from oldest to newest
    	for _, d := range obj.(Deltas) {
    		switch d.Type {
    		case Sync, Added, Updated:
    			isSync := d.Type == Sync
    			s.cacheMutationDetector.AddObject(d.Object)
    			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    				if err := s.indexer.Update(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    			} else {
    				if err := s.indexer.Add(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    			}
    		case Deleted:
    			if err := s.indexer.Delete(d.Object); err != nil {
    				return err
    			}
    			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    		}
    	}
    	return nil
    }
    

    怎么通知到自定義的ResourceEventHandler呢?繼續往下看。

    3.3.5 sharedIndexInformer.processor.distribute

    可以看到distribute方法最終是將構造好的addNotification、updateNotification、deleteNotification對象寫入到p.addCh中。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    
    	if sync {
    		for _, listener := range p.syncingListeners {
    			listener.add(obj)
    		}
    	} else {
    		for _, listener := range p.listeners {
    			listener.add(obj)
    		}
    	}
    }
    
    func (p *processorListener) add(notification interface{}) {
    	p.addCh <- notification
    }
    

    到這里,processor中的listener.pop以及listener.run方法終于派上了用場,繼續往下看。

    3.3.6 listener.pop

    分析processorListener的pop方法可以得知,其邏輯實際上就是將p.addCh中的對象給拿出來,然后丟進了p.nextCh中。那么誰來處理p.nextCh呢?繼續往下看。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *processorListener) pop() {
    	defer utilruntime.HandleCrash()
    	defer close(p.nextCh) // Tell .run() to stop
    
    	var nextCh chan<- interface{}
    	var notification interface{}
    	for {
    		select {
    		case nextCh <- notification:
    			// Notification dispatched
    			var ok bool
    			notification, ok = p.pendingNotifications.ReadOne()
    			if !ok { // Nothing to pop
    				nextCh = nil // Disable this select case
    			}
    		case notificationToAdd, ok := <-p.addCh:
    			if !ok {
    				return
    			}
    			if notification == nil { // No notification to pop (and pendingNotifications is empty)
    				// Optimize the case - skip adding to pendingNotifications
    				notification = notificationToAdd
    				nextCh = p.nextCh
    			} else { // There is already a notification waiting to be dispatched
    				p.pendingNotifications.WriteOne(notificationToAdd)
    			}
    		}
    	}
    }
    
    3.3.7 listener.run

    在processorListener的run方法中,將循環讀取p.nextCh,判斷對象類型,是updateNotification則調用p.handler.OnUpdate方法,是addNotification則調用p.handler.OnAdd方法,是deleteNotification則調用p.handler.OnDelete方法做處理。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *processorListener) run() {
    	// this call blocks until the channel is closed.  When a panic happens during the notification
    	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    	// the next notification will be attempted.  This is usually better than the alternative of never
    	// delivering again.
    	stopCh := make(chan struct{})
    	wait.Until(func() {
    		// this gives us a few quick retries before a long pause and then a few more quick retries
    		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
    			for next := range p.nextCh {
    				switch notification := next.(type) {
    				case updateNotification:
    					p.handler.OnUpdate(notification.oldObj, notification.newObj)
    				case addNotification:
    					p.handler.OnAdd(notification.newObj)
    				case deleteNotification:
    					p.handler.OnDelete(notification.oldObj)
    				default:
    					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
    				}
    			}
    			// the only way to get here is if the p.nextCh is empty and closed
    			return true, nil
    		})
    
    		// the only way to get here is if the p.nextCh is empty and closed
    		if err == nil {
    			close(stopCh)
    		}
    	}, 1*time.Minute, stopCh)
    }
    
    

    而p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete方法實際上就是自定義的的ResourceEventHandlerFuncs了。

    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: onUpdate,
        DeleteFunc: onDelete,
      })
    
    // staging/src/k8s.io/client-go/tools/cache/controller.go
    type ResourceEventHandlerFuncs struct {
    	AddFunc    func(obj interface{})
    	UpdateFunc func(oldObj, newObj interface{})
    	DeleteFunc func(obj interface{})
    }
    
    func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
    	if r.AddFunc != nil {
    		r.AddFunc(obj)
    	}
    }
    
    func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
    	if r.UpdateFunc != nil {
    		r.UpdateFunc(oldObj, newObj)
    	}
    }
    
    func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
    	if r.DeleteFunc != nil {
    		r.DeleteFunc(obj)
    	}
    }
    

    4.cache.WaitForCacheSync(stopper, informer.HasSynced)

    可以看出在cache.WaitForCacheSync方法中,實際上是調用方法入參cacheSyncs ...InformerSynced來判斷cache是否同步完成(即調用informer.HasSynced方法),而這里說的cache同步完成,意思是等待informer從kube-apiserver同步資源完成,即informer的list操作獲取的對象都存入到informer中的indexer本地緩存中;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    	err := wait.PollImmediateUntil(syncedPollPeriod,
    		func() (bool, error) {
    			for _, syncFunc := range cacheSyncs {
    				if !syncFunc() {
    					return false, nil
    				}
    			}
    			return true, nil
    		},
    		stopCh)
    	if err != nil {
    		klog.V(2).Infof("stop requested")
    		return false
    	}
    
    	klog.V(4).Infof("caches populated")
    	return true
    }
    

    4.1 informer.HasSynced

    HasSynced方法實際上是調用了sharedIndexInformer.controller.HasSynced方法;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (s *sharedIndexInformer) HasSynced() bool {
    	s.startedLock.Lock()
    	defer s.startedLock.Unlock()
    
    	if s.controller == nil {
    		return false
    	}
    	return s.controller.HasSynced()
    }
    
    s.controller.HasSynced

    這里的c.config.Queue.HasSynced()方法,實際上是指DeltaFIFO的HasSynced方法,會在DeltaFIFO的分析中再詳細分析,這里只需要知道當informer的list操作獲取的對象都存入到informer中的indexer本地緩存中則返回true即可;

    // staging/src/k8s.io/client-go/tools/cache/controller.go
    func (c *controller) HasSynced() bool {
    	return c.config.Queue.HasSynced()
    }
    

    4.2 sharedInformerFactory.WaitForCacheSync

    可以順帶看下sharedInformerFactory.WaitForCacheSync方法,其實際上是遍歷factory中的所有informer,調用cache.WaitForCacheSync,然后傳入每個informer的HasSynced方法作為入參;

    // staging/src/k8s.io/client-go/informers/factory.go
    func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    	informers := func() map[reflect.Type]cache.SharedIndexInformer {
    		f.lock.Lock()
    		defer f.lock.Unlock()
    
    		informers := map[reflect.Type]cache.SharedIndexInformer{}
    		for informerType, informer := range f.informers {
    			if f.startedInformers[informerType] {
    				informers[informerType] = informer
    			}
    		}
    		return informers
    	}()
    
    	res := map[reflect.Type]bool{}
    	for informType, informer := range informers {
    		res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    	}
    	return res
    }
    

    至此,整個informer的初始化與啟動的分析就結束了,后面會對informer中的各個核心部件進行詳細分析,敬請期待。

    總結

    下面用兩張圖片總結一下informer的初始化與啟動;

    informer初始化

    informer啟動

    posted @ 2022-05-08 09:48  良凱爾  閱讀(201)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看