Kubernetes使用etcd 作为后端存储,etcd是一个分布式的,高可靠性的键值存储系统,跟传统的平台系统不同,Kubernetes把所有的数据都存储到了kv数据库中,而没有像OpenStack一样使用像MySQL这种关系型数据库,做这种选型的原因,我想可能一方面是由于Kubernetes中存储的数据,关系性不是很强,更多的是类似配置管理这类数据,一方面是由于etcd的特性,像效率比较高的gRPC接口、支持事务以及Kubernetes严重依赖的Watch机制等,能够通过单一数据库就满足它的需求,不用再引入其他组件实现类似功能,简化了架构的复杂性。
本篇文章主要来介绍下Kubernetes APIServer是如何跟存储打交道的,不涉及存储底层的细节,只到存储接口层,即主要介绍Kubernetes的存储框架是怎么样的,如何做的抽象,它里面的资源是如何存到etcd里面去的。在介绍具体的流程机制之前,我们先来介绍下Kubernetes里面几个相关的抽象,从顶层看下是如何做的设计。
顶层抽象 资源、类别以及对象 在API中抽象出来资源(Resource)、类别(Kind)以及对象(Object)这几个概念,其相关的结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 type GroupVersionResource struct { Group string Version string Resource string } type GroupVersionKind struct { Group string Version string Kind string } type GroupResource struct { Group string Resource string } type GroupKind struct { Group string Kind string } type Object interface { GetObjectKind() schema.ObjectKind DeepCopyObject() Object } type ObjectKind interface { SetGroupVersionKind(kind GroupVersionKind) GroupVersionKind() GroupVersionKind }
我们知道Kubernetes中的API对象都是带版本以及分组的,比如/apis/networking.k8s.io/v1beta1/ingresses
,/apis
是前缀,networking.k8s.io
就是组(Group),v1beta1
就是版本(Version),ingresses
就是上面提到的资源(Resource)或者类别(Kind),至于Object
则是对API对象的抽象接口,具体的API对象则都实现了这些接口,在golang里,实现了这些接口的结构体,都可以用这个type xxx interface
来统一表示,类似于父类的概念,所以Object
可以代表所有实现了它的接口的对象,通常作为方法的参数或者返回值。可见,这三个概念其实都代表的是同一个意思,都是对像pod
, service
, ingress
等这些API对象的抽象,但是表现形式不同,用途也不一样……:-(所以之前说的Kubernetes代码复杂就复杂在这些地方,抽象的云里雾里的:-)
这些结构体和接口定义在apimachinery 这个库中,这个库可以说是Kubernetes中最高层的抽象,除了上面说的Resource, Kind, Object,还有各种类型定义、序列化、类型转换之类的抽象,都是会被其他的库引用到的一些结构体或者方法。
etcd存储接口 所谓etcd存储接口是对etcd数据库的增删查改的抽象,其定义在apiserver 的apiserver/pkg/storage/interfaces.go
文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Interface interface { Versioner() Versioner Create(ctx context.Context, key string , obj, out runtime.Object, ttl uint64 ) error Delete(ctx context.Context, key string , out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error Watch(ctx context.Context, key string , resourceVersion string , p SelectionPredicate) (watch.Interface, error ) WatchList(ctx context.Context, key string , resourceVersion string , p SelectionPredicate) (watch.Interface, error ) Get(ctx context.Context, key string , resourceVersion string , objPtr runtime.Object, ignoreNotFound bool ) error GetToList(ctx context.Context, key string , resourceVersion string , p SelectionPredicate, listObj runtime.Object) error List(ctx context.Context, key string , resourceVersion string , p SelectionPredicate, listObj runtime.Object) error GuaranteedUpdate( ctx context.Context, key string , ptrToType runtime.Object, ignoreNotFound bool , precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error Count(key string ) (int64 , error ) }
可以看到,跟我们传统的数据库应用不同的地方在于,它的接口比较少,只有这么几个,比如Create()
方法,一般我们写应用程序,要存一个东西,都会有类似CreateXXX()
这样的方法,比如CreatePerson(person Person)
,就是保存一个Person
对象到数据库中,然后会有一堆这样的CreateXXX()
方法来对不同的对象进行存储,但是这里Create()
方法定义的,则是一个高度抽象的方法,obj
是要存进去的对象,即上面说到的Object
,至于这个对象具体是什么,其实是看方法调用者构造了一个什么对象传给它,key
则是其键值,实际上etcd3 store
在实现这些接口时,会将obj
进行编码,即序列化,然后再存到数据库中,这种方法大大减少了数据库层的代码量,也充分利用了kv数据库的特性。
此外,apiserver 这个库是将构建APIServer的一些通用代码抽出来,独立构成了一个库,以便代码复用,可以给第三方应用构建扩展使用,Kubernetes APIServer的实现大量依赖了该库。
REST存储接口 Kubernetes API是RESTful API,它的每一种REST资源,比如pod
, service
, ingress
,在APIServer中,都有一个Store与之对应,通过实现统一的接口,来实现对REST资源的增删改查等操作,这些统一的接口定义在 apiserver/pkg/registry/rest/rest.go
文件中,列举几个:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 type Storage interface { New() runtime.Object Destroy() } type Getter interface { Get(ctx context.Context, name string , options *metav1.GetOptions) (runtime.Object, error ) } type Creater interface { New() runtime.Object Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error ) } type GracefulDeleter interface { Delete(ctx context.Context, name string , deleteValidation ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool , error ) } type Watcher interface { Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error ) }
即针对某一个API对象的REST操作,会由对应的Store中的方法进行处理,这个Store又引用了实现了etcd存储接口的Store,从而可以对数据库进行操作。
底层实现 上一小节,主要介绍了两类存储接口,一类是针对etcd的存储接口,一类是针对REST的存储接口,下面我们来分别说一下实现这两类接口的方法和结构体。
etcd存储接口实现 针对etcd存储接口的实现,在apiserver/pkg/storage/etcd3/store.go
这个文件中,最主要的结构体为:
1 2 3 4 5 6 7 8 9 10 11 12 13 type store struct { client *clientv3.Client getOps []clientv3.OpOption codec runtime.Codec versioner storage.Versioner transformer value.Transformer pathPrefix string watcher *watcher pagingEnabled bool leaseManager *leaseManager }
可见该结构体最重要的属性为client
,即直接调用到了etcd的client库,通过该client
可以对etcd进行操作,在该结构体上,还实现了apiserver/pkg/storage/interfaces.go
中定义的接口,比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (s *store) Get(ctx context.Context, key string , resourceVersion string , out runtime.Object, ignoreNotFound bool ){ getResp, err := s.client.KV.Get(ctx, key, s.getOps...) kv := getResp.Kvs[0 ] data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key)) return decode(s.codec, s.versioner, data, out, kv.ModRevision) } func (s *store) Create(ctx context.Context, key string , obj, out runtime.Object, ttl uint64 ){ data, err := runtime.Encode(s.codec, obj) newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key)) txnResp, err := s.client.KV.Txn(ctx).If( notFound(key), ).Then( clientv3.OpPut(key, string (newData), opts...), ).Commit() if out != nil { putResp := txnResp.Responses[0 ].GetResponsePut() return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) } }
以上代码为简化代码,忽略了一些不重要的逻辑,且只列出GET
和Create
两个方法,其他未列出。可以看到GET
方法,通过一个string
类型的key
,从etcd中取出了对应的value
,然后通过decode进行解码,将数据解码到out
这个Object
中,然后将其返回。而CREATE
方法,则反过来,先将obj
进行编码,然后通过etcd client
将数据通过事务的方式保存到etcd中。这个编码解码的过程,就是常说的序列化的过程。
REST存储接口实现 REST存储接口的实现在 apiserver/pkg/registry/generic/retistry/store.go
这个文件中,定义了如下的结构体:
1 2 3 4 5 6 7 8 9 10 11 12 type Store struct { NewFunc func () runtime.Object NewListFunc func () runtime.Object KeyFunc func (ctx context.Context, name string ) (string , error ) ObjectNameFunc func (obj runtime.Object) (string , error ) ...... Storage DryRunnableStorage DestroyFunc func () StorageVersioner runtime.GroupVersioner }
该结构体实现了上面小节中REST存储定义的各种接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func (e *Store) New() runtime.Object { return e.NewFunc() } func (e *Store) Get(ctx context.Context, name string , options *metav1.GetOptions) (runtime.Object, error ) { obj := e.NewFunc() key, err := e.KeyFunc(ctx, name) if err != nil { return nil , err } if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false ); err != nil { return nil , storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name) } if e.Decorator != nil { if err := e.Decorator(obj); err != nil { return nil , err } } return obj, nil } func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error ) { if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { return nil , err } if createValidation != nil { if err := createValidation(ctx, obj.DeepCopyObject()); err != nil { return nil , err } } name, err := e.ObjectNameFunc(obj) key, err := e.KeyFunc(ctx, name) qualifiedResource := e.qualifiedResourceFromContext(ctx) ttl, err := e.calculateTTL(obj, 0 , false ) out := e.NewFunc() if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil { ...... } ...... return out, nil }
在Store
结构体中,有一个非常重要的成员 Storage DryRunnableStorage
,它即是对etcd store的引用,其定义如下:
1 2 3 4 5 6 7 8 type DryRunnableStorage struct { Storage storage.Interface Codec runtime.Codec } func (s *DryRunnableStorage) Get(ctx context.Context, key string , resourceVersion string , objPtr runtime.Object, ignoreNotFound bool ) error { return s.Storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) }
它的成员Storage storage.Interface
即是上面小节中提到的etcd存储接口
,之所以中间又套了一层DryRunnableStorage
主要是为了编写单元测试方便,针对真实写数据库的操作,可以让它DryRun,而不实际写数据库。
至此,我们知道Kubernetes中定义了两种Store,分别是针对REST资源的Store,以及针对etcd数据库的Store,包括他们各自实现的接口方法,后文我们将他们称为REST store
以及etcd store
,这两个store之间是引用的关系是REST store
引用了etcd store
。上层实例化一个REST store
,则会同时实例化一个etcd store
,用于对数据库的增删查改操作,即上面代码中的e.Storage.Create()
,e.Storage.Get()
等,即是调用etcd store
去读写数据库。
上层应用 这里所说的上层应用,指的是Kubernetes中是如何使用上面说到的REST store
和etcd store
这两个Store的。
etcd存储 本小节主要是介绍下如何构建出etcd store
实体的,首先来看下最上面的代码逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # cmd/kube-apiserver/app/server.go buildGenericConfig() { genericConfig = genenricapiserver.NewConfig() storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd) storageFactory, lastErr = completedStorageFactoryConfig.New() s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig) }
buildGenericConfig()
就是在Kubernetes APIServer 机制概述 中介绍过的CreateServerChain
阶段,构建的配置项,将和APIServer相关的很多通用的配置项都集合在这个里面,这里我们只关注和存储相关的配置项,即上面列出的那几行代码,其实这几行代码,体现了两个在Kubernetes中非常常见的设计模式,一个是Config->Complete->New
模式,一个是Factory
工厂模式。
所谓Config->Complete->New
模式,即首先构建一个Config,即配置项,然后通过Complete()方法进一步补充完善该Config,然后从该Config创建出真正的实体,创建该实体相关的信息,都在该Config中。在上面的例子中,StorageFactoryConfig,就是Config,然后通过Complete()方法,从s.Etcd中再进一步获取相关信息,补充完善该Config,得到completedStorageFactoryConfig,然后再调用New()方法,就可以得到真正的实体,这里就是storageFactory。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 # kubernetes/cmd/kube-apiserver/app/options/options.go s.Etcd = genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil )) # k8s.io/apiserver/pkg/server/options/etcd.go type EtcdOptions struct { StorageConfig storagebackend.Config EncryptionProviderConfigFilepath string EtcdServersOverrides []string DefaultStorageMediaType string DeleteCollectionWorkers int EnableGarbageCollection bool EnableWatchCache bool DefaultWatchCacheSize int WatchCacheSizes []string } # kubernetes/pkg/kubeapiserver/default_storage_factory_builder.go func NewStorageFactoryConfig () *StorageFactoryConfig { resources := []schema.GroupVersionResource{ batch.Resource("cronjobs" ).WithVersion("v1beta1" ), networking.Resource("ingresses" ).WithVersion("v1beta1" ), networking.Resource("ingressclasses" ).WithVersion("v1beta1" ), apisstorage.Resource("csidrivers" ).WithVersion("v1beta1" ), } return &StorageFactoryConfig{ Serializer: legacyscheme.Codecs, DefaultResourceEncoding: serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme), ResourceEncodingOverrides: resources, } } type StorageFactoryConfig struct { StorageConfig storagebackend.Config APIResourceConfig *serverstorage.ResourceConfig DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig DefaultStorageMediaType string Serializer runtime.StorageSerializer ResourceEncodingOverrides []schema.GroupVersionResource EtcdServersOverrides []string EncryptionProviderConfigFilepath string } func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) (*completedStorageFactoryConfig, error ) { c.StorageConfig = etcdOptions.StorageConfig c.DefaultStorageMediaType = etcdOptions.DefaultStorageMediaType c.EtcdServersOverrides = etcdOptions.EtcdServersOverrides c.EncryptionProviderConfigFilepath = etcdOptions.EncryptionProviderConfigFilepath return &completedStorageFactoryConfig{c}, nil } type completedStorageFactoryConfig struct { *StorageFactoryConfig } func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error ) { resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides) storageFactory := serverstorage.NewDefaultStorageFactory( c.StorageConfig, c.DefaultStorageMediaType, c.Serializer, resourceEncodingConfig, c.APIResourceConfig, SpecialDefaultResourcePrefixes) ...... return storageFactory, nil }
通过上面的方式,创建出来DefaultStorageFactory
实例,然后就到了上面说到的Factory
工厂模式,顾名思义,就是从工厂中生产出类似的实体,而它生产的实体,就是针对某个资源resource
的存储配置StorageConfig
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 # k8s.io/apiserver/pkg/server/storage/storage_factory.go type StorageFactory interface { NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error ) ResourcePrefix(groupResource schema.GroupResource) string Backends() []Backend } ## DefaultStorageFactory实现了StorageFactory Interface type DefaultStorageFactory struct { StorageConfig storagebackend.Config Overrides map [schema.GroupResource]groupResourceOverrides DefaultResourcePrefixes map [schema.GroupResource]string DefaultMediaType string DefaultSerializer runtime.StorageSerializer ResourceEncodingConfig ResourceEncodingConfig APIResourceConfigSource APIResourceConfigSource newStorageCodecFn func (opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error ) } func NewDefaultStorageFactory ( config storagebackend.Config, defaultMediaType string , defaultSerializer runtime.StorageSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource, specialDefaultResourcePrefixes map [schema.GroupResource]string , ) *DefaultStorageFactory { config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) if len (defaultMediaType) == 0 { defaultMediaType = runtime.ContentTypeJSON } return &DefaultStorageFactory{ StorageConfig: config, Overrides: map [schema.GroupResource]groupResourceOverrides{}, DefaultMediaType: defaultMediaType, DefaultSerializer: defaultSerializer, ResourceEncodingConfig: resourceEncodingConfig, APIResourceConfigSource: resourceConfig, DefaultResourcePrefixes: specialDefaultResourcePrefixes, newStorageCodecFn: NewStorageCodec, } } func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error ) { chosenStorageResource := s.getStorageGroupResource(groupResource) storageConfig := s.StorageConfig codecConfig := StorageCodecConfig{ StorageMediaType: s.DefaultMediaType, StorageSerializer: s.DefaultSerializer, } if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok { override.Apply(&storageConfig, &codecConfig) } if override, ok := s.Overrides[chosenStorageResource]; ok { override.Apply(&storageConfig, &codecConfig) } codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource) codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource) codecConfig.Config = storageConfig storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig) return &storageConfig, nil }
上面的NewConfig()
是为某个resource
创建出对应的存储配置,即StorageConfig
,该配置中包含了etcd的连接、序列化等信息,即每种资源resource
都有自己的一套存储配置,例如可以通过--etcd-servers-overrides
配置项,来给某个单独的resource
指定不同的后端存储,这种机制跟常规的应用很不一样,一般的应用访问数据库,都会有一个集中的数据库配置,所有的资源都是使用的一套配置,但是这里却细化到了按照资源种类去配置存储,这虽然看起来很灵活强大,但是不免有过度设计的嫌疑,谁会去把同一个集群里的资源分开存放到不同的数据库呢?或者有什么资源是需要有特殊的编解码方式的?至少我没有见过这种使用场景,Anyway,稍微吐槽下,还是转入正题吧。StorageConfig
即 storagebackend.Config
,其初始值是来自于 EtcdOptions
,然后传给 StorageFactoryConfig
,StorageFactoryConfig
再传给 StorageFactory
,StorageFactory
在给某个resource
创建存储配置时,把StorageFactory
中的StorageConfig
复制了一份,再给它赋值了其他一些属性,比如负责编解码的Codec等,构成了针对某种资源特定的配置,StorageConfig
对应的结构体定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 # k8s.io/apiserver/pkg/storage/storagebackend/config.go type Config struct { Type string Prefix string Transport TransportConfig Paging bool Codec runtime.Codec EncodeVersioner runtime.GroupVersioner CompactionInterval time.Duration CountMetricPollPeriod time.Duration }
最终,s.Etcd.ApplyWithStorageFactoryTo()
则是将上面构建出来的storageFactory
构建出另外一个结构体StorageFactoryRestOptionsFactory
,然后将其赋值给genericConfig
的RESTOptionsGetter
属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # apiserver/pkg/server/options/etcd.go (s *EtcdOptions)ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config){ c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} } type StorageFactoryRestOptionsFactory struct { Options EtcdOptions StorageFactory serverstorage.StorageFactory } (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error ){ storageConfig, err := f.StorageFactory.NewConfig(resource) ret := generic.RESTOptions{ StorageConfig: storageConfig, Decorator: generic.UndecoratedStorage, ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), } return ret }
StorageFactoryRestOptionsFactory
实现了一个非常重要的方法:GetRESTOptions()
,在这个里面,首先通过上面介绍到的StorageFactory
的NewConfig()
方法来创建出针对某一种resource的存储配置项,然后构建了一个RESTOptions
的结构体,里面包含了另外一个重要的成员:Decorator
,其定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # apiserver/pkg/registry/generic/storage_decorator.go func UndecoratedStorage ( config *storagebackend.Config, resourcePrefix string , keyFunc func (obj runtime.Object) (string , error ), newFunc func () runtime.Object, newListFunc func () runtime.Object, getAttrsFunc storage.AttrFunc, trigger storage.IndexerFuncs, indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error ) { return NewRawStorage(config) } func NewRawStorage (config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error ) { return factory.Create(*config) }
注意,UndecoratedStorage
是一个方法,所以 Decorator
也是一个方法,该方法的作用就是要使用前面创建出来的StorageConfig
来创建一个etcd store
出来,可以看到这里传了很多参数进去,但是实际上只用了 config
这一个参数,其他参数可能以后会用到,在 NewRawStorage()
方法中又用到工厂模式,使用StorageConfig
中的信息,来创建最终的Store
,其流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 # apiserver/pkg/storage/storagebackend/factory/factory.go func Create (c storagebackend.Config) (storage.Interface, DestroyFunc, error ) { switch c.Type { case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: return newETCD3Storage(c) default : return nil , nil , fmt.Errorf("unknown storage type: %s" , c.Type) } } # apiserver/pkg/storage/storagebackend/factory/etcd3.go func newETCD3Storage (c storagebackend.Config) (storage.Interface, DestroyFunc, error ) { client, err := newETCD3Client(c.Transport) return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil } func newETCD3Client (c storagebackend.TransportConfig) (*clientv3.Client, error ) { cfg := clientv3.Config{ DialTimeout: dialTimeout, DialKeepAliveTime: keepaliveTime, DialKeepAliveTimeout: keepaliveTimeout, DialOptions: dialOptions, Endpoints: c.ServerList, TLS: tlsConfig, } return clientv3.New(cfg) } # apiserver/pkg/storage/etcd3/store.go type store struct { client *clientv3.Client getOps []clientv3.OpOption codec runtime.Codec versioner storage.Versioner transformer value.Transformer pathPrefix string watcher *watcher pagingEnabled bool leaseManager *leaseManager } func New (c *clientv3.Client, codec runtime.Codec, prefix string , transformer value.Transformer, pagingEnabled bool ) storage.Interface { return newStore(c, pagingEnabled, codec, prefix, transformer) } func newStore (c *clientv3.Client, pagingEnabled bool , codec runtime.Codec, prefix string , transformer value.Transformer) *store { versioner := APIObjectVersioner{} result := &store{ client: c, codec: codec, versioner: versioner, transformer: transformer, pagingEnabled: pagingEnabled, pathPrefix: path.Join("/" , prefix), watcher: newWatcher(c, codec, versioner, transformer), leaseManager: newDefaultLeaseManager(c), } return result }
经过了山路十八弯,终于看到创建出来了etcd store
,即上面newETCD3Storage()
方法内的逻辑,先创建了一个etcd client
,然后将该client传给newStore()
,构建出etcd store
结构体,可以通过client
跟etcd
打交道,同时序列化等信息存在codec
等变量里。
综上,可以看到,虽然实现非常复杂,但是使用起来还是很简单的,因为实际上构建出来的StorageFactory
是放到了GenericConfig
中的RESTOptionsGetter
,而GenericConfig
是在CreateServerChain
阶段就提前构建好的,因此,只需要向下面这样使用,就可以得到一个etcd store
:
1 2 3 4 5 opts := genericConfig.RESTOptionsGetter.GetRESTOptions(resource) store := opts.Decorator( opts.StorageConfig, ... )
REST存储 在APIServer中,每一个API对象,都有一个REST Store
与之对应,Kubernetes内置的API对象的REST store
的相关逻辑,都位于kubernetes/pkg/registry/
目录下,我们以pod
为例,来说明下REST Store
是如何构建出来的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 # pkg/registry/core/pod/storage/storage.go import ( genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" ) type REST struct { *genericregistry.Store proxyTransport http.RoundTripper } type BindingREST struct { store *genericregistry.Store } type StatusREST struct { store *genericregistry.Store } func NewStorage (optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter...) { store := &genericregistry.Store{ NewFunc: func () runtime.Object { return &api.Pod{} }, NewListFunc: func () runtime.Object { return &api.PodList{} }, ...... } options := &generic.StoreOptions{ RESTOptions: optsGetter } ...... store.CompleteWithOptions(options) ...... statusStore := *store statusStore.UpdateStrategy = registrypod.StatusStrategy statusStore.ResetFieldsStrategy = registrypod.StatusStrategy bindingREST := &BindingREST{store: store} ...... return PodStorage{ Pod: &REST{store, proxyTransport}, Binding: &BindingREST{store: store}, ...... Status: &StatusREST{store: &statusStore}, ...... } }
上面的 genericregistry.Store
就是 REST存储接口实现
介绍到的 REST store
,一个Pod有很多种 REST store
,比如上例中的 REST
, BindingREST
, StatusREST
都是,这些类型都继承自 genericregistry.Store
,注意该方法接受了一个参数optsGetter generic.RESTOptionsGetter
,这个就是在上面etcd 存储
应用中,介绍到的存储到genericConfig
的RESTOptionsGetter
,我们来看看这里是怎么使用这个genericConfig.RESTOptionsGetter
的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 type Store struct { NewFunc func () runtime.Object NewListFunc func () runtime.Object KeyFunc func (ctx context.Context, name string ) (string , error ) ObjectNameFunc func (obj runtime.Object) (string , error ) ...... Storage DryRunnableStorage DestroyFunc func () StorageVersioner runtime.GroupVersioner } (e *Store) CompleteWithOptions(options *generic.StoreOptions){ opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource) e.Storage.Codec = opts.StorageConfig.Codec e.Storage.Storage, e.DestroyFunc, err = opts.Decorator( opts.StorageConfig, prefix, keyFunc, e.NewFunc, e.NewListFunc, attrFunc, options.TriggerFunc, options.Indexers, ) e.StorageVersioner = opts.StorageConfig.EncodeVersioner }
即在store.CompleteWithOptions(options)
方法中,调用了GetRESTOptions()
方法获取到存储配置信息,然后再调用Decorator()
方法创建出etcd store
实体,存储到DryRunnableStorage
中。
总结 本文主要梳理了API对象存储相关的两个层面的存储接口以及其实现和应用,即REST store
和etcd store
,在脑海中建立起来的整体画像应该是,每一个API对象,都有对应的REST store
和etcd store
,这两者之间是引用的关系,REST store
引用etcd store
来操作数据库etcd,REST store
是面向RESTful api
侧, etcd store
则面向数据库侧。