kube-scheduler源码解析(2):深入核心处理流程揭秘?
- 内容介绍
- 文章标签
- 相关推荐
本文共计3292个文字,预计阅读时间需要14分钟。
kube-scheduler是Kubernetes的核心组件之一,主要负责调度Pod资源。具体来说,kube-scheduler根据调度算法(包括预选算法和优选算法)将未调度的Pod分配到最合适的节点上。
kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。 kube-scheduler源码分析(2)-核心处理逻辑分析 kube-scheduler简介kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。
kube-scheduler架构图kube-scheduler的大致组成和处理流程如下图,kube-scheduler对pod、node等对象进行了list/watch,根据informer将未调度的pod放入待调度pod队列,并根据informer构建调度器cache(用于快速获取需要的node等对象),然后sched.scheduleOne方法为kube-scheduler组件调度pod的核心处理逻辑所在,从未调度pod队列中取出一个pod,经过预选与优选算法,最终选出一个最优node,然后更新cache并异步执行bind操作,也就是更新pod的nodeName字段,至此一个pod的调度工作完成。
kube-scheduler组件的分析将分为两大块进行,分别是:
(1)kube-scheduler初始化与启动分析;
(2)kube-scheduler核心处理逻辑分析。
上一篇进行了kube-scheduler组件的初始化与启动分析,本篇进行核心处理逻辑分析。
2.kube-scheduler核心处理逻辑分析 基于tag v1.17.4github.com/kubernetes/kubernetes/releases/tag/v1.17.4
直接看到kube-scheduler核心处理方法sched.Run。
sched.Runsched.Run主要逻辑:
(1)判断informer中的对象cache是否同步完成;
(2)循环调用sched.scheduleOne调度pod。
// pkg/scheduler/scheduler.go
func (sched *Scheduler) Run(ctx context.Context) {
// 1.判断informer中的对象cache是否同步完成
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
// 2.循环调用sched.scheduleOne调度pod
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
}
sched.scheduleOne
sched.scheduleOne方法作用是调度一个pod到合适的node节点,主要逻辑如下:
(1)从scheduler的待调度pod队列中取出一个pod,如果该pod的deletetimestamp不为空(代表处于删除状态)则跳过该pod的调度工作;
(2)调用sched.Algorithm.Schedule执行调度算法,返回通过预算及优选算法算出的nodo节点;
(3)当执行调度算法失败时,上报调度失败event,更新pod的status;
(4)当没有找到合适的节点时,判断scheduler是否开启了抢占调度机制,是则调用sched.preempt执行抢占逻辑;
(5)调用sched.VolumeBinder.Binder.AssumePodVolumes,更新cache,判断关联pvc是否都已bound;
(6)执行调用 "reserve" plugins(有印象即可,后面会对该类plugins进行讲解);
(7)调用sched.assume,在scheduler的cache中记录这个pod已经调度了,因为更新pod的nodeName是异步操作,防止pod被重复调度;
(8)起一个goroutine,异步执行pod的binding操作:
(8.1)执行调用 "permit" plugins(有印象即可,后面会对该类plugins进行讲解);
(8.2)调用sched.bindVolumes,绑定volumes;
(8.3)执行调用 "prebind" plugins(有印象即可,后面会对该类plugins进行讲解);
(8.4)更新pod的nodeName,写入etcd;
(8.5)执行调用 "postbind" plugins(有印象即可,后面会对该类plugins进行讲解),该pod调度结束。
// pkg/scheduler/scheduler.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk := sched.Framework
// 1.从scheduler的待调度pod队列中取出一个pod
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
// 如果该pod的deletetimestamp不为空(代表处于删除状态)则跳过该pod的调度工作
if pod.DeletionTimestamp != nil {
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return
}
klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
// 2.调用sched.schedule执行调度算法,返回通过预算及优选算法算出的nodo节点
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
if err != nil {
// 3.当执行调度算法失败时,上报调度失败event,更新pod的status
sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
// Schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
// 4.当没有找到合适的节点时,判断scheduler是否开启了抢占调度机制,是则调用sched.preempt执行抢占逻辑
if sched.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
}
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to
// schedule it. (hopefully)
metrics.PodScheduleFailures.Inc()
} else {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedSchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// 5.更新cache,判断关联pvc是否都已bound
// Assume volumes first before assuming the pod.
//
// If all volumes are completely bound, then allBound is true and binding will be skipped.
//
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
//
// This function modifies 'assumedPod' if volume binding is required.
allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
metrics.PodScheduleErrors.Inc()
return
}
// 6.执行调用 "reserve" plugins
// Run "reserve" plugins.
if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
metrics.PodScheduleErrors.Inc()
return
}
// 7.调用sched.assume,在scheduler的cache中记录这个pod已经调度了,因为更新pod的nodeName是异步操作,防止pod被重复调度
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
// 8.起一个goroutine,异步执行pod的binding操作
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
// Run "permit" plugins.
permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !permitStatus.IsSuccess() {
var reason string
if permitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message())
return
}
// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error())
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
}
// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
var reason string
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
return
}
err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
if klog.V(2) {
klog.Infof("pod %v/%v is bound successfully on node %q, %d nodes evaluated, %d nodes were found feasible.", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
}
metrics.PodScheduleSuccesses.Inc()
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
// Run "postbind" plugins.
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}
2.1 sched.Algorithm.Schedule
sched.Algorithm.Schedule主要作用是执行预选算法和优选算法,给pod算出一个合适的node,其主要逻辑为:
(1)对pod使用到的pvc进行检查,检查其是否处于删除状态;
(2)调用g.snapshot,获取当前的所有node节点信息快照,用于本轮调度,包括下面执行的预算算法与优选算法都将使用该node快照;
(3)执行调用 "prefilter" plugins(有印象即可,后面会对该类plugins进行讲解);
(4)调用g.findNodesThatFit,执行预选算法,筛选出符合该pod运行条件的合适node节点;
(5)执行调用 "postfilter" plugins(有印象即可,后面会对该类plugins进行讲解);
(6)判断合适的node节点数,如果为0直接返回失败;
(7)判断合适的node节点数,如果为1则直接返回该node节点,不往下执行优选算法;
(8)调用g.prioritizeNodes,执行优选算法,给合适的所有node节点打分;
(9)调用g.selectHost,从优选算法打分过后的node节点中选择最佳(即得分最高)的node节点并返回。
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
// 1.对pod使用到的pvc进行检查,检查其是否处于删除状态
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
trace.Step("Basic checks done")
// 2.调用g.snapshot,获取当前的所有node节点信息快照,用于本轮调度,包括下面执行的预算算法与优选算法都将使用该node快照
if err := g.snapshot(); err != nil {
return result, err
}
trace.Step("Snapshoting scheduler cache and node infos done")
if len(g.nodeInfoSnapshot.NodeInfoList) == 0 {
return result, ErrNoNodesAvailable
}
// Run "prefilter" plugins.
preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod)
if !preFilterStatus.IsSuccess() {
return result, preFilterStatus.AsError()
}
trace.Step("Running prefilter plugins done")
// 4.调用g.findNodesThatFit,执行预选算法,筛选出符合该pod运行条件的合适node节点
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")
// Run "postfilter" plugins.
postfilterStatus := g.framework.RunPostFilterPlugins(ctx, state, pod, filteredNodes, filteredNodesStatuses)
if !postfilterStatus.IsSuccess() {
return result, postfilterStatus.AsError()
}
// 6.判断合适的node节点数,如果为0直接返回失败
if len(filteredNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList),
FailedPredicates: failedPredicateMap,
FilteredNodesStatuses: filteredNodesStatuses,
}
}
trace.Step("Running postfilter plugins done")
metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))
metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
// 7.判断合适的node节点数,如果为1则直接返回该node节点,不往下执行优选算法
startPriorityEvalTime := time.Now()
// When only one node after predicate, just use it.
if len(filteredNodes) == 1 {
metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
return ScheduleResult{
SuggestedHost: filteredNodes[0].Name,
EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
// 8.调用g.prioritizeNodes,执行优选算法,给合适的所有node节点打分
metaPrioritiesInterface := g.priorityMetaProducer(pod, filteredNodes, g.nodeInfoSnapshot)
priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes)
if err != nil {
return result, err
}
metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
// 9.调用g.selectHost,从合适的node节点中选择最佳(即得分最高)的node节点并返回
host, err := g.selectHost(priorityList)
trace.Step("Prioritizing done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses),
FeasibleNodes: len(filteredNodes),
}, err
}
2.1.1 g.snapshot
g.snapshot方法主要是获取当前的所有node节点信息快照,用于本轮调度,包括后面执行的预算算法与优选算法都将使用该node快照。当node节点信息没有变化时(根据node节点的generation大小判断),该方法直接返回现有node节点信息快照,无需更新,当node节点信息有变化时才更新快照并返回最新快照。
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
}
// pkg/scheduler/internal/cache/cache.go
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
// Get the last generation of the snapshot.
snapshotGeneration := nodeSnapshot.Generation
// 增加和更新node节点快照
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
for node := cache.headNode; node != nil; node = node.next {
if node.info.GetGeneration() <= snapshotGeneration {
// all the nodes are updated before the existing snapshot. We are done.
break
}
if balancedVolumesEnabled && node.info.TransientInfo != nil {
// Transient scheduler info is reset here.
node.info.TransientInfo.ResetTransientSchedulerInfo()
}
if np := node.info.Node(); np != nil {
nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone()
}
}
// Update the snapshot generation with the latest NodeInfo generation.
if cache.headNode != nil {
nodeSnapshot.Generation = cache.headNode.info.GetGeneration()
}
// 删除多余的node节点快照
if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) {
for name := range nodeSnapshot.NodeInfoMap {
if _, ok := cache.nodes[name]; !ok {
delete(nodeSnapshot.NodeInfoMap, name)
}
}
}
// Take a snapshot of the nodes order in the tree
nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
nodeSnapshot.HavePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
for i := 0; i < cache.nodeTree.numNodes; i++ {
nodeName := cache.nodeTree.next()
if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil {
nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n)
if len(n.PodsWithAffinity()) > 0 {
nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n)
}
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
}
return nil
}
2.1.2 g.findNodesThatFit 执行预选算法
g.findNodesThatFit方法是执行预选算法的地方。主要逻辑如下:
(1)调用g.numFeasibleNodesToFind,根据一定的算法,计算并返回预选算法要筛选的node节点数量;
(2)定义checkNode函数,用于筛选合适的node节点;
(3)起16个goroutine,并行的对所有node执行checkNode函数,返回合适的node节点列表,列表长度小于等于g.numFeasibleNodesToFind方法返回值;
(4)遍历scheduler-extender(kube-scheduler的一种webhook扩展机制),对已经过滤过的node再来执行extender的Filter,即执行liqiang.io/post/kubernetes-scheduler-extender-dd6516a6
前面代码分析中提到过的执行的filter,都是kube-scheduler的可扩展机制scheduler framework提供的,该机制在调度器生命周期的各个关键点上,向用户暴露可以进行扩展和实现的接口,从而赋予用户自定义调度器的能力。基于篇幅原因,这里不展开介绍kube-scheduler的可扩展机制,可自行进行了解。
目前kube-scheduler的很多内置预选算法和优选算法都是基于scheduler framework机制实现的。
参考:cloudnative.to/blog/202003-k8s-scheduling-framework/
总结 kube-scheduler简介kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。
kube-scheduler架构图kube-scheduler的大致组成和处理流程如下图,kube-scheduler对pod、node等对象进行了list/watch,根据informer将未调度的pod放入待调度pod队列,并根据informer构建调度器cache(用于快速获取需要的node等对象),然后sched.scheduleOne方法为kube-scheduler组件调度pod的核心处理逻辑所在,从未调度pod队列中取出一个pod,经过预选与优选算法,最终选出一个最优node,然后更新cache并异步执行bind操作,也就是更新pod的nodeName字段,至此一个pod的调度工作完成。
下方处理流程图展示了sched.scheduleOne方法的核心处理步骤,其中kube-scheduler扩展机制相关的步骤未画出。
本文共计3292个文字,预计阅读时间需要14分钟。
kube-scheduler是Kubernetes的核心组件之一,主要负责调度Pod资源。具体来说,kube-scheduler根据调度算法(包括预选算法和优选算法)将未调度的Pod分配到最合适的节点上。
kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。 kube-scheduler源码分析(2)-核心处理逻辑分析 kube-scheduler简介kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。
kube-scheduler架构图kube-scheduler的大致组成和处理流程如下图,kube-scheduler对pod、node等对象进行了list/watch,根据informer将未调度的pod放入待调度pod队列,并根据informer构建调度器cache(用于快速获取需要的node等对象),然后sched.scheduleOne方法为kube-scheduler组件调度pod的核心处理逻辑所在,从未调度pod队列中取出一个pod,经过预选与优选算法,最终选出一个最优node,然后更新cache并异步执行bind操作,也就是更新pod的nodeName字段,至此一个pod的调度工作完成。
kube-scheduler组件的分析将分为两大块进行,分别是:
(1)kube-scheduler初始化与启动分析;
(2)kube-scheduler核心处理逻辑分析。
上一篇进行了kube-scheduler组件的初始化与启动分析,本篇进行核心处理逻辑分析。
2.kube-scheduler核心处理逻辑分析 基于tag v1.17.4github.com/kubernetes/kubernetes/releases/tag/v1.17.4
直接看到kube-scheduler核心处理方法sched.Run。
sched.Runsched.Run主要逻辑:
(1)判断informer中的对象cache是否同步完成;
(2)循环调用sched.scheduleOne调度pod。
// pkg/scheduler/scheduler.go
func (sched *Scheduler) Run(ctx context.Context) {
// 1.判断informer中的对象cache是否同步完成
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
// 2.循环调用sched.scheduleOne调度pod
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
}
sched.scheduleOne
sched.scheduleOne方法作用是调度一个pod到合适的node节点,主要逻辑如下:
(1)从scheduler的待调度pod队列中取出一个pod,如果该pod的deletetimestamp不为空(代表处于删除状态)则跳过该pod的调度工作;
(2)调用sched.Algorithm.Schedule执行调度算法,返回通过预算及优选算法算出的nodo节点;
(3)当执行调度算法失败时,上报调度失败event,更新pod的status;
(4)当没有找到合适的节点时,判断scheduler是否开启了抢占调度机制,是则调用sched.preempt执行抢占逻辑;
(5)调用sched.VolumeBinder.Binder.AssumePodVolumes,更新cache,判断关联pvc是否都已bound;
(6)执行调用 "reserve" plugins(有印象即可,后面会对该类plugins进行讲解);
(7)调用sched.assume,在scheduler的cache中记录这个pod已经调度了,因为更新pod的nodeName是异步操作,防止pod被重复调度;
(8)起一个goroutine,异步执行pod的binding操作:
(8.1)执行调用 "permit" plugins(有印象即可,后面会对该类plugins进行讲解);
(8.2)调用sched.bindVolumes,绑定volumes;
(8.3)执行调用 "prebind" plugins(有印象即可,后面会对该类plugins进行讲解);
(8.4)更新pod的nodeName,写入etcd;
(8.5)执行调用 "postbind" plugins(有印象即可,后面会对该类plugins进行讲解),该pod调度结束。
// pkg/scheduler/scheduler.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk := sched.Framework
// 1.从scheduler的待调度pod队列中取出一个pod
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
// 如果该pod的deletetimestamp不为空(代表处于删除状态)则跳过该pod的调度工作
if pod.DeletionTimestamp != nil {
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return
}
klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
// 2.调用sched.schedule执行调度算法,返回通过预算及优选算法算出的nodo节点
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
if err != nil {
// 3.当执行调度算法失败时,上报调度失败event,更新pod的status
sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
// Schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
// 4.当没有找到合适的节点时,判断scheduler是否开启了抢占调度机制,是则调用sched.preempt执行抢占逻辑
if sched.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
}
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to
// schedule it. (hopefully)
metrics.PodScheduleFailures.Inc()
} else {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedSchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// 5.更新cache,判断关联pvc是否都已bound
// Assume volumes first before assuming the pod.
//
// If all volumes are completely bound, then allBound is true and binding will be skipped.
//
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
//
// This function modifies 'assumedPod' if volume binding is required.
allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
metrics.PodScheduleErrors.Inc()
return
}
// 6.执行调用 "reserve" plugins
// Run "reserve" plugins.
if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
metrics.PodScheduleErrors.Inc()
return
}
// 7.调用sched.assume,在scheduler的cache中记录这个pod已经调度了,因为更新pod的nodeName是异步操作,防止pod被重复调度
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
// 8.起一个goroutine,异步执行pod的binding操作
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
// Run "permit" plugins.
permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !permitStatus.IsSuccess() {
var reason string
if permitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message())
return
}
// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error())
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
}
// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
var reason string
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
return
}
err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
if klog.V(2) {
klog.Infof("pod %v/%v is bound successfully on node %q, %d nodes evaluated, %d nodes were found feasible.", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
}
metrics.PodScheduleSuccesses.Inc()
metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
// Run "postbind" plugins.
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}
2.1 sched.Algorithm.Schedule
sched.Algorithm.Schedule主要作用是执行预选算法和优选算法,给pod算出一个合适的node,其主要逻辑为:
(1)对pod使用到的pvc进行检查,检查其是否处于删除状态;
(2)调用g.snapshot,获取当前的所有node节点信息快照,用于本轮调度,包括下面执行的预算算法与优选算法都将使用该node快照;
(3)执行调用 "prefilter" plugins(有印象即可,后面会对该类plugins进行讲解);
(4)调用g.findNodesThatFit,执行预选算法,筛选出符合该pod运行条件的合适node节点;
(5)执行调用 "postfilter" plugins(有印象即可,后面会对该类plugins进行讲解);
(6)判断合适的node节点数,如果为0直接返回失败;
(7)判断合适的node节点数,如果为1则直接返回该node节点,不往下执行优选算法;
(8)调用g.prioritizeNodes,执行优选算法,给合适的所有node节点打分;
(9)调用g.selectHost,从优选算法打分过后的node节点中选择最佳(即得分最高)的node节点并返回。
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
// 1.对pod使用到的pvc进行检查,检查其是否处于删除状态
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
trace.Step("Basic checks done")
// 2.调用g.snapshot,获取当前的所有node节点信息快照,用于本轮调度,包括下面执行的预算算法与优选算法都将使用该node快照
if err := g.snapshot(); err != nil {
return result, err
}
trace.Step("Snapshoting scheduler cache and node infos done")
if len(g.nodeInfoSnapshot.NodeInfoList) == 0 {
return result, ErrNoNodesAvailable
}
// Run "prefilter" plugins.
preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod)
if !preFilterStatus.IsSuccess() {
return result, preFilterStatus.AsError()
}
trace.Step("Running prefilter plugins done")
// 4.调用g.findNodesThatFit,执行预选算法,筛选出符合该pod运行条件的合适node节点
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(ctx, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")
// Run "postfilter" plugins.
postfilterStatus := g.framework.RunPostFilterPlugins(ctx, state, pod, filteredNodes, filteredNodesStatuses)
if !postfilterStatus.IsSuccess() {
return result, postfilterStatus.AsError()
}
// 6.判断合适的node节点数,如果为0直接返回失败
if len(filteredNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: len(g.nodeInfoSnapshot.NodeInfoList),
FailedPredicates: failedPredicateMap,
FilteredNodesStatuses: filteredNodesStatuses,
}
}
trace.Step("Running postfilter plugins done")
metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime))
metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
// 7.判断合适的node节点数,如果为1则直接返回该node节点,不往下执行优选算法
startPriorityEvalTime := time.Now()
// When only one node after predicate, just use it.
if len(filteredNodes) == 1 {
metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
return ScheduleResult{
SuggestedHost: filteredNodes[0].Name,
EvaluatedNodes: 1 + len(failedPredicateMap) + len(filteredNodesStatuses),
FeasibleNodes: 1,
}, nil
}
// 8.调用g.prioritizeNodes,执行优选算法,给合适的所有node节点打分
metaPrioritiesInterface := g.priorityMetaProducer(pod, filteredNodes, g.nodeInfoSnapshot)
priorityList, err := g.prioritizeNodes(ctx, state, pod, metaPrioritiesInterface, filteredNodes)
if err != nil {
return result, err
}
metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
// 9.调用g.selectHost,从合适的node节点中选择最佳(即得分最高)的node节点并返回
host, err := g.selectHost(priorityList)
trace.Step("Prioritizing done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap) + len(filteredNodesStatuses),
FeasibleNodes: len(filteredNodes),
}, err
}
2.1.1 g.snapshot
g.snapshot方法主要是获取当前的所有node节点信息快照,用于本轮调度,包括后面执行的预算算法与优选算法都将使用该node快照。当node节点信息没有变化时(根据node节点的generation大小判断),该方法直接返回现有node节点信息快照,无需更新,当node节点信息有变化时才更新快照并返回最新快照。
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
}
// pkg/scheduler/internal/cache/cache.go
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
// Get the last generation of the snapshot.
snapshotGeneration := nodeSnapshot.Generation
// 增加和更新node节点快照
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
for node := cache.headNode; node != nil; node = node.next {
if node.info.GetGeneration() <= snapshotGeneration {
// all the nodes are updated before the existing snapshot. We are done.
break
}
if balancedVolumesEnabled && node.info.TransientInfo != nil {
// Transient scheduler info is reset here.
node.info.TransientInfo.ResetTransientSchedulerInfo()
}
if np := node.info.Node(); np != nil {
nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone()
}
}
// Update the snapshot generation with the latest NodeInfo generation.
if cache.headNode != nil {
nodeSnapshot.Generation = cache.headNode.info.GetGeneration()
}
// 删除多余的node节点快照
if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) {
for name := range nodeSnapshot.NodeInfoMap {
if _, ok := cache.nodes[name]; !ok {
delete(nodeSnapshot.NodeInfoMap, name)
}
}
}
// Take a snapshot of the nodes order in the tree
nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
nodeSnapshot.HavePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
for i := 0; i < cache.nodeTree.numNodes; i++ {
nodeName := cache.nodeTree.next()
if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil {
nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n)
if len(n.PodsWithAffinity()) > 0 {
nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n)
}
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
}
return nil
}
2.1.2 g.findNodesThatFit 执行预选算法
g.findNodesThatFit方法是执行预选算法的地方。主要逻辑如下:
(1)调用g.numFeasibleNodesToFind,根据一定的算法,计算并返回预选算法要筛选的node节点数量;
(2)定义checkNode函数,用于筛选合适的node节点;
(3)起16个goroutine,并行的对所有node执行checkNode函数,返回合适的node节点列表,列表长度小于等于g.numFeasibleNodesToFind方法返回值;
(4)遍历scheduler-extender(kube-scheduler的一种webhook扩展机制),对已经过滤过的node再来执行extender的Filter,即执行liqiang.io/post/kubernetes-scheduler-extender-dd6516a6
前面代码分析中提到过的执行的filter,都是kube-scheduler的可扩展机制scheduler framework提供的,该机制在调度器生命周期的各个关键点上,向用户暴露可以进行扩展和实现的接口,从而赋予用户自定义调度器的能力。基于篇幅原因,这里不展开介绍kube-scheduler的可扩展机制,可自行进行了解。
目前kube-scheduler的很多内置预选算法和优选算法都是基于scheduler framework机制实现的。
参考:cloudnative.to/blog/202003-k8s-scheduling-framework/
总结 kube-scheduler简介kube-scheduler组件是kubernetes中的核心组件之一,主要负责pod资源对象的调度工作,具体来说,kube-scheduler组件负责根据调度算法(包括预选算法和优选算法)将未调度的pod调度到合适的最优的node节点上。
kube-scheduler架构图kube-scheduler的大致组成和处理流程如下图,kube-scheduler对pod、node等对象进行了list/watch,根据informer将未调度的pod放入待调度pod队列,并根据informer构建调度器cache(用于快速获取需要的node等对象),然后sched.scheduleOne方法为kube-scheduler组件调度pod的核心处理逻辑所在,从未调度pod队列中取出一个pod,经过预选与优选算法,最终选出一个最优node,然后更新cache并异步执行bind操作,也就是更新pod的nodeName字段,至此一个pod的调度工作完成。
下方处理流程图展示了sched.scheduleOne方法的核心处理步骤,其中kube-scheduler扩展机制相关的步骤未画出。

