<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析

    k8s client-go源碼分析 informer源碼分析(3)-Reflector源碼分析

    1.Reflector概述

    Reflector從kube-apiserver中list&watch資源對象,然后將對象的變化包裝成Delta并將其丟到DeltaFIFO中。簡單點來說,就是將Etcd 的對象及其變化反射到DeltaFIFO中。

    Reflector首先通過List操作獲取全量的資源對象數據,調用DeltaFIFO的Replace方法全量插入DeltaFIFO,然后后續通過Watch操作根據資源對象的變化類型相應的調用DeltaFIFO的Add、Update、Delete方法,將對象及其變化插入到DeltaFIFO中。

    Reflector的健壯性處理機制

    Reflector有健壯性處理機制,用于處理與apiserver斷連后重新進行List&Watch的場景。也是因為有這樣的健壯性處理機制,所以我們一般不去直接使用客戶端的Watch 方法來處理自己的業務邏輯,而是使用informers

    Reflector核心操作

    Reflector的兩個核心操作:
    (1)List&Watch;
    (2)將對象的變化包裝成Delta然后扔進DeltaFIFO。

    informer概要架構圖

    通過下面這個informer的概要架構圖,可以大概看到Reflector在整個informer中所處的位置及其作用。

    2.Reflector初始化與啟動分析

    2.1 Reflector結構體

    先來看到Reflector結構體,這里重點看到以下屬性:
    (1)expectedType:放到Store中(即DeltaFIFO中)的對象類型;
    (2)store:store會賦值為DeltaFIFO,具體可以看之前的informer初始化與啟動分析即可得知,這里不再展開分析;
    (3)listerWatcher:存放list方法和watch方法的ListerWatcher interface實現;

    // k8s.io/client-go/tools/cache/reflector.go
    type Reflector struct {
    	// name identifies this reflector. By default it will be a file:line if possible.
    	name string
    
    	// The name of the type we expect to place in the store. The name
    	// will be the stringification of expectedGVK if provided, and the
    	// stringification of expectedType otherwise. It is for display
    	// only, and should not be used for parsing or comparison.
    	expectedTypeName string
    	// The type of object we expect to place in the store.
    	expectedType reflect.Type
    	// The GVK of the object we expect to place in the store if unstructured.
    	expectedGVK *schema.GroupVersionKind
    	// The destination to sync up with the watch source
    	store Store
    	// listerWatcher is used to perform lists and watches.
    	listerWatcher ListerWatcher
    	// period controls timing between one watch ending and
    	// the beginning of the next one.
    	period       time.Duration
    	resyncPeriod time.Duration
    	ShouldResync func() bool
    	// clock allows tests to manipulate time
    	clock clock.Clock
    	// lastSyncResourceVersion is the resource version token last
    	// observed when doing a sync with the underlying store
    	// it is thread safe, but not synchronized with the underlying store
    	lastSyncResourceVersion string
    	// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
    	lastSyncResourceVersionMutex sync.RWMutex
    	// WatchListPageSize is the requested chunk size of initial and resync watch lists.
    	// Defaults to pager.PageSize.
    	WatchListPageSize int64
    }
    

    2.2 Reflector初始化-NewReflector

    NewReflector為Reflector的初始化方法,返回一個Reflector結構體,這里主要看到初始化Reflector的時候,需要傳入ListerWatcher interface的實現。

    // k8s.io/client-go/tools/cache/reflector.go
    func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
    }
    
    // NewNamedReflector same as NewReflector, but with a specified name for logging
    func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    	r := &Reflector{
    		name:          name,
    		listerWatcher: lw,
    		store:         store,
    		period:        time.Second,
    		resyncPeriod:  resyncPeriod,
    		clock:         &clock.RealClock{},
    	}
    	r.setExpectedType(expectedType)
    	return r
    }
    

    2.3 ListerWatcher interface

    ListerWatcher interface定義了Reflector應該擁有的最核心的兩個方法,即ListWatch,用于全量獲取資源對象以及監控資源對象的變化。關于ListWatch什么時候會被調用,怎么被調用,在后續分析Reflector核心處理方法的時候會詳細做分析。

    // k8s.io/client-go/tools/cache/listwatch.go
    type Lister interface {
    	// List should return a list type object; the Items field will be extracted, and the
    	// ResourceVersion field will be used to start the watch in the right place.
    	List(options metav1.ListOptions) (runtime.Object, error)
    }
    
    type Watcher interface {
    	// Watch should begin a watch at the specified version.
    	Watch(options metav1.ListOptions) (watch.Interface, error)
    }
    
    type ListerWatcher interface {
    	Lister
    	Watcher
    }
    

    2.4 ListWatch struct

    繼續看到ListWatch struct,其實現了ListerWatcher interface

    // k8s.io/client-go/tools/cache/listwatch.go
    type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
    
    type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
    
    type ListWatch struct {
    	ListFunc  ListFunc
    	WatchFunc WatchFunc
    	// DisableChunking requests no chunking for this list watcher.
    	DisableChunking bool
    }
    
    ListWatch的初始化

    再來看到ListWatch struct初始化的一個例子。在NewDeploymentInformer初始化Deployment對象的informer中,會初始化ListWatch struct并定義其ListFuncWatchFunc,可以看到ListFuncWatchFunc即為其資源對象客戶端的ListWatch方法。

    // staging/src/k8s.io/client-go/informers/apps/v1beta1/deployment.go
    func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
    	return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil)
    }
    
    func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    	return cache.NewSharedIndexInformer(
    		&cache.ListWatch{
    			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
    				if tweakListOptions != nil {
    					tweakListOptions(&options)
    				}
    				return client.AppsV1beta1().Deployments(namespace).List(options)
    			},
    			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
    				if tweakListOptions != nil {
    					tweakListOptions(&options)
    				}
    				return client.AppsV1beta1().Deployments(namespace).Watch(options)
    			},
    		},
    		&appsv1beta1.Deployment{},
    		resyncPeriod,
    		indexers,
    	)
    }
    

    2.5 Reflector啟動入口-Run

    最后來看到Reflector的啟動入口Run方法,其主要是循環調用r.ListAndWatch,該方法是Reflector的核心處理方法,后面會詳細進行分析。另外,也可以看到Reflector有健壯性處理機制,即循環調用r.ListAndWatch方法,用于處理與apiserver斷連后重新進行List&Watch的場景。也是因為有這樣的健壯性處理機制,所以我們一般不去直接使用客戶端的Watch 方法來處理自己的業務邏輯,而是使用informers

    // k8s.io/client-go/tools/cache/reflector.go
    func (r *Reflector) Run(stopCh <-chan struct{}) {
    	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    	wait.Until(func() {
    		if err := r.ListAndWatch(stopCh); err != nil {
    			utilruntime.HandleError(err)
    		}
    	}, r.period, stopCh)
    }
    

    3.Reflector核心處理方法分析

    分析完了初始化與啟動后,現在來看到Reflector的核心處理方法ListAndWatch

    ListAndWatch

    ListAndWatch的主要邏輯分為三大塊:

    A.List操作(只執行一次):
    (1)設置ListOptions,將ResourceVersion設置為“0”;
    (2)調用r.listerWatcher.List方法,執行list操作,即獲取全量的資源對象;
    (3)根據list回來的資源對象,獲取最新的resourceVersion;
    (4)資源轉換,將list操作獲取回來的結果轉換為[]runtime.Object結構;
    (5)調用r.syncWith,根據list回來轉換后的結果去替換store里的items;
    (6)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值;

    B.Resync操作(異步循環執行);
    (1)判斷是否需要執行Resync操作,即重新同步;
    (2)需要則調用r.store.Resync操作后端store做處理;

    C.Watch操作(循環執行):
    (1)stopCh處理,判斷是否需要退出循環;
    (2)設置ListOptions,設置resourceVersion為最新的resourceVersion,即從list回來的最新resourceVersion開始執行watch操作;
    (3)調用r.listerWatcher.Watch,開始監聽操作;
    (4)watch監聽操作的錯誤返回處理;
    (5)調用r.watchHandler,處理watch操作返回來的結果,操作后端store,新增、更新或刪除items;

    // k8s.io/client-go/tools/cache/reflector.go
    func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    	var resourceVersion string
        
        // A.List操作(只執行一次)
        // (1)設置ListOptions,將ResourceVersion設置為“0”
    	// Explicitly set "0" as resource version - it's fine for the List()
    	// to be served from cache and potentially be delayed relative to
    	// etcd contents. Reflector framework will catch up via Watch() eventually.
    	options := metav1.ListOptions{ResourceVersion: "0"}
    
    	if err := func() error {
    		initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
    		defer initTrace.LogIfLong(10 * time.Second)
    		var list runtime.Object
    		var err error
    		listCh := make(chan struct{}, 1)
    		panicCh := make(chan interface{}, 1)
    		//(2)調用r.listerWatcher.List方法,執行list操作,即獲取全量的資源對象
    		go func() {
    			defer func() {
    				if r := recover(); r != nil {
    					panicCh <- r
    				}
    			}()
    			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
    			// list request will return the full response.
    			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
    				return r.listerWatcher.List(opts)
    			}))
    			if r.WatchListPageSize != 0 {
    				pager.PageSize = r.WatchListPageSize
    			}
    			// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
    			list, err = pager.List(context.Background(), options)
    			close(listCh)
    		}()
    		select {
    		case <-stopCh:
    			return nil
    		case r := <-panicCh:
    			panic(r)
    		case <-listCh:
    		}
    		if err != nil {
    			return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
    		}
    		initTrace.Step("Objects listed")
    		listMetaInterface, err := meta.ListAccessor(list)
    		if err != nil {
    			return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
    		}
    		//(3)根據list回來的資源對象,獲取最新的resourceVersion
    		resourceVersion = listMetaInterface.GetResourceVersion()
    		initTrace.Step("Resource version extracted")
    		//(4)資源轉換,將list操作獲取回來的結果轉換為```[]runtime.Object```結構
    		items, err := meta.ExtractList(list)
    		if err != nil {
    			return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
    		}
    		initTrace.Step("Objects extracted")
    		//(5)調用r.syncWith,根據list回來轉換后的結果去替換store里的items
    		if err := r.syncWith(items, resourceVersion); err != nil {
    			return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    		}
    		initTrace.Step("SyncWith done")
    		//(6)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值
    		r.setLastSyncResourceVersion(resourceVersion)
    		initTrace.Step("Resource version updated")
    		return nil
    	}(); err != nil {
    		return err
    	}
    
        // B.Resync操作(異步循環執行)
    	resyncerrc := make(chan error, 1)
    	cancelCh := make(chan struct{})
    	defer close(cancelCh)
    	go func() {
    		resyncCh, cleanup := r.resyncChan()
    		defer func() {
    			cleanup() // Call the last one written into cleanup
    		}()
    		for {
    			select {
    			case <-resyncCh:
    			case <-stopCh:
    				return
    			case <-cancelCh:
    				return
    			}
    			//(1)判斷是否需要執行Resync操作,即重新同步
    			if r.ShouldResync == nil || r.ShouldResync() {
    				klog.V(4).Infof("%s: forcing resync", r.name)
    				//(2)需要則調用r.store.Resync操作后端store做處理
    				if err := r.store.Resync(); err != nil {
    					resyncerrc <- err
    					return
    				}
    			}
    			cleanup()
    			resyncCh, cleanup = r.resyncChan()
    		}
    	}()
        
        // C.Watch操作(循環執行)
    	for {
    	    //(1)stopCh處理,判斷是否需要退出循環
    		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
    		select {
    		case <-stopCh:
    			return nil
    		default:
    		}
            
            //(2)設置ListOptions,設置resourceVersion為最新的resourceVersion,即從list回來的最新resourceVersion開始執行watch操作
    		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
    		options = metav1.ListOptions{
    			ResourceVersion: resourceVersion,
    			// We want to avoid situations of hanging watchers. Stop any wachers that do not
    			// receive any events within the timeout window.
    			TimeoutSeconds: &timeoutSeconds,
    			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
    			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
    			// watch bookmarks, it will ignore this field).
    			AllowWatchBookmarks: true,
    		}
            
            //(3)調用r.listerWatcher.Watch,開始監聽操作
    		w, err := r.listerWatcher.Watch(options)
    		//(4)watch監聽操作的錯誤返回處理
    		if err != nil {
    			switch err {
    			case io.EOF:
    				// watch closed normally
    			case io.ErrUnexpectedEOF:
    				klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
    			default:
    				utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
    			}
    			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
    			// It doesn't make sense to re-list all objects because most likely we will be able to restart
    			// watch where we ended.
    			// If that's the case wait and resend watch request.
    			if utilnet.IsConnectionRefused(err) {
    				time.Sleep(time.Second)
    				continue
    			}
    			return nil
    		}
            
            //(5)調用r.watchHandler,處理watch操作返回來的結果,操作后端store,新增、更新或刪除items
    		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
    			if err != errorStopRequested {
    				switch {
    				case apierrs.IsResourceExpired(err):
    					klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
    				default:
    					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
    				}
    			}
    			return nil
    		}
    	}
    }
    
    關于List操作時設置的ListOptions

    這里主要講一下ListOptions中的ResourceVersion屬性的作用。

    上述講到的Reflector中,list操作時將 resourceVersion 設置了為“0”,此時返回的數據是apiserver cache中的,并非直接讀取 etcd 而來,而apiserver cache中的數據可能會因網絡或其他原因導致與etcd中的數據不同。

    list操作時,resourceVersion 有三種設置方法:
    (1)第一種:不設置,此時會從直接從etcd中讀取,此時數據是最新的;
    (2)第二種:設置為“0”,此時從apiserver cache中獲取;
    (3)第三種:設置為指定的resourceVersion,獲取resourceVersion大于指定版本的所有資源對象。

    詳細參考:https://kubernetes.io/zh/docs/reference/using-api/api-concepts/#resource-versions

    3.1 r.syncWith

    r.syncWith方法主要是調用r.store.Replace方法,即根據list的結果去替換store里的items,具體關于r.store.Replace方法的分析,在后續對DeltaFIFO進行分析時再做具體的分析。

    // k8s.io/client-go/tools/cache/reflector.go
    func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    	found := make([]interface{}, 0, len(items))
    	for _, item := range items {
    		found = append(found, item)
    	}
    	return r.store.Replace(found, resourceVersion)
    }
    

    3.2 r.setLastSyncResourceVersion

    lastSyncResourceVersion屬性為Reflector struct的一個屬性,用于存儲已被Reflector處理的最新資源對象的ResourceVersion,r.setLastSyncResourceVersion方法用于更新該值。

    // k8s.io/client-go/tools/cache/reflector.go
    func (r *Reflector) setLastSyncResourceVersion(v string) {
    	r.lastSyncResourceVersionMutex.Lock()
    	defer r.lastSyncResourceVersionMutex.Unlock()
    	r.lastSyncResourceVersion = v
    }
    
    type Reflector struct {
        ...
        lastSyncResourceVersion string
        ...
    }
    

    3.3 r.watchHandler

    r.watchHandler主要是處理watch操作返回來的結果,其主要邏輯為循環做以下操作,直至event事件處理完畢:
    (1)從watch操作返回來的結果中獲取event事件;
    (2)event事件相關錯誤處理;
    (3)獲得當前watch到資源的ResourceVersion;
    (4)區分watch.Added、watch.Modified、watch.Deleted三種類型的event事件,分別調用r.store.Add、r.store.Update、r.store.Delete做處理,具體關于r.store.xxx的方法分析,在后續對DeltaFIFO進行分析時再做具體的分析;
    (5)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值;

    // k8s.io/client-go/tools/cache/reflector.go
    // watchHandler watches w and keeps *resourceVersion up to date.
    func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    	start := r.clock.Now()
    	eventCount := 0
    
    	// Stopping the watcher should be idempotent and if we return from this function there's no way
    	// we're coming back in with the same watch interface.
    	defer w.Stop()
    
    loop:
    	for {
    		select {
    		case <-stopCh:
    			return errorStopRequested
    		case err := <-errc:
    			return err
    		// (1)從watch操作返回來的結果中獲取event事件
    		case event, ok := <-w.ResultChan():
    		    // (2)event事件相關錯誤處理
    			if !ok {
    				break loop
    			}
    			if event.Type == watch.Error {
    				return apierrs.FromObject(event.Object)
    			}
    			if r.expectedType != nil {
    				if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
    					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
    					continue
    				}
    			}
    			if r.expectedGVK != nil {
    				if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
    					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
    					continue
    				}
    			}
    			// (3)獲得當前watch到資源的ResourceVersion
    			meta, err := meta.Accessor(event.Object)
    			if err != nil {
    				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    				continue
    			}
    			newResourceVersion := meta.GetResourceVersion()
    			// (4)區分watch.Added、watch.Modified、watch.Deleted三種類型的event事件,分別調用r.store.Add、r.store.Update、r.store.Delete做處理
    			switch event.Type {
    			case watch.Added:
    				err := r.store.Add(event.Object)
    				if err != nil {
    					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
    				}
    			case watch.Modified:
    				err := r.store.Update(event.Object)
    				if err != nil {
    					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
    				}
    			case watch.Deleted:
    				// TODO: Will any consumers need access to the "last known
    				// state", which is passed in event.Object? If so, may need
    				// to change this.
    				err := r.store.Delete(event.Object)
    				if err != nil {
    					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
    				}
    			case watch.Bookmark:
    				// A `Bookmark` means watch has synced here, just update the resourceVersion
    			default:
    				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    			}
    			// (5)調用r.setLastSyncResourceVersion,為Reflector更新已被處理的最新資源對象的resourceVersion值
    			*resourceVersion = newResourceVersion
    			r.setLastSyncResourceVersion(newResourceVersion)
    			eventCount++
    		}
    	}
    
    	watchDuration := r.clock.Since(start)
    	if watchDuration < 1*time.Second && eventCount == 0 {
    		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    	}
    	klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    	return nil
    }
    

    至此Reflector的分析就結束了,最后來總結一下。

    總結

    Reflector核心處理邏輯

    先來用一幅圖來總結一下Reflector核心處理邏輯。

    informer架構中的Reflector

    下面這個架構圖相比文章開頭的informer的概要架構圖,將Refletor部分詳細分解了,也順帶回憶一下Reflector在informer架構中的主要作用:
    (1)Reflector首先通過List操作獲取全量的資源對象數據,調用DeltaFIFO的Replace方法全量插入DeltaFIFO;
    (2)然后后續通過Watch操作根據資源對象的變化類型相應的調用DeltaFIFO的Add、Update、Delete方法,將對象及其變化插入到DeltaFIFO中。

    在對informer中的Reflector分析完之后,接下來將分析informer中的DeltaFIFO。

    posted @ 2022-05-15 10:15  良凱爾  閱讀(111)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看