<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • k8s client-go源碼分析 informer源碼分析(4)-DeltaFIFO源碼分析

    client-go之DeltaFIFO源碼分析

    1.DeltaFIFO概述

    先從名字上來看,DeltaFIFO,首先它是一個FIFO,也就是一個先進先出的隊列,而Delta代表變化的資源對象,其包含資源對象數據本身及其變化類型。

    Delta的組成:

    type Delta struct {
        Type   DeltaType
        Object interface{}
    }
    

    DeltaFIFO的組成:

    type DeltaFIFO struct {
        ...
        items map[string]Deltas
    	queue []string
        ...
    }
    
    type Deltas []Delta
    

    具體來說,DeltaFIFO存儲著map[object key]Deltas以及object key的queue,Delta裝有對象數據及對象的變化類型。輸入輸出方面,Reflector負責DeltaFIFO的輸入,Controller負責處理DeltaFIFO的輸出。

    一個對象能算出一個唯一的object key,其對應著一個Deltas,所以一個對象對應著一個Deltas。

    而目前Delta有4種Type,分別是: Added、Updated、Deleted、Sync。針對同一個對象,可能有多個不同Type的Delta元素在Deltas中,表示對該對象做了不同的操作,另外,也可能有多個相同Type的Delta元素在Deltas中(除Deleted外,Delted類型會被去重),比如短時間內,多次對某一個對象進行了更新操作,那么就會有多個Updated類型的Delta放入Deltas中。

    2.DeltaFIFO的定義與初始化分析

    2.1 DeltaFIFO struct

    DeltaFIFO struct定義了DeltaFIFO的一些屬性,下面挑幾個重要的分析一下。

    (1)lock:讀寫鎖,操作DeltaFIFO中的items與queue之前都要先加鎖;
    (2)items:是個map,key根據對象算出,value為Deltas類型;
    (3)queue:存儲對象key的隊列;
    (4)keyFunc:計算對象key的函數;

    // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
    type DeltaFIFO struct {
    	// lock/cond protects access to 'items' and 'queue'.
    	lock sync.RWMutex
    	cond sync.Cond
    
    	// We depend on the property that items in the set are in
    	// the queue and vice versa, and that all Deltas in this
    	// map have at least one Delta.
    	items map[string]Deltas
    	queue []string
    
    	// populated is true if the first batch of items inserted by Replace() has been populated
    	// or Delete/Add/Update was called first.
    	populated bool
    	// initialPopulationCount is the number of items inserted by the first call of Replace()
    	initialPopulationCount int
    
    	// keyFunc is used to make the key used for queued item
    	// insertion and retrieval, and should be deterministic.
    	keyFunc KeyFunc
    
    	// knownObjects list keys that are "known", for the
    	// purpose of figuring out which items have been deleted
    	// when Replace() or Delete() is called.
    	knownObjects KeyListerGetter
    
    	// Indication the queue is closed.
    	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    	// Currently, not used to gate any of CRED operations.
    	closed     bool
    	closedLock sync.Mutex
    
    
    type Deltas

    再來看一下Deltas類型,是Delta的切片類型。

    type Deltas []Delta
    
    type Delta

    繼續看到Delta類型,其包含兩個屬性:
    (1)Type:代表的是Delta的類型,有Added、Updated、Deleted、Sync四個類型;
    (2)Object:存儲的資源對象,如pod等資源對象;

    type Delta struct {
    	Type   DeltaType
    	Object interface{}
    }
    
    // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
    type DeltaType string
    
    // Change type definition
    const (
    	Added   DeltaType = "Added"
    	Updated DeltaType = "Updated"
    	Deleted DeltaType = "Deleted"
    	// The other types are obvious. You'll get Sync deltas when:
    	//  * A watch expires/errors out and a new list/watch cycle is started.
    	//  * You've turned on periodic syncs.
    	// (Anything that trigger's DeltaFIFO's Replace() method.)
    	Sync DeltaType = "Sync"
    )
    

    2.2 DeltaFIFO初始化-NewDeltaFIFO

    NewDeltaFIFO初始化了一個items和queue都為空的DeltaFIFO并返回。

    // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
    func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
    	f := &DeltaFIFO{
    		items:        map[string]Deltas{},
    		queue:        []string{},
    		keyFunc:      keyFunc,
    		knownObjects: knownObjects,
    	}
    	f.cond.L = &f.lock
    	return f
    }
    

    3.DeltaFIFO核心處理方法分析

    在前面分析Reflector時,Reflector的核心處理方法里有調用過幾個方法,分別是r.store.Replace、r.store.Add、r.store.Update、r.store.Delete,結合前面文章的k8s informer的初始化與啟動分析,或者簡要的看一下下面的代碼調用,就可以知道Reflector里的r.store其實就是DeltaFIFO,而那幾個方法其實就是DeltaFIFO的Replace、Add、Update、Delete方法。

    sharedIndexInformer.Run方法中調用NewDeltaFIFO初始化了DeltaFIFO,隨后將DeltaFIFO作為參數傳入初始化Config;

    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
        ...
        fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
        
        cfg := &Config{
    		Queue:            fifo,
    		...
    	}
    	
    	func() {
    		...
    		s.controller = New(cfg)
    		...
    	}()
    	...
    	s.controller.Run(stopCh)
    

    在controller的Run方法中,調用NewReflector初始化Reflector時,將之前的DeltaFIFO傳入,賦值給Reflector的store屬性,所以Reflector里的r.store其實就是DeltaFIFO,而調用的r.store.Replace、r.store.Add、r.store.Update、r.store.Delete方法其實就是DeltaFIFO的Replace、Add、Update、Delete方法。

    func (c *controller) Run(stopCh <-chan struct{}) {
    	...
    	r := NewReflector(
    		c.config.ListerWatcher,
    		c.config.ObjectType,
    		c.config.Queue,
    		c.config.FullResyncPeriod,
    	)
    	...
    }
    
    func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
    }
    
    func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    	r := &Reflector{
    		...
    		store:         store,
    		...
    	}
    	...
    	return r
    }
    

    所以這里對DeltaFIFO核心處理方法進行分析,主要是分析DeltaFIFO的Replace、Add、Update、Delete方法。

    3.1 DeltaFIFO.Add

    DeltaFIFO的Add操作,主要邏輯:
    (1)加鎖;
    (2)調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Added類型的新Delta追加到相應的Deltas中;
    (3)釋放鎖。

    func (f *DeltaFIFO) Add(obj interface{}) error {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    	f.populated = true
    	return f.queueActionLocked(Added, obj)
    }
    

    可以看到基本上DeltaFIFO所有的操作都有加鎖操作,所以都是并發安全的。

    3.1.1 DeltaFIFO.queueActionLocked

    queueActionLocked負責操作DeltaFIFO中的queue與Deltas,根據對象key構造新的Delta追加到對應的Deltas中,主要邏輯:
    (1)計算出對象的key;
    (2)構造新的Delta,將新的Delta追加到Deltas末尾;
    (3)調用dedupDeltas將Delta去重(目前只將Deltas最末尾的兩個delete類型的Delta去重);
    (4)判斷對象的key是否在queue中,不在則添加入queue中;
    (5)根據對象key更新items中的Deltas;
    (6)通知所有的消費者解除阻塞;

    func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
        //(1)計算出對象的key
    	id, err := f.KeyOf(obj)
    	if err != nil {
    		return KeyError{obj, err}
    	}
        //(2)構造新的Delta,將新的Delta追加到Deltas末尾
    	newDeltas := append(f.items[id], Delta{actionType, obj})
    	//(3)調用dedupDeltas將Delta去重(目前只將Deltas最末尾的兩個delete類型的Delta去重)
    	newDeltas = dedupDeltas(newDeltas)
    
    	if len(newDeltas) > 0 {
    	    //(4)判斷對象的key是否在queue中,不在則添加入queue中
    		if _, exists := f.items[id]; !exists {
    			f.queue = append(f.queue, id)
    		}
    		//(5)根據對象key更新items中的Deltas
    		f.items[id] = newDeltas
    		//(6)通知所有的消費者解除阻塞
    		f.cond.Broadcast()
    	} else {
    		// We need to remove this from our map (extra items in the queue are
    		// ignored if they are not in the map).
    		delete(f.items, id)
    	}
    	return nil
    }
    

    3.2 DeltaFIFO.Update

    DeltaFIFO的Update操作,主要邏輯:
    (1)加鎖;
    (2)調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Updated類型的新Delta追加到相應的Deltas中;
    (3)釋放鎖。

    func (f *DeltaFIFO) Update(obj interface{}) error {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    	f.populated = true
    	return f.queueActionLocked(Updated, obj)
    }
    

    3.3 DeltaFIFO.Delete

    DeltaFIFO的Delete操作,主要邏輯:
    (1)計算出對象的key;
    (2)加鎖;
    (3)items中不存在對象key,則直接return,跳過處理;
    (4)調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Deleted類型的新Delta追加到相應的Deltas中;
    (5)釋放鎖。

    func (f *DeltaFIFO) Delete(obj interface{}) error {
    	id, err := f.KeyOf(obj)
    	if err != nil {
    		return KeyError{obj, err}
    	}
    	f.lock.Lock()
    	defer f.lock.Unlock()
    	f.populated = true
    	// informer的用法中,f.knownObjects不為nil
    	if f.knownObjects == nil {
    		if _, exists := f.items[id]; !exists {
    			// Presumably, this was deleted when a relist happened.
    			// Don't provide a second report of the same deletion.
    			return nil
    		}
    	} else {
    		// We only want to skip the "deletion" action if the object doesn't
    		// exist in knownObjects and it doesn't have corresponding item in items.
    		// Note that even if there is a "deletion" action in items, we can ignore it,
    		// because it will be deduped automatically in "queueActionLocked"
    		_, exists, err := f.knownObjects.GetByKey(id)
    		_, itemsExist := f.items[id]
    		if err == nil && !exists && !itemsExist {
    			// Presumably, this was deleted when a relist happened.
    			// Don't provide a second report of the same deletion.
    			return nil
    		}
    	}
    
    	return f.queueActionLocked(Deleted, obj)
    }
    

    3.4 DeltaFIFO.Replace

    DeltaFIFO的Replace操作,主要邏輯:
    (1)加鎖;
    (2)遍歷list,計算對象的key,循環調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Sync類型的新Delta追加到相應的Deltas中;
    (3)對比DeltaFIFO中的items與Replace方法的list,如果DeltaFIFO中的items有,但傳進來Replace方法的list中沒有某個key,則調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Deleted類型的新Delta追加到相應的Deltas中(避免重復,使用DeletedFinalStateUnknown包裝對象);
    (4)釋放鎖;

    // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
    func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
        //(1)加鎖
    	f.lock.Lock()
    	//(4)釋放鎖
    	defer f.lock.Unlock()
    	keys := make(sets.String, len(list))
    
        //(2)遍歷list,計算對象的key,循環調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Sync類型的新Delta追加到相應的Deltas中
    	for _, item := range list {
    		key, err := f.KeyOf(item)
    		if err != nil {
    			return KeyError{item, err}
    		}
    		keys.Insert(key)
    		if err := f.queueActionLocked(Sync, item); err != nil {
    			return fmt.Errorf("couldn't enqueue object: %v", err)
    		}
    	}
        // informer的用法中,f.knownObjects不為nil
    	if f.knownObjects == nil {
    		// Do deletion detection against our own list.
    		queuedDeletions := 0
    		for k, oldItem := range f.items {
    			if keys.Has(k) {
    				continue
    			}
    			var deletedObj interface{}
    			if n := oldItem.Newest(); n != nil {
    				deletedObj = n.Object
    			}
    			queuedDeletions++
    			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
    				return err
    			}
    		}
            
    		if !f.populated {
    			f.populated = true
    			// While there shouldn't be any queued deletions in the initial
    			// population of the queue, it's better to be on the safe side.
    			f.initialPopulationCount = len(list) + queuedDeletions
    		}
    
    		return nil
    	}
        
        //(3)找出DeltaFIFO中的items有,但傳進來Replace方法的list中沒有的key,調用f.queueActionLocked,操作DeltaFIFO中的queue與Deltas,根據對象key構造Deleted類型的新Delta追加到相應的Deltas中(避免重復,使用DeletedFinalStateUnknown包裝對象)
    	// Detect deletions not already in the queue.
    	knownKeys := f.knownObjects.ListKeys()
    	queuedDeletions := 0
    	for _, k := range knownKeys {
    		if keys.Has(k) {
    			continue
    		}
    
    		deletedObj, exists, err := f.knownObjects.GetByKey(k)
    		if err != nil {
    			deletedObj = nil
    			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
    		} else if !exists {
    			deletedObj = nil
    			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
    		}
    		queuedDeletions++
    		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
    			return err
    		}
    	}
        
        // 第一次調用Replace方法后,populated值為true
    	if !f.populated {
    		f.populated = true
    		// initialPopulationCount代表第一次調用Replace方法加入DeltaFIFO中的items數量
    		f.initialPopulationCount = len(list) + queuedDeletions
    	}
    
    	return nil
    }
    

    3.5 DeltaFIFO.Pop

    DeltaFIFO的Pop操作,queue為空時會阻塞,直至非空,主要邏輯:
    (1)加鎖;
    (2)循環判斷queue的長度是否為0,為0則阻塞住,調用f.cond.Wait(),等待通知(與queueActionLocked方法中的f.cond.Broadcast()相對應,即queue中有對象key則發起通知);
    (3)取出queue的隊頭對象key;
    (4)更新queue,把queue中所有的對象key前移,相當于把第一個對象key給pop出去;
    (5)initialPopulationCount變量減1,當減到0時則說明initialPopulationCount代表第一次調用Replace方法加入DeltaFIFO中的對象key已經被pop完成;
    (6)根據對象key從items中獲取Deltas;
    (7)把Deltas從items中刪除;
    (8)調用PopProcessFunc處理獲取到的Deltas;
    (9)釋放鎖。

    // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
    func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
        //(1)加鎖
    	f.lock.Lock()
    	//(9)釋放鎖
    	defer f.lock.Unlock()
    	//(2)循環判斷queue的長度是否為0,為0則阻塞住,調用f.cond.Wait(),等待通知(與queueActionLocked方法中的f.cond.Broadcast()相對應,即queue中有對象key則發起通知)
    	for {
    		for len(f.queue) == 0 {
    			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
    			// When Close() is called, the f.closed is set and the condition is broadcasted.
    			// Which causes this loop to continue and return from the Pop().
    			if f.IsClosed() {
    				return nil, ErrFIFOClosed
    			}
    
    			f.cond.Wait()
    		}
    		//(3)取出queue的隊頭對象key
    		id := f.queue[0]
    		//(4)更新queue,把queue中所有的對象key前移,相當于把第一個對象key給pop出去
    		f.queue = f.queue[1:]
    		//(5)initialPopulationCount變量減1,當減到0時則說明initialPopulationCount代表第一次調用Replace方法加入DeltaFIFO中的對象key已經被pop完成
    		if f.initialPopulationCount > 0 {
    			f.initialPopulationCount--
    		}
    		//(6)根據對象key從items中獲取對象
    		item, ok := f.items[id]
    		if !ok {
    			// Item may have been deleted subsequently.
    			continue
    		}
    		//(7)把對象從items中刪除
    		delete(f.items, id)
    		//(8)調用PopProcessFunc處理pop出來的對象
    		err := process(item)
    		if e, ok := err.(ErrRequeue); ok {
    			f.addIfNotPresent(id, item)
    			err = e.Err
    		}
    		// Don't need to copyDeltas here, because we're transferring
    		// ownership to the caller.
    		return item, err
    	}
    }
    

    3.6 DeltaFIFO.HasSynced

    HasSynced從字面意思上看代表是否同步完成,是否同步完成其實是指第一次從kube-apiserver中獲取到的全量的對象是否全部從DeltaFIFO中pop完成,全部pop完成,說明list回來的對象已經全部同步到了Indexer緩存中去了。

    方法是否返回true是根據populated和initialPopulationCount兩個變量來判斷的,當且僅當populated為true且initialPopulationCount 為0的時候方法返回true,否則返回false。

    populated屬性值在第一次調用DeltaFIFO的Replace方法中就已經將其值設置為true。

    而initialPopulationCount的值在第一次調用DeltaFIFO的Replace方法中設置值為加入到items中的Deltas的數量,然后每pop一個Deltas,則initialPopulationCount的值減1,pop完成時值則為0。

    // staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
    func (f *DeltaFIFO) HasSynced() bool {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    	return f.populated && f.initialPopulationCount == 0
    }
    

    在前面做informer的初始化與啟動分析時也提到過,DeltaFIFO.HasSynced方法的調用鏈如下:

    sharedIndexInformer.WaitForCacheSync --> cache.WaitForCacheSync --> sharedIndexInformer.controller.HasSynced --> controller.config.Queue.HasSynced --> DeltaFIFO.HasSynced

    至此DeltaFIFO的分析就結束了,最后來總結一下。

    總結

    DeltaFIFO核心處理方法

    Reflector調用的r.store.Replacer.store.Addr.store.Updater.store.Delete方法其實就是DeltaFIFO的Replace、Add、Update、Delete方法。

    (1)DeltaFIFO.Replace:構造Sync類型的Delta加入DeltaFIFO中,此外還會對比DeltaFIFO中的items與Replace方法的list,如果DeltaFIFO中的items有,但傳進來Replace方法的list中沒有某個key,則構造Deleted類型的Delta加入DeltaFIFO中;
    (2)DeltaFIFO.Add:構建Added類型的Delta加入DeltaFIFO中;
    (3)DeltaFIFO.Update:構建Updated類型的Delta加入DeltaFIFO中;
    (4)DeltaFIFO.Delete:構建Deleted類型的Delta加入DeltaFIFO中;
    (5)DeltaFIFO.Pop:從DeltaFIFO的queue中pop出隊頭key,從map中取出key對應的Deltas返回,并把該key:Deltas從map中移除;
    (6)DeltaFIFO.HasSynced:返回true代表同步完成,是否同步完成指第一次從kube-apiserver中獲取到的全量的對象是否全部從DeltaFIFO中pop完成,全部pop完成,說明list回來的對象已經全部同步到了Indexer緩存中去了;

    informer架構中的DeltaFIFO

    在對informer中的DeltaFIFO分析完之后,接下來將分析informer中的Controller與Processor。

    posted @ 2022-05-22 10:33  良凱爾  閱讀(94)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看