Kubernetes APIServer Storage 框架解析

Kubernetes使用etcd作为后端存储,etcd是一个分布式的,高可靠性的键值存储系统,跟传统的平台系统不同,Kubernetes把所有的数据都存储到了kv数据库中,而没有像OpenStack一样使用像MySQL这种关系型数据库,做这种选型的原因,我想可能一方面是由于Kubernetes中存储的数据,关系性不是很强,更多的是类似配置管理这类数据,一方面是由于etcd的特性,像效率比较高的gRPC接口、支持事务以及Kubernetes严重依赖的Watch机制等,能够通过单一数据库就满足它的需求,不用再引入其他组件实现类似功能,简化了架构的复杂性。

本篇文章主要来介绍下Kubernetes APIServer是如何跟存储打交道的,不涉及存储底层的细节,只到存储接口层,即主要介绍Kubernetes的存储框架是怎么样的,如何做的抽象,它里面的资源是如何存到etcd里面去的。在介绍具体的流程机制之前,我们先来介绍下Kubernetes里面几个相关的抽象,从顶层看下是如何做的设计。

顶层抽象

资源、类别以及对象

在API中抽象出来资源(Resource)、类别(Kind)以及对象(Object)这几个概念,其相关的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type GroupVersionResource struct {
Group string
Version string
Resource string
}

type GroupVersionKind struct {
Group string
Version string
Kind string
}

type GroupResource struct {
Group string
Resource string
}

type GroupKind struct {
Group string
Kind string
}

type Object interface {
GetObjectKind() schema.ObjectKind
DeepCopyObject() Object
}

type ObjectKind interface {
SetGroupVersionKind(kind GroupVersionKind)
GroupVersionKind() GroupVersionKind
}

我们知道Kubernetes中的API对象都是带版本以及分组的,比如/apis/networking.k8s.io/v1beta1/ingresses/apis是前缀,networking.k8s.io就是组(Group),v1beta1就是版本(Version),ingresses就是上面提到的资源(Resource)或者类别(Kind),至于Object则是对API对象的抽象接口,具体的API对象则都实现了这些接口,在golang里,实现了这些接口的结构体,都可以用这个type xxx interface来统一表示,类似于父类的概念,所以Object可以代表所有实现了它的接口的对象,通常作为方法的参数或者返回值。可见,这三个概念其实都代表的是同一个意思,都是对像pod, service, ingress等这些API对象的抽象,但是表现形式不同,用途也不一样……:-(所以之前说的Kubernetes代码复杂就复杂在这些地方,抽象的云里雾里的:-)

这些结构体和接口定义在apimachinery这个库中,这个库可以说是Kubernetes中最高层的抽象,除了上面说的Resource, Kind, Object,还有各种类型定义、序列化、类型转换之类的抽象,都是会被其他的库引用到的一些结构体或者方法。

etcd存储接口

所谓etcd存储接口是对etcd数据库的增删查改的抽象,其定义在apiserverapiserver/pkg/storage/interfaces.go文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Interface interface {
Versioner() Versioner
Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
Count(key string) (int64, error)
}

可以看到,跟我们传统的数据库应用不同的地方在于,它的接口比较少,只有这么几个,比如Create()方法,一般我们写应用程序,要存一个东西,都会有类似CreateXXX()这样的方法,比如CreatePerson(person Person),就是保存一个Person对象到数据库中,然后会有一堆这样的CreateXXX()方法来对不同的对象进行存储,但是这里Create()方法定义的,则是一个高度抽象的方法,obj是要存进去的对象,即上面说到的Object,至于这个对象具体是什么,其实是看方法调用者构造了一个什么对象传给它,key则是其键值,实际上etcd3 store在实现这些接口时,会将obj进行编码,即序列化,然后再存到数据库中,这种方法大大减少了数据库层的代码量,也充分利用了kv数据库的特性。

此外,apiserver 这个库是将构建APIServer的一些通用代码抽出来,独立构成了一个库,以便代码复用,可以给第三方应用构建扩展使用,Kubernetes APIServer的实现大量依赖了该库。

REST存储接口

Kubernetes API是RESTful API,它的每一种REST资源,比如pod, service, ingress,在APIServer中,都有一个Store与之对应,通过实现统一的接口,来实现对REST资源的增删改查等操作,这些统一的接口定义在 apiserver/pkg/registry/rest/rest.go 文件中,列举几个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object

// Destroy cleans up its resources on shutdown.
// Destroy has to be implemented in thread-safe way and be prepared
// for being called more than once.
Destroy()
}

// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}

// Creater is an object that can create an instance of a RESTful object.
type Creater interface {
New() runtime.Object
Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
}

// GracefulDeleter knows how to pass deletion options to allow delayed deletion of a
// RESTful object.
type GracefulDeleter interface {
Delete(ctx context.Context, name string, deleteValidation ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error)
}

// Watcher should be implemented by all Storage objects that
// want to offer the ability to watch for changes through the watch api.
type Watcher interface {
Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error)
}

即针对某一个API对象的REST操作,会由对应的Store中的方法进行处理,这个Store又引用了实现了etcd存储接口的Store,从而可以对数据库进行操作。

底层实现

上一小节,主要介绍了两类存储接口,一类是针对etcd的存储接口,一类是针对REST的存储接口,下面我们来分别说一下实现这两类接口的方法和结构体。

etcd存储接口实现

针对etcd存储接口的实现,在apiserver/pkg/storage/etcd3/store.go 这个文件中,最主要的结构体为:

1
2
3
4
5
6
7
8
9
10
11
12
13
type store struct {
client *clientv3.Client
// getOpts contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
}

可见该结构体最重要的属性为client,即直接调用到了etcd的client库,通过该client可以对etcd进行操作,在该结构体上,还实现了apiserver/pkg/storage/interfaces.go中定义的接口,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool){
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64){
data, err := runtime.Encode(s.codec, obj)
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))

txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()

if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}

以上代码为简化代码,忽略了一些不重要的逻辑,且只列出GETCreate两个方法,其他未列出。可以看到GET方法,通过一个string类型的key,从etcd中取出了对应的value,然后通过decode进行解码,将数据解码到out这个Object中,然后将其返回。而CREATE方法,则反过来,先将obj进行编码,然后通过etcd client将数据通过事务的方式保存到etcd中。这个编码解码的过程,就是常说的序列化的过程。

REST存储接口实现

REST存储接口的实现在 apiserver/pkg/registry/generic/retistry/store.go 这个文件中,定义了如下的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
type Store struct {
NewFunc func() runtime.Object
NewListFunc func() runtime.Object
KeyFunc func(ctx context.Context, name string) (string, error)
ObjectNameFunc func(obj runtime.Object) (string, error)

......

Storage DryRunnableStorage // etcd3.store, implement the storage.Interface
DestroyFunc func()
StorageVersioner runtime.GroupVersioner
}

该结构体实现了上面小节中REST存储定义的各种接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (e *Store) New() runtime.Object {
return e.NewFunc()
}

func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
obj := e.NewFunc()
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return obj, nil
}

func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}

if createValidation != nil {
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, err
}
}

name, err := e.ObjectNameFunc(obj)

key, err := e.KeyFunc(ctx, name)

qualifiedResource := e.qualifiedResourceFromContext(ctx)
ttl, err := e.calculateTTL(obj, 0, false)

out := e.NewFunc()

if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
......
}
......
return out, nil
}

Store结构体中,有一个非常重要的成员 Storage DryRunnableStorage,它即是对etcd store的引用,其定义如下:

1
2
3
4
5
6
7
8
type DryRunnableStorage struct {
Storage storage.Interface
Codec runtime.Codec
}

func (s *DryRunnableStorage) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
return s.Storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
}

它的成员Storage storage.Interface即是上面小节中提到的etcd存储接口,之所以中间又套了一层DryRunnableStorage主要是为了编写单元测试方便,针对真实写数据库的操作,可以让它DryRun,而不实际写数据库。

至此,我们知道Kubernetes中定义了两种Store,分别是针对REST资源的Store,以及针对etcd数据库的Store,包括他们各自实现的接口方法,后文我们将他们称为REST store以及etcd store,这两个store之间是引用的关系是REST store引用了etcd store。上层实例化一个REST store,则会同时实例化一个etcd store,用于对数据库的增删查改操作,即上面代码中的e.Storage.Create()e.Storage.Get()等,即是调用etcd store去读写数据库。

上层应用

这里所说的上层应用,指的是Kubernetes中是如何使用上面说到的REST storeetcd store这两个Store的。

etcd存储

本小节主要是介绍下如何构建出etcd store实体的,首先来看下最上面的代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# cmd/kube-apiserver/app/server.go

buildGenericConfig() {
genericConfig = genenricapiserver.NewConfig()

// 下面三行代码,构造了一个StorageFactoryConfig,给它的各个属性赋值
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)

// 从StorageFactoryConfig New了一个DefaultStorageFactory
storageFactory, lastErr = completedStorageFactoryConfig.New()

// 使用storageFactory构造了一个StorageFactoryRestOptionsFactory,赋值给genericConfig的RESTOptionsGetter属性
s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
}

buildGenericConfig()就是在Kubernetes APIServer 机制概述中介绍过的CreateServerChain阶段,构建的配置项,将和APIServer相关的很多通用的配置项都集合在这个里面,这里我们只关注和存储相关的配置项,即上面列出的那几行代码,其实这几行代码,体现了两个在Kubernetes中非常常见的设计模式,一个是Config->Complete->New模式,一个是Factory工厂模式。

所谓Config->Complete->New模式,即首先构建一个Config,即配置项,然后通过Complete()方法进一步补充完善该Config,然后从该Config创建出真正的实体,创建该实体相关的信息,都在该Config中。在上面的例子中,StorageFactoryConfig,就是Config,然后通过Complete()方法,从s.Etcd中再进一步获取相关信息,补充完善该Config,得到completedStorageFactoryConfig,然后再调用New()方法,就可以得到真正的实体,这里就是storageFactory。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

# kubernetes/cmd/kube-apiserver/app/options/options.go

s.Etcd = genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil))

# k8s.io/apiserver/pkg/server/options/etcd.go

type EtcdOptions struct {
StorageConfig storagebackend.Config // 在newEtcdOptions时就赋值进去的
EncryptionProviderConfigFilepath string
EtcdServersOverrides []string
DefaultStorageMediaType string
DeleteCollectionWorkers int
EnableGarbageCollection bool
EnableWatchCache bool
DefaultWatchCacheSize int
WatchCacheSizes []string
}

# kubernetes/pkg/kubeapiserver/default_storage_factory_builder.go

// NewStorageFactoryConfig returns a new StorageFactoryConfig set up with necessary resource overrides.
func NewStorageFactoryConfig() *StorageFactoryConfig {

resources := []schema.GroupVersionResource{
batch.Resource("cronjobs").WithVersion("v1beta1"),
networking.Resource("ingresses").WithVersion("v1beta1"),
networking.Resource("ingressclasses").WithVersion("v1beta1"),
apisstorage.Resource("csidrivers").WithVersion("v1beta1"),
}

return &StorageFactoryConfig{
Serializer: legacyscheme.Codecs,
DefaultResourceEncoding: serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme),
ResourceEncodingOverrides: resources,
}
}

// StorageFactoryConfig is a configuration for creating storage factory.
type StorageFactoryConfig struct {
StorageConfig storagebackend.Config
APIResourceConfig *serverstorage.ResourceConfig
DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig
DefaultStorageMediaType string
Serializer runtime.StorageSerializer
ResourceEncodingOverrides []schema.GroupVersionResource
EtcdServersOverrides []string
EncryptionProviderConfigFilepath string
}

// Complete completes the StorageFactoryConfig with provided etcdOptions returning completedStorageFactoryConfig.
func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) (*completedStorageFactoryConfig, error) {
c.StorageConfig = etcdOptions.StorageConfig // 从etcdOptions获取初始的StorageConfig
c.DefaultStorageMediaType = etcdOptions.DefaultStorageMediaType
c.EtcdServersOverrides = etcdOptions.EtcdServersOverrides
c.EncryptionProviderConfigFilepath = etcdOptions.EncryptionProviderConfigFilepath
return &completedStorageFactoryConfig{c}, nil
}

type completedStorageFactoryConfig struct {
*StorageFactoryConfig
}

// New returns a new storage factory created from the completed storage factory configuration.
func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) {
resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides)
storageFactory := serverstorage.NewDefaultStorageFactory(
c.StorageConfig,
c.DefaultStorageMediaType,
c.Serializer,
resourceEncodingConfig,
c.APIResourceConfig,
SpecialDefaultResourcePrefixes)

......

return storageFactory, nil
}

通过上面的方式,创建出来DefaultStorageFactory实例,然后就到了上面说到的Factory工厂模式,顾名思义,就是从工厂中生产出类似的实体,而它生产的实体,就是针对某个资源resource的存储配置StorageConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# k8s.io/apiserver/pkg/server/storage/storage_factory.go

type StorageFactory interface {
NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error)
ResourcePrefix(groupResource schema.GroupResource) string
Backends() []Backend
}

## DefaultStorageFactory实现了StorageFactory Interface

type DefaultStorageFactory struct {
StorageConfig storagebackend.Config
Overrides map[schema.GroupResource]groupResourceOverrides
DefaultResourcePrefixes map[schema.GroupResource]string
DefaultMediaType string
DefaultSerializer runtime.StorageSerializer
ResourceEncodingConfig ResourceEncodingConfig
APIResourceConfigSource APIResourceConfigSource
newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error)
}

func NewDefaultStorageFactory(
config storagebackend.Config,
defaultMediaType string,
defaultSerializer runtime.StorageSerializer,
resourceEncodingConfig ResourceEncodingConfig,
resourceConfig APIResourceConfigSource,
specialDefaultResourcePrefixes map[schema.GroupResource]string,
) *DefaultStorageFactory {
config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if len(defaultMediaType) == 0 {
defaultMediaType = runtime.ContentTypeJSON
}
return &DefaultStorageFactory{
StorageConfig: config,
Overrides: map[schema.GroupResource]groupResourceOverrides{},
DefaultMediaType: defaultMediaType,
DefaultSerializer: defaultSerializer,
ResourceEncodingConfig: resourceEncodingConfig,
APIResourceConfigSource: resourceConfig,
DefaultResourcePrefixes: specialDefaultResourcePrefixes,

newStorageCodecFn: NewStorageCodec,
}
}

func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource)

// operate on copy
storageConfig := s.StorageConfig
codecConfig := StorageCodecConfig{
StorageMediaType: s.DefaultMediaType,
StorageSerializer: s.DefaultSerializer,
}

if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
override.Apply(&storageConfig, &codecConfig)
}
if override, ok := s.Overrides[chosenStorageResource]; ok {
override.Apply(&storageConfig, &codecConfig)
}

codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
codecConfig.Config = storageConfig

storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)

return &storageConfig, nil
}

上面的NewConfig()是为某个resource创建出对应的存储配置,即StorageConfig,该配置中包含了etcd的连接、序列化等信息,即每种资源resource都有自己的一套存储配置,例如可以通过--etcd-servers-overrides配置项,来给某个单独的resource指定不同的后端存储,这种机制跟常规的应用很不一样,一般的应用访问数据库,都会有一个集中的数据库配置,所有的资源都是使用的一套配置,但是这里却细化到了按照资源种类去配置存储,这虽然看起来很灵活强大,但是不免有过度设计的嫌疑,谁会去把同一个集群里的资源分开存放到不同的数据库呢?或者有什么资源是需要有特殊的编解码方式的?至少我没有见过这种使用场景,Anyway,稍微吐槽下,还是转入正题吧。StorageConfigstoragebackend.Config,其初始值是来自于 EtcdOptions,然后传给 StorageFactoryConfigStorageFactoryConfig 再传给 StorageFactoryStorageFactory在给某个resource创建存储配置时,把StorageFactory中的StorageConfig复制了一份,再给它赋值了其他一些属性,比如负责编解码的Codec等,构成了针对某种资源特定的配置,StorageConfig对应的结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
# k8s.io/apiserver/pkg/storage/storagebackend/config.go

type Config struct {
Type string
Prefix string
Transport TransportConfig
Paging bool
Codec runtime.Codec
EncodeVersioner runtime.GroupVersioner
CompactionInterval time.Duration
CountMetricPollPeriod time.Duration
}

最终,s.Etcd.ApplyWithStorageFactoryTo()则是将上面构建出来的storageFactory构建出另外一个结构体StorageFactoryRestOptionsFactory,然后将其赋值给genericConfigRESTOptionsGetter属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# apiserver/pkg/server/options/etcd.go

(s *EtcdOptions)ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config){
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
}

type StorageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
}

(f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error){
// 拿到该resource对应的storageconfig,每个resource可以有不同的storage配置,主要设置上storageConfig的codec和encodeversioner
storageConfig, err := f.StorageFactory.NewConfig(resource)
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage, // decorator->factory->store 最终拿到了Storage.Interface
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
return ret
}

StorageFactoryRestOptionsFactory实现了一个非常重要的方法:GetRESTOptions(),在这个里面,首先通过上面介绍到的StorageFactoryNewConfig()方法来创建出针对某一种resource的存储配置项,然后构建了一个RESTOptions的结构体,里面包含了另外一个重要的成员:Decorator,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# apiserver/pkg/registry/generic/storage_decorator.go

func UndecoratedStorage(
config *storagebackend.Config,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config)
}

func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config)
}

注意,UndecoratedStorage是一个方法,所以 Decorator也是一个方法,该方法的作用就是要使用前面创建出来的StorageConfig来创建一个etcd store出来,可以看到这里传了很多参数进去,但是实际上只用了 config 这一个参数,其他参数可能以后会用到,在 NewRawStorage() 方法中又用到工厂模式,使用StorageConfig中的信息,来创建最终的Store,其流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# apiserver/pkg/storage/storagebackend/factory/factory.go

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

# apiserver/pkg/storage/storagebackend/factory/etcd3.go

func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
client, err := newETCD3Client(c.Transport)
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}

func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) {
cfg := clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: dialOptions,
Endpoints: c.ServerList,
TLS: tlsConfig,
}

return clientv3.New(cfg)
}

# apiserver/pkg/storage/etcd3/store.go

// Implement storage.Interface in apiserver/pkg/storage/interfaces.go
type store struct {
client *clientv3.Client
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
}

func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, pagingEnabled, codec, prefix, transformer)
}

func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, versioner, transformer),
leaseManager: newDefaultLeaseManager(c),
}
return result
}

经过了山路十八弯,终于看到创建出来了etcd store,即上面newETCD3Storage()方法内的逻辑,先创建了一个etcd client,然后将该client传给newStore(),构建出etcd store结构体,可以通过clientetcd打交道,同时序列化等信息存在codec等变量里。

综上,可以看到,虽然实现非常复杂,但是使用起来还是很简单的,因为实际上构建出来的StorageFactory是放到了GenericConfig中的RESTOptionsGetter,而GenericConfig是在CreateServerChain阶段就提前构建好的,因此,只需要向下面这样使用,就可以得到一个etcd store

1
2
3
4
5
opts := genericConfig.RESTOptionsGetter.GetRESTOptions(resource)
store := opts.Decorator(
opts.StorageConfig,
...
)

REST存储

在APIServer中,每一个API对象,都有一个REST Store与之对应,Kubernetes内置的API对象的REST store的相关逻辑,都位于kubernetes/pkg/registry/目录下,我们以pod为例,来说明下REST Store是如何构建出来的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# pkg/registry/core/pod/storage/storage.go

import (
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
)

type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}

type BindingREST struct {
store *genericregistry.Store
}

type StatusREST struct {
store *genericregistry.Store
}

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter...) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
......
}

options := &generic.StoreOptions{
RESTOptions: optsGetter
}

......

store.CompleteWithOptions(options)

......

statusStore := *store
statusStore.UpdateStrategy = registrypod.StatusStrategy
statusStore.ResetFieldsStrategy = registrypod.StatusStrategy

bindingREST := &BindingREST{store: store}

......

return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
......
Status: &StatusREST{store: &statusStore},
......
}
}

上面的 genericregistry.Store 就是 REST存储接口实现 介绍到的 REST store,一个Pod有很多种 REST store,比如上例中的 REST, BindingREST, StatusREST都是,这些类型都继承自 genericregistry.Store,注意该方法接受了一个参数optsGetter generic.RESTOptionsGetter,这个就是在上面etcd 存储应用中,介绍到的存储到genericConfigRESTOptionsGetter,我们来看看这里是怎么使用这个genericConfig.RESTOptionsGetter的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 每一种resource都是由这个Store组成的,它实现了rest的各种接口,而它里面又包含了一个Storage属性,是对etcd的一个封装,实现了在数据库层面的各种增删查改的接口,即实现了storage.Interface
// Implement rest Interfaces in apiserver/pkg/registry/rest/rest.go
// like Getter, Lister, Creater, Updater, Patcher, Watcher etc.
type Store struct {
NewFunc func() runtime.Object
NewListFunc func() runtime.Object
KeyFunc func(ctx context.Context, name string) (string, error)
ObjectNameFunc func(obj runtime.Object) (string, error)

......

Storage DryRunnableStorage // etcd3.store, implement the storage.Interface
DestroyFunc func()
StorageVersioner runtime.GroupVersioner
}

(e *Store) CompleteWithOptions(options *generic.StoreOptions){
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
e.Storage.Codec = opts.StorageConfig.Codec
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
}

即在store.CompleteWithOptions(options)方法中,调用了GetRESTOptions()方法获取到存储配置信息,然后再调用Decorator()方法创建出etcd store实体,存储到DryRunnableStorage中。

总结

本文主要梳理了API对象存储相关的两个层面的存储接口以及其实现和应用,即REST storeetcd store,在脑海中建立起来的整体画像应该是,每一个API对象,都有对应的REST storeetcd store,这两者之间是引用的关系,REST store引用etcd store来操作数据库etcd,REST store是面向RESTful api侧, etcd store则面向数据库侧。

作者

hackerain

发布于

2020-09-19

更新于

2023-11-05

许可协议