kube-controller之statefulset

StatefulSet

StatefulSet 是用来管理有状态应用的工作负载 API 对象。

StatefulSet 用来管理某 Pod 集合的部署和扩缩, 并为这些 Pod 提供持久存储和持久标识符。

和 Deployment 类似, StatefulSet 管理基于相同容器规约的一组 Pod。但和 Deployment 不同的是, StatefulSet 为它们的每个 Pod 维护了一个有粘性的 ID。这些 Pod 是基于相同的规约来创建的, 但是不能相互替换:无论怎么调度,每个 Pod 都有一个永久不变的 ID。

如果希望使用存储卷为工作负载提供持久存储,可以使用 StatefulSet 作为解决方案的一部分。 尽管 StatefulSet 中的单个 Pod 仍可能出现故障, 但持久的 Pod 标识符使得将现有卷与替换已失败 Pod 的新 Pod 相匹配变得更加容易。

使用 StatefulSet

StatefulSet 对于需要满足以下一个或多个需求的应用程序很有价值:

  • 稳定的、唯一的网络标识符。
  • 稳定的、持久的存储。
  • 有序的、优雅的部署和扩缩。
  • 有序的、自动的滚动更新。
    在上面描述中,“稳定的”意味着 Pod 调度或重调度的整个过程是有持久性的。 如果应用程序不需要任何稳定的标识符或有序的部署、删除或扩缩, 则应该使用由一组无状态的副本控制器提供的工作负载来部署应用程序,比如 Deployment 或者 ReplicaSet 可能更适用于你的无状态应用部署需要。
限制

给定 Pod 的存储必须由 PersistentVolume Provisioner 基于所请求的 storage class 来制备,或者由管理员预先制备。
删除或者扩缩 StatefulSet 并不会删除它关联的存储卷。 这样做是为了保证数据安全,它通常比自动清除 StatefulSet 所有相关的资源更有价值。
StatefulSet 当前需要无头服务来负责 Pod 的网络标识。你需要负责创建此服务。
当删除一个 StatefulSet 时,该 StatefulSet 不提供任何终止 Pod 的保证。 为了实现 StatefulSet 中的 Pod 可以有序且体面地终止,可以在删除之前将 StatefulSet 缩容到 0。
在默认 Pod 管理策略(OrderedReady) 时使用滚动更新, 可能进入需要人工干预才能修复的损坏状态。

以上来自官方文档


源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// pkg/controller/statefulset/stateful_set.go
type StatefulSetController struct {
// client interface
kubeClient clientset.Interface
// control returns an interface capable of syncing a stateful set.
// Abstracted out for testing.
control StatefulSetControlInterface
// podControl is used for patching pods.
podControl controller.PodControlInterface
// podLister is able to list/get pods from a shared informer's store
podLister corelisters.PodLister
// podListerSynced returns true if the pod shared informer has synced at least once
podListerSynced cache.InformerSynced
// setLister is able to list/get stateful sets from a shared informer's store
setLister appslisters.StatefulSetLister
// setListerSynced returns true if the stateful set shared informer has synced at least once
setListerSynced cache.InformerSynced
// pvcListerSynced returns true if the pvc shared informer has synced at least once
pvcListerSynced cache.InformerSynced
// revListerSynced returns true if the rev shared informer has synced at least once
revListerSynced cache.InformerSynced
// StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface
// eventBroadcaster is the core of event processing pipeline.
eventBroadcaster record.EventBroadcaster
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// pkg/controller/statefulset/stateful_set_control.go
// 定义了StatefulSet操作的一些方法
type StatefulSetControlInterface interface {
// UpdateStatefulSet implements the control logic for Pod creation, update, and deletion, and
// persistent volume creation, update, and deletion.
// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
// 实现Pod和pv的创建,更新,删除操作
UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
// AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are
// successful the returned error is nil.
AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
}

// ControllerRevision 实现了一个不可变的状态数据快照。客户端负责序列化和反序列化包含其内部状态的对象
// 一旦成功创建了 ControllerRevision,就不能对其进行更新,但是可以被删除。API 服务器会拒绝所有试图修改 Data 字段的请求
// 主要被 DaemonSet 和 StatefulSet 控制器用于更新和回滚
// vendor/k8s.io/api/apps/v1/types.go
type ControllerRevision struct {
metav1.TypeMeta `json:",inline"`

metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Data is the serialized representation of the state.
Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`

// Revision indicates the revision of the state represented by Data.
Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
// pkg/controller/statefulset/stateful_set.go
// 新建一个StatefulSet Controller
// 都是一样的操作,设置EventHandler,初始化队列等等
func NewStatefulSetController(
ctx context.Context,
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
revInformer appsinformers.ControllerRevisionInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
logger := klog.FromContext(ctx)
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(
NewStatefulPodControl(
kubeClient,
podInformer.Lister(),
pvcInformer.Lister(),
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
revListerSynced: revInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},

eventBroadcaster: eventBroadcaster,
}

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// lookup the statefulset and enqueue
AddFunc: func(obj interface{}) {
ssc.addPod(logger, obj)
},
// lookup current and old statefulset if labels changed
UpdateFunc: func(oldObj, newObj interface{}) {
ssc.updatePod(logger, oldObj, newObj)
},
// lookup statefulset accounting for deletion tombstones
DeleteFunc: func(obj interface{}) {
ssc.deletePod(logger, obj)
},
})
ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced

setInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ssc.enqueueStatefulSet,
UpdateFunc: func(old, cur interface{}) {
oldPS := old.(*apps.StatefulSet)
curPS := cur.(*apps.StatefulSet)
if oldPS.Status.Replicas != curPS.Status.Replicas {
logger.V(4).Info("Observed updated replica count for StatefulSet", "statefulSet", klog.KObj(curPS), "oldReplicas", oldPS.Status.Replicas, "newReplicas", curPS.Status.Replicas)
}
ssc.enqueueStatefulSet(cur)
},
DeleteFunc: ssc.enqueueStatefulSet,
},
)
ssc.setLister = setInformer.Lister()
ssc.setListerSynced = setInformer.Informer().HasSynced

// TODO: Watch volumes
return ssc
}

// Run runs the statefulset controller.
func (ssc *StatefulSetController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
ssc.eventBroadcaster.StartStructuredLogging(0)
ssc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ssc.kubeClient.CoreV1().Events("")})
defer ssc.eventBroadcaster.Shutdown()

defer ssc.queue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting stateful set controller")
defer logger.Info("Shutting down statefulset controller")

if !cache.WaitForNamedCacheSync("stateful set", ctx.Done(), ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, ssc.worker, time.Second)
}

<-ctx.Done()
}

// 入口函数
func (ssc *StatefulSetController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
ssc.eventBroadcaster.StartStructuredLogging(0)
ssc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ssc.kubeClient.CoreV1().Events("")})
defer ssc.eventBroadcaster.Shutdown()

defer ssc.queue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting stateful set controller")
defer logger.Info("Shutting down statefulset controller")

if !cache.WaitForNamedCacheSync("stateful set", ctx.Done(), ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, ssc.worker, time.Second)
}

<-ctx.Done()
}


func (ssc *StatefulSetController) worker(ctx context.Context) {
for ssc.processNextWorkItem(ctx) {
}
}

func (ssc *StatefulSetController) processNextWorkItem(ctx context.Context) bool {
key, quit := ssc.queue.Get()
if quit {
return false
}
defer ssc.queue.Done(key)
// 调谐函数ssc.sync,所以我们主要的分析集中在sync函数
if err := ssc.sync(ctx, key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err))
ssc.queue.AddRateLimited(key)
} else {
ssc.queue.Forget(key)
}
return true
}

// StatefulSet的调谐函数
func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
startTime := time.Now()
logger := klog.FromContext(ctx)
defer func() {
logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime))
}()

// 通过队列中的key取出命名空间和资源名称
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 获取StatefulSet
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
// 如果已经删除了则直接返回
if errors.IsNotFound(err) {
logger.Info("StatefulSet has been deleted", "key", key)
return nil
}
// 其它错误
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
return err
}

// 获取selector
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
// This is a non-transient error, so don't retry.
return nil
}

// 收养孤儿ControllerRevisions
if err := ssc.adoptOrphanRevisions(ctx, set); err != nil {
return err
}

// 通过sts和selector获取对应的pod
pods, err := ssc.getPodsForStatefulSet(ctx, set, selector)
if err != nil {
return err
}
// 逻辑操作
return ssc.syncStatefulSet(ctx, set, pods)
}
// 收养孤儿ControllerRevisions
func (ssc *StatefulSetController) adoptOrphanRevisions(ctx context.Context, set *apps.StatefulSet) error {

// 获取sts的ControllerRevision列表
revisions, err := ssc.control.ListRevisions(set)
if err != nil {
return err
}
orphanRevisions := make([]*apps.ControllerRevision, 0)
for i := range revisions {
// 获取OwnerReference
if metav1.GetControllerOf(revisions[i]) == nil {
// OwnerReference为空
orphanRevisions = append(orphanRevisions, revisions[i])
}
}
// 存在孤儿ControllerRevisions
if len(orphanRevisions) > 0 {
// 判断是否能收养,再进行一次check
canAdoptErr := ssc.canAdoptFunc(ctx, set)(ctx)
if canAdoptErr != nil {
return fmt.Errorf("can't adopt ControllerRevisions: %v", canAdoptErr)
}
return ssc.control.AdoptOrphanRevisions(set, orphanRevisions)
}
return nil
}


func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) error {
logger := klog.FromContext(ctx)
logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods))
var status *apps.StatefulSetStatus
var err error
status, err = ssc.control.UpdateStatefulSet(ctx, set, pods)
if err != nil {
return err
}
logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set))
// One more sync to handle the clock skew. This is also helping in requeuing right after status update
if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
}

return nil
}

// sts调谐的核心逻辑,默认采用[monotonic update strategy],扩容按照一定的顺序进行,只有当所有pod都Ready了才会创建后面的Pod,
// 缩容是如扩容是不一样的顺序(比如扩容是1->2->3,缩容则是3->2->1)
// 如果采用的是[burst strategy]则没有这么多的限制,Pod将会被迅速创建和删除,并且没有特定的顺序。
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors.

// 获取所有的Revisions
revisions, err := ssc.ListRevisions(set)
if err != nil {
return nil, err
}
// 对revisions进行排序
history.SortControllerRevisions(revisions)


currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
if err != nil {
return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
}

// maintain the set's revision history limit
// history保持一定的长度以避免占用过多的资源
return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}


func (ssc *defaultStatefulSetControl) performUpdate(
ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
logger := klog.FromContext(ctx)
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
}

// 执行更新逻辑并返回获取状态
currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods)
if err != nil && currentStatus == nil {
return currentRevision, updateRevision, nil, err
}

// make sure to update the latest status even if there is an error with non-nil currentStatus
statusErr := ssc.updateStatefulSetStatus(ctx, set, currentStatus)
if statusErr == nil {
logger.V(4).Info("Updated status", "statefulSet", klog.KObj(set),
"replicas", currentStatus.Replicas,
"readyReplicas", currentStatus.ReadyReplicas,
"currentReplicas", currentStatus.CurrentReplicas,
"updatedReplicas", currentStatus.UpdatedReplicas)
}

switch {
case err != nil && statusErr != nil:
klog.ErrorS(statusErr, "Could not update status", "statefulSet", klog.KObj(set))
return currentRevision, updateRevision, currentStatus, err
case err != nil:
return currentRevision, updateRevision, currentStatus, err
case statusErr != nil:
return currentRevision, updateRevision, currentStatus, statusErr
}

logger.V(4).Info("StatefulSet revisions", "statefulSet", klog.KObj(set),
"currentRevision", currentStatus.CurrentRevision,
"updateRevision", currentStatus.UpdateRevision)

return currentRevision, updateRevision, currentStatus, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
// pkg/controller/statefulset/stateful_set_control.go
// StatefulSet控制器最核心逻辑。updateStatefulSet以特定的顺序creates,updates,deletes statefulset中的pod
// 以达到预期的状态。
// 如果更新策略为RollingUpdateStatefulSetStrategyType,sts中的所有pod必须在sts.Status.CurrentRevision中
// 如果更新策略为OnDeleteStatefulSetStrategyType,目标状态对sts中的pod的revision没有任何意义
// 如果更新策略为PartitionStatefulSetStrategyType所有pod ordinal低于UpdateStrategy.Partition.Ordinal必须存在
// 于Status.CurrentRevision,其余的pod必须在Status.UpdateRevision.
// 返回值:error=nil,说明StatefulSetStatus是有效的,更新必须记录。如果error!=nil,进行重试
func (ssc *defaultStatefulSetControl) updateStatefulSet(
ctx context.Context,
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)

// 更新sts Revision并返回currentSet
currentSet, err := ApplyRevision(set, currentRevision)
if err != nil {
return nil, err
}
// 更新sts Revision并返回updateSet
updateSet, err := ApplyRevision(set, updateRevision)
if err != nil {
return nil, err
}

// 返回的状态,设置对应的值
status := apps.StatefulSetStatus{}
status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount

// 期望副本数
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
// 记录在范围之内的pod
replicas := make([]*v1.Pod, replicaCount)
// slice that will contain all Pods such that getOrdinal(pod) < getStartOrdinal(set) OR getOrdinal(pod) > getEndOrdinal(set)
// condemned记录的应该是要遗弃的pod
condemned := make([]*v1.Pod, 0, len(pods))
unhealthy := 0
var firstUnhealthyPod *v1.Pod

// First we partition pods into two lists valid replicas and condemned Pods
// 遍历pods,分别记录到replicas和condemned
for _, pod := range pods {
// 统计sts副本数
status.Replicas++

// count the number of running and ready replicas
if isRunningAndReady(pod) {
// 统计Ready状态的副本数
status.ReadyReplicas++
// 统计Running和Available状态的副本数
if isRunningAndAvailable(pod, set.Spec.MinReadySeconds) {
status.AvailableReplicas++
}

}

// count the number of current and update replicas
if isCreated(pod) && !isTerminating(pod) {
if getPodRevision(pod) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(pod) == updateRevision.Name {
status.UpdatedReplicas++
}
}

if podInOrdinalRange(pod, set) {
// if the ordinal of the pod is within the range of the current number of replicas,
// insert it at the indirection of its ordinal
// 将符合条件的pod添加到replicas列表中
replicas[getOrdinal(pod)-getStartOrdinal(set)] = pod
} else if getOrdinal(pod) >= 0 {
// if the ordinal is valid, but not within the range add it to the condemned list
condemned = append(condemned, pod)
}
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
}

// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
// 为每个pod创建revision
for ord := getStartOrdinal(set); ord <= getEndOrdinal(set); ord++ {
replicaIdx := ord - getStartOrdinal(set)
if replicas[replicaIdx] == nil {
replicas[replicaIdx] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}

// sort the condemned Pods by their ordinals
// 对condemned pods根据ordinal排序
sort.Sort(ascendingOrdinal(condemned))

// find the first unhealthy Pod
// 找到第一个unhealthy的pod,并统计unhealthy pod数量
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
if firstUnhealthyPod == nil {
firstUnhealthyPod = replicas[i]
}
}
}

for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
if firstUnhealthyPod == nil {
firstUnhealthyPod = condemned[i]
}
}
}

if unhealthy > 0 {
logger.V(4).Info("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod))
}

// If the StatefulSet is being deleted, don't do anything other than updating
// status.
// sts已经被删除,返回
if set.DeletionTimestamp != nil {
return &status, nil
}

monotonic := !allowsBurst(set)

// Examine each replica with respect to its ordinal
// 遍历replicas
for i := range replicas {
// delete and recreate failed pods
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
// 删除pod
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas--
}
status.Replicas--
replicaOrd := i + getStartOrdinal(set)
// 设置新的revision
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
replicaOrd)
}
// If we find a Pod that has not been created we create the Pod
// 没有创建对应的pod,则创建
if !isCreated(replicas[i]) {
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
return &status, err
} else if isStale {
// If a pod has a stale PVC, no more work can be done this round.
return &status, err
}
}
// 创建pod
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
return &status, err
}
status.Replicas++
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
// if the set does not allow bursting, return immediately
if monotonic {
return &status, nil
}
// pod created, no more work possible for this round
continue
}

// 如果pod处于pending状态,会触发创建缺失PVC的动作
if isPending(replicas[i]) {
klog.V(4).Infof(
"StatefulSet %s/%s is triggering PVC creation for pending Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
return &status, err
}
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
// 如果一个pod处于terminating状态,我们必须等待pod优雅删除完成,才成继续往下走
if isTerminating(replicas[i]) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return &status, nil
}
// If we have a Pod that has been created but is not running and ready we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.
// 如果一个pod不是running和ready状态,我们可以退出这次调谐
if !isRunningAndReady(replicas[i]) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return &status, nil
}
// If we have a Pod that has been created but is not available we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Available.
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return &status, nil
}
// Enforce the StatefulSet invariants
retentionMatch := true
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
var err error
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
if err != nil {
retentionMatch = true
}
}
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
continue
}
// Make a deep copy so we don't mutate the shared cache
replica := replicas[i].DeepCopy()
if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
return &status, err
}
}

if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// Ensure ownerRefs are set correctly for the condemned pods.
for i := range condemned {
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
return &status, err
} else if !matchPolicy {
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
return &status, err
}
}
}
}

// At this point, all of the current Replicas are Running, Ready and Available, we can consider termination.
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order.
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
// updates.
for target := len(condemned) - 1; target >= 0; target-- {
// wait for terminating pods to expire
if isTerminating(condemned[target]) {
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))
// block if we are in monotonic mode
if monotonic {
return &status, nil
}
continue
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return &status, nil
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
if !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return &status, nil
}
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))

if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
return &status, err
}
if getPodRevision(condemned[target]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(condemned[target]) == updateRevision.Name {
status.UpdatedReplicas--
}
if monotonic {
return &status, nil
}
}

// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
return &status, nil
}

if utilfeature.DefaultFeatureGate.Enabled(features.MaxUnavailableStatefulSet) {
return updateStatefulSetAfterInvariantEstablished(ctx,
ssc,
set,
replicas,
updateRevision,
status,
)
}

// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
updateMin := 0
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
}
// we terminate the Pod with the largest ordinal that does not match the update revision.
for target := len(replicas) - 1; target >= updateMin; target-- {

// delete the Pod if it is not already terminating and does not match the update revision.
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
logger.V(2).Info("Pod of StatefulSet is terminating for update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
if !errors.IsNotFound(err) {
return &status, err
}
}
status.CurrentReplicas--
return &status, err
}

// wait for unhealthy Pods on update
if !isHealthy(replicas[target]) {
logger.V(4).Info("StatefulSet is waiting for Pod to update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
return &status, nil
}

}
return &status, nil
}

REF:
1.pkg/controller/statefulset/stateful_set.go
2.pkg/controller/statefulset/stateful_set_control.go

  1. vendor/k8s.io/api/apps/v1/types.go