概述
kube-scheduler是k8s的控制面核心组件,负责将Pod调度到合适的节点。通过监听集群中未分配Node的Pod,之后通过一系列算法挑选出最合适的Node,更新到Pod的NodeName字段完成调度。
调度过程要考虑诸多因素:
公平调度
资源的高效利用
QoS
亲和与反亲和(affinity和anti-affinity)
数据本地化(data locality)
内部负载的干扰(inter-workload interface)
deadlines
调度过程
整个调度过程大体分为两个阶段:
Predicate(预选):过滤掉不符合条件的节点
Priority(优选):对剩余节点进行打分,选出得分最高的节点
对于规模较大的集群,如果对每个集群都要进行打分过程,则调度过程会耗时很长。kube-scheduler的默认策略为对于超过100个节点的集群,需要计算50%的Node,对于超过5000的超大集群,则取10%,最低为5%。且kube-scheduler还提供了percentageOfNodesToScore配置参数,用于手工指定需要打分的节点比例。
apiVersion: kube-scheduler.config.k8s.io/v1beta2
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: default-scheduler
plugins:
score:
enabled:
- name: NodeResourcesBalancedAllocation
percentageOfNodesToScore: 50
代码结构
kube-scheduler的代码主要在kubernetes项目下的cmd/kube-scheduler/
和pkg/scheduler/
。
启动过程
入口
kube-scheduler通过cobra实现的命令行应用。入口为cmd/kube-scheduler/scheduler.go
:
func main() {
command := app.NewSchedulerCommand()
code := cli.Run(command)
os.Exit(code)
}
创建cobra.Command对象,然后运行Command。实际上cli.Run方法最终会调用cmd.Execute()方法,执行Command对象的PersistentPreRunE,进行配置信息和特性门禁等的初始化后,最终执行RunE指定的runCommand方法进入主流程:
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
PersistentPreRunE: func(*cobra.Command, []string) error {
// makes sure feature gates are set before RunE.
return opts.ComponentGlobalsRegistry.Set()
},
RunE: func(cmd *cobra.Command, args []string) error {
return runCommand(cmd, opts, registryOptions...)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
在runCommand中,可以看到前面进行了参数和特性门禁的校验,然后注册了信号监听的协程,接收到syscall.SIGINT和syscall.SIGTERM后调用根Context的cancel方法,通知其他协程退出:
/ runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
verflag.PrintAndExitIfRequested()
fg := opts.ComponentGlobalsRegistry.FeatureGateFor(utilversion.DefaultKubeComponent)
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := logsapi.ValidateAndApply(opts.Logs, fg); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
cliflag.PrintFlags(cmd.Flags())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
stopCh := server.SetupSignalHandler()
<-stopCh
cancel()
}()
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil {
return err
}
// add feature enablement metrics
fg.(featuregate.MutableFeatureGate).AddMetrics()
return Run(ctx, cc, sched)
}
看到runCommand方法主要流程的三步,分别是Setup方法,调用featureGate对象的AddMetrics方法,和主流程Run方法。
AddMetrics是根据featureGate添加监控指标,先不用看。
Setup方法根据注释来看是使用命令行args和之前的options共同创建出scheduler运行需要的完整的配置信息:
// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if cfg, err := latest.Default(); err != nil {
return nil, nil, err
} else {
opts.ComponentConfig = cfg
}
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
c, err := opts.Config(ctx)
if err != nil {
return nil, nil, err
}
// Get the completed config
cc := c.Complete()
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
// Create the scheduler.
sched, err := scheduler.New(ctx,
cc.Client,
cc.InformerFactory,
cc.DynInformerFactory,
recorderFactory,
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
completedProfiles = append(completedProfiles, profile)
}),
)
if err != nil {
return nil, nil, err
}
if err := options.LogOrWriteConfig(klog.FromContext(ctx), opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
return nil, nil, err
}
return &cc, sched, nil
}
Setup方法执行过程
生成完整配置
校验参数
首先调用latest.Default()方法生成了cfg这个KubeSchedulerConfiguration对象,即最新版本的组件默认配置项,然后调用opts.Validate()方法对opts中的必要参数进行了校验,点进内部可以看到有很多校验,包括对全局组件的版本和门禁的校验、对KubeSchedulerConfiguration的校验(如客户端连接、leader选举、管理Pod并发度的parallelism参数、之前提到的percentageOfNodesToScore等等)、对服务器、认证鉴权、监控项的校验,等等。
补齐scheduler运行需要的相关参数对象
然后除了前面的一些静态参数以外,对于scheduler运行必要的属性对象也需要进行创建,比如创建好连接apiserver的clientset、eventClient、InformerFactory、dynamicClient等等。创建好c这个config.Config对象(代码位于/cmd/kube-scheduler/app/config/config.go
)。再通过c.Complete()方法检验配置本地的loopback client(用于通过loopback地址访问apiserver,提高性能)是否可以正常访问apiserver。最终得到config.CompletedConfig对象(就是config.Config对象),可完整用于sheduler的运行。
创建出Scheduler对象
该步骤调用scheduler.New方法创建出最终的调度器Scheduler对象。
创建Scheduler过程
Scheduler对象是调度器核心对象,负责监听没有调度的Pod,并找出Pod合适调度的节点,并更新到kube-apiserver。代码位于pkg/scheduler/scheduler.go
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
// It is expected that changes made via Cache will be observed
// by NodeLister and Algorithm.
Cache internalcache.Cache
Extenders []framework.Extender
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func(logger klog.Logger) (*framework.QueuedPodInfo, error)
// FailureHandler is called upon a scheduling failure.
FailureHandler FailureHandlerFn
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
// Return a struct of ScheduleResult with the name of suggested host on success,
// otherwise will return a FitError with reasons.
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
// Profiles are the scheduling profiles.
Profiles profile.Map
client clientset.Interface
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
// logger *must* be initialized when creating a Scheduler,
// otherwise logging functions will access a nil sink and
// panic.
logger klog.Logger
// registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start.
registeredHandlers []cache.ResourceEventHandlerRegistration
}
主要字段含义:
Cache:缓存调度过程中需要的数据,如Node、Pod信息,用于快速查询。
Extenders:scheduler支持外部的extender对调度过程的干预,extender就是在外部配置的某个server,可以在调度的不同阶段,由scheduler根据配置进行访问,以此影响调度结果,后面我们会看到。
NextPod:是个方法,用于从SchedulingQueue中取出下一个等待调度的Pod。
FailureHandler:调度失败时的处理函数。
SchedulePod:是个方法,尝试将Pod调度到某个节点,返回调度结果ScheduleResult对象,调度成功里面会包含Node信息。
StopEverything:channel,来自于全局的Context.Done,用于通知关闭调度器。
SchedulingQueue:存储待调度Pod的优先队列。
Profiles:调度配置文件的集合,定义调度器的配置和默认使用的调度插件。
client:用于和kube-apiserver交互的Clientset。
nodeInfoSnapshot:某一时刻集群中节点信息的快照,在每一轮调度开始时会生成。
接下来看一下Scheduler时如何被创建的:
// New returns a Scheduler
func New(ctx context.Context,
client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
opts ...Option) (*Scheduler, error) {
logger := klog.FromContext(ctx)
stopEverything := ctx.Done()
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
if options.applyDefaultProfile {
var versionedCfg configv1.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := schedulerapi.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
options.profiles = cfg.Profiles
}
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
metrics.Register()
extenders, err := buildExtenders(logger, options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
}
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage
waitingPods := frameworkruntime.NewWaitingPodsMap()
var resourceClaimCache *assumecache.AssumeCache
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithResourceClaimCache(resourceClaimCache),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
frameworkruntime.WithWaitingPods(waitingPods),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
var returnErr error
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
queueingHintsPerProfile[profileName], err = buildQueueingHintMap(ctx, profile.EnqueueExtensions())
if err != nil {
returnErr = errors.Join(returnErr, err)
}
}
if returnErr != nil {
return nil, returnErr
}
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodLister(podLister),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
internalqueue.WithMetricsRecorder(*metricsRecorder),
)
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
}
schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(ctx)
sched := &Scheduler{
Cache: schedulerCache,
client: client,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders,
StopEverything: stopEverything,
SchedulingQueue: podQueue,
Profiles: profiles,
logger: logger,
}
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}
return sched, nil
}
首先使用defaultSchedulerOptions创建了options配置对象,然后使用传入的众多的Option方法,对options对象做修改。然后根据参数options.applyDefaultProfile判断是否要配置默认配置项,该参数默认为true。scheme.Scheme.Default()方法实际上调用了文件/pkg/scheduler/apis/config/v1/defaults.go
文件的SetDefaults_KubeSchedulerConfiguration方法对比如SchedulerName、PercentageOfNodesToScore、leaderElection.ResourceNamespace这些配置项进行了默认配置。
然后将内部提供的调度插件注册表InTreeRegistry和外部配置的插件注册表OutOfTreeRegistry做了合并。默认的注册的插件在文件/pkg/scheduler/apis/config/v1/default_plugins.go文件中可以看到,如下:
// getDefaultPlugins returns the default set of plugins.
func getDefaultPlugins() *v1.Plugins {
plugins := &v1.Plugins{
MultiPoint: v1.PluginSet{
Enabled: []v1.Plugin{
{Name: names.SchedulingGates},
{Name: names.PrioritySort},
{Name: names.NodeUnschedulable},
{Name: names.NodeName},
{Name: names.TaintToleration, Weight: ptr.To[int32](3)},
{Name: names.NodeAffinity, Weight: ptr.To[int32](2)},
{Name: names.NodePorts},
{Name: names.NodeResourcesFit, Weight: ptr.To[int32](1)},
{Name: names.VolumeRestrictions},
{Name: names.NodeVolumeLimits},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread, Weight: ptr.To[int32](2)},
{Name: names.InterPodAffinity, Weight: ptr.To[int32](2)},
{Name: names.DefaultPreemption},
{Name: names.NodeResourcesBalancedAllocation, Weight: ptr.To[int32](1)},
{Name: names.ImageLocality, Weight: ptr.To[int32](1)},
{Name: names.DefaultBinder},
},
},
}
applyFeatureGates(plugins)
return plugins
}
接下来注册了监控指标metrics。
然后使用buildExtenders()方法,该方法用于构造出完整的扩展器列表。
接下来创建好需要用到的podLister和nodeLister。
然后使用已有的KubeSchedulerProfile对象生成了profiles对象,profiles实际上是一个Map,key为调度器的名字,默认的调度器是default-scheduler,value则为对应的framework.Framework对象,framework.Framework是一个接口,它的默认实现为frameworkImpl结构体,实际上Scheduler运行调度的整个过程就是逐个调用framework.Framework的方法(如果要自己实现一个新的调度器,就需要实现这个接口)。代码位于/pkg/scheduler/framework/runtime/framework.go
,后续我们会讨论。
接下来初始化了两个容器,preEnqueuePluginMap和queueingHintsPerProfile。queueingHintsPerProfile里面保存了每种调度器的每个插件所对应的framework.QueueingHintFn,它是个方法,作用是在插件运行时用来判断该Pod当前阶段可不可以继续被调度,防止Pod不可调度时队列出现阻塞导致其他Pod无法被处理。首先,每个插件都会实现EnqueueExtensions接口,该接口只有一个方法就是EventsToRegister,可以注册插件感兴趣的事件,那么当事件到达时,就可以调用framework.QueueingHintFn方法判断该事件会不会引发当前Pod不可调度,该方法的返回值为framework.QueueingHint和error,framework.QueueingHint只有两个值:0和1,分别代表Pod不可调度和允许调度。
接下来初始化了调度队列podQueue,类型为SchedulingQueue,实际上是一个优先队列,存储等待调度的Pod。给每个frame.Framework(即每个调度器),绑定好调度队列podQueue。
最后调用addAllEventHandlers方法,注册PodInformer的Add、Update和Delete到回调,比如创建Pod时执行PreEnqueuePlugin插件并将Pod添加到activeQ。
启动Scheduler
在完成Profile和Scheduler的创建后,进入kube-scheduler的主逻辑app.Run方法。前面创建了configz对象,注册事件广播器,启动http服务器,启动所有的Informer和Cache并等待同步,启动leader选举等等,最后执行sched.Run方法启动调度器
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
logger := klog.FromContext(ctx)
// To help debugging, immediately log version
logger.Info("Starting Kubernetes Scheduler", "version", version.Get())
logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()
// Setup healthz checks.
var checks, readyzChecks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
readyzChecks = append(readyzChecks, cc.LeaderElection.WatchDog)
}
readyzChecks = append(readyzChecks, healthz.NewShutdownHealthz(ctx.Done()))
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
handlerSyncReadyCh := make(chan struct{})
handlerSyncCheck := healthz.NamedCheck("sched-handler-sync", func(_ *http.Request) error {
select {
case <-handlerSyncReadyCh:
return nil
default:
}
return fmt.Errorf("waiting for handlers to sync")
})
readyzChecks = append(readyzChecks, handlerSyncCheck)
if cc.LeaderElection != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
binaryVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).BinaryVersion().String())
if err != nil {
return err
}
emulationVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).EmulationVersion().String())
if err != nil {
return err
}
// Start lease candidate controller for coordinated leader election
leaseCandidate, waitForSync, err := leaderelection.NewCandidate(
cc.Client,
metav1.NamespaceSystem,
cc.LeaderElection.Lock.Identity(),
"kube-scheduler",
binaryVersion.FinalizeVersion(),
emulationVersion.FinalizeVersion(),
[]coordinationv1.CoordinatedLeaseStrategy{coordinationv1.OldestEmulationVersion},
)
if err != nil {
return err
}
readyzChecks = append(readyzChecks, healthz.NewInformerSyncHealthz(waitForSync))
go leaseCandidate.Run(ctx)
}
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks, readyzChecks), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
startInformersAndWaitForSync := func(ctx context.Context) {
// Start all informers.
cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
// Wait for all handlers to sync (all items in the initial list delivered) before scheduling.
if err := sched.WaitForHandlersSync(ctx); err != nil {
logger.Error(err, "waiting for handlers to sync")
}
close(handlerSyncReadyCh)
logger.V(3).Info("Handlers synced")
}
if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
startInformersAndWaitForSync(ctx)
}
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) {
cc.LeaderElection.Coordinated = true
}
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
if cc.ComponentConfig.DelayCacheUntilActive {
logger.Info("Starting informers and waiting for sync...")
startInformersAndWaitForSync(ctx)
logger.Info("Sync completed")
}
sched.Run(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
logger.Info("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
logger.Error(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
调度器的执行
Scheduler.Run方法启动调度器,启动过程较为简单,首先启动了调度队列SchedulingQueue.Run方法,然后通过另一个goroutine周期性执行Scheduler.ScheduleOne方法。阻塞到全局Context处,等待调度器的关闭:
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
logger := klog.FromContext(ctx)
sched.SchedulingQueue.Run(logger)
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.ScheduleOne, 0)
<-ctx.Done()
sched.SchedulingQueue.Close()
// If the plugins satisfy the io.Closer interface, they are closed.
err := sched.Profiles.Close()
if err != nil {
logger.Error(err, "Failed to close plugins")
}
}
SchedulingQueue的实现
我们先看下调度队列的实现,代码位于pkg/scheduler/backend/queue/scheduling_queue.go
:
// PriorityQueue implements a scheduling queue.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has two sub queues and a additional data structure, namely: activeQ,
// backoffQ and unschedulablePods.
// - activeQ holds pods that are being considered for scheduling.
// - backoffQ holds pods that moved from unschedulablePods and will move to
// activeQ when their backoff periods complete.
// - unschedulablePods holds pods that were already attempted for scheduling and
// are currently determined to be unschedulable.
type PriorityQueue struct {
*nominator
stop chan struct{}
clock clock.Clock
// lock takes precedence and should be taken first,
// before any other locks in the queue (activeQueue.lock or nominator.nLock).
// Correct locking order is: lock > activeQueue.lock > nominator.nLock.
lock sync.RWMutex
// pod initial backoff duration.
podInitialBackoffDuration time.Duration
// pod maximum backoff duration.
podMaxBackoffDuration time.Duration
// the maximum time a pod can stay in the unschedulablePods.
podMaxInUnschedulablePodsDuration time.Duration
activeQ activeQueuer
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ *heap.Heap[*framework.QueuedPodInfo]
// unschedulablePods holds pods that have been tried and determined unschedulable.
unschedulablePods *UnschedulablePods
// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unschedulable pods in and before this scheduling
// cycle will be put back to activeQueue if we were trying to schedule them
// when we received move request.
// TODO: this will be removed after SchedulingQueueHint goes to stable and the feature gate is removed.
moveRequestCycle int64
// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
// queueingHintMap is keyed with profile name, valued with registered queueing hint functions.
queueingHintMap QueueingHintMapPerProfile
nsLister listersv1.NamespaceLister
metricsRecorder metrics.MetricAsyncRecorder
// pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled.
pluginMetricsSamplePercent int
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
isSchedulingQueueHintEnabled bool
}
调度队列的本质是一个有优先级的队列,底层通过堆实现。底层是三个核心数据结构:activeQ、podBackoffQ、unschedulablePods。activeQ、podBackoffQ底层都是通过堆实现的优先队列,activeQ、podBackoffQ,unschedulablePods底层是一个Map:
activeQ:用于按照调度优先级存放等待调度的Pod,Scheduler将会从该activeQ中逐个取出Pod进行调度。每次有新Pod加入时它的broadcast()方法会通知调度器执行pop()方法开始调度
podBackoffQ:用于按照Pod的backoff过期时间排序存放Pod,Pod调度失败后,需要不停的回退调度,这些在回退时间内的Pod暂时存放在这里
unschedulablePods:被认为是当前不可调度的Pod,会被保存到这个Map中,等待合适的时机重新调度。
其他核心字段的含义:
lock:并发访问队列时需要加锁
podInitialBackoffDuration:Pod初始的回退时间和每次尝试调度失败后增加的时间,默认1s
podMaxBackoffDuration:Pod最大的回退时间,达到这个时间后回退时间将不再增加,默认10s
podMaxInUnschedulablePodsDuration:Pod在unschedulablePods中等待的最大时间,默认5min
moveRequestCycle:调度周期的序列号,每次调度周期开始时,需要将unschedulablePods中的Pod全部放入activeQ中等待重新调度
preEnqueuePluginMap:存放在入队之前需要执行的插件
queueingHintMap:存放入队前各个插件判断是否有Event会影响Pod不可调度的方法
然后我们来看SchedulingQueue的启动过程:
// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run(logger klog.Logger) {
go wait.Until(func() {
p.flushBackoffQCompleted(logger)
}, 1.0*time.Second, p.stop)
go wait.Until(func() {
p.flushUnschedulablePodsLeftover(logger)
}, 30*time.Second, p.stop)
}
可以看到Run方法中开启了两个定时任务的goroutine:
flushBackoffQCompleted方法:每1s一次,检查backoffQ将backoff时间到达的Pod移入activeQ中
flushUnschedulablePodsLeftover方法:每30s一次,从unschedulablePods中取出等待时间超过了podMaxInUnschedulablePodsDuration的Pod,然后判断Pod是否应该在backoff中,如果未到backoff时间就放到backoffQ中,否则放入activeQ中
那么整体的流程就可以梳理出来了:
调度器通过不停的从activeQ中取出Pod进行调度,当Pod调度失败时将会被放入unschedulablePods中,同时SchedulingQueue会有两个定时任务,定期检查将backoffQ中的Pod取出放入activeQ中,和定期检查unschedulablePods中的Pod,根据Pod的backoff时间放入backoffQ或activeQ中。
调度过程
如上所说,调度器会持续地执行ScheduleOne方法,完成Pod的调度,ScheduleOne方法如下:
// ScheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) ScheduleOne(ctx context.Context) {
logger := klog.FromContext(ctx)
podInfo, err := sched.NextPod(logger)
if err != nil {
logger.Error(err, "Error while retrieving next pod from scheduling queue")
return
}
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("About to try and schedule pod", "pod", klog.KObj(pod))
fwk, err := sched.frameworkForPod(pod)
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
logger.Error(err, "Error occurred")
sched.SchedulingQueue.Done(pod.UID)
return
}
if sched.skipPodSchedule(ctx, fwk, pod) {
// We don't put this Pod back to the queue, but we have to cleanup the in-flight pods/events.
sched.SchedulingQueue.Done(pod.UID)
return
}
logger.V(3).Info("Attempting to schedule pod", "pod", klog.KObj(pod))
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
podsToActivate := framework.NewPodsToActivate()
state.Write(framework.PodsToActivateKey, podsToActivate)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
return
}
}()
}
首先它调用Scheduler.NextPod方法,持续地从SchedulingQueue中取出Pod,如果没有Pod将会阻塞在这里。
接下来调用Scheduler.frameworkForPod方法,根据Pod中所指定的调度器,找到对应的framework.Framework对象,如果Pod中没有指定,则使用默认的default-scheduler。
然后调用Scheduler.skipPodSchedule方法,判断是否可以取消调度该Pod,以下两种情况可以取消调度:
该Pod已被删除
该Pod已经在被调度中(scheduler支持并发调度)
接下来构造一个framework.PodsToActivate对象,底层是一个Map,key是Pod的naespace和name组成的Pod唯一标识,value是Pod的yaml,将会在运行插件时被更新。用于最后向kube-apiserver更新Pod。
然后来到ScheduleOne方法的核心,Scheduler.schedulingCycle方法,完成对该Pod的调度流程。
然后最后的goroutine用于在选出合适node以后,异步的对Pod和Node做绑定,让ScheduleOne方法可以立刻返回,到此一个Pod整体的调度过程就结束了。
我们接下来再看一下一个Pod的详细的调度流程,也就是Scheduler.schedulingCycle方法的实现。首先我们可以先看下framework.Framework接口,这里面的方法就是调度过程中运行各种插件的各个节点,也是我们自己实现调度器时,需要实现的接口。
插件执行顺序
首先我们看下framework.Framework接口,从接口中可以看出不同类型插件的执行阶段和大体执行过程。
// Framework manages the set of plugins in use by the scheduling framework.
// Configured plugins are called at specified points in a scheduling context.
type Framework interface {
Handle
// PreEnqueuePlugins returns the registered preEnqueue plugins.
PreEnqueuePlugins() []PreEnqueuePlugin
// EnqueueExtensions returns the registered Enqueue extensions.
EnqueueExtensions() []EnqueueExtensions
// QueueSortFunc returns the function to sort pods in scheduling queue
QueueSortFunc() LessFunc
// RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
// It also returns a PreFilterResult, which may influence what or how many nodes to
// evaluate downstream.
// The third returns value contains PreFilter plugin that rejected some or all Nodes with PreFilterResult.
// But, note that it doesn't contain any plugin when a plugin rejects this Pod with non-success status,
// not with PreFilterResult.
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status, sets.Set[string])
// RunPostFilterPlugins runs the set of configured PostFilter plugins.
// PostFilter plugins can either be informational, in which case should be configured
// to execute first and return Unschedulable status, or ones that try to change the
// cluster state to make the pod potentially schedulable in a future scheduling cycle.
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusReader) (*PostFilterResult, *Status)
// RunPreBindPlugins runs the set of configured PreBind plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If the Status code is "Unschedulable", it is
// considered as a scheduling check failure, otherwise, it is considered as an
// internal error. In either case the pod is not going to be bound.
RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// RunPostBindPlugins runs the set of configured PostBind plugins.
RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
// RunReservePluginsReserve runs the Reserve method of the set of
// configured Reserve plugins. If any of these calls returns an error, it
// does not continue running the remaining ones and returns the error. In
// such case, pod will not be scheduled.
RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// RunReservePluginsUnreserve runs the Unreserve method of the set of
// configured Reserve plugins.
RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
// RunPermitPlugins runs the set of configured Permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will create and add waiting pod
// to a map of currently waiting pods and return status with "Wait" code.
// Pod will remain waiting pod for the minimum duration returned by the Permit plugins.
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
// RunBindPlugins runs the set of configured Bind plugins. A Bind plugin may choose
// whether or not to handle the given Pod. If a Bind plugin chooses to skip the
// binding, it should return code=5("skip") status. Otherwise, it should return "Error"
// or "Success". If none of the plugins handled binding, RunBindPlugins returns
// code=5("skip") status.
RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// HasFilterPlugins returns true if at least one Filter plugin is defined.
HasFilterPlugins() bool
// HasPostFilterPlugins returns true if at least one PostFilter plugin is defined.
HasPostFilterPlugins() bool
// HasScorePlugins returns true if at least one Score plugin is defined.
HasScorePlugins() bool
// ListPlugins returns a map of extension point name to list of configured Plugins.
ListPlugins() *config.Plugins
// ProfileName returns the profile name associated to a profile.
ProfileName() string
// PercentageOfNodesToScore returns percentageOfNodesToScore associated to a profile.
PercentageOfNodesToScore() *int32
// SetPodNominator sets the PodNominator
SetPodNominator(nominator PodNominator)
// Close calls Close method of each plugin.
Close() error
}
同一个插件可以在多个阶段执行,只要实现了对应的接口。插件执行的各个流程节点:
PreEnqueue:在Pod进入activeQ之前执行,schedulingGates插件用于入队之前的门禁,PreEnqueuePlugin需要保证轻量高效
PreFilter:在调度周期的最开始执行,对Node进行过滤,或者做一些对Filter过程的预处理,比如检查Node状态是否为UnSchedulable、查询Pod使用的HostPort等待Filter阶段检查端口冲突等,最终得到一个Node的集合。PreFilter阶段如果过滤掉了Node,这些Node被标记为UnschedulableAndUnresolvable,不会被抢占
PreFilterExtention:这是一个拓展点,用于对PreFilter的结果进行增量调整,它的返回值是个接口,需要实现AddPod和RemovePod方法。由于Scheduler是支持并发调度Pod的,且Pod和Node的绑定是异步于主流程的,所以需要在过滤Node时判断当前已提名的Pod如果调度到该Node上的影响,且在抢占插件中也需要考虑从一个Node上移除某个Pod造成的影响,评估这些信息用于过滤掉某些Node
Filter:PreFilter执行结束后,对每个Node执行,用于判断给定Node是否满足要求,比如处理亲和性反亲和性的NodeAffinity、PodAffinity、PodAntiAffinity插件,检查节点是否有足够资源的NodeResources插件等
PostFilter:在PreFilter插件或Filter插件执行失败时会调用,作用是即使Pod不可调用,也可以通过,比如处理抢占的插件DefaultPreemptiojn(对于高优先级的Pod,即使没有Node可以被调度,也可以抢占低优先级的Pod,驱逐低优先级的Pod,释放资源给高优先级Pod调用)
PreScore:在Filter阶段成功得到了一批Node时执行,同样也是做Score打分阶段的前置准备动作,比如计算出Pod需要的总资源数,或者保存Pod的PreferredNodeAffinity,用于后续Score阶段使用
Score:对每个Node执行,这个阶段就对Filter阶段得到的所有Node逐个进行打分,比如NodeResources插件在该阶段就会计算出该Node上目前已请求的资源量和Pod申请的资源量,然后根据配置的策略是平衡,最少使用,给Node进行打分
ScoreExtentions:对Score的结果进行拓展到规范化处理,因为插件执行Score得出的Node分数可能不规范,需要保证分数在0-100之间
Reserve:给Pod预留资源,确保Pod调度到某个节点时,该节点有可用的资源。有两个方法Reserve()用于更新Scheduler的缓存预留资源给Pod,Unreserve()则是在Reserve执行失败时回退预留的资源。比如VolumeBinding插件会假设Pod使用的PVC被某个PV绑定,确保Pod调度到该Node上后可以正确挂载PV
Permit:在PreBind之前调用,可以自定义控制逻辑,在Pod被绑定到Node之前,延迟或者阻止Pod被绑定,默认提供的插件都没有实现这个接口
PreBind:在Pod绑定到Node之前,确保Pod需要的资源在Node上已准备周全,Reserve阶段只是一个假定预留资源,这一步需要确定Reserve阶段的预留是有效的。比如VolumeBinding插件,在这一步会调一下更新PV和PVC的接口(但是什么都不做),并等待PV控制器完成PVC和PV的绑定,确保Pod可以顺利调度
Bind:这一步会按照顺序调用BindPlugin,任意一个BindPlugin完成了Pod和Node的绑定,剩余的BindPlugin将会跳过执行。如果BindPlugin不接受处理该Pod,会返回Skip,任意一个BindPlugin返回Error,则该Pod将不会和Node绑定。默认调度器采用DefaultBinder插件,这一步仅仅是访问kube-apiserver填写Pod的nodeName字段
PostBind:这一步仅仅用于清理动作,提高效率,也可以不清理,等待GC清理。目前只有最新的dynamicResources插件实现了该接口
调度器会根据上述顺序,在每个阶段调用注册的插件对应的实现方法,如下:
调度执行过程
如上面schedulingOne方法的说明,整个调度执行过程可以分为两部分:
schedulingCycle方法:同步执行,负责找到Pod需要调度到的Node
bindingCycle方法:异步执行,负责将Pod绑定到选中Node
这两个过程,如果返回失败,就会执行注册给Scheduler的handleSchedulingFailure方法,根据错误原因,以及集群中Pod的实际状态做处理,比如如果Pod已被删除,或者已被调度到Node则不用处理,否则就将Pod丢到SchedulingQueue到unschedulablePods中。
schedulingCycle过程
这一过程主要执行插件的接口为:PreFilter、Filter、PostFilter、PreScore、Score、ScoreExtention、Reserve和Permit,代码如下:
// schedulingCycle tries to schedule a single Pod.
func (sched *Scheduler) schedulingCycle(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
podInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
logger := klog.FromContext(ctx)
pod := podInfo.Pod
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
if err != nil {
defer func() {
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
}()
if err == ErrNoNodesAvailable {
status := framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status
}
fitError, ok := err.(*framework.FitError)
if !ok {
logger.Error(err, "Error selecting node for pod", "pod", klog.KObj(pod))
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, framework.AsStatus(err)
}
// SchedulePod() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if !fwk.HasPostFilterPlugins() {
logger.V(3).Info("No PostFilter plugins are registered, so no preemption will be performed")
return ScheduleResult{}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
}
// Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle.
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatus)
msg := status.Message()
fitError.Diagnosis.PostFilterMsg = msg
if status.Code() == framework.Error {
logger.Error(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
} else {
logger.V(5).Info("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
}
var nominatingInfo *framework.NominatingInfo
if result != nil {
nominatingInfo = result.NominatingInfo
}
return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)
if err != nil {
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.AsStatus(err)
}
// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
}
if sts.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: pod,
Diagnosis: framework.Diagnosis{
NodeToStatus: framework.NewDefaultNodeToStatus(),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, sts)
fitErr.Diagnosis.AddPluginStatus(sts)
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(sts.Code()).WithError(fitErr)
}
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts
}
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
}
if runPermitStatus.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: pod,
Diagnosis: framework.Diagnosis{
NodeToStatus: framework.NewDefaultNodeToStatus(),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, runPermitStatus)
fitErr.Diagnosis.AddPluginStatus(runPermitStatus)
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(runPermitStatus.Code()).WithError(fitErr)
}
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus
}
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
// Clear the entries after activation.
podsToActivate.Map = make(map[string]*v1.Pod)
}
return scheduleResult, assumedPodInfo, nil
}
首先这里封装了一层方法,调用Scheduler.schedulePod方法,挑选出最合适的Node。schedulePod方法中会执行所谓的Predicated和Priority过程(即Filter相关插件和Score相关插件),并得到一个最合适的Node。
如果schedulePod返回了错误,且返回的错误是FitError(由于资源、标签、亲和性、污点、优先级等造成的调度失败)时,会尝试执行PostFilter插件,尝试抢占式调度,如果抢占到了Node,则将Node写入调度结果的nominatingInfo中并返回,下一次调度时可以直接使用。
如果正常得到了最合适的Node,则通过assume方法,更新缓存中的Node和调度信息,假定该Pod已经调度到了某个Node,以防止干扰其他Pod到调度。(这里会有一个过期时间,如果Pod假定调度完成后一段时间没得到确认,则会从Node上减去这个Pod,以保证缓存中信息的准确)。如果假定失败(比如Node上资源已不足,则本次调度失败,在方法返回后Pod将被移入unschedulablePods中)。
接下来开始执行所有Reserve插件,比如VolumeBinding插件,这一步会找到合适的PV,并假定PVC和PV已经完成了绑定,更新存储信息到缓存中。这样schedulingCycle就执行结束了。
接下来我们详细看下完整实现了Predicated和Score过程的Scheduler.schedulePod方法(最核心的处理逻辑):
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError with reasons.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
if err := sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot); err != nil {
return result, err
}
trace.Step("Snapshotting scheduler cache and node infos done")
if sched.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")
if len(feasibleNodes) == 0 {
return result, &framework.FitError{
Pod: pod,
NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
Diagnosis: diagnosis,
}
}
// When only one node after predicate, just use it.
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Node().Name,
EvaluatedNodes: 1 + diagnosis.NodeToStatus.Len(),
FeasibleNodes: 1,
}, nil
}
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
trace.Step("Prioritizing done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + diagnosis.NodeToStatus.Len(),
FeasibleNodes: len(feasibleNodes),
}, err
}
在调度的开始可以看到,首先查询了缓存中的最新Node信息和Pod调度信息,确保调度的准确性。
接下来处理没有Node的情况下直接返回。
然后调用findNodesThatFitPod方法,完成predicated阶段,找出所有可行的Node。
首先执行了PreFilter插件
然后判断该Pod是否在上一轮的调度中抢占到了其他Pod的节点,如果抢占到了,就直接返回抢占到的Node。
执行所有Filter插件,注意这里并没有遍历所有Node,而是使用percentageOfNodesToScore计算出需要的Node个数,通过sched.nextStartNodeIndex判断从哪个Node开始过滤,取到足够的可用Node就返回:
numNodesToFind := sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes))
。接下来调用findNodesThatPassExtenders方法,Extender是kube-scheduler的拓展机制,会调用配置的Extender的HTTP接口,访问外部进程,外部的Extender会返回通过的Node列表。同时,如果这一步有过滤掉Node,需要记录诊断信息。
findNodesThatFitPod方法执行完成后,就得到了一份可用的Node列表。这里判断了如果Node列表中只有一个Node,则没得选择,也不需要进行Score阶段了,直接返回该Node。
然后调用prioritizeNodes方法,进行打分阶段。
首先执行PreScore插件
然后执行Score插件
然后同样配置的Extender对打好分的Node列表进行重新打分
最终我们得到了打好分的Node列表,调用selectHost方法,找出得分最高的Node,如果有多个Node分数相同,则随机返回一个Node,Node挑选阶段就结束了。
bindingCycle过程
这一阶段的代码逻辑相对简单,这一过程主要执行插件的接口为:PreBind、Bind、PostBind。
// bindingCycle tries to bind an assumed Pod.
func (sched *Scheduler) bindingCycle(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) *framework.Status {
logger := klog.FromContext(ctx)
assumedPod := assumedPodInfo.Pod
// Run "permit" plugins.
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
if status.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatus: framework.NewDefaultNodeToStatus(),
UnschedulablePlugins: sets.New(status.Plugin()),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, status)
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status
}
// Run "prebind" plugins.
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
if status.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatus: framework.NewDefaultNodeToStatus(),
UnschedulablePlugins: sets.New(status.Plugin()),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, status)
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status
}
// Any failures after this point cannot lead to the Pod being considered unschedulable.
// We define the Pod as "unschedulable" only when Pods are rejected at specific extension points, and PreBind is the last one in the scheduling/binding cycle.
//
// We can call Done() here because
// we can free the cluster events stored in the scheduling queue sonner, which is worth for busy clusters memory consumption wise.
sched.SchedulingQueue.Done(assumedPod.UID)
// Run "bind" plugins.
if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
return status
}
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
logger.V(2).Info("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
if assumedPodInfo.InitialAttemptTimestamp != nil {
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
metrics.PodSchedulingSLIDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
}
// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}
return nil
}
首先会阻塞等待schedulingCycle阶段的Permit插件的执行完成,由于Permit阶段的Pod会以waitingPod的数据结构被保存到waitingPods这个Map中,waitingPod里面有一个channel,当Permit插件执行完时,Pod会从waitingPods这个Map中移除,并会发送一个信号到channel,这样。
然后调用Framework.RunPreBindPlugins方法,执行所有PreBind插件的preBind方法。对于volumeBinding这个插件来说,这一步最核心的内容是发送更新PV和PVC的请求给kube-apiserver,通知PV控制器完成PVC和PV的绑定,并等待PVC进入bound状态。
接下来就是调用Bind插件,更新Pod的nodeName字段。
最后记录一些监控项和日志后,执行PostBind插件,该Pod就调度过程解析,Scheduler将会继续调度下一个Pod。
至此,kube-scheduler的整体代码逻辑和核心流程梳理完成。整体过程大致如下:
总结
kube-scheduler是kubernetes相对来说最稳定的核心组件,作为容器调度平台,调度器也是kubernetes的核心,通过阅读kube-scheduler的代码,可以学习到很多东西,比如插件化,如何做到既能处理复杂逻辑的同时还能拥有很高的可拓展性,再比如并发处理,调度的过程如何提升性能等。这篇也还只是了解了调度器的整体处理过程和核心代码逻辑,还有很多细节需要后续仔细阅读,接下来打算自己实现一个调度插件和调度器来加深对调度器的理解。
评论区