Kubernetes Scheduler Scheduling Framework

简介

Kubernetes Scheduler机制概览中就介绍过Scheduling Framework这个新的调度框架,它通过Plugins以及Extension Points的方式对调度框架进行了重构,通过在各个预定义的扩展点,插入Plugin的方式,扩展Scheduler的功能,核心的调度算法逻辑,也都放到了Plugin中,如果默认的调度器中的Plugin不能满足需求,可以自己写插件,但是要重新编译代码。

关于Scheduling Framework的介绍以及配置,官方这两篇文档已经介绍的很详细了:

本篇文章主要来介绍下Scheduling Framework的插件机制的实现原理,以及如何写自己的插件。

实现原理

在Scheduling Framework中有这么几个概念:Profile, Framework, ExtensionPoints, Registry, Plugin。Framework是最核心的结构,在Framework中定义了一些扩展点,即ExtensionPoints,每一个扩展点都包含一些Plugin,在调度时,会依次执行Framework中的ExtensionPoints中的每一个Plugin,而Profile相当于是Framework的配置文件,它配置了哪些ExtensionPoint包含哪些Plugin,而Registry则是Plugin的工厂方法集合,插件的构造方法都需要向Registry中注册。他们之间的关系如下图所示:

Profile

Profile相当于是Framework的配置文件,它决定了Framework中各个ExtensionPoints包含哪些插件,通过将插件的name加入Enable/Disable列表中进行配置,相关结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# kubernetes/pkg/scheduler/apis/config/types.go

type KubeSchedulerProfile struct {
SchedulerName string
Plugins *Plugins
PluginConfig []PluginConfig
}

type Plugins struct {
QueueSort *PluginSet
PreFilter *PluginSet
Filter *PluginSet
......
}

type PluginSet struct {
Enabled []Plugin
Disabled []Plugin
}

type Plugin struct {
Name string
Weight int32
}

Profile可以通过scheduler的配置文件--config file进行配置,比如官方文档给出的示例:

1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
profiles:
- plugins:
score:
disabled:
- name: NodeResourcesLeastAllocated
enabled:
- name: MyCustomPluginA
weight: 2
- name: MyCustomPluginB
weight: 1

这个配置的意思是把score这个ExtensionPoint中默认的NodeResourcesLeastAllocated插件给禁用掉,启用自定义的两个插件。

此外,Profile还有一个特性就是它是可以定义多个的,每个Profile会对应的创建一个Framework出来,意思是可以任意组合这些Plugin创建出多种调度策略,每一个调度策略都有一个名称,创建pod时,可以指定使用哪种调度策略,Scheduler已经内置了一个叫做”default-scheduler”的Profile,如果建pod时没有指定哪种调度策略,就使用默认的,先来看下默认的调度策略,都定义了哪些plugin:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# kubernetes/pkg/scheduler/algorithmprovider/registry.go

func getDefaultConfig() *schedulerapi.Plugins {
return &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
},
},
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
{Name: nodeports.Name},
{Name: interpodaffinity.Name},
},
},
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: nodeunschedulable.Name},
{Name: noderesources.FitName},
{Name: nodename.Name},
{Name: nodeports.Name},
{Name: nodeaffinity.Name},
{Name: volumerestrictions.Name},
{Name: tainttoleration.Name},
{Name: nodevolumelimits.EBSName},
{Name: nodevolumelimits.GCEPDName},
{Name: nodevolumelimits.CSIName},
{Name: nodevolumelimits.AzureDiskName},
{Name: volumebinding.Name},
{Name: volumezone.Name},
{Name: interpodaffinity.Name},
},
},
PreScore: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: defaultpodtopologyspread.Name},
{Name: tainttoleration.Name},
},
},
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.BalancedAllocationName, Weight: 1},
{Name: imagelocality.Name, Weight: 1},
{Name: interpodaffinity.Name, Weight: 1},
{Name: noderesources.LeastAllocatedName, Weight: 1},
{Name: nodeaffinity.Name, Weight: 1},
{Name: nodepreferavoidpods.Name, Weight: 10000},
{Name: defaultpodtopologyspread.Name, Weight: 1},
{Name: tainttoleration.Name, Weight: 1},
},
},
Reserve: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
Unreserve: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
PreBind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
Bind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: defaultbinder.Name},
},
},
PostBind: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: volumebinding.Name},
},
},
}
}

如果想要指定多个Profile的话,可以参考官方文档的一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: default-scheduler
- schedulerName: no-scoring-scheduler
plugins:
preScore:
disabled:
- name: '*'
score:
disabled:
- name: '*'

这里除了”default-scheduler”之外,还定义了一个”no-scoring-scheduler”,它将score相关的plugin都给disable掉了,注意自定义的profile会跟”default-scheduler”做merge操作,相当于是除了没有score相关的plugin之外,其他都跟default-scheduler的配置一样。创建pod时,可以通过”.spec.schedulerName”来指定使用哪种Profile。

Framework

Framework会使用Profile来决定往ExtensionPoints中添加哪些插件,先来看下Framework的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# kubernetes/pkg/scheduler/framework/v1alpha1/framework.go

type framework struct {
registry Registry
snapshotSharedLister SharedLister
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
preFilterPlugins []PreFilterPlugin
filterPlugins []FilterPlugin
preScorePlugins []PreScorePlugin
scorePlugins []ScorePlugin
reservePlugins []ReservePlugin
preBindPlugins []PreBindPlugin
bindPlugins []BindPlugin
postBindPlugins []PostBindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin

clientSet clientset.Interface
informerFactory informers.SharedInformerFactory

metricsRecorder *metricsRecorder

preemptHandle PreemptHandle

runAllFilters bool
}

preFilterPlugins, filterPlugins这样的成员变量,就是所谓的ExtensionPoints,在创建Framework时,如果Profile中对应的插件是Enable的,那么会调用registry中对应的插件的工厂方法实例化这个插件,然后添加到这个列表中的,而framework这个结构体又实现了Framework定义的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# kubernetes/pkg/scheduler/framework/v1alpha1/interface.go

type Framework interface {
FrameworkHandle
QueueSortFunc() LessFunc
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) PluginToStatus
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
RunPreScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status)
RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
HasFilterPlugins() bool
HasScorePlugins() bool
ListPlugins() map[string][]config.Plugin
}

type FrameworkHandle interface {
SnapshotSharedLister() SharedLister
IterateOverWaitingPods(callback func(WaitingPod))
GetWaitingPod(uid types.UID) WaitingPod
RejectWaitingPod(uid types.UID)
ClientSet() clientset.Interface
SharedInformerFactory() informers.SharedInformerFactory
}

在调度时,会调用framework中定义的这些RunXXXPlugins()方法,在这些方法中,又依次遍历ExtensionPoints中注册的插件的方法,去执行具体的调度逻辑。

再来看下这些插件的接口是如何定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# kubernetes/pkg/scheduler/framework/v1alpha1/interface.go

type Plugin interface {
Name() string
}
type QueueSortPlugin interface {
Plugin
Less(*QueuedPodInfo, *QueuedPodInfo) bool
}
type PreFilterPlugin interface {
Plugin
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status
PreFilterExtensions() PreFilterExtensions
}
type FilterPlugin interface {
Plugin
Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}
type PreScorePlugin interface {
Plugin
PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
}
type ScorePlugin interface {
Plugin
Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)
ScoreExtensions() ScoreExtensions
}
type ReservePlugin interface {
Plugin
Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
type PreBindPlugin interface {
Plugin
PreBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
type PostBindPlugin interface {
Plugin
PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}
type UnreservePlugin interface {
Plugin
Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}
type PermitPlugin interface {
Plugin
Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
}
type BindPlugin interface {
Plugin
Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}

内置的插件,都在kubernetes/pkg/scheduler/framework/plugins/目录下,一个插件其实可以实现多个上述的接口,在不同的阶段执行不同的操作。自定义的插件,则需要实现对应的接口方法即可。

Registry

Registry是用来注册创建插件实例的工厂方法的结构体,其结构如下:

1
2
3
4
# kubernetes/pkg/scheduler/framework/v1alpha1/registry.go

type PluginFactory = func(configuration runtime.Object, f FrameworkHandle) (Plugin, error)
type Registry map[string]PluginFactory

Registry中包含两种插件,一种是Kubernetes内置的插件,叫做InTreeRegistry,一种是外部自定义的插件,叫做OutOfTreeRegistry。内置的和外置的插件会在创建Scheduler时,进行合并到同一个Registry中:

1
2
3
4
5
6
# kubernetes/pkg/scheduler/scheduler.go

registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}

内置的插件,都在kubernetes/pkg/scheduler/framework/plugins/目录下,目前有如下内置的插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# kubernetes/pkg/scheduler/framework/plugins/registry.go

func NewInTreeRegistry() framework.Registry {
return framework.Registry{
defaultpodtopologyspread.Name: defaultpodtopologyspread.New,
imagelocality.Name: imagelocality.New,
tainttoleration.Name: tainttoleration.New,
nodename.Name: nodename.New,
nodeports.Name: nodeports.New,
nodepreferavoidpods.Name: nodepreferavoidpods.New,
nodeaffinity.Name: nodeaffinity.New,
podtopologyspread.Name: podtopologyspread.New,
nodeunschedulable.Name: nodeunschedulable.New,
noderesources.FitName: noderesources.NewFit,
noderesources.BalancedAllocationName: noderesources.NewBalancedAllocation,
noderesources.MostAllocatedName: noderesources.NewMostAllocated,
noderesources.LeastAllocatedName: noderesources.NewLeastAllocated,
noderesources.RequestedToCapacityRatioName: noderesources.NewRequestedToCapacityRatio,
noderesources.ResourceLimitsName: noderesources.NewResourceLimits,
volumebinding.Name: volumebinding.New,
volumerestrictions.Name: volumerestrictions.New,
volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: nodevolumelimits.NewCSI,
nodevolumelimits.EBSName: nodevolumelimits.NewEBS,
nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD,
nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk,
nodevolumelimits.CinderName: nodevolumelimits.NewCinder,
interpodaffinity.Name: interpodaffinity.New,
nodelabel.Name: nodelabel.New,
serviceaffinity.Name: serviceaffinity.New,
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
}
}

而外置的插件是怎么传递进来的呢?是在kube-scheduler的main方法中传进来的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# kubernetes/cmd/kube-scheduler/app/server.go

func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
......
}

# kubernetes/cmd/kube-scheduler/scheduler.go

func main() {
rand.Seed(time.Now().UnixNano())

command := app.NewSchedulerCommand()
......
}

但是可以看到,kubernetes中内置的main()并没有传递这个参数,因此如果想要自定义插件的话,就必须要改main()方法,将这个参数传递进去,因此就需要重新编译kube-scheduler这个组件了。而在app/server.go中,已经为自定义插件留好了口子:

1
2
3
4
5
6
7
# kubernetes/cmd/kube-scheduler/app/server.go

func WithPlugin(name string, factory framework.PluginFactory) Option {
return func(registry framework.Registry) error {
return registry.Register(name, factory)
}
}

需要在外面的一个项目中,编写自己的插件,主要是实现对应的插件定义的接口方法,以及该插件的工厂方法,然后重写kube-scheduler的main()方法,调用WithPlugin()方法,构造一个Option,将其传给app.NewSchedulerCommand(),然后重新编译这个kube-scheduler,再结合Profile,将自定义的插件注册到对应的ExtensionPoint中,就可以了。

社区的项目scheduler-plugins就维护了一些outoftree的插件,来看下它重写的main()方法的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
rand.Seed(time.Now().UnixNano())

// Register custom plugins to the scheduler framework.
// Later they can consist of scheduler profile(s) and hence
// used by various kinds of workloads.
command := app.NewSchedulerCommand(
app.WithPlugin(capacityscheduling.Name, capacityscheduling.New),
app.WithPlugin(coscheduling.Name, coscheduling.New),
app.WithPlugin(noderesources.AllocatableName, noderesources.NewAllocatable),
// Sample plugins below.
app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
app.WithPlugin(podstate.Name, podstate.New),
app.WithPlugin(qos.Name, qos.New),
)
......
}

总结

本文介绍了Scheduling Framework的实现原理,以及如何去实现自定义的插件。整体上看,这个框架设计的还是相当不错的,尤其是插件机制,以及多Profile的支持,是一个很好的设计典范。不过唯一有点别扭的是自定义插件竟然需要去更改main()方法,并且需要单独编译kube-scheduler,也许有更优雅的实现方式。

作者

hackerain

发布于

2020-12-21

更新于

2023-10-26

许可协议