<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • k8s client-go源碼分析 informer源碼分析(1)-概要分析

    k8s informer概述

    我們都知道可以使用k8s的Clientset來獲取所有的原生資源對象,那么怎么能持續的獲取集群的所有資源對象,或監聽集群的資源對象數據的變化呢?這里不需要輪詢去不斷執行List操作,而是調用Watch接口,即可監聽資源對象的變化,當資源對象發生變化,客戶端即可通過Watch接口收到資源對象的變化。

    Watch接口雖然可以直接使用,但一般情況下很少直接使用,因為往往由于集群中的資源較多,我們需要自己在客戶端去維護一套緩存,而這個維護成本比較大。

    也是因為如此,client-go提供了自己的實現機制,Informers應運而生。informers實現了持續獲取集群的所有資源對象、監聽集群的資源對象變化功能,并在本地維護了全量資源對象的內存緩存,以減少對apiserver、對etcd的請求壓力。Informers在啟動的時候會首先在客戶端調用List接口來獲取全量的對象集合,然后通過Watch接口來獲取增量的對象,然后更新本地緩存。

    此外informers也有很強的健壯性,當長期運行的watch連接中斷時,informers會嘗試拉起一個新的watch請求來恢復連接,在不丟失任何事件的情況下恢復事件流。另外,informers還可以配置一個重新同步的周期參數,每間隔該周期,informers就會重新List全量數據。

    在informers的使用上,通常每個GroupVersionResource(GVR)只實例化一個informers,但有時候我們在一個應用中往往會在多個地方對同一種資源對象都有informer的需求,所以就有了共享informer,即SharedInformerFactory。所以可以通過使用SharedInformerFactory來實例化informers,這樣本地內存緩存就只有一份,通知機制也只有一套,大大提高了效率,減少了資源浪費。

    k8s informer架構

    k8s client-go informer主要包括以下部件:
    (1)Reflector:Reflector從kube-apiserver中list&watch資源對象,然后調用DeltaFIFO的Add/Update/Delete/Replace方法將資源對象及其變化包裝成Delta并將其丟到DeltaFIFO中;
    (2)DeltaFIFO:DeltaFIFO中存儲著一個map和一個queue,即map[object key]Deltas以及object key的queue,Deltas為Delta的切片類型,Delta裝有對象及對象的變化類型(Added/Updated/Deleted/Sync) ,Reflector負責DeltaFIFO的輸入,Controller負責處理DeltaFIFO的輸出;
    (3)Controller:Controller從DeltaFIFO的queue中pop一個object key出來,并獲取其關聯的 Deltas出來進行處理,遍歷Deltas,根據對象的變化更新Indexer中的本地內存緩存,并通知Processor,相關對象有變化事件發生;
    (4)Processor:Processor根據對象的變化事件類型,調用相應的ResourceEventHandler來處理對象的變化;
    (5)Indexer:Indexer中有informer維護的指定資源對象的相對于etcd數據的一份本地內存緩存,可通過該緩存獲取資源對象,以減少對apiserver、對etcd的請求壓力;
    (6)ResourceEventHandler:用戶根據自身處理邏輯需要,注冊自定義的的ResourceEventHandler,當對象發生變化時,將觸發調用對應類型的ResourceEventHandler來做處理。

    根據informer架構,對k8s informer的分析將分為以下幾部分進行,本篇為概要分析:
    (1)informer概要分析;
    (2)informer之初始化與啟動分析;
    (3)informer之Reflector分析;
    (4)informer之DeltaFIFO分析;
    (5)informer之Controller&Processor分析;
    (6)informer之Indexer分析;

    informer使用示例代碼

    使用大致過程如下:
    (1)構建與kube-apiserver通信的config配置;
    (2)初始化與apiserver通信的clientset;
    (3)利用clientset初始化shared informer factory以及pod informer;
    (4)注冊informer的自定義ResourceEventHandler;
    (5)啟動shared informer factory,開始informer的list & watch操作;
    (6)等待informer從kube-apiserver同步資源完成,即informer的list操作獲取的對象都存入到informer中的indexer本地緩存中;
    (7)創建lister,可以從informer中的indexer本地緩存中獲取對象;

    func main() {
        // 自定義與kube-apiserver通信的config配置
        master := "192.168.1.10" // apiserver url
        kubeconfig := "/.kube/config"
        config, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
        if err != nil {
    		klog.Fatalf("Failed to create config: %v", err)
    	}
    	// 或使用k8s serviceAccount機制與kube-apiserver通信
    	// config, err = rest.InClusterConfig()
        
        // 初始化與apiserver通信的clientset
        clientset, err := kubernetes.NewForConfig(config)
    	if err != nil {
    		klog.Fatalf("Failed to create client: %v", err)
    	}
    	
    	// 初始化shared informer factory以及pod informer
    	factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
    	podInformer := factory.Core().V1().Pods()
    	informer := podInformer.Informer()
    	
    	// 注冊informer的自定義ResourceEventHandler
    	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc:    xxx,
    		UpdateFunc: xxx,
    		DeleteFunc: xxx,
    	})
    	
    	// 啟動shared informer factory,開始informer的list & watch操作
    	stopper := make(chan struct{})
    	go factory.Start(stopper)
    	
    	// 等待informer從kube-apiserver同步資源完成,即informer的list操作獲取的對象都存入到informer中的indexer本地緩存中 
    	// 或者調用factory.WaitForCacheSync(stopper)
    	if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
    		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    		return
    	}
    	
    	// 創建lister
    	podLister := podInformer.Lister()
    	// 從informer中的indexer本地緩存中獲取對象
    	podList, err := podLister.List(labels.Everything())
    	if err != nil {
    		fmt.Println(err)
    	}
    	
    }
    

    總結

    以上只是對K8s informer做了簡單的介紹,以及簡單的寫了一下如何使用informer的示例代碼,后面將開始對informer的各個部件做進一步的源碼分析,敬請期待。

    最后以一張k8s informer的架構圖作為結尾總結,大家回憶一下k8s informer的架構組成以及各個部件的作用。

    posted @ 2022-04-23 09:40  良凱爾  閱讀(269)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看