<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • k8s client-go源碼分析 informer源碼分析(5)-Controller&Processor源碼分析

    client-go之Controller&Processor源碼分析

    1.controller與Processor概述

    Controller

    Controller從DeltaFIFO中pop Deltas出來處理,根據對象的變化更新Indexer本地緩存,并通知Processor相關對象有變化事件發生。

    Processor

    Processor根據Controller的通知,即根據對象的變化事件類型,調用相應的ResourceEventHandler來處理對象的變化。

    先通過一張informer概要架構圖看一下Controller&Processor所處位置與概要功能。

    2.Controller初始化與啟動分析

    2.1 Cotroller初始化-New

    New用于初始化Controller,方法比較簡單。

    // staging/src/k8s.io/client-go/tools/cache/controller.go
    func New(c *Config) Controller {
    	ctlr := &controller{
    		config: *c,
    		clock:  &clock.RealClock{},
    	}
    	return ctlr
    }
    

    2.2 Controller啟動-controller.Run

    controller.Run為controller的啟動方法,這里主要看到幾個點:
    (1)調用NewReflector,初始化Reflector;
    (2)調用r.Run,實際上是調用了Reflector的啟動方法來啟動Reflector(Reflector相關的分析前面的博客已經分析過了,這里不再重復);
    (3)調用c.processLoop,開始controller的核心處理;

    // staging/src/k8s.io/client-go/tools/cache/controller.go
    func (c *controller) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	go func() {
    		<-stopCh
    		c.config.Queue.Close()
    	}()
    	r := NewReflector(
    		c.config.ListerWatcher,
    		c.config.ObjectType,
    		c.config.Queue,
    		c.config.FullResyncPeriod,
    	)
    	r.ShouldResync = c.config.ShouldResync
    	r.clock = c.clock
    
    	c.reflectorMutex.Lock()
    	c.reflector = r
    	c.reflectorMutex.Unlock()
    
    	var wg wait.Group
    	defer wg.Wait()
    
    	wg.StartWithChannel(stopCh, r.Run)
    
    	wait.Until(c.processLoop, time.Second, stopCh)
    }
    

    3.controller核心處理方法分析

    controller.processLoop即為controller的核心處理方法。

    controller.processLoop

    controller的核心處理方法processLoop中,最重要的邏輯是循環調用c.config.Queue.Pop將DeltaFIFO中的隊頭元素給pop出來(實際上pop出來的是Deltas,是Delta的切片類型),然后調用c.config.Process方法來做處理,當處理出錯時,再調用c.config.Queue.AddIfNotPresent將對象重新加入到DeltaFIFO中去。

    func (c *controller) processLoop() {
    	for {
    		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    		if err != nil {
    			if err == ErrFIFOClosed {
    				return
    			}
    			if c.config.RetryOnError {
    				// This is the safe way to re-enqueue.
    				c.config.Queue.AddIfNotPresent(obj)
    			}
    		}
    	}
    }
    

    根據前面sharedIndexInformer的初始化與啟動分析(sharedIndexInformer.Run)可以得知,c.config.Process即為s.HandleDeltas方法,所以接下來看到s.HandleDeltas方法的分析。

    c.config.Process/s.HandleDeltas

    根據前面分析知道HandleDeltas要處理的是Deltas,是Delta的切片類型。

    再來看到HandleDeltas方法的主要邏輯:
    (1)循環遍歷Deltas,拿到單個Delta;
    (2)判斷Delta的類型;
    (3)如果是Added、Updated、Sync類型,則從indexer中獲取該對象,存在則調用s.indexer.Update來更新indexer中的該對象,隨后構造updateNotification struct,并調用s.processor.distribute方法;如果indexer中不存在該對象,則調用s.indexer.Add來往indexer中添加該對象,隨后構造addNotification struct,并調用s.processor.distribute方法;
    (4)如果是Deleted類型,則調用s.indexer.Delete來將indexer中的該對象刪除,隨后構造deleteNotification struct,并調用s.processor.distribute方法;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    	s.blockDeltas.Lock()
    	defer s.blockDeltas.Unlock()
    
    	// from oldest to newest
    	for _, d := range obj.(Deltas) {
    		switch d.Type {
    		case Sync, Added, Updated:
    			isSync := d.Type == Sync
    			s.cacheMutationDetector.AddObject(d.Object)
    			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    				if err := s.indexer.Update(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    			} else {
    				if err := s.indexer.Add(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    			}
    		case Deleted:
    			if err := s.indexer.Delete(d.Object); err != nil {
    				return err
    			}
    			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    		}
    	}
    	return nil
    }
    
    type updateNotification struct {
    	oldObj interface{}
    	newObj interface{}
    }
    
    type addNotification struct {
    	newObj interface{}
    }
    
    type deleteNotification struct {
    	oldObj interface{}
    }
    

    至此,Controller的分析就結束了,用一張圖來回憶一下Controller的功能與架構。

    4.processor核心處理方法分析

    sharedIndexInformer.processor.distribute

    接下來分析一下前面提到的s.processor.distribute方法。

    可以看到distribute方法最終是將構造好的addNotification、updateNotification、deleteNotification對象寫入到p.addCh中。

    sync類型的對象寫入到p.syncingListeners中,但informer中貌似沒有啟動p.syncingListeners或對p.syncingListeners做處理,所以sync類型的對象變化(也即list操作得到的對象所生成的對象變化)會被忽略?有待驗證。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    
    	if sync {
    		for _, listener := range p.syncingListeners {
    			listener.add(obj)
    		}
    	} else {
    		for _, listener := range p.listeners {
    			listener.add(obj)
    		}
    	}
    }
    
    func (p *processorListener) add(notification interface{}) {
    	p.addCh <- notification
    }
    

    sharedIndexInformer.processor.run

    s.processor.run啟動了processor,其中注意到listener.run與listener.pop兩個核心方法。

    這里可以看到processor的run方法中只啟動了p.listeners,沒有啟動p.syncingListeners。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    	func() {
    		p.listenersLock.RLock()
    		defer p.listenersLock.RUnlock()
    		for _, listener := range p.listeners {
    			p.wg.Start(listener.run)
    			p.wg.Start(listener.pop)
    		}
    		p.listenersStarted = true
    	}()
    	<-stopCh
    	p.listenersLock.RLock()
    	defer p.listenersLock.RUnlock()
    	for _, listener := range p.listeners {
    		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    	}
    	p.wg.Wait() // Wait for all .pop() and .run() to stop
    }
    

    processorListener.pop

    分析processorListener的pop方法可以得知,其邏輯實際上就是將p.addCh中的對象給拿出來,然后丟進了p.nextCh中。那么誰來處理p.nextCh呢?繼續往下看。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *processorListener) pop() {
    	defer utilruntime.HandleCrash()
    	defer close(p.nextCh) // Tell .run() to stop
    
    	var nextCh chan<- interface{}
    	var notification interface{}
    	for {
    		select {
    		case nextCh <- notification:
    			// Notification dispatched
    			var ok bool
    			notification, ok = p.pendingNotifications.ReadOne()
    			if !ok { // Nothing to pop
    				nextCh = nil // Disable this select case
    			}
    		case notificationToAdd, ok := <-p.addCh:
    			if !ok {
    				return
    			}
    			if notification == nil { // No notification to pop (and pendingNotifications is empty)
    				// Optimize the case - skip adding to pendingNotifications
    				notification = notificationToAdd
    				nextCh = p.nextCh
    			} else { // There is already a notification waiting to be dispatched
    				p.pendingNotifications.WriteOne(notificationToAdd)
    			}
    		}
    	}
    }
    

    processorListener.run

    在processorListener的run方法中,將循環讀取p.nextCh,判斷對象類型,是updateNotification則調用p.handler.OnUpdate方法,是addNotification則調用p.handler.OnAdd方法,是deleteNotification則調用p.handler.OnDelete方法做處理。

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (p *processorListener) run() {
    	// this call blocks until the channel is closed.  When a panic happens during the notification
    	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    	// the next notification will be attempted.  This is usually better than the alternative of never
    	// delivering again.
    	stopCh := make(chan struct{})
    	wait.Until(func() {
    		// this gives us a few quick retries before a long pause and then a few more quick retries
    		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
    			for next := range p.nextCh {
    				switch notification := next.(type) {
    				case updateNotification:
    					p.handler.OnUpdate(notification.oldObj, notification.newObj)
    				case addNotification:
    					p.handler.OnAdd(notification.newObj)
    				case deleteNotification:
    					p.handler.OnDelete(notification.oldObj)
    				default:
    					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
    				}
    			}
    			// the only way to get here is if the p.nextCh is empty and closed
    			return true, nil
    		})
    
    		// the only way to get here is if the p.nextCh is empty and closed
    		if err == nil {
    			close(stopCh)
    		}
    	}, 1*time.Minute, stopCh)
    }
    
    

    而p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete方法實際上就是自定義的的ResourceEventHandlerFuncs了。

    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: onUpdate,
        DeleteFunc: onDelete,
      })
    
    // staging/src/k8s.io/client-go/tools/cache/controller.go
    type ResourceEventHandlerFuncs struct {
    	AddFunc    func(obj interface{})
    	UpdateFunc func(oldObj, newObj interface{})
    	DeleteFunc func(obj interface{})
    }
    
    func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
    	if r.AddFunc != nil {
    		r.AddFunc(obj)
    	}
    }
    
    func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
    	if r.UpdateFunc != nil {
    		r.UpdateFunc(oldObj, newObj)
    	}
    }
    
    func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
    	if r.DeleteFunc != nil {
    		r.DeleteFunc(obj)
    	}
    }
    

    至此,Processor的分析也結束了,用一張圖來回憶一下Processor的功能與架構。

    總結

    Controller

    Controller從DeltaFIFO中pop Deltas出來處理,根據對象的變化更新Indexer本地緩存,并通知Processor相關對象有變化事件發生:
    (1)如果是Added、Updated、Sync類型,則從indexer中獲取該對象,存在則調用s.indexer.Update來更新indexer中的該對象,隨后構造updateNotification struct,并通知Processor;如果indexer中不存在該對象,則調用s.indexer.Add來往indexer中添加該對象,隨后構造addNotification struct,并通知Processor;
    (2)如果是Deleted類型,則調用s.indexer.Delete來將indexer中的該對象刪除,隨后構造deleteNotification struct,并通知Processor;

    Processor

    Processor根據Controller的通知,即根據對象的變化事件類型(addNotification、updateNotification、deleteNotification),調用相應的ResourceEventHandler(addFunc、updateFunc、deleteFunc)來處理對象的變化。

    informer架構中的Controller&Processor

    在對informer中的Controller與Processor分析完之后,接下來將分析informer中的Indexer。

    posted @ 2022-06-05 10:08  良凱爾  閱讀(221)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看