kube-scheduler源码解析(2):深入核心处理流程揭秘?

2026-05-22 18:372阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3292个文字,预计阅读时间需要14分钟。

kube-scheduler是Kubernetes的核心组件之一,主要负责调度Pod资源。具体来说,kube-scheduler根据调度算法(包括预选算法和优选算法)将未调度的Pod分配到最合适的节点上。

kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。 kube-scheduler源码分析(2)-核心处理逻辑分析 kube-scheduler简介

kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。

kube-scheduler架构图

kube-scheduler的大致组成和处理流程如下图,kube-scheduler对pod、node等对象进行了list/watch,根据informer将未调度的pod放入待调度pod队列,并根据informer构建调度器cache(用于快速获取需要的node等对象),然后sched.scheduleOne方法为kube-scheduler组件调度pod的核心处理逻辑所在,从未调度pod队列中取出一个pod,经过预选与优选算法,最终选出一个最优node,然后更新cache并异步执行bind操作,也就是更新pod的nodeName字段,至此一个pod的调度工作完成。

kube-scheduler组件的分析将分为两大块进行,分别是:
(1)kube-scheduler初始化与启动分析;
(2)kube-scheduler核心处理逻辑分析。

上一篇进行了kube-scheduler组件的初始化与启动分析,本篇进行核心处理逻辑分析。

2.kube-scheduler核心处理逻辑分析 基于tag v1.17.4

github.com/kubernetes/kubernetes/releases/tag/v1.17.4

直接看到kube-scheduler核心处理方法sched.Run。

sched.Run

sched.Run主要逻辑:
(1)判断informer中的对象cache是否同步完成;
(2)循环调用sched.scheduleOne调度pod。

// pkg/scheduler/scheduler.go func (sched *Scheduler) Run(ctx context.Context) { // 1.判断informer中的对象cache是否同步完成 if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } // 2.循环调用sched.scheduleOne调度pod wait.UntilWithContext(ctx, sched.scheduleOne, 0) } sched.scheduleOne

sched.scheduleOne方法作用是调度一个pod到合适的node节点,主要逻辑如下:
(1)从scheduler的待调度pod队列中取出一个pod,如果该pod的deletetimestamp不为空(代表处于删除状态)则跳过该pod的调度工作;
(2)调用sched.Algorithm.Schedule执行调度算法,返回通过预算及优选算法算出的nodo节点;
(3)当执行调度算法失败时,上报调度失败event,更新pod的status;
(4)当没有找到合适的节点时,判断scheduler是否开启了抢占调度机制,是则调用sched.preempt执行抢占逻辑;
(5)调用sched.VolumeBinder.Binder.AssumePodVolumes,更新cache,判断关联pvc是否都已bound;
(6)执行调用 "reserve" plugins(有印象即可,后面会对该类plugins进行讲解);
(7)调用sched.assume,在scheduler的cache中记录这个pod已经调度了,因为更新pod的nodeName是异步操作,防止pod被重复调度;
(8)起一个goroutine,异步执行pod的binding操作:
(8.1)执行调用 "permit" plugins(有印象即可,后面会对该类plugins进行讲解);
(8.2)调用sched.bindVolumes,绑定volumes;
(8.3)执行调用 "prebind" plugins(有印象即可,后面会对该类plugins进行讲解);
(8.4)更新pod的nodeName,写入etcd;
(8.5)执行调用 "postbind" plugins(有印象即可,后面会对该类plugins进行讲解),该pod调度结束。

// pkg/scheduler/scheduler.go func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk := sched.Framework // 1.从scheduler的待调度pod队列中取出一个pod podInfo := sched.NextPod() // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod // 如果该pod的deletetimestamp不为空(代表处于删除状态)则跳过该pod的调度工作 if pod.DeletionTimestamp != nil { sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return } klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name) // 2.调用sched.schedule执行调度算法,返回通过预算及优选算法算出的nodo节点 // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod) if err != nil { // 3.当执行调度算法失败时,上报调度失败event,更新pod的status sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error()) // Schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. if fitError, ok := err.(*core.FitError); ok { // 4.当没有找到合适的节点时,判断scheduler是否开启了抢占调度机制,是则调用sched.preempt执行抢占逻辑 if sched.DisablePreemption { klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." + " No preemption is performed.") } else { preemptionStartTime := time.Now() sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) } // Pod did not fit anywhere, so it is counted as a failure. If preemption // succeeds, the pod should get counted as a success the next time we try to // schedule it. (hopefully) metrics.PodScheduleFailures.Inc() } else { klog.Errorf("error selecting node for pod: %v", err) metrics.PodScheduleErrors.Inc() } return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedSchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // 5.更新cache,判断关联pvc是否都已bound // Assume volumes first before assuming the pod. // // If all volumes are completely bound, then allBound is true and binding will be skipped. // // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. // // This function modifies 'assumedPod' if volume binding is required. allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost) if err != nil { sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePodVolumes failed: %v", err)) metrics.PodScheduleErrors.Inc() return } // 6.执行调用 "reserve" plugins // Run "reserve" plugins. if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) metrics.PodScheduleErrors.Inc() return } // 7.调用sched.assume,在scheduler的cache中记录这个pod已经调度了,因为更新pod的nodeName是异步操作,防止pod被重复调度 // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { // This is most probably result of a BUG in retrying logic. // We report an error here so that pod scheduling can be retried. // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err)) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } // 8.起一个goroutine,异步执行pod的binding操作 // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() // Run "permit" plugins. permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !permitStatus.IsSuccess() { var reason string if permitStatus.IsUnschedulable() { metrics.PodScheduleFailures.Inc() reason = v1.PodReasonUnschedulable } else { metrics.PodScheduleErrors.Inc() reason = SchedulerError } if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message()) return } // Bind volumes first before Pod if !allBound { err := sched.bindVolumes(assumedPod) if err != nil { sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error()) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } } // Run "prebind" plugins. preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { var reason string metrics.PodScheduleErrors.Inc() reason = SchedulerError if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) return } err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state) metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. if klog.V(2) { klog.Infof("pod %v/%v is bound successfully on node %q, %d nodes evaluated, %d nodes were found feasible.", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes) } metrics.PodScheduleSuccesses.Inc() metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins. fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() } 2.1 sched.Algorithm.Schedule

sched.Algorithm.Schedule主要作用是执行预选算法和优选算法,给pod算出一个合适的node,其主要逻辑为:
(1)对pod使用到的pvc进行检查,检查其是否处于删除状态;
(2)调用g.snapshot,获取当前的所有node节点信息快照,用于本轮调度,包括下面执行的预算算法与优选算法都将使用该node快照;
(3)执行调用 "prefilter" plugins(有印象即可,后面会对该类plugins进行讲解);
(4)调用g.findNodesThatFit,执行预选算法,筛选出符合该pod运行条件的合适node节点;
(5)执行调用 "postfilter" plugins(有印象即可,后面会对该类plugins进行讲解);
(6)判断合适的node节点数,如果为0直接返回失败;
(7)判断合适的node节点数,如果为1则直接返回该node节点,不往下执行优选算法;
(8)调用g.prioritizeNodes,执行优选算法,给合适的所有node节点打分;
(9)调用g.selectHost,从优选算法打分过后的node节点中选择最佳(即得分最高)的node节点并返回。

// pkg/scheduler/core/generic_scheduler.go func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) // 1.对pod使用到的pvc进行检查,检查其是否处于删除状态 if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { return result, err } trace.Step("Basic checks done") // 2.调用g.snapshot,获取当前的所有node节点信息快照,用于本轮调度,包括下面执行的预算算法与优选算法都将使用该node快照 if err := g.snapshot(); err != nil { return result, err } trace.Step("Snapshoting scheduler cache and node infos done") if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { return result, ErrNoNodesAvailable } // Run "prefilter" plugins. preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod) if !preFilterStatus.IsSuccess() { return result, preFilterStatus.AsError() } trace.Step("Running prefilter plugins done") // 4.调用g.findNodesThatFit,执行预选算法,筛选出符合该pod运行条件的合适node节点 startPredicateEvalTime := time.Now() filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod) if err != nil { return result, err } trace.Step("Computing predicates done") // Run "postfilter" plugins. postfilterStatus := g.framework.RunPostFilterPlugins(ctx, state, pod, filteredNodes, filteredNodesStatuses) if !postfilterStatus.IsSuccess() { return result, postfilterStatus.AsError() } // 6.判断合适的node节点数,如果为0直接返回失败 if len(filteredNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList), FailedPredicates: failedPredicateMap, FilteredNodesStatuses: filteredNodesStatuses, } } trace.Step("Running postfilter plugins done") metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime)) metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime)) // 7.判断合适的node节点数,如果为1则直接返回该node节点,不往下执行优选算法 startPriorityEvalTime := time.Now() // When only one node after predicate, just use it. if len(filteredNodes) == 1 { metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime)) metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime)) return ScheduleResult{ SuggestedHost: filteredNodes[0].Name, EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } // 8.调用g.prioritizeNodes,执行优选算法,给合适的所有node节点打分 metaPrioritiesInterface := g.priorityMetaProducer(pod, filteredNodes, g.nodeInfoSnapshot) priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes) if err != nil { return result, err } metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime)) metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime)) // 9.调用g.selectHost,从合适的node节点中选择最佳(即得分最高)的node节点并返回 host, err := g.selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses), FeasibleNodes: len(filteredNodes), }, err } 2.1.1 g.snapshot

g.snapshot方法主要是获取当前的所有node节点信息快照,用于本轮调度,包括后面执行的预算算法与优选算法都将使用该node快照。当node节点信息没有变化时(根据node节点的generation大小判断),该方法直接返回现有node节点信息快照,无需更新,当node节点信息有变化时才更新快照并返回最新快照。

// pkg/scheduler/core/generic_scheduler.go func (g *genericScheduler) snapshot() error { // Used for all fit and priority funcs. return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot) }

// pkg/scheduler/internal/cache/cache.go func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error { cache.mu.Lock() defer cache.mu.Unlock() balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) // Get the last generation of the snapshot. snapshotGeneration := nodeSnapshot.Generation // 增加和更新node节点快照 // Start from the head of the NodeInfo doubly linked list and update snapshot // of NodeInfos updated after the last snapshot. for node := cache.headNode; node != nil; node = node.next { if node.info.GetGeneration() <= snapshotGeneration { // all the nodes are updated before the existing snapshot. We are done. break } if balancedVolumesEnabled && node.info.TransientInfo != nil { // Transient scheduler info is reset here. node.info.TransientInfo.ResetTransientSchedulerInfo() } if np := node.info.Node(); np != nil { nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone() } } // Update the snapshot generation with the latest NodeInfo generation. if cache.headNode != nil { nodeSnapshot.Generation = cache.headNode.info.GetGeneration() } // 删除多余的node节点快照 if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) { for name := range nodeSnapshot.NodeInfoMap { if _, ok := cache.nodes[name]; !ok { delete(nodeSnapshot.NodeInfoMap, name) } } } // Take a snapshot of the nodes order in the tree nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) nodeSnapshot.HavePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) for i := 0; i < cache.nodeTree.numNodes; i++ { nodeName := cache.nodeTree.next() if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil { nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n) if len(n.PodsWithAffinity()) > 0 { nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n) } } else { klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName) } } return nil } 2.1.2 g.findNodesThatFit 执行预选算法

g.findNodesThatFit方法是执行预选算法的地方。主要逻辑如下:
(1)调用g.numFeasibleNodesToFind,根据一定的算法,计算并返回预选算法要筛选的node节点数量;
(2)定义checkNode函数,用于筛选合适的node节点;
(3)起16个goroutine,并行的对所有node执行checkNode函数,返回合适的node节点列表,列表长度小于等于g.numFeasibleNodesToFind方法返回值;
(4)遍历scheduler-extender(kube-scheduler的一种webhook扩展机制),对已经过滤过的node再来执行extender的Filter,即执行liqiang.io/post/kubernetes-scheduler-extender-dd6516a6

scheduler framework

前面代码分析中提到过的执行的filter,都是kube-scheduler的可扩展机制scheduler framework提供的,该机制在调度器生命周期的各个关键点上,向用户暴露可以进行扩展和实现的接口,从而赋予用户自定义调度器的能力。基于篇幅原因,这里不展开介绍kube-scheduler的可扩展机制,可自行进行了解。

目前kube-scheduler的很多内置预选算法和优选算法都是基于scheduler framework机制实现的。

参考:cloudnative.to/blog/202003-k8s-scheduling-framework/

总结 kube-scheduler简介

kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。

kube-scheduler架构图

kube-scheduler的大致组成和处理流程如下图,kube-scheduler对pod、node等对象进行了list/watch,根据informer将未调度的pod放入待调度pod队列,并根据informer构建调度器cache(用于快速获取需要的node等对象),然后sched.scheduleOne方法为kube-scheduler组件调度pod的核心处理逻辑所在,从未调度pod队列中取出一个pod,经过预选与优选算法,最终选出一个最优node,然后更新cache并异步执行bind操作,也就是更新pod的nodeName字段,至此一个pod的调度工作完成。

kube-scheduler核心处理流程图

下方处理流程图展示了sched.scheduleOne方法的核心处理步骤,其中kube-scheduler扩展机制相关的步骤未画出。

本文共计3292个文字,预计阅读时间需要14分钟。

kube-scheduler是Kubernetes的核心组件之一,主要负责调度Pod资源。具体来说,kube-scheduler根据调度算法(包括预选算法和优选算法)将未调度的Pod分配到最合适的节点上。

kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。 kube-scheduler源码分析(2)-核心处理逻辑分析 kube-scheduler简介

kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。

kube-scheduler架构图

kube-scheduler的大致组成和处理流程如下图,kube-scheduler对pod、node等对象进行了list/watch,根据informer将未调度的pod放入待调度pod队列,并根据informer构建调度器cache(用于快速获取需要的node等对象),然后sched.scheduleOne方法为kube-scheduler组件调度pod的核心处理逻辑所在,从未调度pod队列中取出一个pod,经过预选与优选算法,最终选出一个最优node,然后更新cache并异步执行bind操作,也就是更新pod的nodeName字段,至此一个pod的调度工作完成。

kube-scheduler组件的分析将分为两大块进行,分别是:
(1)kube-scheduler初始化与启动分析;
(2)kube-scheduler核心处理逻辑分析。

上一篇进行了kube-scheduler组件的初始化与启动分析,本篇进行核心处理逻辑分析。

2.kube-scheduler核心处理逻辑分析 基于tag v1.17.4

github.com/kubernetes/kubernetes/releases/tag/v1.17.4

直接看到kube-scheduler核心处理方法sched.Run。

sched.Run

sched.Run主要逻辑:
(1)判断informer中的对象cache是否同步完成;
(2)循环调用sched.scheduleOne调度pod。

// pkg/scheduler/scheduler.go func (sched *Scheduler) Run(ctx context.Context) { // 1.判断informer中的对象cache是否同步完成 if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } // 2.循环调用sched.scheduleOne调度pod wait.UntilWithContext(ctx, sched.scheduleOne, 0) } sched.scheduleOne

sched.scheduleOne方法作用是调度一个pod到合适的node节点,主要逻辑如下:
(1)从scheduler的待调度pod队列中取出一个pod,如果该pod的deletetimestamp不为空(代表处于删除状态)则跳过该pod的调度工作;
(2)调用sched.Algorithm.Schedule执行调度算法,返回通过预算及优选算法算出的nodo节点;
(3)当执行调度算法失败时,上报调度失败event,更新pod的status;
(4)当没有找到合适的节点时,判断scheduler是否开启了抢占调度机制,是则调用sched.preempt执行抢占逻辑;
(5)调用sched.VolumeBinder.Binder.AssumePodVolumes,更新cache,判断关联pvc是否都已bound;
(6)执行调用 "reserve" plugins(有印象即可,后面会对该类plugins进行讲解);
(7)调用sched.assume,在scheduler的cache中记录这个pod已经调度了,因为更新pod的nodeName是异步操作,防止pod被重复调度;
(8)起一个goroutine,异步执行pod的binding操作:
(8.1)执行调用 "permit" plugins(有印象即可,后面会对该类plugins进行讲解);
(8.2)调用sched.bindVolumes,绑定volumes;
(8.3)执行调用 "prebind" plugins(有印象即可,后面会对该类plugins进行讲解);
(8.4)更新pod的nodeName,写入etcd;
(8.5)执行调用 "postbind" plugins(有印象即可,后面会对该类plugins进行讲解),该pod调度结束。

// pkg/scheduler/scheduler.go func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk := sched.Framework // 1.从scheduler的待调度pod队列中取出一个pod podInfo := sched.NextPod() // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod // 如果该pod的deletetimestamp不为空(代表处于删除状态)则跳过该pod的调度工作 if pod.DeletionTimestamp != nil { sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return } klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name) // 2.调用sched.schedule执行调度算法,返回通过预算及优选算法算出的nodo节点 // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod) if err != nil { // 3.当执行调度算法失败时,上报调度失败event,更新pod的status sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error()) // Schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. if fitError, ok := err.(*core.FitError); ok { // 4.当没有找到合适的节点时,判断scheduler是否开启了抢占调度机制,是则调用sched.preempt执行抢占逻辑 if sched.DisablePreemption { klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." + " No preemption is performed.") } else { preemptionStartTime := time.Now() sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) } // Pod did not fit anywhere, so it is counted as a failure. If preemption // succeeds, the pod should get counted as a success the next time we try to // schedule it. (hopefully) metrics.PodScheduleFailures.Inc() } else { klog.Errorf("error selecting node for pod: %v", err) metrics.PodScheduleErrors.Inc() } return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedSchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // 5.更新cache,判断关联pvc是否都已bound // Assume volumes first before assuming the pod. // // If all volumes are completely bound, then allBound is true and binding will be skipped. // // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. // // This function modifies 'assumedPod' if volume binding is required. allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost) if err != nil { sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePodVolumes failed: %v", err)) metrics.PodScheduleErrors.Inc() return } // 6.执行调用 "reserve" plugins // Run "reserve" plugins. if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) metrics.PodScheduleErrors.Inc() return } // 7.调用sched.assume,在scheduler的cache中记录这个pod已经调度了,因为更新pod的nodeName是异步操作,防止pod被重复调度 // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { // This is most probably result of a BUG in retrying logic. // We report an error here so that pod scheduling can be retried. // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err)) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } // 8.起一个goroutine,异步执行pod的binding操作 // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() // Run "permit" plugins. permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !permitStatus.IsSuccess() { var reason string if permitStatus.IsUnschedulable() { metrics.PodScheduleFailures.Inc() reason = v1.PodReasonUnschedulable } else { metrics.PodScheduleErrors.Inc() reason = SchedulerError } if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message()) return } // Bind volumes first before Pod if !allBound { err := sched.bindVolumes(assumedPod) if err != nil { sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error()) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } } // Run "prebind" plugins. preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { var reason string metrics.PodScheduleErrors.Inc() reason = SchedulerError if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) return } err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state) metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. if klog.V(2) { klog.Infof("pod %v/%v is bound successfully on node %q, %d nodes evaluated, %d nodes were found feasible.", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes) } metrics.PodScheduleSuccesses.Inc() metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins. fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() } 2.1 sched.Algorithm.Schedule

sched.Algorithm.Schedule主要作用是执行预选算法和优选算法,给pod算出一个合适的node,其主要逻辑为:
(1)对pod使用到的pvc进行检查,检查其是否处于删除状态;
(2)调用g.snapshot,获取当前的所有node节点信息快照,用于本轮调度,包括下面执行的预算算法与优选算法都将使用该node快照;
(3)执行调用 "prefilter" plugins(有印象即可,后面会对该类plugins进行讲解);
(4)调用g.findNodesThatFit,执行预选算法,筛选出符合该pod运行条件的合适node节点;
(5)执行调用 "postfilter" plugins(有印象即可,后面会对该类plugins进行讲解);
(6)判断合适的node节点数,如果为0直接返回失败;
(7)判断合适的node节点数,如果为1则直接返回该node节点,不往下执行优选算法;
(8)调用g.prioritizeNodes,执行优选算法,给合适的所有node节点打分;
(9)调用g.selectHost,从优选算法打分过后的node节点中选择最佳(即得分最高)的node节点并返回。

// pkg/scheduler/core/generic_scheduler.go func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) // 1.对pod使用到的pvc进行检查,检查其是否处于删除状态 if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { return result, err } trace.Step("Basic checks done") // 2.调用g.snapshot,获取当前的所有node节点信息快照,用于本轮调度,包括下面执行的预算算法与优选算法都将使用该node快照 if err := g.snapshot(); err != nil { return result, err } trace.Step("Snapshoting scheduler cache and node infos done") if len(g.nodeInfoSnapshot.NodeInfoList) == 0 { return result, ErrNoNodesAvailable } // Run "prefilter" plugins. preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod) if !preFilterStatus.IsSuccess() { return result, preFilterStatus.AsError() } trace.Step("Running prefilter plugins done") // 4.调用g.findNodesThatFit,执行预选算法,筛选出符合该pod运行条件的合适node节点 startPredicateEvalTime := time.Now() filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod) if err != nil { return result, err } trace.Step("Computing predicates done") // Run "postfilter" plugins. postfilterStatus := g.framework.RunPostFilterPlugins(ctx, state, pod, filteredNodes, filteredNodesStatuses) if !postfilterStatus.IsSuccess() { return result, postfilterStatus.AsError() } // 6.判断合适的node节点数,如果为0直接返回失败 if len(filteredNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList), FailedPredicates: failedPredicateMap, FilteredNodesStatuses: filteredNodesStatuses, } } trace.Step("Running postfilter plugins done") metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime)) metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime)) // 7.判断合适的node节点数,如果为1则直接返回该node节点,不往下执行优选算法 startPriorityEvalTime := time.Now() // When only one node after predicate, just use it. if len(filteredNodes) == 1 { metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime)) metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime)) return ScheduleResult{ SuggestedHost: filteredNodes[0].Name, EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } // 8.调用g.prioritizeNodes,执行优选算法,给合适的所有node节点打分 metaPrioritiesInterface := g.priorityMetaProducer(pod, filteredNodes, g.nodeInfoSnapshot) priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes) if err != nil { return result, err } metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime)) metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime)) // 9.调用g.selectHost,从合适的node节点中选择最佳(即得分最高)的node节点并返回 host, err := g.selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses), FeasibleNodes: len(filteredNodes), }, err } 2.1.1 g.snapshot

g.snapshot方法主要是获取当前的所有node节点信息快照,用于本轮调度,包括后面执行的预算算法与优选算法都将使用该node快照。当node节点信息没有变化时(根据node节点的generation大小判断),该方法直接返回现有node节点信息快照,无需更新,当node节点信息有变化时才更新快照并返回最新快照。

// pkg/scheduler/core/generic_scheduler.go func (g *genericScheduler) snapshot() error { // Used for all fit and priority funcs. return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot) }

// pkg/scheduler/internal/cache/cache.go func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error { cache.mu.Lock() defer cache.mu.Unlock() balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) // Get the last generation of the snapshot. snapshotGeneration := nodeSnapshot.Generation // 增加和更新node节点快照 // Start from the head of the NodeInfo doubly linked list and update snapshot // of NodeInfos updated after the last snapshot. for node := cache.headNode; node != nil; node = node.next { if node.info.GetGeneration() <= snapshotGeneration { // all the nodes are updated before the existing snapshot. We are done. break } if balancedVolumesEnabled && node.info.TransientInfo != nil { // Transient scheduler info is reset here. node.info.TransientInfo.ResetTransientSchedulerInfo() } if np := node.info.Node(); np != nil { nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone() } } // Update the snapshot generation with the latest NodeInfo generation. if cache.headNode != nil { nodeSnapshot.Generation = cache.headNode.info.GetGeneration() } // 删除多余的node节点快照 if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) { for name := range nodeSnapshot.NodeInfoMap { if _, ok := cache.nodes[name]; !ok { delete(nodeSnapshot.NodeInfoMap, name) } } } // Take a snapshot of the nodes order in the tree nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) nodeSnapshot.HavePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) for i := 0; i < cache.nodeTree.numNodes; i++ { nodeName := cache.nodeTree.next() if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil { nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n) if len(n.PodsWithAffinity()) > 0 { nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n) } } else { klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName) } } return nil } 2.1.2 g.findNodesThatFit 执行预选算法

g.findNodesThatFit方法是执行预选算法的地方。主要逻辑如下:
(1)调用g.numFeasibleNodesToFind,根据一定的算法,计算并返回预选算法要筛选的node节点数量;
(2)定义checkNode函数,用于筛选合适的node节点;
(3)起16个goroutine,并行的对所有node执行checkNode函数,返回合适的node节点列表,列表长度小于等于g.numFeasibleNodesToFind方法返回值;
(4)遍历scheduler-extender(kube-scheduler的一种webhook扩展机制),对已经过滤过的node再来执行extender的Filter,即执行liqiang.io/post/kubernetes-scheduler-extender-dd6516a6

scheduler framework

前面代码分析中提到过的执行的filter,都是kube-scheduler的可扩展机制scheduler framework提供的,该机制在调度器生命周期的各个关键点上,向用户暴露可以进行扩展和实现的接口,从而赋予用户自定义调度器的能力。基于篇幅原因,这里不展开介绍kube-scheduler的可扩展机制,可自行进行了解。

目前kube-scheduler的很多内置预选算法和优选算法都是基于scheduler framework机制实现的。

参考:cloudnative.to/blog/202003-k8s-scheduling-framework/

总结 kube-scheduler简介

kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。

kube-scheduler架构图

kube-scheduler的大致组成和处理流程如下图,kube-scheduler对pod、node等对象进行了list/watch,根据informer将未调度的pod放入待调度pod队列,并根据informer构建调度器cache(用于快速获取需要的node等对象),然后sched.scheduleOne方法为kube-scheduler组件调度pod的核心处理逻辑所在,从未调度pod队列中取出一个pod,经过预选与优选算法,最终选出一个最优node,然后更新cache并异步执行bind操作,也就是更新pod的nodeName字段,至此一个pod的调度工作完成。

kube-scheduler核心处理流程图

下方处理流程图展示了sched.scheduleOne方法的核心处理步骤,其中kube-scheduler扩展机制相关的步骤未画出。