k8s 中的taints机制

Taint 使用说明

给节点增加taint

1
2
3
4
5
kubectl taint nodes node1 key1=value1:NoSchedule
kubectl taint nodes node1 key1=value1:NoExcute
kubectl taint node node2 node.kubernetes.io/unschedulable=true:NoExecute

kubectl taint node node2 node.kubernetes.io/unschedulable- # 取消污点

设置节点不可调度并增加taint

1
2
3
4
5
6
7
kubectl cordon node2
kubectl taint node node2 node.kubernetes.io/unschedulable:NoExecute
kubectl taint node node2 node.kubernetes.io/unschedulable:NoSchedule

取消
kubectl uncordon node2
kubectl taint node node2 node.kubernetes.io/unschedulable-

在节点上增加taint后(即增加污点后),k8s调度时对无法容忍这些污点的Pod,不会将这些Pod 调度到这些节点, 对于taint为effect: NoExecute的污点,会交由taint_manager进行处理,即NoExecuteTaintManager,节点上的所有Pod 都会被污点管理器(taint_manager.go)计划删除。而在节点被认定为不可用状态到删除节点上的 Pod 之间是有一段时间的,这段时间被称为容忍度,即tolerationSeconds

1
2
3
4
5
6
7
8
9
tolerations:
- key: node.kubernetes.io/not-ready
operator: Exists
effect: NoExecute
tolerationSeconds: 180
- key: node.kubernetes.io/unreachable
operator: Exists
effect: NoExecute
tolerationSeconds: 180

我们在创建Pod时,有时候并没有配置tolerations,此时k8s 会自动为Pod增加两个默认的toleration,TaintNodeNotReady,TaintNodeUnreachable,参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
var (
defaultNotReadyTolerationSeconds = flag.Int64("default-not-ready-toleration-seconds", 300,
"Indicates the tolerationSeconds of the toleration for notReady:NoExecute"+
" that is added by default to every pod that does not already have such a toleration.")

defaultUnreachableTolerationSeconds = flag.Int64("default-unreachable-toleration-seconds", 300,
"Indicates the tolerationSeconds of the toleration for unreachable:NoExecute"+
" that is added by default to every pod that does not already have such a toleration.")

notReadyToleration = api.Toleration{
Key: v1.TaintNodeNotReady,
Operator: api.TolerationOpExists,
Effect: api.TaintEffectNoExecute,
TolerationSeconds: defaultNotReadyTolerationSeconds,
}

unreachableToleration = api.Toleration{
Key: v1.TaintNodeUnreachable,
Operator: api.TolerationOpExists,
Effect: api.TaintEffectNoExecute,
TolerationSeconds: defaultUnreachableTolerationSeconds,
}
)

....

for _, toleration := range tolerations {
if (toleration.Key == v1.TaintNodeNotReady || len(toleration.Key) == 0) &&
(toleration.Effect == api.TaintEffectNoExecute || len(toleration.Effect) == 0) {
toleratesNodeNotReady = true
}

if (toleration.Key == v1.TaintNodeUnreachable || len(toleration.Key) == 0) &&
(toleration.Effect == api.TaintEffectNoExecute || len(toleration.Effect) == 0) {
toleratesNodeUnreachable = true
}
}

if !toleratesNodeNotReady {
pod.Spec.Tolerations = append(pod.Spec.Tolerations, notReadyToleration)
}

if !toleratesNodeUnreachable {
pod.Spec.Tolerations = append(pod.Spec.Tolerations, unreachableToleration)
}
....

处理流程

NoExecuteTaintManager
处理节点node上taint的更新、删除事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
pkg/controller/nodelifecycle/scheduler/taint_manager.go:419
这里只处理节点NoExecute的taint,对于这种Taint,我们需要驱逐节点的Pod
klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()

....
在查询到节点的taint后,再查询节点上运行的Pod,此时会判断节点的taint是否为空,如果为空,则需要将之前的删除Pod事件取消掉,不再进行删除pod
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
for i := range pods {
tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}

....
取消Pod 的驱逐操作
func (tc *NoExecuteTaintManager) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
}

后续就是依次处理每个节点的Pod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  如果节点的pod的toleration 不满足节点的taint,则取消对pod的定时事件处理,立即将该pod删除,将pod 加入到删除队列
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
tc.cancelWorkWithEvent(podNamespacedName)
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
}


对比toleration time,如果超过时间,则删除pod的删除事件处理,立即执行删除pod 操作,将删除pod的操作加入执行队列
minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
if minTolerationTime < 0 {
klog.V(4).Infof("Current tolerations for %v tolerate forever, cancelling any scheduled deletion.", podNamespacedName.String())
tc.cancelWorkWithEvent(podNamespacedName)
return
}

startTime := now
triggerTime := startTime.Add(minTolerationTime)
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) ## 获得一个指定时间执行的worker
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) { //在容忍时间内,不触发删除pod
return
}
tc.cancelWorkWithEvent(podNamespacedName) ## 这里为什么要执行cancel 呢?当超过了执行时间,是不是应该立即删除?取消之后,再添加到taintEvictionQueue,删除时间是不是就延后了?答案:此时startTime 超过triggertime,在调用AddWork函数时,会直接触发删除操作
}
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
经过查看time——worker的代码,可以解释上述问题,在执行tc.taintEvictionQueue.AddWork 时,会调用CreateWorker,当fireAt 时间小于createAt 时间时,直接触发删除pod 的操作

func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker {
delay := fireAt.Sub(createdAt)
if delay <= 0 {
go f(args)
return nil
}
timer := time.AfterFunc(delay, func() { f(args) })
return &TimedWorker{
WorkItem: args,
CreatedAt: createdAt,
FireAt: fireAt,
Timer: timer,
}
}

taintEvictionQueue的处理函数:尝试删除pod,删除失败时,尝试5次,每一次sleep 0.01秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = c.CoreV1().Pods(ns).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}

timedworkerqueue,在执行workerfun(deletePodHandler)后,就直接把worker在队列中删除,不再执行了,删除失败了如何处理呢?等待后续的处理?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
err := q.workFunc(args)
q.Lock()
defer q.Unlock()
if err == nil {
// To avoid duplicated calls we keep the key in the queue, to prevent
// subsequent additions.
q.workers[key] = nil
} else {
delete(q.workers, key)
}
return err
}
}

频繁给节点增加污点删除污点的问题

现象

将一个Statefulset的Pod运行在node2,此时给节点打上污点,taint_manager 会驱逐节点的Pod

1
2
3
kubectl cordon node2
kubectl taint node node2 node.kubernetes.io/unschedulable:NoExecute
kubectl taint node node2 node.kubernetes.io/unschedulable:NoSchedule

再移除节点的污点,此时 该pod会重新运行在node2

1
2
kubectl uncordon node2
kubectl taint node node2 node.kubernetes.io/unschedulable-

给节点增加污点,Statefulset的Pod被删除,再执行移除节点污点的操作,POd正常运行在该节点,此时间隔比较短的实际执行增加污点的操作,此时会发现等待很长时间,Statefulset的Pod也不会被删除,如下所示
avatar

观察日志可以看到,当清楚节点污点时,controller-manager会将驱逐队列清空
Cancelling TimedWorkerQueue item…
avatar

日志中是执行了两边增加污点和删除污点的操作,删除污点时正常情况会有两条

1
timed_workers.go:129] Cancelling TimedWorkerQueue item 67a41957-f57d-4156-ae53-dc796a3c66f2/cn0r7j27908mp-0

此时只有1条,证明还没有删除 67a41957-f57d-4156-ae53-dc796a3c66f2/cn0r7j27908mp-0 对应的key,但是此时收到了增加污点的操作,由于队列中仍然还存在这个key,则没有执行CreateWorker的操作,导致Pod 一直存在,参考以下代码和日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
key := args.KeyFromWorkArgs()
klog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt)

q.Lock()
defer q.Unlock()
if _, exists := q.workers[key]; exists {
klog.Warningf("Trying to add already existing work for %+v. Skipping.", args)
return
}
worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
q.workers[key] = worker
}

我们可以看到日志有以下信息

1
2
3
4
5
I0102 07:42:44.106288       1 taint_manager.go:440] Updating known taints on node node2: [{node.kubernetes.io/unschedulable  NoExecute <nil>}]
I0102 07:42:44.106350 1 taint_manager.go:352] Not all taints are tolerated after update for Pod kube-system/whereabouts-tqxf6 on node2
I0102 07:42:44.106358 1 timed_workers.go:110] Adding TimedWorkerQueue item kube-system/whereabouts-tqxf6 at 2022-01-02 07:42:44.106354809 +0000 UTC m=+5343.566104243 to be fired at 2022-01-02 07:42:44.106354871 +0000 UTC m=+5343.566104306
I0102 07:42:44.106391 1 timed_workers.go:110] Adding TimedWorkerQueue item 67a41957-f57d-4156-ae53-dc796a3c66f2/cn0r7j27908mp-0 at 2022-01-02 07:42:44.10634238 +0000 UTC m=+5343.566091815 to be fired at 2022-01-02 07:43:14.10634238 +0000 UTC m=+5373.566091815
W0102 07:42:44.106398 1 timed_workers.go:115] Trying to add already existing work for &{NamespacedName:67a41957-f57d-4156-ae53-dc796a3c66f2/cn0r7j27908mp-0}. Skipping.

原因分析

  1. 执行操作间隔时间太短,以及taint—manager有多个worker处理该消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}

// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
}
}(stopCh)
  1. taint删除pod 成功时,taint 队列也未清除该pod名称
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(args *WorkArgs) error {
    return func(args *WorkArgs) error {
    err := q.workFunc(args)
    q.Lock()
    defer q.Unlock()
    if err == nil {
    // To avoid duplicated calls we keep the key in the queue, to prevent
    // subsequent additions.
    q.workers[key] = nil
    } else {
    delete(q.workers, key)
    }
    return err
    }
    }
  1. 进行删除Pod时,进行尝试删除,删除失败后只是返回error,将该pod从taint队列中移除,等待下一次处理。但是如果没有下一次的事件触发,Pod应该无法被删除了(只是猜测)