Kubernetes controller manager运行机制源码解析

目录
  • Run
  • StartControllers
  • ReplicaSet
  • ReplicaSetController
  • syncReplicaSet
  • Summary

Run

确立目标

理解 kube-controller-manager 的运行机制

从主函数找到run函数,代码较长,这里精简了一下

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
	// configz 模块,在kube-scheduler分析中已经了解
	if cfgz, err := configz.New(ConfigzName); err == nil {
		cfgz.Set(c.ComponentConfig)
	} else {
		klog.Errorf("unable to register configz: %v", err)
	}
	// 健康监测与http服务,跳过
	var checks []healthz.HealthChecker
	var unsecuredMux *mux.PathRecorderMux
	run := func(ctx context.Context) {
		rootClientBuilder := controller.SimpleControllerClientBuilder{
			ClientConfig: c.Kubeconfig,
		}
    // client认证相关
		var clientBuilder controller.ControllerClientBuilder
    // 创建controller的上下文context
		controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
		if err != nil {
			klog.Fatalf("error building controller context: %v", err)
		}
		saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
		if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
			klog.Fatalf("error starting controllers: %v", err)
		}
    // 这里的 InformerFactory 和我们在kube-scheduler中看的 SharedInformerFactory 基本一致
		controllerContext.InformerFactory.Start(controllerContext.Stop)
		controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
		close(controllerContext.InformersStarted)
		select {}
	}
  // 是否进行选举
	if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
		run(context.TODO())
		panic("unreachable")
	}
  // 拼接出一个全局唯一的id
	id, err := os.Hostname()
	if err != nil {
		return err
	}
	id = id + "_" + string(uuid.NewUUID())
	rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
		c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
		c.ComponentConfig.Generic.LeaderElection.ResourceName,
		c.LeaderElectionClient.CoreV1(),
		c.LeaderElectionClient.CoordinationV1(),
		resourcelock.ResourceLockConfig{
			Identity:      id,
			EventRecorder: c.EventRecorder,
		})
	if err != nil {
		klog.Fatalf("error creating lock: %v", err)
	}
  // 正常情况下都是阻塞在RunOrDie这个函数中,不停地进行选举相关的工作
	leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
		Lock:          rl,
		LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
		RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
		RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
		Callbacks: leaderelection.LeaderCallbacks{
      // 开始成为Leader的时候,调用run函数
			OnStartedLeading: run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		},
		WatchDog: electionChecker,
		Name:     "kube-controller-manager",
	})
	panic("unreachable")
}

StartControllers

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
	// 关键性的循环,启动每个controllers,key为控制器名字,value为初始化函数
	for controllerName, initFn := range controllers {
    // 是否允许启动
		if !ctx.IsControllerEnabled(controllerName) {
			klog.Warningf("%q is disabled", controllerName)
			continue
		}
		time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
		klog.V(1).Infof("Starting %q", controllerName)
    // 调用init函数进行启动
		debugHandler, started, err := initFn(ctx)
		if err != nil {
			klog.Errorf("Error starting %q", controllerName)
			return err
		}
		if !started {
			klog.Warningf("Skipping %q", controllerName)
			continue
		}
    // 注册对应controller到debug的url中
		if debugHandler != nil && unsecuredMux != nil {
			basePath := "/debug/controllers/" + controllerName
			unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
			unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
		}
		klog.Infof("Started %q", controllerName)
	}
	return nil
}
// 我们再去传入controller的函数去看看,对应的controller有哪些,这里有我们很多常见的概念,不一一细讲
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["endpointslice"] = startEndpointSliceController
	controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefulset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["csrsigning"] = startCSRSigningController
	controllers["csrapproving"] = startCSRApprovingController
	controllers["csrcleaner"] = startCSRCleanerController
	controllers["ttl"] = startTTLController
	controllers["bootstrapsigner"] = startBootstrapSignerController
	controllers["tokencleaner"] = startTokenCleanerController
	controllers["nodeipam"] = startNodeIpamController
	controllers["nodelifecycle"] = startNodeLifecycleController
	if loopMode == IncludeCloudLoops {
		controllers["service"] = startServiceController
		controllers["route"] = startRouteController
		controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
	}
	controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
	controllers["attachdetach"] = startAttachDetachController
	controllers["persistentvolume-expander"] = startVolumeExpandController
	controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
	controllers["pvc-protection"] = startPVCProtectionController
	controllers["pv-protection"] = startPVProtectionController
	controllers["ttl-after-finished"] = startTTLAfterFinishedController
	controllers["root-ca-cert-publisher"] = startRootCACertPublisher
	controllers["ephemeral-volume"] = startEphemeralVolumeController
	return controllers
}

ReplicaSet

由于我们的示例是创建一个nginx的pod,涉及到kube-controller-manager的内容很少。

但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明:

A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。 因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。

简单来说,ReplicaSet 就是用来生成指定个数的Pod

代码在pkg/controller/replica_set.go

ReplicaSetController

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
		return nil, false, nil
	}
  // 用goroutine异步运行,包含了 ReplicaSet和Pod 的两个Informer
  // 这一点很好理解:我们是要控制ReplicaSet声明的数量和运行的Pod数量一致,需要同时观察者两种资源
	go replicaset.NewReplicaSetController(
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
		replicaset.BurstReplicas,
	).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
	return nil, true, nil
}
// 运行函数
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer rsc.queue.ShutDown()
	controllerName := strings.ToLower(rsc.Kind)
	klog.Infof("Starting %v controller", controllerName)
	defer klog.Infof("Shutting down %v controller", controllerName)
	if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
		return
	}
	for i := 0; i < workers; i++ {
    // 工作的函数
		go wait.Until(rsc.worker, time.Second, stopCh)
	}
	<-stopCh
}
func (rsc *ReplicaSetController) worker() {
  // 继续查找实现
	for rsc.processNextWorkItem() {
	}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
  // 这里也有个queue的概念,可以类比kube-scheduler中的实现
  // 不同的是,这里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具体实现今天不细看
  // 获取元素
	key, quit := rsc.queue.Get()
	if quit {
		return false
	}
	defer rsc.queue.Done(key)
  // 处理对应的元素
	err := rsc.syncHandler(key.(string))
	if err == nil {
		rsc.queue.Forget(key)
		return true
	}
	utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
	rsc.queue.AddRateLimited(key)
	return true
}
// 再回过头,去查看syncHandler的具体实现
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
	gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
	rsc.syncHandler = rsc.syncReplicaSet
	return rsc
}

syncReplicaSet

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
	startTime := time.Now()
	defer func() {
		klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
	}()
	// 从key中拆分出 namespace 和 name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
  // 根据name,从 Lister 获取对应的 ReplicaSets 信息
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
		rsc.expectations.DeleteExpectations(key)
		return nil
	}
	if err != nil {
		return err
	}
	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  // 获取 selector (k8s 是根据selector中的label来匹配 ReplicaSets 和 Pod 的)
	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
		return nil
	}
	// 根据namespace和labels获取所有的pod
	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
	if err != nil {
		return err
	}
        // 过滤无效的pod
	filteredPods := controller.FilterActivePods(allPods)
	// 根据selector再过滤pod
	filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
	if err != nil {
		return err
	}
	var manageReplicasErr error
	if rsNeedsSync && rs.DeletionTimestamp == nil {
    // 管理 ReplicaSet,下面详细分析
		manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
	}
	rs = rs.DeepCopy()
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
	// 更新状态
	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
	if err != nil {
		return err
	}
	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
	}
	return manageReplicasErr
}
// 我们再一起看看,当Pod数量和ReplicaSet中声明的不同时,是怎么工作的
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
	// diff = 当前pod数 - 期望pod数
  diff := len(filteredPods) - int(*(rs.Spec.Replicas))
	rsKey, err := controller.KeyFunc(rs)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
		return nil
	}
  // diff小于0,表示需要扩容,即新增Pod
	if diff < 0 {
    // 具体的实现暂时不细看
  // diff 大于0,即需要缩容
	} else if diff > 0 {
	}
	return nil
}

站在前人的肩膀上,向前辈致敬,Respect!

Summary

  • kube-controller-manager 的核心思想是: 根据期望状态和当前状态,管理Kubernetes中的资源。 以ReplicaSet为例,它对比了定义声明的Pod数和当前集群中满足条件的Pod数,进行相对应的扩缩容。

以上就是Kubernetes controller manager运行机制源码解析的详细内容,更多关于Kubernetes controller manager的资料请关注我们其它相关文章!

(0)

相关推荐

  • Visitor设计模式及发送pod创建请求实现详解

    目录 确立目标 visitor design pattern Visitor Chained VisitorList EagerVisitorList DecoratedVisitor ContinueOnErrorVisitor FlattenListVisitor FilteredVisitor Implements StreamVisitor FileVisitor URLVisitor KustomizeVisitor 发送创建Pod请求的实现细节 send request RESTfu

  • Kubernetes kubectl中Pod创建流程源码解析

    目录 确立目标 先写一个Pod的Yaml 部署Pod 查询Pod kubectl create 的调用逻辑 Main Match Command Create RunCreate Summary 确立目标 从创建pod的全流程入手,了解各组件的工作内容,组件主要包括以下 kubectl kube-apiserver kube-scheduler kube-controller kubelet 理解各个组件之间的相互协作,目前是kubectl 先写一个Pod的Yaml apiVersion: v1

  • Kubernetes应用服务质量管理详解

    目录 服务质量管理 QoS模型 Guaranteed Burstable BestEffort cpuset LimitRange 服务可用性管理 高可用 可用性 PDB 总结 服务质量管理 在Kubernetes中,Pod是最小的调度单元,所以跟资源和调度相关的属性都是Pod对象的字段,而其中最重要的就是CPU和内存. 如下所示: --- apiVersion: v1 kind: Pod metadata: name: pod-demo spec: containers: - name: my

  • Kubernetes Informer数据存储Index与Pod分配流程解析

    目录 确立目标 Process 查看消费的过程 Index 掌握Index数据结构 distribute 信息的分发distribute 理解一个pod的被调度的大致流程 Scheduler SchedulingQueue scheduleOne ScheduleResult 调度计算结果 Assume 初步推算 Bind 实际绑定 Update To Etcd Summary 确立目标 理解Informer的数据存储方式 大致理解Pod的分配流程 理解Informer的数据存储方式 代码在k8

  • Kubernetes ApiServer三大server权限与数据存储解析

    目录 确立目标 Run Three Servers KubeAPIServer GenericConfig Authentication Authorization Admission GenericAPIServer的初始化 New GenericServer NewAPIServerHandler installAPI Apiserver InstallLegacyAPI Create Pod Pod数据的保存 RESTCreateStrategy Storage Storage Imple

  • Kubernetes 权限管理认证鉴权详解

    目录 正文 认证 认证用户 Normal Users Service Accounts 认证策略 客户端证书 不记名令牌 Static Token File Service Account Tokens OpenID Connect Tokens 鉴权 鉴权流程 鉴权模块 RBAC Role 和 ClusterRole RoleBinding 和 ClusterRoleBinding Service Account 最后 正文 Kubernetes 主要通过 API Server 对外提供服务,

  • Kubernetes scheduler启动监控资源变化解析

    目录 确立目标 run Scheduler NodeName Informer Shared Informer PodInformer Reflect Summary 确立目标 理解kube-scheduler启动的流程 了解Informer是如何从kube-apiserver监听资源变化的情况 理解kube-scheduler启动的流程 代码在cmd/kube-scheduler run // kube-scheduler 类似于kube-apiserver,是个常驻进程,查看其对应的Run函

  • Kubernetes controller manager运行机制源码解析

    目录 Run StartControllers ReplicaSet ReplicaSetController syncReplicaSet Summary Run 确立目标 理解 kube-controller-manager 的运行机制 从主函数找到run函数,代码较长,这里精简了一下 func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // configz 模块,在kube-scheduler分析中已经了解

  • 详解Redis 缓存删除机制(源码解析)

    删除的范围 过期的 key 在内存满了的情况下,如果继续执行 set 等命令,且所有 key 都没有过期,那么会按照缓存淘汰策略选中的 key 过期删除 redis 中设置了过期时间的 key 会单独存储一份 typedef struct redisDb { dict *dict; // 所有的键值对 dict *expires; //设置了过期时间的键值对 // ... } redisDb; 设置有效期 Redis 中有 4 个命令可以给 key 设置过期时间,分别是 expire pexpi

  • React事件机制源码解析

    React v17里事件机制有了比较大的改动,想来和v16差别还是比较大的. 本文浅析的React版本为17.0.1,使用ReactDOM.render创建应用,不含优先级相关. 原理简述 React中事件分为委托事件(DelegatedEvent)和不需要委托事件(NonDelegatedEvent),委托事件在fiberRoot创建的时候,就会在root节点的DOM元素上绑定几乎所有事件的处理函数,而不需要委托事件只会将处理函数绑定在DOM元素本身. 同时,React将事件分为3种类型--d

  • Spring AOP实现声明式事务机制源码解析

    目录 一.声明式全局事务 二.源码 三.小结: 一.声明式全局事务 在Seata示例工程中,能看到@GlobalTransactional,如下方法示例: @GlobalTransactional public boolean purchase(long accountId, long stockId, long quantity) { String xid = RootContext.getXID(); LOGGER.info("New Transaction Begins: " +

  • jquery事件绑定解绑机制源码解析

    引子 为什么Jquery能实现不传回调函数也能解绑事件?如下: $("p").on("click",function(){ alert("The paragraph was clicked."); }); $("#box1").off("click"); 事件绑定解绑机制 调用on函数的时候,将生成一份事件数据,结构如下: { type: type, origType: origType, data: da

  • Flink作业Task运行源码解析

    目录 引言 概览 调度框架 JobMaster ScheduleNG TaskExecutor Task 计算框架 算子计算处理 总结 引言 上一篇我们分析了Flink部署集群的过程和作业提交的方式,本篇我们来分析下,具体作业是如何被调度和计算的.具体分为2个部分来介绍 作业运行的整体框架,对相关的重要角色有深入了解 计算流程,重点是如何调度具体的operator机制 概览 首先我们来了解下整体的框架 JobMaster: 计算框架的主节点,负责运行单个JobGraph,包括任务的调度,资源申请

  • Tomcat的类加载机制流程及源码解析

    目录 前言 1.Tomcat 的类加载器结构图: 2.Tomcat 的类加载流程说明: 3.源码解析: 4.为什么tomcat要实现自己的类加载机制: 前言 在前面 Java虚拟机:对象创建过程与类加载机制.双亲委派模型 文章中,我们介绍了 JVM 的类加载机制以及双亲委派模型,双亲委派模型的类加载过程主要分为以下几个步骤: (1)初始化 ClassLoader 时需要指定自己的 parent 是谁 (2)先检查类是否已经被加载过,如果类已经被加载了,直接返回 (3)若没有加载则调用父加载器 p

  • Mysql锁内部实现机制之C源码解析

    目录 概述 行锁结构 表锁结构 事务中锁的描述 概述 虽然现在关系型数据库越来越相似,但其背后的实现机制可能大相径庭.实际使用方面,因为SQL语法规范的存在使得我们熟悉多种关系型数据库并非难事,但是有多少种数据库可能就有多少种锁的实现方法. Microsoft Sql Server2005之前只提供页锁,直到2005版本才开始支持乐观并发.悲观并发,乐观模式下允许实现行级别锁,在Sql Server的设计中锁是一种稀缺资源,锁的数量越多,开销就越大,为了避免因为锁的数量快速攀升导致性能断崖式下跌

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • Spring源码解析之事务传播特性

    一.使用方式 可以采用Transactional,配置propagation即可. 打开org.springframework.transaction.annotation.Transactional可见默认传播特性是REQUIRED. /** * The transaction propagation type. * <p>Defaults to {@link Propagation#REQUIRED}. * @see org.springframework.transaction.inte

随机推荐