kube-controller之storageversion

Lease, storageVersion

Lease(租约)租约提供了一种机制来锁定共享资源并协调集合成员之间的活动,Kubernetes 也使用 Lease 确保在任何给定时间某个组件只有一个实例在运行。 这在高可用配置中由 kube-controller-managerkube-scheduler 等控制平面组件进行使用, 这些组件只应有一个实例激活运行,而其他实例待机。

StorageVersion 是一种内部版本资源,记录了 APIServerID和其所支持的对象版本。

Lease被删除时,说明API Server实例发生了变化,所以对象支持的版本也有可能
发生变化,所以需要对storageVersion作相应的处理。

所以storageversiongc的作用就是当API Server发生时更新storageversion中记录API Server支持的版本。
storageversion发生变化时,也会查询lease信息从而更新storageversion

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// staging/src/k8s.io/api/apiserverinternal/v1alpha1/types.go
type StorageVersion struct {
metav1.TypeMeta `json:",inline"`
// The name is <group>.<resource>.
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Spec is an empty spec. It is here to comply with Kubernetes API style.
Spec StorageVersionSpec `json:"spec" protobuf:"bytes,2,opt,name=spec"`

// API server instances report the version they can decode and the version they
// encode objects to when persisting objects in the backend.
Status StorageVersionStatus `json:"status" protobuf:"bytes,3,opt,name=status"`
}

type StorageVersionSpec struct{}

// API server instances report the versions they can decode and the version they
// encode objects to when persisting objects in the backend.
type StorageVersionStatus struct {
// The reported versions per API server instance.
// +optional
// +listType=map
// +listMapKey=apiServerID
StorageVersions []ServerStorageVersion `json:"storageVersions,omitempty" protobuf:"bytes,1,opt,name=storageVersions"`
// If all API server instances agree on the same encoding storage version,
// then this field is set to that version. Otherwise this field is left empty.
// API servers should finish updating its storageVersionStatus entry before
// serving write operations, so that this field will be in sync with the reality.
// +optional
CommonEncodingVersion *string `json:"commonEncodingVersion,omitempty" protobuf:"bytes,2,opt,name=commonEncodingVersion"`

// The latest available observations of the storageVersion's state.
// +optional
// +listType=map
// +listMapKey=type
Conditions []StorageVersionCondition `json:"conditions,omitempty" protobuf:"bytes,3,opt,name=conditions"`
}

// An API server instance reports the version it can decode and the version it
// encodes objects to when persisting objects in the backend.

// ServerStorageVersion 记录了API server实例可以decode/encode的版本
type ServerStorageVersion struct {
// The ID of the reporting API server.
APIServerID string `json:"apiServerID,omitempty" protobuf:"bytes,1,opt,name=apiServerID"`

// 当对象进行持久化时API Server采用EncodingVersion对对象进行编码
EncodingVersion string `json:"encodingVersion,omitempty" protobuf:"bytes,2,opt,name=encodingVersion"`

// API server可以解码的版本, encodingVersion必须在decodableVersions中
// +listType=set
DecodableVersions []string `json:"decodableVersions,omitempty" protobuf:"bytes,3,opt,name=decodableVersions"`

// API Serever可以处理的version,DecodableVersions必须包含所有的ServedVersions
// +listType=set
ServedVersions []string `json:"servedVersions,omitempty" protobuf:"bytes,4,opt,name=servedVersions"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
// 启动时会创建对应的storageversion
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
...

// Technically an apiserver only needs to update storage version once during bootstrap.
// Reconcile StorageVersion objects every 10 minutes will help in the case that the
// StorageVersion objects get accidentally modified/deleted by a different agent. In that
// case, the reconciliation ensures future storage migration still works. If nothing gets
// changed, the reconciliation update is a noop and gets short-circuited by the apiserver,
// therefore won't change the resource version and trigger storage migration.
go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
// share the same generic apiserver config. The same StorageVersion manager is used
// to register all built-in resources when the generic apiservers install APIs.
s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID, c.GenericConfig.MergedResourceConfig)
return false, nil
}, hookContext.StopCh)
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// pkg/controller/storageversiongc/gc_controller.go
// 可以看出Controller会监听两种资源对象,lease, storageversion
type Controller struct {
kubeclientset kubernetes.Interface

leaseLister coordlisters.LeaseLister
leasesSynced cache.InformerSynced

storageVersionSynced cache.InformerSynced

leaseQueue workqueue.RateLimitingInterface
storageVersionQueue workqueue. RateLimitingInterface
}

// NewStorageVersionGC creates a new Controller.
func NewStorageVersionGC(ctx context.Context, clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
c := &Controller{
kubeclientset: clientset,
leaseLister: leaseInformer.Lister(),
leasesSynced: leaseInformer.Informer().HasSynced,
storageVersionSynced: storageVersionInformer.Informer().HasSynced,
leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"),
storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"),
}

logger := klog.FromContext(ctx)
leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// 这里只监听lease删除的事件
// lease删除了说明API Server发生了变化,这很好理解
// 但为什么不监听Create事件呢?
// 因为一个实例在启动的过程中会创建对应StorageVersion
DeleteFunc: func(obj interface{}) {
c.onDeleteLease(logger, obj)
},
})
// use the default resync period from the informer
storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.onAddStorageVersion(logger, obj)
},
UpdateFunc: func(old, newObj interface{}) {
c.onUpdateStorageVersion(logger, old, newObj)
},
})

return c
}

// Run starts one worker.
func (c *Controller) Run(ctx context.Context) {
logger := klog.FromContext(ctx)
defer utilruntime.HandleCrash()
defer c.leaseQueue.ShutDown()
defer c.storageVersionQueue.ShutDown()
defer logger.Info("Shutting down storage version garbage collector")

logger.Info("Starting storage version garbage collector")

if !cache.WaitForCacheSync(ctx.Done(), c.leasesSynced, c.storageVersionSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}

// Identity lease deletion and storageversion update don't happen too often. Start one
// worker for each of them.
// runLeaseWorker handles legit identity lease deletion, while runStorageVersionWorker
// handles storageversion creation/update with non-existing id. The latter should rarely
// happen. It's okay for the two workers to conflict on update.
go wait.UntilWithContext(ctx, c.runLeaseWorker, time.Second)
go wait.UntilWithContext(ctx, c.runStorageVersionWorker, time.Second)

<-ctx.Done()
}

func (c *Controller) runLeaseWorker(ctx context.Context) {
for c.processNextLease(ctx) {
}
}

func (c *Controller) processNextLease(ctx context.Context) bool {
key, quit := c.leaseQueue.Get()
if quit {
return false
}
defer c.leaseQueue.Done(key)

err := c.processDeletedLease(ctx, key.(string))
if err == nil {
c.leaseQueue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err))
c.leaseQueue.AddRateLimited(key)
return true
}

func (c *Controller) runStorageVersionWorker(ctx context.Context) {
for c.processNextStorageVersion(ctx) {
}
}

func (c *Controller) processNextStorageVersion(ctx context.Context) bool {
key, quit := c.storageVersionQueue.Get()
if quit {
return false
}
defer c.storageVersionQueue.Done(key)

err := c.syncStorageVersion(ctx, key.(string))
if err == nil {
c.storageVersionQueue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err))
c.storageVersionQueue.AddRateLimited(key)
return true
}

// 处理删除lease时的逻辑
func (c *Controller) processDeletedLease(ctx context.Context, name string) error {
_, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{})
// lease没有被删除,不需要做任何事
if err == nil {
return nil
}
// 除IsNotFound的错误外,直接return
if !apierrors.IsNotFound(err) {
return err
}
// 获取所有的storageversion对象,只有当删除lease时才会触发,所以调用频率不高
storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

var errors []error
for _, sv := range storageVersionList.Items {
var serverStorageVersions []apiserverinternalv1alpha1. ServerStorageVersion
hasStaleRecord := false
for _, ssv := range sv.Status.StorageVersions {
// 如果ssv.APIServerID == name, 则应该被删除
// 因为此时APIServerID已经不是原来的实例了,则原有实例支持的version需要更新
if ssv.APIServerID == name {
hasStaleRecord = true
continue
}
serverStorageVersions = append(serverStorageVersions, ssv)
}
if !hasStaleRecord {
continue
}
if err := c.updateOrDeleteStorageVersion(ctx, &sv, serverStorageVersions); err != nil {
errors = append(errors, err)
}
}

return utilerrors.NewAggregate(errors)
}

func (c *Controller) syncStorageVersion(ctx context.Context, name string) error {
sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(ctx, name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// The problematic storage version that was added/updated recently is gone.
// Nothing we need to do here.
return nil
}
if err != nil {
return err
}

hasInvalidID := false
var serverStorageVersions []apiserverinternalv1alpha1. ServerStorageVersion
// 遍历所有的StorageVersions
for _, v := range sv.Status.StorageVersions {
// 根据APIServerID获取对应的lease
lease, err := c.kubeclientset.CoordinationV1().Leases(metav1. NamespaceSystem).Get(ctx, v.APIServerID, metav1.GetOptions{})
// 如找不到对应的APIServer则需要更新serverStorageVersions
if err != nil || lease == nil || lease.Labels == nil ||
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
// We cannot find a corresponding identity lease from apiserver as well.
// We need to clean up this storage version.
hasInvalidID = true
continue
}

serverStorageVersions = append(serverStorageVersions, v)
}
if !hasInvalidID {
return nil
}
return c.updateOrDeleteStorageVersion(ctx, sv, serverStorageVersions)
}

func (c *Controller) updateOrDeleteStorageVersion(ctx context.Context, sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error {
if len(serverStorageVersions) == 0 {
return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete(
ctx, sv.Name, metav1.DeleteOptions{})
}
sv.Status.StorageVersions = serverStorageVersions
storageversion.SetCommonEncodingVersion(sv)
_, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus(ctx, sv, metav1.UpdateOptions{})
return err
}

REF:

  1. Lease
  2. staging/src/k8s.io/api/apiserverinternal/v1alpha1/types.go
  3. staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
  4. pkg/controller/storageversiongc/gc_controller.go