文章目录
- Pod调度
- 实现原理
- 调度队列
- 优先队列
- 底层数据
- 调度缓存
- 调度框架
Pod调度
Pod调度: 通过污点、容忍度和亲和性影响Pod的调度
- 调度器实现, 其基于配置器构造(其配置来源于配置API)
- 调度过程中任何插件返回拒绝, 都会导致Pod可能再次返回调度队列
如: Pod调度简略流程
调度执行流程:
- 通过
SharedIndedInformer
过滤未调度的Pod放入调度队列 - 通过
SharedIndexInformer
过滤已调度的Pod更新调度缓存 - 从调度队列中取出个Pod,通过其
SchedulerName
执行特定调度框架 - 调度框架触发调度算法利用调度缓存为Pod选择最优的Node进行异步绑定
实现原理
调度器(Scheduler): Pod调度决策
- 调度过程的数据依赖于瞬间的缓存
- Pod调度完成后需等待调度插件的批准才可执行绑定
- 基于模块: 调度队列、调度缓存、调度框架、调度算法
以下源码分析均都基于v1.28.1
版本的Kubernetes源码
// https://github1s.com/kubernetes/kubernetes/blob/HEAD/pkg/scheduler/scheduler.go#64// Scheduler 监视未调度的Pod并尝试找到适合的Node, 并将绑定信息写回到API Server
type Scheduler struct {// Cache 调度缓存, 缓存所有的Node状态// 每次调度前都需更新快照, 以便调度算法使用Cache internalcache.Cache// Extenders 调度插件Extenders []framework.Extender// NextPod 获取下个要调度的Pod, 没有则阻塞goroutine// 不能直接从调度队列中获取调度的Pod, 其不能日志记录调度PodNextPod func() (*framework.QueuedPodInfo, error)// FailureHandler 调度时的回调错误函数FailureHandler FailureHandlerFn// SchedulePod 调度算法给出Pod可调度的NodeSchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)// StopEverything 关闭Scheduler的信号StopEverything <-chan struct{}// SchedulingQueue 缓存等待调度的PodSchedulingQueue internalqueue.SchedulingQueue// Profiles 调度框架的配置(Profile和Frameword属于1:1)Profiles profile.Mapclient clientset.InterfacenodeInfoSnapshot *internalcache.SnapshotpercentageOfNodesToScore int32nextStartNodeIndex intlogger klog.LoggerregisteredHandlers []cache.ResourceEventHandlerRegistration
}// New 调度器构造函数
func New(ctx context.Context,client clientset.Interface,informerFactory informers.SharedInformerFactory,dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,recorderFactory profile.RecorderFactory,opts ...Option) (*Scheduler, error) {// Kubernetes内部记录器, 并获取终止信号logger := klog.FromContext(ctx)stopEverything := ctx.Done()// 在默认的schedulerOptions基础上应用所有的optsoptions := defaultSchedulerOptionsfor _, opt := range opts {opt(&options)}// 是否应用默认调度框架配置if options.applyDefaultProfile {var versionedCfg configv1.KubeSchedulerConfigurationscheme.Scheme.Default(&versionedCfg)cfg := schedulerapi.KubeSchedulerConfiguration{}if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {return nil, err}options.profiles = cfg.Profiles}// 创建InTree插件Factory注册表, 并与OutTree插件Factory注册表合并以形成插件Factory注册表// 数据类型为(插件Factory就是插件的构造函数): map[插件名称]插件Factoryregistry := frameworkplugins.NewInTreeRegistry()if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {return nil, err}// 注册调度器的度量标准metrics.Register()// 调度器的扩展程序extenders, err := buildExtenders(logger, options.extenders, options.profiles)if err != nil {return nil, fmt.Errorf("couldn't build extenders: %w", err)}// 获取Pod和Node数据// 以便做成快照数据, 提供调度依据podLister := informerFactory.Core().V1().Pods().Lister()nodeLister := informerFactory.Core().V1().Nodes().Lister()// 初始化快照数据和度量指标记录器(异步)snapshot := internalcache.NewEmptySnapshot()metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)// 初始化调度器的配置文件profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),frameworkruntime.WithClientSet(client),frameworkruntime.WithKubeConfig(options.kubeConfig),frameworkruntime.WithInformerFactory(informerFactory),frameworkruntime.WithSnapshotSharedLister(snapshot),frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),frameworkruntime.WithParallelism(int(options.parallelism)),frameworkruntime.WithExtenders(extenders),frameworkruntime.WithMetricsRecorder(metricsRecorder),)if err != nil {return nil, fmt.Errorf("initializing profiles: %v", err)}if len(profiles) == 0 {return nil, errors.New("at least one profile is required")}// 配置中添加PreEnqueue和Queueing Hint// PreEnqueue用于在Pod加入调度队列的前置操作// Queueing Hint过滤事件, 防止调度Pod时的无用重试 preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)for profileName, profile := range profiles {preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())}// 创建调度队列podQueue := internalqueue.NewSchedulingQueue(profiles[options.profiles[0].SchedulerName].QueueSortFunc(),informerFactory,internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),internalqueue.WithPodLister(podLister),internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),internalqueue.WithMetricsRecorder(*metricsRecorder),)// 假定调度指向该调度器的调度队列for _, fwk := range profiles {fwk.SetPodNominator(podQueue)}// 调度缓存// durationToExpireAssumedPod 代表绑定的TTL// 若该时间内未完成绑定, 则从调度缓存中移除该假定调度PodschedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)// 调度器的Debuggerdebugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)debugger.ListenForSignal(ctx)// 创建调度器, 并从调度队列中弹出个Pod开始调度// 并指定调度器默认的调度流程和调度失败的回调函数sched := &Scheduler{Cache: schedulerCache,client: client,nodeInfoSnapshot: snapshot,percentageOfNodesToScore: options.percentageOfNodesToScore,Extenders: extenders,StopEverything: stopEverything,SchedulingQueue: podQueue,Profiles: profiles,logger: logger,}sched.NextPod = podQueue.Popsched.applyDefaultHandlers()// 注册事件处理函数// 新建Pod先放入调度队列, 绑定成功后由该函数更新调度缓存以确认// 本质: 通过SharedIndexInformer监控Pod和NService等调度依赖的资源, 并根据事件执行对应操作if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {return nil, fmt.Errorf("adding event handlers: %w", err)}return sched, nil
}
调度器执行流程如下(主要通过scheduleOne()
方法实现):
- 调度器从调度队列中获取个Pod
- 先通过
NextPod()
方法从调度队列中获取Pod(可记录日志) - 调度器会循环从调度队列中获取Pod, 没有待调度的Pod就直接返回
- 先通过
- 对获取的Pod做调度前的预处理
- 获取Pod指定的调度框架(
spec.SchedulerName
) - 配置Pod调度的环境(计时、
CycleState
和schedulingCycleCtx
等)
- 获取Pod指定的调度框架(
- 判断是否忽略Pod的调度
- Pod已被删除, 则直接忽略
- Pod被更新, 但已调度/假定调度(根据Pod的更新决定是否重新调度)
- 通过
Score
匹配最优NodeScore
会基于多种因素计算出最适合Pod的Node- 若没有Node能满足Pod的资源需求, 则Pod通过
PostFilter
进行抢占式调度 - 若Pod是抢占式调度的, 则Pod当前依然是不可调度的(需等待被抢占的Pod优雅退出)
- 记录所有调度失败的原因(
FailureHandler()
)- 记录导致Pod调度失败的事件(
kubectl describe pod
命令时的信息) - 从
SharedIndexInformer
缓存中获取Pod最新状态, 决定是否将调度失败的Pod再次放回调度队列
- 记录导致Pod调度失败的事件(
- 假定调度Pod(调度器无需等待可立刻调度下个Pod)
- 调度缓存中更新Pod已绑定匹配的Node
- 若TTL内未绑定成功, 则判定假定调度失败并从调度缓存中删除绑定
- 为Pod预留全局资源
- 若预留资源失败, 则删除已预留的资源和调度缓存中假定调度信息并记录失败信息
- 判定Pod是否可进入绑定周期
- 需等待所有插件批准才可执行绑定
- 若未批准会执行: 删除预留资源、删除假定调度Pod、记录失败信息
- Pod绑定Node, 异步执行
- 绑定预处理(按顺序执行调度框架的各个插件)
- 执行绑定, 向API Server写入信息(先
Extender
后Bind
, 因部分资源只有前者可管理) - 若绑定成功, 则通知调度缓存并记录绑定成功事件(绑定失败也会记录事件)
- 若绑定失败会执行: 删除预留资源、删除假定调度Pod、记录失败信息
如: 调度整体流程
调度队列
调度队列(SchedulingQueue): Pod调度过程中的Pod获取顺序
- 调度队列具有幂等性(每次操作前均判断是否已存在)
- 调度队列的实现是个优先队列(由传入的函数决定其优先级)
优先队列
优先队列(PriorityQueue): 基于map以优先级方式实现调度队列
- 优先队列由三个子队列构成: 就绪队列、不可调度队列、退避队列
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L150
// 实现SchedulingQueue接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L90// PriorityQueue 优先级方式实现的调度队列
//
// activeQ(就绪队列): 存储等待被调度的Pod(从该队列中Pop出的Pod), 默认存储新添加的Pod
// backoffQ(退避队列): 存储等待特定时间后可调度的Pod(再次放入activeQ), 等待时间根据尝试次数进行指数级增长(默认上限10s)
// unschedulablePods(不可调度队列): 存储由各种原因导致无法调度的Pod, 经过特定周期后会再次加入activeQ(默认60s)
type PriorityQueue struct {*nominatorstop chan struct{}clock clock.Clock// podInitialBackoffDuration Pod的初始退避时间, 默认1s// Pod后续每次调度失败, 该时间就以二次方增加podInitialBackoffDuration time.Duration// podMaxBackoffDuration Pod的最大退避时间, 默认10spodMaxBackoffDuration time.Duration// podMaxInUnschedulablePodsDuration Pod可处于unschedulablePods的最大时间podMaxInUnschedulablePodsDuration time.Durationcond sync.Cond// inFlightPods 返回所有当前正在处理的PodinFlightPods map[types.UID]inFlightPod// receivedEvents 返回调度队列收到的所有事件receivedEvents *list.List// activeQ 存储待调度的Pod// 头部Pod是具有最高优先级的Pod(最先调度)activeQ *heap.Heap// podBackoffQ 按照退避时间到期排序// 完成退避的Pod将在调度器查看activeQ之前从其中弹出podBackoffQ *heap.Heap// unschedulablePods 返回已尝试并确定无法调度的PodunschedulablePods *UnschedulablePods// schedulingCycle 返回调度周期schedulingCycle int64// moveRequestCycle 缓存移动请求时的调度周期// 当接受到移动请求并正在调度不可调度Pod时, 则将其放回activeQmoveRequestCycle int64// preEnqueuePluginMap PreEnqueue插件的配置(K为配置文件名)preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin// queueingHintMap Queueing Hint插件的配置(K为配置文件名)queueingHintMap QueueingHintMapPerProfile// closed 队列是否关闭closed boolnsLister listersv1.NamespaceListermetricsRecorder metrics.MetricAsyncRecorderpluginMetricsSamplePercent intisSchedulingQueueHintEnabled bool
}// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L1261// UnschedulablePods 不可调度Pod的队列, 对Map的再次封装
type UnschedulablePods struct {// podInfoMap 存储Pod的map, K为Pod的名称podInfoMap map[string]*framework.QueuedPodInfo// keyFunc 获取对象K的函数keyFunc func(*v1.Pod) string// unschedulableRecorder 监控数据unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}
如: 优先队列的流程
底层数据
底层数据: 封装用于调度队列存储对象的底层数据结构
- Heap: 对map的再次封装, 具有slice的顺序性和map的高效检索能力
- QueuedPodInfo: 对Pod的再次封装, 具有调度队列存储相关的信息
// https://github1s.com/kubernetes/kubernetes/blob/HEAD/pkg/scheduler/framework/types.go#167// QueuedPodInfo 在Pod基础上封装关于调度队列的信息
type QueuedPodInfo struct {*PodInfo// Timestamp Pod添加到调度队列的时间// Pod可能会频繁从取出再放入, 该时间便于处理PodTimestamp time.Time// Attempts Pod重试调度的次数Attempts int// InitialAttemptTimestamp Pod首次添加到调度队列的时间// 初始化后不再更新, 用于计算调度完成所需时间InitialAttemptTimestamp *time.Time// UnschedulablePlugins Pod调度周期中导致失败的插件名称// 仅对PreFilter, Filter, Reserve, Permit(WaitOnPermit)插件有效UnschedulablePlugins sets.Set[string]// Gated 是否由 PreEnqueuePlugin 调度Gated bool
}
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/heap/heap.go#L127// Heap 实现堆数据结构的生产者/消费者队列, 可用于优先级队列类似的数据结构
type Heap struct {// data 存储数据对象data *data// metricRecorder 监控数据metricRecorder metrics.MetricRecorder
}// data 实现标准库的 Heap 接口
type data struct {// items 通过map管理所有对象items map[string]*heapItem// queue 通过slice管理所有对象的K(namespace + pod name)queue []string// keyFunc 获取对象K的函数// 用于操作queue, 应保证该函数的确定性keyFunc KeyFunc// lessFunc 比较两个对象的函数(用于排序)lessFunc lessFunc
}
调度缓存
调度缓存(SchedulerCache): 获取Etcd中Pod和Node的绑定等调度相关所需的信息
- Node信息中已包含所有运行在该Node上的Pod信息
- 调度缓存会维护段时间已删除的Node, 直到Node没有Pod
- 调度缓存中的Node有虚实之分, 虚Node实现Node增加/删除时的正常调度(
nodeTree
仅存储实Node)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/cache.go#L57
// 实现Cache接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/interface.go#L60// cacheImpl 实现调度缓存接口
type cacheImpl struct {// top 调度缓存的停止Chanstop <-chan struct{}// ttl 假定调度绑定的超时时间, 默认30sttl time.Duration// period 定期清除假定调度绑定超时的Pod, 默认60speriod time.Duration// mu 读写锁保证并发安全mu sync.RWMutex// assumedPods 假定调度Pod集合assumedPods sets.Set[string]// podStates 所有Pod信息podStates map[string]*podState// nodes 所有Node信息nodes map[string]*nodeInfoListItem// headNode Node双向链表中首个Node// 基于特定规则排序, 链表的排序效率高于sliceheadNode *nodeInfoListItem// nodeTree 节点按照zone组成成的树状数据结构nodeTree *nodeTree// imagesStates 镜像状态imageStates map[string]*imageState
}
快照(Snapshot): 调度缓存某瞬间下的副本
- 作用:通过增量更新和只读, 避免频繁获取和读写锁损失性能
- 调度器在执行每个调度周期前, 都会获取个快照作为调度的数据依据
- 每次调度都会根据调度缓存更新快照中的Node信息以保证状态一致(仅更新部分信息)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/snapshot.go#L29// Snapshot 缓存 NodeInfo 和 NodeTree 快照
type Snapshot struct {// nodeInfoMap Node信息(map[namespace + name]NodeInfo)nodeInfoMap map[string]*framework.NodeInfo// nodeInfoList 按照NodeTree排序的Node全集列表(不包含已删除的Node)nodeInfoList []*framework.NodeInfo// havePodsWithAffinityNodeInfoList 处理具有亲和性的PodhavePodsWithAffinityNodeInfoList []*framework.NodeInfo// havePodsWithRequiredAntiAffinityNodeInfoList 处理具有反亲和性的PodhavePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo// usedPVCSet 调度Pod使用的PVCusedPVCSet sets.Set[string]// generation Node的配置纪元// 所有NodeInfo.Generation中的最大值(其均源于全局Generation变量)generation int64
}
假定调度Pod(Assume): 调度结果写入Etcd
- 异步绑定假定结果, 调度器继续调度其他Pod以保证性能
- 假定调度绑定时会预先占用资源防止再次分配, 但真正绑定失败会释放占用资源
- 假定调度Pod具有绑定限定时间, 超时未真正绑定会释放占用资源和清除假定调度Pod
- 当真正绑定时会删除假定调度Pod, 并对假定调度Pod占用资源进行转移
如: 调度缓存流程
调度框架
调度框架: Kubernetes调度器的插件架构(调度插件的集合)
- 扩展点: 调度插件注册后执行位置(提供信息或调度决策)
- Pod的调度流程分为两个周期: 调度周期(串行)、绑定周期(并行)
- 句柄(Handler): 为插件提供服务(提供额外功能, 协助插件完成功能)
- 配置后的调度框架就是个调度器, 配置后的调度插件就是个调度算法
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/runtime/framework.go#L49
// 实现Framework接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/interface.go#L512// frameworkImpl
type frameworkImpl struct {// registry 调度插件注册表, 通过其创建配置的插件registry Registry// snapshotSharedLister 基于快照的ListersnapshotSharedLister framework.SharedLister// waitingPods 存储等待批准的PodwaitingPods *waitingPodsMap// scorePluginWeight 插件的权重映射(K为插件名称)scorePluginWeight map[string]int// 所有扩展点的插件, 用于实现Framework接口的各个方法// 每个扩展点都会遍历执行插件, 且均在构造函数中通过SchedulingProfile生成preEnqueuePlugins []framework.PreEnqueuePluginenqueueExtensions []framework.EnqueueExtensionsqueueSortPlugins []framework.QueueSortPluginpreFilterPlugins []framework.PreFilterPluginfilterPlugins []framework.FilterPluginpostFilterPlugins []framework.PostFilterPluginpreScorePlugins []framework.PreScorePluginscorePlugins []framework.ScorePluginreservePlugins []framework.ReservePluginpreBindPlugins []framework.PreBindPluginbindPlugins []framework.BindPluginpostBindPlugins []framework.PostBindPluginpermitPlugins []framework.PermitPluginclientSet clientset.InterfacekubeConfig *restclient.ConfigeventRecorder events.EventRecorderinformerFactory informers.SharedInformerFactorylogger klog.LoggermetricsRecorder *metrics.MetricAsyncRecorderprofileName stringpercentageOfNodesToScore *int32extenders []framework.Extenderframework.PodNominatorparallelizer parallelize.Parallelizer
}
如: 调度周期和绑定周期执行流程(调度上下文)
- Kubernetes集群中可存在多个调度框架
- 扩展点是插件的设计接口, 调度需12个插件
- 带有Pre前缀的插件都是提供信息的, 其他均是做决策的
- 调度框架就是个调度器, 配置好的调度插件就是调度算法
调度插件: 影响Pod调度的各组件
- 调度插件分为多种, 而每个调度插件可有多种实现
- 调度插件都虚静态编译到注册中, 且通过唯一性的名称区分
- 插件均是无状态的(插件存储状态需依赖外部实现), 且插件间通信依赖于
CycleState
- 每种插件类型可实现多种类型的插件接口(该插件可在插件框架中的多个扩展位置作用)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/cycle_state.go#L48// CycleState 基于共享变量实现插件之间数据传输
// 仅作为单词调度周期中, 各个插件之间通信(调度上下文)
//
// 未提供任何数据保护, 对所有插件都认为是可信的
type CycleState struct {// storage 存储数据storage sync.Map// recordPluginMetrics 是否监控recordPluginMetrics bool// SkipFilterPlugins 将在Filter扩展点忽略的插件SkipFilterPlugins sets.Set[string]// SkipScorePlugins 将在Score扩展点忽略的插件SkipScorePlugins sets.Set[string]
}
调度框架中各调度插件的说明:
插件名称 | 说明 |
---|---|
Sort | 排序等待调度的Pod (默认按照优先级) |
PreFilter | 处理Pod相关信息为过滤Node做准备 (过滤前的处理) |
Filter | 过滤无法运行该Pod的Node (对多节点并发应用多个Filter插件) |
PostFilter | 抢占调度 (仅在Filter过滤不出Node时执行) |
PreScore | 处理Pod相关信息为Node评分做准备 (主要处理亲和性、拓扑分步和容忍度) |
Score | 对所有过滤的Node评分并排序 (首个Node则为Pod的最优选择) |
NormalizeScore | 修改已排序的Node评分 (提高Node评分的扩展性) |
Reserve | 维护全局调度状态 (防止下次调度与本次绑定完成前发生竞争) |
Premit | 标注Pod状态以防止或延迟Pod绑定 (Pod状态可为: 批准、等待、延迟) |
Prebind | 处理Pod绑定需完成的操作 (常用于完成PV和PVC) |
Bind | Pod与Node绑定 (仅作用单个Bind插件) |
PostBind | 清理绑定期间使用的资源 (没有默认实现, 用于自定义扩展) |
- Pre前缀的插件是预处理并提供信息, 同时避免部分真正重量操作重复执行