<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • k8s client-go源碼分析 informer源碼分析(6)-Indexer源碼分析

    client-go之Indexer源碼分析

    1.Indexer概述

    Indexer中有informer維護的指定資源對象的相對于etcd數據的一份本地內存緩存,可通過該緩存獲取資源對象,以減少對apiserver、對etcd的請求壓力。

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    type threadSafeMap struct {
    	items map[string]interface{}
    	indexers Indexers
    	indices Indices
    	...
    }
    

    informer所維護的緩存依賴于threadSafeMap結構體中的items屬性,其本質上是一個用map構建的鍵值對,資源對象都存在items這個map中,key為資源對象的namespace/name組成,value為資源對象本身,這些構成了informer的本地緩存。

    Indexer除了維護了一份本地內存緩存外,還有一個很重要的功能,便是索引功能了。索引的目的就是為了快速查找,比如我們需要查找某個node節點上的所有pod、查找某個命名空間下的所有pod等,利用到索引,可以實現快速查找。關于索引功能,則依賴于threadSafeMap結構體中的indexers與indices屬性。

    先通過一張informer概要架構圖看一下Indexer所處位置與其概要功能。

    2.Indexer的結構定義分析

    2.1 Indexer interface

    Indexer接口繼承了一個Store接口(實現本地緩存),以及包含幾個index索引相關的方法聲明(實現索引功能)。

    // staging/src/k8s.io/client-go/tools/cache/index.go
    type Indexer interface {
    	Store
    	
    	Index(indexName string, obj interface{}) ([]interface{}, error)
    	
    	IndexKeys(indexName, indexedValue string) ([]string, error)
    	
    	ListIndexFuncValues(indexName string) []string
    	
    	ByIndex(indexName, indexedValue string) ([]interface{}, error)
    	
    	GetIndexers() Indexers
    
    	AddIndexers(newIndexers Indexers) error
    }
    

    2.2 Store interface

    Store接口本身,定義了Add、Update、Delete、List、Get等一些對象增刪改查的方法聲明,用于操作informer的本地緩存。

    // staging/src/k8s.io/client-go/tools/cache/store.go
    type Store interface {
    	Add(obj interface{}) error
    	Update(obj interface{}) error
    	Delete(obj interface{}) error
    	List() []interface{}
    	ListKeys() []string
    	Get(obj interface{}) (item interface{}, exists bool, err error)
    	GetByKey(key string) (item interface{}, exists bool, err error)
    
    	Replace([]interface{}, string) error
    	Resync() error
    }
    

    2.3 cache struct

    結合代碼,可以看到cache struct是Indexer接口的一個實現,所以自然也是Store接口的一個實現,cache struct包含一個ThreadSafeStore接口的實現,以及一個計算object key的函數KeyFunc。

    cache struct會根據keyFunc生成某個obj對象對應的一個唯一key, 然后調用ThreadSafeStore接口中的方法來操作本地緩存中的對象。

    // staging/src/k8s.io/client-go/tools/cache/store.go
    type cache struct {
    	cacheStorage ThreadSafeStore
    	keyFunc KeyFunc
    }
    

    2.4 ThreadSafeStore interface

    ThreadSafeStore接口包含了操作本地緩存的增刪改查方法以及索引功能的相關方法,其方法名稱與Indexer接口的類似,最大區別是ThreadSafeStore接口的增刪改查方法入參基本都有key,由cache struct中的KeyFunc函數計算得出object key。

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    type ThreadSafeStore interface {
    	Add(key string, obj interface{})
    	Update(key string, obj interface{})
    	Delete(key string)
    	Get(key string) (item interface{}, exists bool)
    	List() []interface{}
    	ListKeys() []string
    	Replace(map[string]interface{}, string)
    	
    	Index(indexName string, obj interface{}) ([]interface{}, error)
    	IndexKeys(indexName, indexKey string) ([]string, error)
    	ListIndexFuncValues(name string) []string
    	ByIndex(indexName, indexKey string) ([]interface{}, error)
    	GetIndexers() Indexers
    
    	AddIndexers(newIndexers Indexers) error
    	Resync() error
    }
    

    2.5 threadSafeMap struct

    threadSafeMap struct是ThreadSafeStore接口的一個實現,其最重要的一個屬性便是items了,items是用map構建的鍵值對,資源對象都存在items這個map中,key根據資源對象來算出,value為資源對象本身,這里的items即為informer的本地緩存了,而indexers與indices屬性則與索引功能有關。

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    type threadSafeMap struct {
    	lock  sync.RWMutex
    	items map[string]interface{}
    
    	// indexers maps a name to an IndexFunc
    	indexers Indexers
    	// indices maps a name to an Index
    	indices Indices
    }
    

    2.6 Indexer結構定義小結

    下面對上面介紹的Indexer的相關struct與interface做個小結:
    (1)Store interface: 定義了Add、Update、Delete、List、Get等一些對象增刪改查的方法聲明,用于操作informer的本地緩存;
    (2)Indexer interface: 繼承了一個Store接口(實現本地緩存),以及包含幾個index索引相關的方法聲明(實現索引功能);
    (3)cache struct: Indexer接口的一個實現,所以自然也是Store接口的一個實現,cache struct包含一個ThreadSafeStore接口的實現,以及一個計算object key的函數KeyFunc;
    (4)ThreadSafeStore interface: 包含了操作本地緩存的增刪改查方法以及索引功能的相關方法,其方法名稱與Indexer接口的類似,最大區別是ThreadSafeStore接口的增刪改查方法入參基本都有key,由cache struct中的KeyFunc函數計算得出object key;
    (5)threadSafeMap struct: ThreadSafeStore接口的一個實現,其最重要的一個屬性便是items了,items是用map構建的鍵值對,資源對象都存在items這個map中,key根據資源對象來算出,value為資源對象本身,這里的items即為informer的本地緩存了,而indexers與indices屬性則與索引功能有關;

    3.Indexer的索引功能

    在threadSafeMap struct中,與索引功能有關的是indexers與indices屬性;

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    type threadSafeMap struct {
    	lock  sync.RWMutex
    	items map[string]interface{}
    
    	// indexers maps a name to an IndexFunc
    	indexers Indexers
    	// indices maps a name to an Index
    	indices Indices
    }
    
    type Indexers map[string]IndexFunc
    
    type IndexFunc func(obj interface{}) ([]string, error)
    
    type Indices map[string]Index
    
    type Index map[string]sets.String
    

    3.1 type Indexers map[string]IndexFunc / type IndexFunc func(obj interface{}) ([]string, error)

    Indexers包含了所有索引器(索引分類)及其索引器函數IndexFunc,IndexFunc為計算某個索引鍵下的所有對象鍵列表的方法;

    Indexers: {  
      "索引器1": 索引函數1,
      "索引器2": 索引函數2,
    }
    

    數據示例:

    Indexers: {  
      "namespace": MetaNamespaceIndexFunc,
      "nodeName": NodeNameIndexFunc,
    }
    
    func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
    	meta, err := meta.Accessor(obj)
    	if err != nil {
    		return []string{""}, fmt.Errorf("object has no meta: %v", err)
    	}
    	return []string{meta.GetNamespace()}, nil
    }
    
    func NodeNameIndexFunc(obj interface{}) ([]string, error) {
    	pod, ok := obj.(*v1.Pod)
    	if !ok {
    		return []string{""}, fmt.Errorf("object is not a pod)
    	}
    	return []string{pod.Spec.NodeName}, nil
    }
    

    3.2 type Indices map[string]Index / type Index map[string]sets.String

    Indices包含了所有索引器(索引分類)及其所有的索引數據Index;而Index則包含了索引鍵以及索引鍵下的所有對象鍵的列表;

    Indices: {
     "索引器1": {  
      "索引鍵1": ["對象鍵1", "對象鍵2"],  
      "索引鍵2": ["對象鍵3"],   
     },
     "索引器2": {  
      "索引鍵3": ["對象鍵1"],  
      "索引鍵4": ["對象鍵2", "對象鍵3"],  
     }
    }
    

    數據示例:

    pod1 := &v1.Pod {
        ObjectMeta: metav1.ObjectMeta {
            Name: "pod-1",
            Namespace: "default",
        },
        Spec: v1.PodSpec{
            NodeName: "node1",
        }
    }
    
    pod2 := &v1.Pod {
        ObjectMeta: metav1.ObjectMeta {
            Name: "pod-2",
            Namespace: "default",
        },
        Spec: v1.PodSpec{
            NodeName: "node2",
        }
    }
    
    pod3 := &v1.Pod {
        ObjectMeta: metav1.ObjectMeta {
            Name: "pod-3",
            Namespace: "kube-system",
        },
        Spec: v1.PodSpec{
            NodeName: "node2",
        }
    }
    
    Indices: {
     "namespace": {  
      "default": ["pod-1", "pod-2"],  
      "kube-system": ["pod-3"],   
     },
     "nodeName": {  
      "node1": ["pod-1"],  
      "node2": ["pod-2", "pod-3"],  
     }
    }
    

    3.3 索引結構小結

    Indexers: {  
      "索引器1": 索引函數1,
      "索引器2": 索引函數2,
    }
    
    Indices: {
     "索引器1": {  
      "索引鍵1": ["對象鍵1", "對象鍵2"],  
      "索引鍵2": ["對象鍵3"],   
     },
     "索引器2": {  
      "索引鍵3": ["對象鍵1"],  
      "索引鍵4": ["對象鍵2", "對象鍵3"],  
     }
    }
    

    3.4 索引功能方法分析

    看到Indexer interface,除了繼承的Store外,其他的幾個方法聲明均與索引功能相關,下面對幾個常用方法進行介紹。

    // staging/src/k8s.io/client-go/tools/cache/index.go
    type Indexer interface {
    	Store
    	
    	Index(indexName string, obj interface{}) ([]interface{}, error)
    	
    	IndexKeys(indexName, indexedValue string) ([]string, error)
    	
    	ListIndexFuncValues(indexName string) []string
    	
    	ByIndex(indexName, indexedValue string) ([]interface{}, error)
    	
    	GetIndexers() Indexers
    
    	AddIndexers(newIndexers Indexers) error
    }
    

    下面的方法介紹基于以下數據:

    Indexers: {  
      "namespace": MetaNamespaceIndexFunc,
      "nodeName": NodeNameIndexFunc,
    }
    
    Indices: {
     "namespace": {  
      "default": ["pod-1", "pod-2"],  
      "kube-system": ["pod-3"],   
     },
     "nodeName": {  
      "node1": ["pod-1"],  
      "node2": ["pod-2", "pod-3"],  
     }
    }
    
    3.4.1 ByIndex(indexName, indexedValue string) ([]interface{}, error)

    調用ByIndex方法,傳入索引器名稱indexName,以及索引鍵名稱indexedValue,方法尋找該索引器下,索引鍵對應的對象鍵列表,然后根據對象鍵列表,到Indexer緩存(即threadSafeMap中的items屬性)中獲取出相應的對象列表。

    // staging/src/k8s.io/client-go/tools/cache/store.go
    func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
    	return c.cacheStorage.ByIndex(indexName, indexKey)
    }
    
    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
    	c.lock.RLock()
    	defer c.lock.RUnlock()
    
    	indexFunc := c.indexers[indexName]
    	if indexFunc == nil {
    		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    	}
    
    	index := c.indices[indexName]
    
    	set := index[indexKey]
    	list := make([]interface{}, 0, set.Len())
    	for key := range set {
    		list = append(list, c.items[key])
    	}
    
    	return list, nil
    }
    

    使用示例:

    pods, err := index.ByIndex("namespace", "default")
    if err != nil {
        panic(err)
    }
    for _, pod := range pods {
        fmt.Println(pod.(*v1.Pod).Name)
    }
    
    fmt.Println("=====")
    
    pods, err := index.ByIndex("nodename", "node1")
    if err != nil {
        panic(err)
    }
    for _, pod := range pods {
        fmt.Println(pod.(*v1.Pod).Name)
    }
    

    輸出:

    pod-1
    pod-2
    =====
    pod-1
    
    3.4.2 IndexKeys(indexName, indexedValue string) ([]string, error)

    IndexKeys方法與ByIndex方法類似,只不過只返回對象鍵列表,不會根據對象鍵列表,到Indexer緩存(即threadSafeMap中的items屬性)中獲取出相應的對象列表。

    // staging/src/k8s.io/client-go/tools/cache/store.go
    func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) {
    	return c.cacheStorage.IndexKeys(indexName, indexKey)
    }
    
    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
    	c.lock.RLock()
    	defer c.lock.RUnlock()
    
    	indexFunc := c.indexers[indexName]
    	if indexFunc == nil {
    		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    	}
    
    	index := c.indices[indexName]
    
    	set := index[indexKey]
    	return set.List(), nil
    }
    

    4.Indexer本地緩存

    從前面的分析可以知道,informer中的本地緩存實際上指的是Indexer中的threadSafeMap,具體到屬性,則是threadSafeMap中的items屬性;

    threadSafeMap struct

    threadSafeMap struct中的items屬性即為informer的本地緩存;

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    type threadSafeMap struct {
    	lock  sync.RWMutex
    	items map[string]interface{}
    
    	// indexers maps a name to an IndexFunc
    	indexers Indexers
    	// indices maps a name to an Index
    	indices Indices
    }
    

    接下來分析下threadSafeMap的幾個核心方法,主要都是操作items屬性的;

    前面對informer-Controller的分析中(代碼如下),提到的s.indexer.Add、s.indexer.Update、s.indexer.Delete、s.indexer.Get等方法其實最終就是調用的threadSafeMap.Add、threadSafeMap.Update、threadSafeMap.Delete、threadSafeMap.Get等;

    // staging/src/k8s.io/client-go/tools/cache/shared_informer.go
    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    	s.blockDeltas.Lock()
    	defer s.blockDeltas.Unlock()
    
    	// from oldest to newest
    	for _, d := range obj.(Deltas) {
    		switch d.Type {
    		case Sync, Added, Updated:
    			isSync := d.Type == Sync
    			s.cacheMutationDetector.AddObject(d.Object)
    			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    				if err := s.indexer.Update(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    			} else {
    				if err := s.indexer.Add(d.Object); err != nil {
    					return err
    				}
    				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    			}
    		case Deleted:
    			if err := s.indexer.Delete(d.Object); err != nil {
    				return err
    			}
    			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    		}
    	}
    	return nil
    }
    

    4.1 threadSafeMap.Add

    調用鏈:s.indexer.Add --> cache.Add --> threadSafeMap.Add

    threadSafeMap.Add方法將key:object存入items中,并調用updateIndices方法更新索引(updateIndices方法這里不展開分析,可以自行查看源碼);

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    func (c *threadSafeMap) Add(key string, obj interface{}) {
    	c.lock.Lock()
    	defer c.lock.Unlock()
    	oldObject := c.items[key]
    	c.items[key] = obj
    	c.updateIndices(oldObject, obj, key)
    }
    

    也可以看到對threadSafeMap進行操作的方法,基本都會先獲取鎖,然后方法執行完畢釋放鎖,所以是并發安全的。

    4.2 threadSafeMap.Update

    調用鏈:s.indexer.Update --> cache.Update --> threadSafeMap.Update

    threadSafeMap.Update方法邏輯與threadSafeMap.Add方法相同;

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    func (c *threadSafeMap) Update(key string, obj interface{}) {
    	c.lock.Lock()
    	defer c.lock.Unlock()
    	oldObject := c.items[key]
    	c.items[key] = obj
    	c.updateIndices(oldObject, obj, key)
    }
    

    4.3 threadSafeMap.Delete

    調用鏈:s.indexer.Delete --> cache.Delete --> threadSafeMap.Delete

    threadSafeMap.Delete方法中,先判斷本地緩存items中是否存在該key,存在則調用deleteFromIndices刪除相關索引,然后刪除items中的key及其對應object;

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    func (c *threadSafeMap) Delete(key string) {
    	c.lock.Lock()
    	defer c.lock.Unlock()
    	if obj, exists := c.items[key]; exists {
    		c.deleteFromIndices(obj, key)
    		delete(c.items, key)
    	}
    }
    

    4.4 threadSafeMap.Get

    調用鏈:s.indexer.Get --> cache.Get --> threadSafeMap.Get

    threadSafeMap.Get方法邏輯相對簡單,沒有索引的相關操作,而是直接從items中通過key獲取對應的object并返回;

    // staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
    func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
    	c.lock.RLock()
    	defer c.lock.RUnlock()
    	item, exists = c.items[key]
    	return item, exists
    }
    

    總結

    Indexer中有informer維護的指定資源對象的相對于etcd數據的一份本地內存緩存,可通過該緩存獲取資源對象,以減少對apiserver、對etcd的請求壓力。

    informer所維護的緩存依賴于threadSafeMap結構體中的items屬性,其本質上是一個用map構建的鍵值對,資源對象都存在items這個map中,key為資源對象的namespace/name組成,value為資源對象本身,這些構成了informer的本地緩存。

    Indexer除了維護了一份本地內存緩存外,還有一個很重要的功能,便是索引功能了。索引的目的就是為了快速查找,比如我們需要查找某個node節點上的所有pod、查找某個命名空間下的所有pod等,利用到索引,可以實現快速查找。關于索引功能,則依賴于threadSafeMap結構體中的indexers與indices屬性。

    最后以一張圖來回顧總結一下Indexer在informer中所處位置與其概要功能。

    posted @ 2022-06-19 10:31  良凱爾  閱讀(82)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看