Kubelet pod 创建工作流程

Kubelet 是 Kubernetes 的四大组件之一,它维护着 Pod 的整个生命周期,是 Kubernetes 创建 Pod 过程中的最后一个环节。本文将介绍 Kubelet 如何创建 Pod。

Pod创建流程

一、基础筑基

1.1 容器管理线程模型

Kubelet pod 创建工作流程

kubelet中的线程模型属于master/wroker模型,通过单master来监听各种事件源,并为每个Pod创建一个goroutine来进行Pod业务逻辑的处理,master和wroker之间通过一个状态管道来进行通信

这里需要知道:
Master-Worker模式是常用的并行设计模式。核心思想是,系统由两个角色组成,Master和Worker,Master负责接收和分配任务,Worker负责处理子任务。任务处理过程中,Master还负责监督任务进展和Worker的健康状态;Master将接收Client提交的任务,并将任务的进展汇总反馈给Client。各角色关系如下图 

Kubelet pod 创建工作流程

1.2 基于事件驱动的状态最终一致性

Kubelet pod 创建工作流程

在通过yaml创建Pod之后,kubernetes会根据当前的事件和当前的Pod状态,来不断进行调整,从而达到最终目标状态的一致性

1.3 组件协作流程

Kubelet pod 创建工作流程

我们按照容器创建这个流程,我们去观察其核心流程,其实主要可以概括为三部分:kubelet、containerRuntime、CRI容器运行时

二、Kubelet创建容器流程

Kubelet pod 创建工作流程

2.1 获取Pod进行准入检查(硬性条件)

kubelet的事件源主要包含两个部分:静态Pod和动态pod:

静态 Pod 是一种在 Kubernetes 集群中运行的 Pod,它不是由 Kubernetes 控制平面的调度器(Scheduler)动态地创建和管理的,而是直接由节点上的 Kubelet 进程静态配置并运行的。

与通常的由调度器决定在哪个节点上运行的 Pod 不同,静态 Pod 是由 Kubelet 直接负责启动和监控的 Pod。这些 Pod 的配置文件通常存储在节点上的特定目录中,而不是由集群的 etcd 存储中心化管理。因此,静态 Pod 不受调度器的管理,也不会被调度到其他节点上。

静态 Pod 可以在节点启动时自动启动,也可以通过 Kubelet 的配置手动指定。这使得静态 Pod 在一些特殊场景下非常有用,例如在节点引导时运行一些系统级别的服务,这些服务不需要 Kubernetes 调度器的参与,或者在无法使用调度器的情况下确保某些 Pod 在节点上运行。

静态 Pod 的配置文件通常位于节点上的 /etc/kubernetes/manifests/ 目录(或类似目录)中,这样 Kubelet 在启动时会自动检测并启动这些 Pod。配置文件的格式与普通的 Pod 配置文件相似,但它们不包含 metadata.namespace 字段,因为静态 Pod 通常没有命名空间的概念。

我们这里只考虑普通的Pod,则会直接将Pod加入到PodManager来进行管理,并且进行准入检查

准入检查主要包含两个关键的控制器:驱逐管理预选检查:驱逐管理主要是根据当前的资源压力,检测对应的Pod是否容忍当前的资源压力(是否满足资源压力限制,是否可以污点容忍);预选检查则是根据当前活跃的容器和当前节点的信息来检查是否满足当前Pod的基础运行环境(亲和性检查),同时如果当前的Pod的优先级特别高或者是静态Pod,则会尝试为其进行资源抢占,会按照QoS等级逐级来进行抢占从而满足其运行环境(QoS)

扩展资源分配、预先抢占资源以及资源压力检测通常是由 Kubernetes 的调度器(Scheduler)和集群中的控制器(如 Horizontal Pod Autoscaler)协同完成的。

  1. 扩展资源分配(Horizontal Pod Autoscaler):
    • 扩展资源分配涉及自动调整 Pod 的副本数量,以适应集群中的工作负载。Horizontal Pod Autoscaler(HPA)是一个控制器,它通过监测工作负载的指标(例如 CPU 使用率或自定义指标)来自动调整相关 Pod 的副本数量。HPA 根据指标的变化,动态地扩展或缩小 Pod 的数量,以满足应用程序的需求。
  2. 预先抢占资源:
    • 预先抢占资源是一种在资源紧张时提前终止低优先级 Pod 以释放资源的策略。这通常是由调度器(Scheduler)执行的。当新的 Pod 准备要被调度到节点上,但是节点的资源不足时,调度器可能会选择终止一些低优先级的 Pod 以腾出足够的资源。这种行为确保高优先级的 Pod 能够得到足够的资源。
  3. 资源压力检测:
    • 资源压力检测涉及监测节点上的资源使用情况,当节点资源接近或达到极限时,系统会采取相应的措施。这可能是由集群中的各种组件来执行的,例如 Kubelet 或 Node Controller。当节点资源紧张时,集群中的控制器可以触发相应的操作,如预先抢占资源、终止一些 Pod 等。

这些功能是集群自我调整和自我管理的一部分,通过协同调度器、控制器等组件,Kubernetes 可以动态地适应工作负载的变化,确保高可用性和资源利用效率。

2.2 kubelet创建事件管道与容器管理主线程同步最新状态

kubelet接收到一个新创建的Pod首先会为其创建一个事件管道,并且启动一个容器管理的主线程消费管道里面的事件,并且会基于最后同步时间来等待当前kubelet中最新发生的事件(从本地的podCache中获取),如果是一个新建的Pod,则主要是通过PLEG中更新时间操作,广播的默认空状态来作为最新的状态。当从本地的podCache中获取到最新的状态信息和从事件源获取的Pod信息后,会结合当前当前statusManager(负责将Pod状态及时更新到Api-Server)和probeManager(主要涉及liveness和readiness的逻辑)里面的Pod里面的容器状态来更新,从而获取当前感知到的最新的Pod状态

2.3 kubelet进行准入控制检查(软件环境)

之前的准入检查是Pod运行的资源硬性限制的检查,而这里的准入检查则是软状态即容器运行时和版本的一些软件运行环境检查,如果这里检查失败,则会将对应的容器状态设置为Blocked

2.4 更新容器状态

在通过准入检查之后,会调用statusManager来进行Pod最新状态的同步,此处可能会同步给apiserver

2.5 Cgroup配置

在更新完成状态之后会启动一个PodContainerManager主要作用则是为对应的Pod根据其QoS等级来进行Cgroup配置的更新

这里需要知道:
kubelet cgroup 层级如下图所示,

Kubelet pod 创建工作流程

kubelet 会在 node 上创建了 4 个 cgroup 层级,从 node 的root cgroup(一般都是/sys/fs/cgroup)往下:

  1. Node 级别:针对 SystemReserved、KubeReserved 和 k8s pods 分别创建的三个Node-level cgroup;
  2. QoS 级别:在kubepodscgroup 里面,又针对三种 pod QoS (guaranteed、brustable、besteffort)分别创建一个 sub-cgroup:
  3. Pod 级别:每个 pod 创建一个 cgroup,用来限制这个 pod 使用的总资源量;
  4. Container 级别:在 pod cgroup 内部,限制单个 container 的资源使用量。

(1)Container 级别 cgroup:在创建 pod 时,可以直接在 container 级别设置 requests/limits,kubelet 在这里做的事情很简单:创建 container 时,将 spec 中指定 requests/limits传给 docker/containerd 等 container runtime。换句话说,底层能力都是 container runtime 提供的,k8s 只是通过接口把 requests/limits 传给了底层。

(2)Pod 级别 cgroup:这种级别的 cgroup 是针对单个 pod 设置资源限额的,因为:

  1. 某些资源是这个 pod 的所有 container 共享的;
  2. 每个 pod 也有自己的一些开销,例如 sandbox container;
  3. Pod 级别还有一些内存等额外开销;

所以pod 的 requests/limits 并不是是对它的 containers 简单累加得到,为了防止一个 pod 的多个容器使用资源超标,k8s 引入了 引入了 pod-level cgroup,每个 pod 都有自己的 cgroup。

(3)QoS 级别 cgroup

实际的业务场景需要我们能根据优先级高低区分几种 pod。例如,

  • 高优先级 pod:无论何时,都应该首先保证这种 pod 的资源使用量;
  • 低优先级 pod:资源充足时允许运行,资源紧张时优先把这种 pod 赶走,释放出的资源分给中高优先级 pod;
  • 中优先级 pod:介于高低优先级之间,看实际的业务场景和需求。

如果设置了 kubelet--cgroups-per-qos=true参数(默认为 true), 就会将所有 pod 分成三种 QoS,优先级从高到低:Guaranteed > Burstable > BestEffort。三种 QoS 是根据requests/limits的大小关系来定义的:

  1. Guaranteed:requests == limits, requests != 0, 即正常需求 == 最大需求,换言之 spec 要求的资源量必须得到保证,少一点都不行;
  2. Burstable:requests < limits, requests != 0, 即正常需求 < 最大需求,资源使用量可以有一定弹性空间;
  3. BestEffort:request == limits == 0, 创建 pod 时不指定 requests/limits就等同于设置为 0,kubelet 对这种 pod 将尽力而为;有好处也有坏处:

(4)Node 级别 cgroup

所有的 k8s pod 都会落入kubepodscgroup;因此所有 k8s pods 占用的资源都已经能够通过 cgroup 来控制,剩下的就是那些 k8s 组件自身和操作系统基础服务所占用的资源了,即KubeReservedSystemReserved。k8s 无法管理这两种服务的资源分配,但能管理它们的限额:有足够权限给它们创建并设置 cgroup 就行了。但是否会这样做需要看 kubelet 配置,

  • --kube-reserved-cgroup=""
  • --system-reserved-cgroup=""

默认为空,表示不创建,也就是系统组件和 pod 之间并没有严格隔离

2.6 Pod基础运行环境准备

接下来kubelet会为Pod的创建准备基础的环境,包括Pod数据目录的创建、镜像秘钥的获取、等待volume挂载完成等操作创建Pod的数据目录主要是创建 Pod运行所需要的Pod、插件、Volume目录,并且会通过Pod配置的镜像拉取秘钥生成秘钥信息,到此kubelet创建容器的工作就已经基本完成

三、ContainerRuntime

Kubelet pod 创建工作流程

创建容器沙箱的责任通常落在 CRI(Container Runtime Interface)上,而不是直接由具体的容器运行时(Container Runtime)负责。CRI 提供了 Kubernetes 与底层容器运行时之间的标准接口,使得 Kubelet 能够与不同的容器运行时进行通信,而不需要关心具体的实现细节。

具体流程先简单理解概念如下,一会儿会详细分析:

  1. Pod 创建请求:
    • 当 Kubernetes 接收到创建 Pod 的请求时,Kubelet 负责执行这一请求。
  2. Kubelet 通过 CRI 请求容器创建:
    • Kubelet 通过 CRI 向底层容器运行时发出请求,请求创建一个包含一个或多个容器的 Pod。
  3. CRI 创建容器沙箱:
    • CRI 负责创建容器沙箱(Container Sandbox),这是一个包含所有与 Pod 相关的资源的隔离环境。这包括网络命名空间、IPC 命名空间、PID 命名空间等。
  4. CRI 调用容器运行时创建容器:
    • 在容器沙箱准备好后,CRI 会调用底层容器运行时的接口,请求在沙箱内创建一个或多个容器实例。
  5. 容器运行时创建容器:
    • 容器运行时接收到 CRI 的请求后,负责在容器沙箱内创建并启动容器实例。

CRI 充当了 Kubernetes 和容器运行时之间的中间层,负责管理容器生命周期的各个阶段,包括创建容器沙箱和在沙箱内创建容器实例。容器运行时只需关注具体的容器创建和管理操作,而不需要直接与 Kubernetes 通信。这种设计使得 Kubernetes 能够支持不同的容器运行时,而不受底层实现的限制。

首先解释下RuntimeService和container runtime

  1. RuntimeService(运行时服务): 运行时服务是一个更通用的术语,通常指的是一个负责管理和执行应用程序或服务的软件组件。在容器化环境中,RuntimeService包含了所有与容器相关的运行时操作,包括创建、启动、停止、销毁等。这可能还包括资源管理、监控、日志收集等功能。RuntimeService的角色是确保应用程序或服务在运行时得到适当的支持和资源。Docker服务(Docker Daemon)、Kubernetes等都可以被视为运行时服务。
  2. Container Runtime(容器运行时): 容器运行时是RuntimeService的一个子集,专门负责直接管理容器的创建、启动、停止和销毁等基本操作。它提供了容器和底层操作系统之间的接口,确保容器能够在操作系统上正确运行。Docker Runtime、containerd、cri-o等都是具体的容器运行时。

因此,容器运行时是RuntimeService的一部分,负责处理容器的核心运行时任务。在实际使用中,容器运行时通常作为RuntimeService的一个关键组件。不同的容器平台和容器编排系统可能选择不同的容器运行时来满足其需求,但RuntimeService的目标是提供整体支持和管理容器化应用程序的服务。

前面我们提到过针对Pod的操作,最终都是基于事件和状态的同步而完成,在ContainerRuntime并不会区分对应的事件是创建还是更新操作,只是根据当前的Pod的信息与目标状态来进行对比,从而构建出对应的操作,达到目标状态

3.1 计算Pod容器变更

计算容器变更主要包括:Pod的sandbox是否变更、短生命周期容器、初始化容器、业务容器是否已经完成,相应的我们会得到一个对应的容器列表:1、需要启动的容器列表 2、需要被kill掉的容器列表 3、如果初始化容器未完成,可以看到这个地方是两个走向:

(1)初始化失败尝试终止:如果之前检测到之前的初始化容器失败,则会检查当前Pod的所有容器和sandbox关联的容器,如果有在运行的容器,会全部进行Kill操作,并且等待操作完成
(2)未知状态容器补偿:当一些Pod的容器已经运行,但是其状态仍然是Unknow的时候,在这个地方会进行统一的处理,全部kill掉,从而为接下来的重新启动做清理操作

3.2 创建容器沙箱

在启动Pod的容器之前,首先会为其创建一个Pod sandbox容器(在 Linux CRI 体系里,Pod Sandbox 其实就是 pause 容器,在Kubernetes中,pause容器作为pod中所有容器的“父容器”。pause容器有两个核心职责。首先,它是pod中Linux Namespace共享的基础(network、PID、IPC、UTS)。其次,启用了PID(进程ID)命名空间共享后,它为每个pod充当PID 1,并接收僵尸进程。),当前Pod的所有容器都和Pod对应的sandbox共享同一个namespace从而共享一个namespace里面的资源,创建Sandbox比较复杂,后续会展开介绍

Kubelet pod 创建工作流程

3.3 启动Pod相关容器

Pod的容器目前分为三大类:短生命周期容器(EphemeralContainer)、初始化容器(initContainer)、业务容器(MainContainer),启动顺序也是从左到右依次进行,如果对于的容器创建失败,则会通过backoff机制来延缓容器的创建,这里我们顺便介绍下containerRuntime启动容器的流程

3.3.1 检查容器镜像是否拉取

镜像的拉取首先会进行对应容器镜像的拼接,然后将之前获取的拉取的秘钥信息和镜像信息,一起交给CRI运行时来进行底层容器镜像的拉取,当然这里也会各种backoff机制,从而避免频繁拉取失败影响kubelet的性能

3.3.2 创建容器配置

创建容器配置主要是为了容器的运行创建对应的配置数据,主要包括:Pod的主机名、域名、挂载的volume、挂载的设备信息、configMap、secret、环境变量、要挂载的目录信息、端口映射信息、日志目录信息、根据环境生成执行的命令等信息

3.3.3 调用runtimeService完成容器的创建

调用runtimeService传递容器的配置信息,调用CRI,并且最终调用容器的创建接口完成容器的状态

3.3.4 调用runtimeService启动容器

通过之前创建容器返回的容器ID,来进行对应的容器的启动,并且会为容器创建对应的日志目录

3.3.5 执行容器的回调钩子

如果容器配置了PostStart钩子,则会在此处进行对应钩子的执行,如果钩子的类型是Exec类则会调用CNI的Exec接口完成在容器内的执行

四. 运行沙箱容器

Kubelet pod 创建工作流程

4.1 拉取sandbox镜像

首先会拉取sandbox镜像

4.2 创建沙箱容器

4.2.1 应用SecurityContext

在创建容器之前会先根据SecurityContext里面的配资信息,来进行容器SecurityContext的配置,主要包括特权等级、只读目录、运行用户等信息(详细可见:https://www.ljh.cool/34009.html),除了应用SecurityContext还会进行断开、OOMScoreAdj、Cgroup驱动等信息的映射

4.3 创建容器

根据上面的各种配置信息来进行容器的创建

4.3 创建checkpoint

checkpoint主要是将当前sandbox的配置信息进行序列化,并且存储其当前的快照信息

4.4 启动sandbox容器

启动sandbox容器则会直接调用StartContainer同时传入之前创建容器返回的ID完成容器的启动,并且此时会重写覆盖容器的dns配置文件

4.5 容器网络设置

容器的网络配置主要是调用CNI插件来完成容器网络的配置,这里就先不展开了

五、Pod容器启动总结

Kubelet pod 创建工作流程

kubelet是容器管理的核心大管家,其负责各种准入控制、状态管理、探测管理、volume管理、QOS管理、CSI对接的统一调度,并且为Runtime运行时准备基础的数据和并反馈Pod当前的最新状态

Kubelet pod 创建工作流程
Kubelet pod 创建工作流程

Runtime层则将kubelet组装的数据,按照CRI运行时的目标配置和kubelet管理的资源配置信息来进行资源的重组,并且根据Pod的容器的状态来决策容器的启停、创建等操作,并完成容器的基础配置环境的构建,并最终调用CRI完成容器的创建,而CRI运行时,则会讲传递过来的各种数据进行进一步的组合,并应用到主机和对应的namespace资源限制,并根据自己的容器服务组织数据,调用容器服务完成容器的最终创建

Kubelet 代码架构

先看一张Kubelet的组件架构图,如下。

Kubelet pod 创建工作流程

可以看出,Kubelet 主要分为三层:API 层、syncLoop 层、CRI 及以下;API层很好理解,就是对外提供接口的部分;syncLoop层是Kubelet的核心工作层,Kubelet的主要工作就是围绕着这个syncLoop,即控制循环,由生产者和消费者运行;CRI 提供容器和镜像服务的接口,容器在运行时可以作为 CRI 插件访问。CRI 为容器和镜像服务提供接口,在容器运行时可以作为 CRI 插件访问。

让我们看一下syncLoop层的一些重要组件。

  • PLEG(pod lifecycle event generator):调用容器运行时接口获取本节点的容器/沙箱信息,与本地维护的pod缓存进行对比,生成对应的PodLifecycleEvent,然后通过eventChannel发送给Kubelet syncLoop,然后通过定时任务最终达到用户期望的状态。
  • CAdvisor:集成在 Kubelet 中的容器监控工具,用于收集本节点和容器的监控信息。
  • PodWorkers:注册了多个 pod handler,用于在不同时间处理 pod,包括创建、更新、删除等。
  • oomWatcher:系统OOM的监听器,将与CAdvisor模块建立SystemOOM,并通过Watch从CAdvisor接收到OOM信号相关的事件。
  • containerGC:负责清理节点上无用的容器,具体的垃圾回收操作由容器运行时实现。
  • imageGC:负责节点节点上的图像回收。当存储镜像的本地磁盘空间达到一定阈值时,会触发镜像回收,并删除未被 pod 使用的镜像。
  • Managers:包含管理与 Pod 相关的各种资源的各种管理器。每个经理都有自己的角色,并在 SyncLoop 中一起工作。

Kubelet 的工作原理

如上所述,Kubelet 主要围绕 SyncLoop 工作。在 go channel 的帮助下,每个组件监听循环以消费事件或产生 pod 相关的事件到其中,整个控制循环运行事件驱动。这可以用下图表示。

Kubelet pod 创建工作流程

例如,在创建pod的过程中,当一个pod被分派到一个节点时,会触发Kubelet在循环控制中注册的一个handler,比如上图中的HandlePods部分。此时,Kubelet 会检查 Kubelet 内存中的 Pod 的状态,确定是需要创建的 Pod,并在 Handler 中触发 ADD 事件对应的逻辑。

同步循环

让我们看看主循环,SyncLoop。

func (kl *kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  klog.Info("Starting kubelet main sync loop.")// The syncTicker wakes up Kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s.
  syncTicker := time.NewTicker(time.Second)
  defer syncTicker.Stop()
  housekeepingTicker := time.NewTicker(housekeepingPeriod)
  defer housekeepingTicker.Stop()
  plegCh := kl.pleg.Watch()const (base   = 100 * time.Millisecond
    max    = 5 * time.Second
    factor = 2)
  duration := base// Responsible for checking limits in resolv.conf// The limits do not have anything to do with individual pods// Since this is called in syncLoop, we don't need to call it anywhere elseif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
    kl.dnsConfigurer.CheckLimitsForResolvConf()}

  for {if err := kl.runtimeState.runtimeErrors(); err != nil {
      klog.Errorf("skipping pod synchronization - %v", err)// exponential backoff
      time.Sleep(duration)
      duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a success
    duration = base

    kl.syncLoopMonitor.Store(kl.clock.Now())if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}
    kl.syncLoopMonitor.Store(kl.clock.Now())}
}

SyncLoop 启动一个死循环,其中只调用了 syncLoopIteration 方法。syncLoopIteration 遍历所有传入通道并将任何有消息的管道移交给处理程序。

这些渠道包括:

  • configCh:该通道的生产者由 kubeDeps 对象中的 PodConfig 子模块提供。该模块将侦听来自文件、http 和 apiserver 的 pod 信息的更改,并在更新来自源的 pod 信息时向该通道生成事件。
  • plegCh:该通道的生产者是 pleg 子模块,它会定期向容器运行时查询所有容器的当前状态,如果状态发生变化,就会产生事件到该通道。
  • syncCh:定期同步最新保存的 Pod 状态。
  • livenessManager.Updates():健康检查发现某个 pod 不可用,Kubelet 会根据 pod 的 restartPolicy 自动执行正确的操作。
  • houseKeepingCh:用于管家事件的管道,进行 pod 清理。

syncLoopIteration 的代码。

func (kl *kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
  syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {case u, open := <-configCh:// Update from a config source; dispatch it to the right handler// callback.if !open {
      klog.Errorf("Update channel is closed. Exiting the sync loop.")return false}

    switch u.Op {case kubetypes.ADD:
      klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))// After restarting, Kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.
      handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:
      klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
      handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:
      klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
      handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:
      klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
      handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:
      klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.
      handler.HandlePodUpdates(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?
      klog.Errorf("Kubelet does not support snapshot update")default:
      klog.Errorf("Invalid event type received: %d.", u.Op)}

    kl.sourcesReady.AddSource(u.Source) case e := <-plegCh:if e.Type == pleg.ContainerStarted {// record the most recent time we observed a container start for this pod.// this lets us selectively invalidate the runtimeCache when processing a delete for this pod// to make sure we don't miss handling graceful termination for containers we reported as having started.
      kl.lastContainerStartedTime.Add(e.ID, time.Now())}if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
        klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
        handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.
        klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)}}

    if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {
        kl.cleanUpContainersInPod(e.ID, containerID)}}case <-syncCh:// Sync pods waiting for sync
    podsToSync := kl.getPodsToSync()if len(podsToSync) == 0 {break}
    klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
    handler.HandlePodSyncs(podsToSync)case update := <-kl.livenessManager.Updates():if update.Result == proberesults.Failure {// The liveness manager detected a failure; sync the pod.

      // We should not use the pod from livenessManager, because it is never updated after// initialization.
      pod, ok := kl.podManager.GetPodByUID(update.PodUID)if !ok {// If the pod no longer exists, ignore the update.
        klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)break}
      klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
      handler.HandlePodSyncs([]*v1.Pod{pod})}case <-housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.
      klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")} else {
      klog.V(4).Infof("SyncLoop (housekeeping)")if err := handler.HandlePodCleanups(); err != nil {
        klog.Errorf("Failed cleaning pods: %v", err)}}}return true
}

创建 Pod 的过程

Kubelet pod 创建过程是由 configCh 中的 ADD 事件触发的,所以这里是 Kubelet 收到 ADD 事件后的主要流程。

处理程序

当 configCh 发生 ADD 事件时,循环会触发 SyncHandler 的 HandlePodAdditions 方法。该方法的流程可以用下面的流程图来描述。

Kubelet pod 创建工作流程

首先,handler 会对所有的 pod 安装创建时间进行排序,然后一一处理。

先将pod添加到podManager中,方便后续操作;然后判断是否为mirror pod,如果是则视为mirror pod,否则视为普通pod,这里是mirror pod的解释。

调用api-server创建一个管理pod的控制器类型资源(如: DeploymentStatefulSet)实际调用流程大致为:kubectl -> apiserver -> pod配置清单写入etcd后,调用scheduler获取调度节点,将调度信息写入etcd -> 通知Kubelet进行创建 -> 调用容器运行时创建容器。而静态pod的创建流程非常简单: kubelet -> 调用容器运行时创建容器。kubelet直接管理的Pod为什么还能通过apiserver获取到?因为kubelet会为每个它管理的静态pod,调用api-server创建一个对应的pod镜像。由此以来,静态pod也能通过kubectl等方式进行访问,与其他控制器创建出来的pod看起来没有什么区别。

静态 Pod 直接由特定节点上的kubelet进程来直接管理,不通过 master 节点上的apiserver。无法与我们常用的控制器Deployment或者DaemonSet进行关联,它由kubelet进程自己来监控,当pod崩溃时重启该podkubelet也无法对他们进行健康检查。静态 pod 始终绑定在某一个kubelet,并且始终运行在同一个节点上。 kubelet会自动为每一个静态 pod 在 Kubernetes 的 apiserver 上创建一个镜像 Pod(Mirror Pod),因此我们可以在 apiserver 中查询到该 pod,但是不能通过 apiserver 进行控制(例如不能删除)。

Kubelet pod 创建工作流程

下一步就是判断 pod 是否可以在节点上运行,这在 Kubelet 中也称为 pod 访问控制。访问控制主要包括这几个方面:

  • 节点是否满足 pod 亲和性规则
  • 节点是否有足够的资源分配给 pod
  • 节点是使用HostNetwork还是HostIPC(在PodSecurityPolicy中:1:hostPID:是否容许Pod共享宿主机的进程空间 2:hostIPC:是否容许Pod共享宿主机的IPC命名空间 3:hostNetwork:是否容许Pod共享宿主机网络的命名空间 4:hostPorts:是否容许Pod使用宿主机的端口号,能够经过hostPortRange字段设置容许使用的端口号范围,以[min, max]设置最小端口号和最大端口号 5:Volumes:容许Pod使用的存储卷Volume类型),如果是,是否在节点的白名单中
  • /proc 挂载目录满足要求
  • 是否配置了 pod 以及是否配置了正确的 AppArmor

当所有条件都满足时,最终触发 podWorker 同步 pod。

​HandlePodAdditions ​​对应的代码如下。

func (kl *kubelet) HandlePodAdditions(pods []*v1.Pod) {
  start := kl.clock.Now()
  sort.Sort(sliceutils.PodsByCreationTime(pods))for _, pod := range pods {
    existingPods := kl.podManager.GetPods()// Always add the pod to the pod manager. Kubelet relies on the pod// manager as the source of truth for the desired state. If a pod does// not exist in the pod manager, it means that it has been deleted in// the apiserver and no action (other than cleanup) is required.
    kl.podManager.AddPod(pod)

    if kubetypes.IsMirrorPod(pod) {
      kl.handleMirrorPod(pod, start)continue}

    if !kl.podIsTerminated(pod) {// Only go through the admission process if the pod is not// terminated.

      // We failed pods that we rejected, so activePods include all admitted// pods that are alive.
      activePods := kl.filterOutTerminatedPods(existingPods)

      // Check if we can admit the pod; if not, reject it.if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
        kl.rejectPod(pod, reason, message)continue}}
    mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    kl.probeManager.AddPod(pod)}
}

podWorkers 的工作

接下来,让我们看看 podWorker 是做什么的。podWorker 维护一个名为 podUpdates 的 map,以 pod uid 为 key,每个 pod 有一个 channel;当 pod 有事件时,首先从这个 map 中获取对应的 channel,然后启动一个协程监听这个 channel,并执行 managePodLoop。另一方面,podWorker 将需要同步的 pod 传递到该通道中。

managePodLoop 收到事件后,会首先从 pod 缓存中获取 pod 的最新状态,以保证当前正在处理的 pod 是最新的;然后调用syncPod方法将同步结果记录在workQueue中,等待下一个定时同步任务。

整个过程如下图所示。

Kubelet pod 创建工作流程

podWorker 中处理 pod 事件的代码。

func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
  pod := options.Pod
  uid := pod.UID
  var podUpdates chan UpdatePodOptionsvar exists bool

  p.podLock.Lock()
  defer p.podLock.Unlock()if podUpdates, exists = p.podUpdates[uid]; !exists {
    podUpdates = make(chan UpdatePodOptions, 1)
    p.podUpdates[uid] = podUpdates

    go func() {
      defer runtime.HandleCrash()
      p.managePodLoop(podUpdates)}()}if !p.isWorking[pod.UID] {
    p.isWorking[pod.UID] = true
    podUpdates <- *options
  } else {// if a request to kill a pod is pending, we do not let anything overwrite that request.
    update, found := p.lastUndeliveredWorkUpdate[pod.UID]if !found || update.UpdateType != kubetypes.SyncPodKill {
      p.lastUndeliveredWorkUpdate[pod.UID] = *options   }}
}

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {var lastSyncTime time.Timefor update := range podUpdates {
    err := func() error {
      podUID := update.Pod.UID
      status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)if err != nil {
        p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)return err
      }
      err = p.syncPodFn(syncPodOptions{
        mirrorPod:      update.MirrorPod,
        pod:            update.Pod,
        podStatus:      status,
        killPodOptions: update.KillPodOptions,
        updateType:     update.UpdateType,})
      lastSyncTime = time.Now()return err
    }()// notify the call-back function if the operation succeeded or notif update.OnCompleteFunc != nil {
      update.OnCompleteFunc(err)}if err != nil {// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
      klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)}
    p.wrapUp(update.Pod.UID, err)}
}

同步Pod

上面的podWorker在managePodLoop中调用的syncPod方法实际上就是Kubelet对象的SyncPod方法,在文件pkg/kubelet/kubelet.go中。

此方法是实际与容器运行时层交互的方法。首先判断是否为kill事件,如果是则直接调用runtime的killPod;然后判断是否可以在节点上运行,也就是上面提到的 Kubelet 访问控制;然后判断CNI插件是否准备好,如果没有,只创建并更新。然后判断pod是否为静态pod,如果是,则创建对应的镜像pod;然后创建需要挂载pod的目录;最后调用运行时的syncPod。

整个过程如下图所示。

Kubelet pod 创建工作流程

Kubelet 的 syncPod 代码如下。为了理解主要流程,我去掉了一些优化代码,有兴趣的可以自己查看源码。

func (kl *kubelet) syncPod(o syncPodOptions) error {// pull out the required options
   pod := o.pod
   mirrorPod := o.mirrorPod
   podStatus := o.podStatus
   updateType := o.updateType

   // if we want to kill a pod, do it now!if updateType == kubetypes.SyncPodKill {...if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
         kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)// there was an error killing the pod, so we return that error directly
         utilruntime.HandleError(err)return err
      }return nil}...
   runnable := kl.canRunPod(pod)if !runnable.Admit {...}
   ...// If the network plugin is not ready, only start the pod if it uses the host networkif err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
      kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)}...if !kl.podIsTerminated(pod) {...if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {if !pcm.Exists(pod) {if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
               klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)}...}}}

   // Create Mirror Pod for Static Pod if it doesn't already existif kubetypes.IsStaticPod(pod) {...}if mirrorPod == nil || deleted {
         node, err := kl.GetNode()if err != nil || node.DeletionTimestamp != nil {
            klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)} else {
            klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))if err := kl.podManager.CreateMirrorPod(pod); err != nil {
               klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)}}}}

   // Make data directories for the podif err := kl.makePodDataDirs(pod); err != nil {
      kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
      klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)return err
   }

   // Volume manager will not mount volumes for terminated podsif !kl.podIsTerminated(pod) {// Wait for volumes to attach/mountif err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
         kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
         klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)return err
      }}

   // Fetch the pull secrets for the pod
   pullSecrets := kl.getPullSecretsForPod(pod)

   // Call the container runtime's SyncPod callback
   result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
   kl.reasonCache.Update(pod.UID, result)if err := result.Error(); err != nil {// Do not return error if the only failures were pods in backofffor _, r := range result.SyncResults {if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {// Do not record an event here, as we keep all event logging for sync pod failures// local to container runtime so we get better errorsreturn err
         }}return nil}

   return nil
}

整个创建pod的过程就到了runtime层的syncPod部分,这里就看一下流程

Kubelet pod 创建工作流程

过程很清晰,先计算pod沙箱和容器变化,如果沙箱发生变化,则将pod杀死,然后杀死其相关容器;然后为 pod 创建一个沙箱(无论是需要新建的 pod 还是沙箱已更改并被删除的 pod);后面是启动临时容器、初始化容器和业务容器。

其中,临时容器是 k8s v1.16 的新特性,它临时运行在现有的 Pod 中,以完成用户发起的操作,例如故障排除。

整个代码如下,这里又去掉了一些优化代码来展示主流程。

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {// Step 1: Compute sandbox and container changes.
  podContainerChanges := m.computePodActions(pod, podStatus)
  klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))if podContainerChanges.CreateSandbox {ref, err := ref.GetReference(legacyscheme.Scheme, pod)if err != nil {
      klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)}...}

  // Step 2: Kill the pod if the sandbox has changed.if podContainerChanges.KillPod {
    killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
    result.AddPodSyncResult(killResult)...

  } else {// Step 3: kill any running containers in this pod which are not to keep.for containerID, containerInfo := range podContainerChanges.ContainersToKill {...

      if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
        killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
        klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)return}}}...// Step 4: Create a sandbox for the pod if necessary.
  podSandboxID := podContainerChanges.SandboxIDif podContainerChanges.CreateSandbox {var msg stringvar err error
    ...
    podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)if err != nil {...}
    klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))...}

  ...

  // Step 5: start ephemeral containersif utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {for _, idx := range podContainerChanges.EphemeralContainersToStart {
      start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))}}

  // Step 6: start the init container.if container := podContainerChanges.NextInitContainerToStart; container != nil {// Start the next init container.if err := start("init container", containerStartSpec(container)); err != nil {return}
        ...}

  // Step 7: start containers in podContainerChanges.ContainersToStart.for _, idx := range podContainerChanges.ContainersToStart {
    start("container", containerStartSpec(&pod.Spec.Containers[idx]))}

  return
}

最后,让我们看看什么是沙盒。在计算机安全领域,沙箱是一种隔离程序以限制不可信进程权限的机制。docker 在容器中使用这种技术,为每个容器创建一个沙箱,定义其 cgroup 和各种命名空间来隔离容器;k8s 中的每个 pod 为 k8s 中的每个 pod 共享一个沙箱,因此同一个 pod 中的所有容器可以互操作并与外界隔离。

让我们看一下在 Kubelet 中为 Pod 创建沙箱的过程。首先定义Pod的DNS配置、HostName、日志路径、沙盒端口,这些都是Pod中的容器共享的;然后定义pod的linux配置,包括父cgroup、IPC/Network/Pid命名空间、sysctls、Linux权限;一切都配置好之后,那么整个流程如下。

Kubelet pod 创建工作流程

源代码

func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
  podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)...

  // Create pod logs directory
  err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)...
  podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)...return podSandBoxID, "", nil
}

func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *v1.Pod, attempt uint32) (*runtimeapi.PodSandboxConfig, error) {
  podUID := string(pod.UID)
  podSandboxConfig := &runtimeapi.PodSandboxConfig{Metadata: &runtimeapi.PodSandboxMetadata{Name:      pod.Name,Namespace: pod.Namespace,Uid:       podUID,Attempt:   attempt,},Labels:      newPodLabels(pod),Annotations: newPodAnnotations(pod),}

  dnsConfig, err := m.runtimeHelper.GetPodDNS(pod)...
  podSandboxConfig.DnsConfig = dnsConfig

  if !kubecontainer.IsHostNetworkPod(pod) {
    podHostname, podDomain, err := m.runtimeHelper.GeneratePodHostNameAndDomain(pod)
    podHostname, err = util.GetNodenameForKernel(podHostname, podDomain, pod.Spec.SetHostnameAsFQDN)
    podSandboxConfig.Hostname = podHostname
  }

  logDir := BuildPodLogsDirectory(pod.Namespace, pod.Name, pod.UID)
  podSandboxConfig.LogDirectory = logDir

  portMappings := []*runtimeapi.PortMapping{}for _, c := range pod.Spec.Containers {
    containerPortMappings := kubecontainer.MakePortMappings(&c)...}if len(portMappings) > 0 {
    podSandboxConfig.PortMappings = portMappings
  }

  lc, err := m.generatePodSandboxLinuxConfig(pod)...
  podSandboxConfig.Linux = lc

  return podSandboxConfig, nil
}

// generatePodSandboxLinuxConfig generates LinuxPodSandboxConfig from v1.Pod.
func (m *kubeGenericRuntimeManager) generatePodSandboxLinuxConfig(pod *v1.Pod) (*runtimeapi.LinuxPodSandboxConfig, error) {
  cgroupParent := m.runtimeHelper.GetPodCgroupParent(pod)
  lc := &runtimeapi.LinuxPodSandboxConfig{CgroupParent: cgroupParent,SecurityContext: &runtimeapi.LinuxSandboxSecurityContext{Privileged: kubecontainer.HasPrivilegedContainer(pod),SeccompProfilePath: v1.SeccompProfileRuntimeDefault,},}

  sysctls := make(map[string]string)if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {if pod.Spec.SecurityContext != nil {for _, c := range pod.Spec.SecurityContext.Sysctls {
        sysctls[c.Name] = c.Value}}}

  lc.Sysctls = sysctls

  if pod.Spec.SecurityContext != nil {
    sc := pod.Spec.SecurityContextif sc.RunAsUser != nil {
      lc.SecurityContext.RunAsUser = &runtimeapi.Int64Value{Value: int64(*sc.RunAsUser)}}if sc.RunAsGroup != nil {
      lc.SecurityContext.RunAsGroup = &runtimeapi.Int64Value{Value: int64(*sc.RunAsGroup)}}
    lc.SecurityContext.NamespaceOptions = namespacesForPod(pod)

    if sc.FSGroup != nil {
      lc.SecurityContext.SupplementalGroups = append(lc.SecurityContext.SupplementalGroups, int64(*sc.FSGroup))}if groups := m.runtimeHelper.GetExtraSupplementalGroupsForPod(pod); len(groups) > 0 {
      lc.SecurityContext.SupplementalGroups = append(lc.SecurityContext.SupplementalGroups, groups...)}if sc.SupplementalGroups != nil {for _, sg := range sc.SupplementalGroups {
        lc.SecurityContext.SupplementalGroups = append(lc.SecurityContext.SupplementalGroups, int64(sg))}}if sc.SELinuxOptions != nil {
      lc.SecurityContext.SelinuxOptions = &runtimeapi.SELinuxOption{User:  sc.SELinuxOptions.User,Role:  sc.SELinuxOptions.Role,Type:  sc.SELinuxOptions.Type,Level: sc.SELinuxOptions.Level,}}}

  return lc, nil
}

概括:

Kubelet 的核心工作围绕着控制循环,它以 Go 的通道为基础,生产者和消费者共同工作,使控制循环工作并达到预期的状态。

pod创建流程总结:

之前总结的知识比较杂乱,这里做一个统一的大总结:

一:kubernets核心组件之间的调用

1、用户通过 REST API 提交 Pod`描述文件到 API Server;
API Server 将 Pod 对象的信息存入 Etcd;
Pod 的创建会生成事件,返回给 API Server;

2、Controller-manager 监听到事件;
Pod 如果需要要挂载盘,Controller 会检查是否有满足条件的 PV;
若满足条件的 PV,Controller 会绑定 Pod 和 PV,将绑定关系告知 API Server;
API Server 将绑定信息写入 Etcd;
生成 Pod Update 事件;

3、Scheduler 监听到 Pod Update 事件;
Scheduler 会为 Pod 选择 Node;
如有满足条件的 Node,Scheduler 会绑定 Pod 和 Node,并将绑定关系告知 API Server;
API Server 将绑定信息写入 Etcd;
生成 Pod Update 事件;

Kubelet 监听到 Pod Update 事件,创建 Pod;下面 4.1、4.2、4.3步骤为同时进行

  • 4.1:Kubelet 通过 Volume Manager,调用CSI,将盘挂载到 Node 同时挂载到 Pod;
  • 4.2:kubelet通过cAdvisor监控pod情况
  • 4.3:Kubelet 通过ImageManager和PodManager告知调用CRI ,下面 4.3.1、4.3.2、4.3.3步骤为同时进行
    • 4.3.1:CRI通过 Kubelet ImageManager服务调用,从而继续调用ImageService下载镜像;
    • 4.3.2:CRI 通过Kubelet PodManager服务调用,从而继续调用RuntimeService调用dockerd启动容器;
    • 4.3.3:CRI 通过CNI Client 调用 CNI(容器网络接口) 配置容器网络;

发布者:LJH,转发请注明出处:https://www.ljh.cool/39366.html

(0)
上一篇 2024年1月1日 上午3:15
下一篇 2024年1月5日 下午5:24

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注