go语言K8S 的 informer机制浅析

目录
  • 正文
  • 使用方法
    • 创建Informer工厂
    • 创建对象Informer结构体
    • 注册事件方法
    • 启动Informer
  • 机制解析
    • Reflector
    • Controller
    • Processer & Listener
    • Indexer
  • 总结

正文

Kubernetes的控制器模式是其非常重要的一个设计模式,整个Kubernetes定义的资源对象以及其状态都保存在etcd数据库中,通过apiserver对其进行增删查改,而各种各样的控制器需要从apiserver及时获取这些对象,然后将其应用到实际中,即将这些对象的实际状态调整为期望状态,让他们保持匹配。

不过这因为这样,各种控制器需要和apiserver进行频繁交互,需要能够及时获取对象状态的变化,而如果简单的通过暴力轮询的话,会给apiserver造成很大的压力,且效率很低,因此,Kubernetes设计了Informer这个机制,用来作为控制器跟apiserver交互的桥梁,它主要有两方面的作用:

依赖Etcd的List&Watch机制,在本地维护了一份目标对象的缓存。

Etcd的Watch机制能够使客户端及时获知这些对象的状态变化,然后通过List机制,更新本地缓存,这样就在客户端为这些API对象维护了一份和Etcd数据库中几乎一致的数据,然后控制器等客户端就可以直接访问缓存获取对象的信息,而不用去直接访问apiserver,这一方面显著提高了性能,另一方面则大大降低了对apiserver的访问压力;

依赖Etcd的Watch机制,触发控制器等客户端注册到Informer中的事件方法。

客户端可能会对某些对象的某些事件感兴趣,当这些事件发生时,希望能够执行某些操作,比如通过apiserver新建了一个pod,那么kube-scheduler中的控制器收到了这个事件,然后将这个pod加入到其队列中,等待进行调度。

Kubernetes的各个组件本身就内置了非常多的控制器,而自定义的控制器也需要通过Informer跟apiserver进行交互,因此,Informer在Kubernetes中应用非常广泛,本篇文章就重点分析下Informer的机制原理,以加深对其的理解。

使用方法

先来看看Informer是怎么用的,以Endpoint为例,来看下其使用Informer的相关代码:

创建Informer工厂

# client-go/informers/factory.go
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

首先创建了一个SharedInformerFactory,这个结构主要有两个作用:

  • 一个是用来作为创建Informer的工厂,典型的工厂模式,在Kubernetes中这种设计模式也很常用;
  • 一个是共享Informer,所谓共享,就是多个Controller可以共用同一个Informer,因为不同的Controller可能对同一种API对象感兴趣,这样相同的API对象,缓存就只有一份,通知机制也只有一套,大大提高了效率,减少了资源浪费。

创建对象Informer结构体

# client-go/informers/core/v1/endpoints.go
type EndpointsInformer interface {
  Informer() cache.SharedIndexInformer
  Lister() v1.EndpointsLister
}
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()

使用InformerFactory创建出对应版本的对象的Informer结构体,如Endpoints对象对应的就是EndpointsInformer结构体,该结构体实现了两个方法:Informer()和Lister()

  • 前者用来构建出最终的Informer,即我们本篇文章的重点:SharedIndexInformer,
  • 后者用来获取创建出来的Informer的缓存接口:Indexer,该接口可以用来查询缓存的数据,我准备下一篇文章单独介绍其底层如何实现缓存的。

注册事件方法

# Client-go/tools/cache/shared_informer.go
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    onAdd,
    UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用
    DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
  })
func onAdd(obj interface{}) {
  node := obj.(*corev1.Endpoint)
  fmt.Println("add a endpoint:", endpoint.Name)
}

这里,首先调用Infomer()创建出来SharedIndexInformer,然后向其中注册事件方法,这样当有对应的事件发生时,就会触发这里注册的方法去做相应的事情。其次调用Lister()获取到缓存接口,就可以通过它来查询Informer中缓存的数据了,而且Informer中缓存的数据,是可以有索引的,这样可以加快查询的速度。

启动Informer

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
controllerContext.InformerFactory.Start(controllerContext.Stop)

这里InformerFactory的启动,会遍历Factory中创建的所有Informer,依次将其启动。

机制解析

Informer的实现都是在client-go这个库中,通过上述的工厂方法,其实最终创建出来的是一个叫做SharedIndexInformer的结构体:

# k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller
    processor             *sharedProcessor
    cacheMutationDetector MutationDetector
    listerWatcher ListerWatcher
    ......
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

可以看到,在创建SharedIndexInformer时,就创建出了processor, indexer等结构,而在Informer启动时,还创建出了controller, fifo queue, reflector等结构。

Reflector

Reflector的作用,就是通过List&Watch的方式,从apiserver获取到感兴趣的对象以及其状态,然后将其放到一个称为”Delta”的先进先出队列中。

所谓的Delta FIFO Queue,就是队列中的元素除了对象本身外,还有针对该对象的事件类型:

type Delta struct {
    Type   DeltaType
    Object interface{}
}

目前有5种Type: Added, Updated, Deleted, Replaced, Resync,所以,针对同一个对象,可能有多个Delta元素在队列中,表示对该对象做了不同的操作,比如短时间内,多次对某一个对象进行了更新操作,那么就会有多个Updated类型的Delta放入到队列中。后续队列的消费者,可以根据这些Delta的类型,来回调注册到Informer中的事件方法。

而所谓的List&Watch,就是

  • 先调用该API对象的List接口,获取到对象列表,将它们添加到队列中,Delta元素类型为Replaced,
  • 然后再调用Watch接口,持续监听该API对象的状态变化事件,将这些事件按照不同的事件类型,组成对应的Delta类型,添加到队列中,Delta元素类型有Added, Updated, Deleted三种。

此外,Informer还会周期性的发送Resync类型的Delta元素到队列中,目的是为了周期性的触发注册到Informer中的事件方法UpdateFunc,保证对象的期望状态和实际状态一致,该周期是由一个叫做resyncPeriod的参数决定的,在向Informer中添加EventHandler时,可以指定该参数,若为0的话,则关闭该功能。需要注意的是,Resync类型的Delta元素中的对象,是通过Indexer从缓存中获取到的,而不是直接从apiserver中拿的,即这里resync的,其实是”缓存”的对象的期望状态和实际状态的一致性。

根据以上Reflector的机制,依赖Etcd的Watch机制,通过事件来获知对象变化状态,建立本地缓存。即使在Informer中,也没有周期性的调用对象的List接口,正常情况下,List&Watch只会执行一次,即先执行List把数据拉过来,放入队列中,后续就进入Watch阶段。

那什么时候才会再执行List呢?其实就是异常的时候,在List或者Watch的过程中,如果有异常,比如apiserver重启了,那么Reflector就开始周期性的执行List&Watch,直到再次正常进入Watch阶段。为了在异常时段,不给apiserver造成压力,这个周期是一个称为backoff的可变的时间间隔,默认是一个指数型的间隔,即越往后重试的间隔越长,到一定时间又会重置回一开始的频率。而且,为了让不同的apiserver能够均匀负载这些Watch请求,客户端会主动断开跟apiserver的连接,这个超时时间为60秒,然后重新发起Watch请求。此外,在控制器重启过程中,也会再次执行List,所以会观察到之前已经创建好的API对象,又重新触发了一遍AddFunc方法。

从以上这些点,可以看出来,Kubernetes在性能和稳定性的提升上,还是下了很多功夫的。

Controller

这里Controller的作用是通过轮询不断从队列中取出Delta元素,根据元素的类型,一方面通过Indexer更新本地的缓存,一方面调用Processor来触发注册到Informer的事件方法:

# k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    }
}

这里的c.config.Process是定义在shared_informer.go中的HandleDeltas()方法:

# k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()
    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Replaced, Added, Updated:
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                isSync := false
                switch {
                case d.Type == Sync:
                    // Sync events are only propagated to listeners that requested resync
                    isSync = true
                case d.Type == Replaced:
                    if accessor, err := meta.Accessor(d.Object); err == nil {
                        if oldAccessor, err := meta.Accessor(old); err == nil {
                            // Replaced events that didn't change resourceVersion are treated as resync events
                            // and only propagated to listeners that requested resync
                            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                    }
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

Processer & Listener

Processer和Listener则是触发事件方法的机制,在创建Informer时,会创建一个Processer,而在向Informer中通过调用AddEventHandler()注册事件方法时,会为每一个Handler生成一个Listener,然后将该Lisener中添加到Processer中,每一个Listener中有两个channel:addCh和nextCh。Listener通过select监听在这两个channel上,当Controller从队列中取出新的元素时,会调用processer来给它的listener发送“通知”,这个“通知”就是向addCh中添加一个元素,即add(),然后一个goroutine就会将这个元素从addCh转移到nextCh,即pop(),从而触发另一个goroutine执行注册的事件方法,即run()。

# k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}
func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop
    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}
func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        for next := range p.nextCh {
            switch notification := next.(type) {
            case updateNotification:
                p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
                p.handler.OnAdd(notification.newObj)
            case deleteNotification:
                p.handler.OnDelete(notification.oldObj)
            default:
                utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
        }
        // the only way to get here is if the p.nextCh is empty and closed
        close(stopCh)
    }, 1*time.Second, stopCh)
}

Indexer

Indexer是对缓存进行增删查改的接口,缓存本质上就是用map构建的key:value键值对,都存在items这个map中,key为/:

type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}
    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

而为了加速查询,还可以选择性的给这些缓存添加索引,索引存储在indecies中,所谓索引,就是在向缓存中添加记录时,就将其key添加到索引结构中,在查找时,可以根据索引条件,快速查找到指定的key记录,比如默认有个索引是按照namespace进行索引,可以根据快速找出属于某个namespace的某种对象,而不用去遍历所有的缓存。

Indexer对外提供了Replace(), Resync(), Add(), Update(), Delete(), List(), Get(), GetByKey(), ByIndex()等接口。

总结

本篇对Kubernetes Informer的使用方法和实现原理,进行了深入分析,整体上看,Informer的设计是相当不错的,基于事件机制,一方面构建本地缓存,一方面触发事件方法,使得控制器能够快速响应和快速获取数据,此外,还有诸如共享Informer, resync, index, watch timeout等机制,使得Informer更加高效和稳定,有了Informer,控制器模式可以说是如虎添翼。

以上就是go语言K8S 的 informer机制浅析的详细内容,更多关于go K8S informer机制浅析的资料请关注我们其它相关文章!

(0)

相关推荐

  • 使用k8s部署Django项目的方法步骤

    接触了一下docker和k8s,感觉是非常不错的东西.能够方便的部署线上环境,而且还能够更好的利用机器的资源,感觉是以后的大趋势.最近刚好有一个基于django的项目,所以就把这个项目打包到docker里面,放到k8是里面运行,顺便学习下k8s和docker的使用. docker 为什么使用docker? 我觉得docker最大的好处是部署的时候比较方便,一个预先打包好的docker镜像,可以在任何安装有docker的机器上面直接运行,不用再安装其他任何的依赖环境.不管是在开发.测试.还是发布阶

  • k8s在go语言中的使用及client 初始化简介

    作为k8s官方维护的客户端,k8s go-client对于go语言中使用k8s可以说是唯一选项.但是官方的使用示例我个人觉得并不是很清晰,尤其是对于对于k8s并不熟悉的用户.这里我总结一下使用过程中碰到的坑,也希望能给有需要的人一些参考. 首先从官方示例说起:这里先解释一下k8s连接问题.集群的节点上会有一个.kube目录(这个目录一般在root用户home目录下)目录中会存在一个config文件,文件中记录了连接k8s集群所需的所有信息,如apiserver地址,用户认证token等.一般来说

  • Go语言k8s kubernetes使用leader election实现选举

    目录 一.背景 二.官网代码示例 三.锁的实现 一.背景 在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,在发生故障的时候需要自动切换.因此,需要利用leader election的机制多副本部署,单实例运行的模式.应用程序可以使用外部的组件比如ZooKeeper或Etcd等中间件进行leader eleaction, ZooKeeper的实现是采用临时节点的方案,临时节

  • K8s部署发布Golang应用程序的实现方法

    目录 创建dockerfile 打包并且推送 创建namespace 创建deployment 创建service 创建ingress 创建hpa alertGo程序可以参考上篇文章,主要用于alertmanager实现钉钉报警 创建dockerfile FROM golang:1.14-alpine ENV GOPROXY=https://goproxy.cn WORKDIR /build COPY . . EXPOSE 8088 RUN mkdir /app RUN go mod tidy

  • go语言K8S 的 informer机制浅析

    目录 正文 使用方法 创建Informer工厂 创建对象Informer结构体 注册事件方法 启动Informer 机制解析 Reflector Controller Processer & Listener Indexer 总结 正文 Kubernetes的控制器模式是其非常重要的一个设计模式,整个Kubernetes定义的资源对象以及其状态都保存在etcd数据库中,通过apiserver对其进行增删查改,而各种各样的控制器需要从apiserver及时获取这些对象,然后将其应用到实际中,即将这

  • Kotlin中的Checked Exception机制浅析

    前言 现在使用Kotlin的Android开发者已经越来越多了. 这门语言从一开始的无人问津,到后来成为Android开发的一级语言,再到后来Google官宣的Kotlin First.Kotlin正在被越来越多的开发者接受和认可. 许多学习Kotlin的开发者之前都是学习过Java的,并且本身Kotlin就是一款基于JVM语言,因此不可避免地需要经常和Java进行比较. Kotlin的诸多特性,在熟悉Java的开发者看来,有些人很喜欢,有些人不喜欢.但即使是不喜欢的那些人,一旦用熟了Kotli

  • Python多线程与同步机制浅析

    目录 线程实现 Thread类 函数方式 继承方式 同步机制 同步锁Lock 条件变量Condition 信号量Semaphore 事件Event 屏障Barrier GIL全局解释器锁 线程实现 Python中线程有两种方式:函数或者用类来包装线程对象.threading模块中包含了丰富的多线程支持功能: threading.currentThread(): 返回当前线程: threading.enumerate(): 返回包含正在运行的线程列表: threading.activeCount(

  • Java不可变类机制浅析

    不可变类(Immutable Class):所谓的不可变类是指这个类的实例一旦创建完成后,就不能改变其成员变量值.如JDK内部自带的很多不可变类:Interger.Long和String等. 可变类(Mutable Class):相对于不可变类,可变类创建实例后可以改变其成员变量值,开发中创建的大部分类都属于可变类. 不可变类的特性对JAVA来说带来怎样的好处? 1)线程安全:不可变对象是线程安全的,在线程之间可以相互共享,不需要利用特殊机制来保证同步问题,因为对象的值无法改变.可以降低并发错误

  • golang 语言中错误处理机制

    与其他主流语言如 Javascript.Java 和 Python 相比,Golang 的错误处理方式可能和这些你熟悉的语言有所不同.所以才有了这个想法根大家聊一聊 golang 的错误处理方式,以及实际开发中应该如何对错误进行处理.因为分享面对 Golang有一个基本的了解 developers, 所以一些简单地方就不做赘述了. 如何定义错误 在 golang 语言中,无论是在类型检查还是编译过程中,都是将错误看做值来对待,和 string 或者 integer 这些类型值并不差别.声明一个

  • python语言开发垃圾回收机制原理教程

    目录 一.什么是垃圾回收机制 二.为什么要有垃圾回收机制 三.垃圾回收机制的原理 1.引用计数 直接引用 间接引用 2.栈区 / 堆区 3.总结 四.标记清除 1.循环引用问题(也叫交叉引用) 2.循环引用导致的结果 3.解决方法 : 清除-标记 五.分代回收 1.效率问题 2.解决方法 : 分代回收 分代 回收 总结 一.什么是垃圾回收机制 垃圾回收机制(简称GC), 解释器自带的一种机制 它是一种动态存储管理技术,自动释放不再被程序引用的对象所占用的内存空间 二.为什么要有垃圾回收机制 程序

  • MySQL中表锁和行锁机制浅析(源码篇)

    目录 前言 行锁 MySQL 事务属性 事务常见问题 事务的隔离级别 间隙锁 排他锁 共享锁 分析行锁定 行锁优化 表锁 共享读锁 独占写锁 查看加锁情况 分析表锁定 什么场景下用表锁 页锁 补充:行级锁与死锁 总结 前言 众所周知,MySQL的存储引擎有MyISAM和InnoDB,锁粒度分别是表锁和行锁. 后者的出现从某种程度上是弥补前者的不足,比如:MyISAM不支持事务,InnoDB支持事务.表锁虽然开销小,锁表快,但高并发下性能低.行锁虽然开销大,锁表慢,但高并发下相比之下性能更高.事务

  • SpringCloud @RefreshScope刷新机制浅析

    目录 一.前言 二.@Scope 三.RefreshScope 的实现原理 四.总结 一.前言 用过Spring Cloud的同学都知道在使用动态配置刷新的我们要配置一个@RefreshScope 在类上才可以实现对象属性的的动态更新,本着知其所以然的态度,晚上没事儿又把这个点回顾了一下,下面就来简单的说下自己的理解. 总览下,实现@RefreshScope 动态刷新的就需要以下几个: @ Scope @RefreshScope RefreshScope GenericScope Scope C

  • C++11中std::declval的实现机制浅析

    本文主要给大家介绍了关于C++11中std::declval实现机制的相关内容,分享出来供大家参考学习,下面来一起看看详细的介绍: 在vs2013中,declval定义如下 template <_Ty> typenamea dd_rvalue_reference<_Ty>::type declval() _noexcept; 其中,add_rvalue_reference为一个traits,定义为 template <_Ty> struct add_rvalue_ref

  • 深入理解Swift语言中的闭包机制

    在 Swift 中的闭包类似于结构块,并可以在任何地方调用,它就像 C 和 Objective C 语言内置的函数. 函数内部定义的常数和变量引用可被捕获并存储在闭包.函数被视为封闭的特殊情况,它有 3 种形式. 在 Swift 语言闭合表达式,如下优化,重量轻语法风格,其中包括: 推导参数并从上下文菜单返回值的类型 从单封表达的隐性返回 简略参数名称 尾部闭包语法 语法 下面是一个通用的语法定义用于闭包,它接受参数并返回数据的类型: 复制代码 代码如下: {(parameters) -> re

随机推荐