kube-controller之ttlafterfinished

TTLAfterFinished

TTLAfterFinished是一种自动清理已完成的 Job(包括 CompleteFailed 状态)的方法。其使用 TTL 机制,通过指定 Job.spec.ttlSecondsAfterFinished 字段。

TTLAfterFinished-Controller清理 Job 时,它将级联删除该 Job,即连同 Job 依赖的对象(例如 Pods)一起删除。请注意,当 Job 被删除时,将遵守其生命周期保证,例如 finalizer

TTLAfterFinished-Controller 是负责监视 Job API 对象变化的组件。它通过监听 Job 的创建和更新事件,并将具有非空 .spec.ttlSecondsAfterFinished 字段的 Job 加入到队列中。TTLAfterFinished-Controller从队列中获取 Job,检查 JobTTL 是否已过期。如果 JobTTL 尚未过期,Worker将在预计 TTL 过期后将 Job 再次加入队列;如果 TTL 已过期,Worker将向 APIServer发送请求以相应地删除这些 Job

这部分功能的实现与 Job 控制器分开,是为了分离关注点,并且可以扩展到处理其他可完成的资源类型。这种设计可以让 Job-Controller 只关注监视和管理 Job 的状态,而将具体的 TTL过期处理逻辑交给独立的组件来实现。这样可以提高代码的可维护性和可扩展性,并使代码结构更清晰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: batch/v1
kind: Job
metadata:
name: pi-with-ttl
spec:
# job pi-with-ttl将会在job完成后的100秒被删除
# 如果ttlSecondsAfterFinished设置为0,将会在job完成后立即删除
ttlSecondsAfterFinished: 100
template:
spec:
containers:
- name: pi
image: perl:5.34.0
command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]
restartPolicy: Never
源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// pkg/controller/ttlafterfinished/ttlafterfinished_controller.go
type Controller struct {
client clientset.Interface
recorder record.EventRecorder

// jLister can list/get Jobs from the shared informer's store
jLister batchlisters.JobLister

// jStoreSynced returns true if the Job store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jListerSynced cache.InformerSynced

// Jobs that the controller will check its TTL and attempt to delete when the TTL expires.
queue workqueue.RateLimitingInterface

// The clock for tracking time
clock clock.Clock
}

// 初始化一个Controller
func New(ctx context.Context, jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

metrics.Register()

tc := &Controller{
client: client,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
}

logger := klog.FromContext(ctx)
// Informer
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
tc.addJob(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
tc.updateJob(logger, oldObj, newObj)
},
})

tc.jLister = jobInformer.Lister()
tc.jListerSynced = jobInformer.Informer().HasSynced

tc.clock = clock.RealClock{}

return tc
}

func (tc *Controller) addJob(logger klog.Logger, obj interface{}) {
job := obj.(*batch.Job)
logger.V(4).Info("Adding job", "job", klog.KObj(job))

// job没有被删除和j.Spec.TTLSecondsAfterFinished不为空和job已经完成
if job.DeletionTimestamp == nil && needsCleanup(job) {
tc.enqueue(logger, job)
}

}

func (tc *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
job := cur.(*batch.Job)
logger.V(4).Info("Updating job", "job", klog.KObj(job))
// job没有被删除和j.Spec.TTLSecondsAfterFinished不为空和job已经完成
if job.DeletionTimestamp == nil && needsCleanup(job) {
tc.enqueue(logger, job)
}
}

// TTLSecondsAfterFinished != nil && Job已经完成
func needsCleanup(j *batch.Job) bool {
return j.Spec.TTLSecondsAfterFinished != nil && jobutil.IsJobFinished(j)
}

// 启动Worker
func (tc *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer tc.queue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting TTL after finished controller")
defer logger.Info("Shutting down TTL after finished controller")

if !cache.WaitForNamedCacheSync("TTL after finished", ctx.Done(), tc.jListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, tc.worker, time.Second)
}

<-ctx.Done()
}

func (tc *Controller) worker(ctx context.Context) {
for tc.processNextWorkItem(ctx) {
}
}

func (tc *Controller) processNextWorkItem(ctx context.Context) bool {
key, quit := tc.queue.Get()
if quit {
return false
}
defer tc.queue.Done(key)

// 调谐函数为processJob
err := tc.processJob(ctx, key.(string))
tc.handleErr(err, key)

return true
}

// processJob 函数用于检查 Job 的状态和 TTL,并在 Job 完成并且其 TTL 过期后将其删除。
// 如果 Job 尚未完成或者其 TTL 尚未过期,将在预计 TTL 过期后将 Job 重新添加到队列中。

// 需要注意的是,该函数不应在相同的 key 下并发调用
func (tc *Controller) processJob(ctx context.Context, key string) error {
// 从key中解析出namespace和资源名称
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
// 获取对应的Job
job, err := tc.jLister.Jobs(namespace).Get(name)

logger := klog.FromContext(ctx)
logger.V(4).Info("Checking if Job is ready for cleanup", "job", klog.KRef(namespace, name))

if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}

if expiredAt, err := tc.processTTL(logger, job); err != nil {
return err
} else if expiredAt == nil {
return nil
}

// The Job's TTL is assumed to have expired, but the Job TTL might be stale.
// Before deleting the Job, do a final sanity check.
// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
// The latest Job may have a different UID, but it's fine because the checks will be run again.
// 在删除前再检查一个Job
fresh, err := tc.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// Use the latest Job TTL to see if the TTL truly expires.
// 用最新的job检查TTL是否真正的过期
expiredAt, err := tc.processTTL(logger, fresh)
if err != nil {
return err
} else if expiredAt == nil {
return nil
}
// Cascade deletes the Jobs if TTL truly expires.
policy := metav1.DeletePropagationForeground
options := metav1.DeleteOptions{
PropagationPolicy: &policy,
Preconditions: &metav1.Preconditions{UID: &fresh.UID},
}
logger.V(4).Info("Cleaning up Job", "job", klog.KObj(fresh))
// 删除对应的job
if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil {
return err
}
metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds())
return nil
}

// processTTL检查Job的TTL是否过期,将在预计 TTL 过期后将 Job 重新添加到队列中。
func (tc *Controller) processTTL(logger klog.Logger, job *batch.Job) (expiredAt *time.Time, err error) {

// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
// job已经删除或者不需要清理(TTL为空或者Job未完成)
if job.DeletionTimestamp != nil || !needsCleanup(job) {
return nil, nil
}

now := tc.clock.Now()
t, e, err := timeLeft(logger, job, &now)
if err != nil {
return nil, err
}

// TTL has expired
if *t <= 0 {
return e, nil
}

tc.enqueueAfter(job, *t)
return nil, nil
}

REF:
1.https://kubernetes.io/docs/concepts/workloads/controllers/job/#clean-up-finished-jobs-automatically
2.pkg/controller/ttlafterfinished/ttlafterfinished_controller.go