本文转自 Cylon 的笔记收藏册,原文:https://www.cnblogs.com/Cylon/p/16420543.html,版权归原作者所有。
在 Kubernetes 的 kube-controller-manager , kube-scheduler, 以及使用 Operator 的底层实现 controller-rumtime 都支持高可用系统中的 leader 选举,本文将以理解 controller-rumtime (底层的实现是 client-go) 中的 leader 选举以在 kubernetes controller 中是如何实现的。

Background

在运行 kube-controller-manager 时,是有一些参数提供给 cm 进行 leader 选举使用的,可以参考官方文档提供的 参数[1] 来了解相关参数。
--leader-elect                               Default: 
true
--leader-elect-renew-deadline duration       Default: 10s

--leader-elect-resource-lock string          Default: 
"leases"
--leader-elect-resource-name string       Default: 
"kube-controller-manager"
--leader-elect-resource-namespace string     Default: 
"kube-system"
--leader-elect-retry-period duration         Default: 2s

...

本身以为这些组件的选举动作时通过 etcd 进行的,但是后面对 controller-runtime 学习时,发现并没有配置其相关的 etcd 相关参数,这就引起了对选举机制的好奇。怀着这种好奇心搜索了下有关于 kubernetes 的选举,发现官网是这么介绍的,下面是对官方的说明进行一个通俗总结。simple leader election with kubernetes[2]
通过阅读文章得知,kubernetes API 提供了一中选举机制,只要运行在集群内的容器,都是可以实现选举功能的。
Kubernetes API 通过提供了两个属性来完成选举动作的
  • ResourceVersions:每个 API 对象唯一一个 ResourceVersion
  • Annotations:每个 API 对象都可以对这些 key 进行注释
注:这种选举会增加 APIServer 的压力。也就对 etcd 会产生影响
那么有了这些信息之后,我们来看一下,在 Kubernetes 集群中,谁是 cm 的 leader(我们提供的集群只有一个节点,所以本节点就是 leader)。
在 Kubernetes 中所有启用了 leader 选举的服务都会生成一个 EndPoint ,在这个 EndPoint 中会有上面提到的 label(Annotations)来标识谁是 leader。
$ kubectl get ep -n kube-system

NAME                      ENDPOINTS   AGE

kube-controller-manager   <none>      3d4h

kube-dns                              3d4h

kube-scheduler            <none>      3d4h

这里以 kube-controller-manager 为例,来看下这个 EndPoint 有什么信息
[root@master-machine ~]
# kubectl describe ep kube-controller-manager -n kube-system
Name:         kube-controller-manager

Namespace:    kube-system

Labels:       <none>

Annotations:  control-plane.alpha.kubernetes.io/leader:

                {
"holderIdentity"
:
"master-machine_06730140-a503-487d-850b-1fe1619f1fe1"
,
"leaseDurationSeconds"
:15,
"acquireTime"
:
"2022-06-27T15:30:46Z"
,
"re...

Subsets:

Events:

  Type    Reason          Age    From                     Message

  ----    ------          ----   ----                     -------

  Normal  LeaderElection  2d22h  kube-controller-manager  master-machine_76aabcb5-49ff-45ff-bd18-4afa61fbc5af became leader

  Normal  LeaderElection  9m     kube-controller-manager  master-machine_06730140-a503-487d-850b-1fe1619f1fe1 became leader

可以看出 Annotations: control-plane.alpha.kubernetes.io/leader: 标出了哪个 node 是 leader。

election in controller-runtime

controller-runtime 有关 leader 选举的部分在 pkg/leaderelection[3] 下面,总共 100 行代码,我们来看下做了些什么?
可以看到,这里只提供了创建资源锁的一些选项
type
 Options 
struct
 {

// 在manager启动时,决定是否进行选举
 LeaderElection 
bool
// 使用那种资源锁 默认为租用 lease
 LeaderElectionResourceLock 
string
// 选举发生的名称空间
 LeaderElectionNamespace 
string
// 该属性将决定持有leader锁资源的名称
 LeaderElectionID 
string
}

通过 NewResourceLock 可以看到,这里是走的 client-go/tools/leaderelection[4]下面,而这个 leaderelection 也有一个 example[5] 来学习如何使用它。
通过 example 可以看到,进入选举的入口是一个 RunOrDie() 的函数
// 这里使用了一个lease锁,注释中说愿意为集群中存在lease的监听较少
lock := &resourcelock.LeaseLock{

    LeaseMeta: metav1.ObjectMeta{

        Name:      leaseLockName,

        Namespace: leaseLockNamespace,

    },

    Client: client.CoordinationV1(),

    LockConfig: resourcelock.ResourceLockConfig{

        Identity: id,

    },

}


// 开启选举循环
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{

    Lock: lock,

// 这里必须保证拥有的租约在调用cancel()前终止,否则会仍有一个loop在运行
    ReleaseOnCancel: 
true
,

    LeaseDuration:   
60
 * time.Second,

    RenewDeadline:   
15
 * time.Second,

    RetryPeriod:     
5
 * time.Second,

    Callbacks: leaderelection.LeaderCallbacks{

        OnStartedLeading: 
func(ctx context.Context)
 {

// 这里填写你的代码,
// usually put your code
            run(ctx)

        },

        OnStoppedLeading: 
func()
 {

// 这里清理你的lease
            klog.Infof(
"leader lost: %s"
, id)

            os.Exit(
0
)

        },

        OnNewLeader: 
func(identity string)
 {

// we're notified when new leader elected
if
 identity == id {

// I just got the lock
return
            }

            klog.Infof(
"new leader elected: %s"
, identity)

        },

    },

})

到这里,我们了解了锁的概念和如何启动一个锁,下面看下,client-go 都提供了那些锁。
在代码 tools/leaderelection/resourcelock/interface.go[6] 定义了一个锁抽象,interface 提供了一个通用接口,用于锁定 leader 选举中使用的资源。
type
 Interface 
interface
 {

// Get 返回选举记录
 Get(ctx context.Context) (*LeaderElectionRecord, []
byte
, error)


// Create 创建一个LeaderElectionRecord
 Create(ctx context.Context, ler LeaderElectionRecord) error


// Update will update and existing LeaderElectionRecord
 Update(ctx context.Context, ler LeaderElectionRecord) error


// RecordEvent is used to record events
 RecordEvent(
string
)


// Identity 返回锁的标识
 Identity() 
string

// Describe is used to convert details on current resource lock into a string
 Describe() 
string
}

那么实现这个抽象接口的就是,实现的资源锁,我们可以看到,client-go 提供了四种资源锁
  • leaselock
  • configmaplock
  • multilock
  • endpointlock

leaselock

Lease 是 kubernetes 控制平面中的通过 ETCD 来实现的一个 Leases 的资源,主要为了提供分布式租约的一种控制机制。相关对这个 API 的描述可以参考于:Lease[7]
在 Kubernetes 集群中,我们可以使用如下命令来查看对应的 lease
$ kubectl get leases -A

NAMESPACE         NAME                      HOLDER                                                AGE

kube-node-lease   master-machine            master-machine                                        3d19h

kube-system       kube-controller-manager   master-machine_06730140-a503-487d-850b-1fe1619f1fe1   3d19h

kube-system       kube-scheduler            master-machine_1724e2d9-c19c-48d7-ae47-ee4217b27073   3d19h


$ kubectl describe leases kube-controller-manager -n kube-system

Name:         kube-controller-manager

Namespace:    kube-system

Labels:       <none>

Annotations:  <none>

API Version:  coordination.k8s.io/v1

Kind:         Lease

Metadata:

  Creation Timestamp:  2022-06-24T11:01:51Z

  Managed Fields:

    API Version:  coordination.k8s.io/v1

    Fields Type:  FieldsV1

    fieldsV1:

      f:spec:

        f:acquireTime:

        f:holderIdentity:

        f:leaseDurationSeconds:

        f:leaseTransitions:

        f:renewTime:

    Manager:         kube-controller-manager

    Operation:       Update

    Time:            2022-06-24T11:01:51Z

  Resource Version:  56012

  Self Link:         /apis/coordination.k8s.io/v1/namespaces/kube-system/leases/kube-controller-manager

  UID:               851a32d2-25dc-49b6-a3f7-7a76f152f071

Spec:

  Acquire Time:            2022-06-27T15:30:46.000000Z

  Holder Identity:         master-machine_06730140-a503-487d-850b-1fe1619f1fe1

  Lease Duration Seconds:  15

  Lease Transitions:       2

  Renew Time:              2022-06-28T06:09:26.837773Z

Events:                    <none>

下面来看下 leaselock 的实现,leaselock 会实现了作为资源锁的抽象
type
 LeaseLock 
struct
 {

// LeaseMeta 就是类似于其他资源类型的属性,包含name ns 以及其他关于lease的属性
 LeaseMeta  metav1.ObjectMeta

 Client     coordinationv1client.LeasesGetter 
// Client 就是提供了informer中的功能
// lockconfig包含上面通过 describe 看到的 Identity与recoder用于记录资源锁的更改
    LockConfig ResourceLockConfig

// lease 就是 API中的Lease资源,可以参考下上面给出的这个API的使用
 lease      *coordinationv1.Lease

}

下面来看下 leaselock 实现了那些方法?

Get

Get[8] 是从 spec 中返回选举的记录
func(ll *LeaseLock)Get(ctx context.Context)(*LeaderElectionRecord, []byte, error)
 {

var
 err error

 ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})

if
 err != 
nil
 {

returnnil
nil
, err

 }

 record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)

 recordByte, err := json.Marshal(*record)

if
 err != 
nil
 {

returnnil
nil
, err

 }

return
 record, recordByte, 
nil
}


// 可以看出是返回这个资源spec里面填充的值
funcLeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord
 {

var
 r LeaderElectionRecord

if
 spec.HolderIdentity != 
nil
 {

  r.HolderIdentity = *spec.HolderIdentity

 }

if
 spec.LeaseDurationSeconds != 
nil
 {

  r.LeaseDurationSeconds = 
int
(*spec.LeaseDurationSeconds)

 }

if
 spec.LeaseTransitions != 
nil
 {

  r.LeaderTransitions = 
int
(*spec.LeaseTransitions)

 }

if
 spec.AcquireTime != 
nil
 {

  r.AcquireTime = metav1.Time{spec.AcquireTime.Time}

 }

if
 spec.RenewTime != 
nil
 {

  r.RenewTime = metav1.Time{spec.RenewTime.Time}

 }

return
 &r

}

Create

Create[9] 是在 kubernetes 集群中尝试去创建一个租约,可以看到,Client 就是 API 提供的对应资源的 REST 客户端,结果会在 Kubernetes 集群中创建这个 Lease
func(ll *LeaseLock)Create(ctx context.Context, ler LeaderElectionRecord)error
 {

var
 err error

 ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{

  ObjectMeta: metav1.ObjectMeta{

   Name:      ll.LeaseMeta.Name,

   Namespace: ll.LeaseMeta.Namespace,

  },

  Spec: LeaderElectionRecordToLeaseSpec(&ler),

 }, metav1.CreateOptions{})

return
 err

}

Update

Update[10] 是更新 Lease 的 spec
func(ll *LeaseLock)Update(ctx context.Context, ler LeaderElectionRecord)error
 {

if
 ll.lease == 
nil
 {

return
 errors.New(
"lease not initialized, call get or create first"
)

 }

 ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)


 lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})

if
 err != 
nil
 {

return
 err

 }


 ll.lease = lease

returnnil
}

RecordEvent

RecordEvent[11] 是记录选举时出现的事件,这时候我们回到上部分 在 kubernetes 集群中查看 ep 的信息时可以看到的 event 中存在 became leader 的事件,这里就是将产生的这个 event 添加到 meta-data 中。
func(ll *LeaseLock)RecordEvent(s string)
 {

if
 ll.LockConfig.EventRecorder == 
nil
 {

return
   }

   events := fmt.Sprintf(
"%v %v"
, ll.LockConfig.Identity, s)

   subject := &coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}

// Populate the type meta, so we don't have to get it from the schema
   subject.Kind = 
"Lease"
   subject.APIVersion = coordinationv1.SchemeGroupVersion.String()

   ll.LockConfig.EventRecorder.Eventf(subject, corev1.EventTypeNormal, 
"LeaderElection"
, events)

}

到这里大致上了解了资源锁究竟是什么了,其他种类的资源锁也是相同的实现的方式,这里就不过多阐述了;下面的我们来看看选举的过程。

election workflow

选举的代码入口是在 leaderelection.go[12] ,这里会继续上面的 example 向下分析整个选举的过程。
前面我们看到了进入选举的入口是一个 RunOrDie()[13] 的函数,那么就继续从这里开始来了解。进入 RunOrDie,看到其实只有几行而已,大致上了解到了 RunOrDie 会使用提供的配置来启动选举的客户端,之后会阻塞,直到 ctx 退出,或停止持有 leader 的租约。
funcRunOrDie(ctx context.Context, lec LeaderElectionConfig)
 {

 le, err := NewLeaderElector(lec)

if
 err != 
nil
 {

panic
(err)

 }

if
 lec.WatchDog != 
nil
 {

  lec.WatchDog.SetLeaderElection(le)

 }

 le.Run(ctx)

}

下面看下 NewLeaderElector[14] 做了些什么?可以看到,LeaderElector 是一个结构体,这里只是创建他,这个结构体提供了我们选举中所需要的一切(LeaderElector 就是 RunOrDie 创建的选举客户端)。
funcNewLeaderElector(lec LeaderElectionConfig)(*LeaderElector, error)
 {

if
 lec.LeaseDuration <= lec.RenewDeadline {

returnnil
, fmt.Errorf(
"leaseDuration must be greater than renewDeadline"
)

 }

if
 lec.RenewDeadline <= time.Duration(JitterFactor*
float64
(lec.RetryPeriod)) {

returnnil
, fmt.Errorf(
"renewDeadline must be greater than retryPeriod*JitterFactor"
)

 }

if
 lec.LeaseDuration < 
1
 {

returnnil
, fmt.Errorf(
"leaseDuration must be greater than zero"
)

 }

if
 lec.RenewDeadline < 
1
 {

returnnil
, fmt.Errorf(
"renewDeadline must be greater than zero"
)

 }

if
 lec.RetryPeriod < 
1
 {

returnnil
, fmt.Errorf(
"retryPeriod must be greater than zero"
)

 }

if
 lec.Callbacks.OnStartedLeading == 
nil
 {

returnnil
, fmt.Errorf(
"OnStartedLeading callback must not be nil"
)

 }

if
 lec.Callbacks.OnStoppedLeading == 
nil
 {

returnnil
, fmt.Errorf(
"OnStoppedLeading callback must not be nil"
)

 }


if
 lec.Lock == 
nil
 {

returnnil
, fmt.Errorf(
"Lock must not be nil."
)

 }

 le := LeaderElector{

  config:  lec,

  clock:   clock.RealClock{},

  metrics: globalMetricsFactory.newLeaderMetrics(),

 }

 le.metrics.leaderOff(le.config.Name)

return
 &le, 
nil
}

LeaderElector[15] 是建立的选举客户端,
type
 LeaderElector 
struct
 {

 config LeaderElectionConfig 
// 这个的配置,包含一些时间参数,健康检查
// recoder相关属性
 observedRecord    rl.LeaderElectionRecord

 observedRawRecord []
byte
 observedTime      time.Time

// used to implement OnNewLeader(), may lag slightly from the
// value observedRecord.HolderIdentity if the transition has
// not yet been reported.
 reportedLeader 
string
// clock is wrapper around time to allow for less flaky testing
 clock clock.Clock

// 锁定 observedRecord
 observedRecordLock sync.Mutex

 metrics leaderMetricsAdapter

}

可以看到 Run 实现的选举逻辑就是在初始化客户端时传入的 三个 callback
func(le *LeaderElector)Run(ctx context.Context)
 {

defer
 runtime.HandleCrash()

deferfunc()
 { 
// 退出时执行callbacke的OnStoppedLeading
  le.config.Callbacks.OnStoppedLeading()

 }()


if
 !le.acquire(ctx) {

return
 }

 ctx, cancel := context.WithCancel(ctx)

defer
 cancel()

go
 le.config.Callbacks.OnStartedLeading(ctx) 
// 选举时,执行 OnStartedLeading
 le.renew(ctx)

}

在 Run 中调用了 acquire,这个是 通过一个 loop 去调用 tryAcquireOrRenew,直到 ctx 传递过来结束信号
func(le *LeaderElector)acquire(ctx context.Context)bool
 {

 ctx, cancel := context.WithCancel(ctx)

defer
 cancel()

 succeeded := 
false
 desc := le.config.Lock.Describe()

 klog.Infof(
"attempting to acquire leader lease %v..."
, desc)

// jitterUntil是执行定时的函数 func() 是定时任务的逻辑
// RetryPeriod是周期间隔
// JitterFactor 是重试系数,类似于延迟队列中的系数 (duration + maxFactor * duration)
// sliding 逻辑是否计算在时间内
// 上下文传递
 wait.JitterUntil(
func()
 {

  succeeded = le.tryAcquireOrRenew(ctx)

  le.maybeReportTransition()

if
 !succeeded {

   klog.V(
4
).Infof(
"failed to acquire lease %v"
, desc)

return
  }

  le.config.Lock.RecordEvent(
"became leader"
)

  le.metrics.leaderOn(le.config.Name)

  klog.Infof(
"successfully acquired lease %v"
, desc)

  cancel()

 }, le.config.RetryPeriod, JitterFactor, 
true
, ctx.Done())

return
 succeeded

}

这里实际上选举动作在 tryAcquireOrRenew 中,下面来看下 tryAcquireOrRenew;tryAcquireOrRenew 是尝试获得一个 leader 租约,如果已经获得到了,则更新租约;否则可以得到租约则为 true,反之 false
func(le *LeaderElector)tryAcquireOrRenew(ctx context.Context)bool
 {

 now := metav1.Now() 
// 时间
 leaderElectionRecord := rl.LeaderElectionRecord{ 
// 构建一个选举record
  HolderIdentity:       le.config.Lock.Identity(), 
// 选举人的身份特征,ep与主机名有关
  LeaseDurationSeconds: 
int
(le.config.LeaseDuration / time.Second), 
// 默认15s
  RenewTime:            now, 
// 重新获取时间
  AcquireTime:          now, 
// 获得时间
 }


// 1. 从API获取或创建一个recode,如果可以拿到则已经有租约,反之创建新租约
 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)

if
 err != 
nil
 {

if
 !errors.IsNotFound(err) {

   klog.Errorf(
"error retrieving resource lock %v: %v"
, le.config.Lock.Describe(), err)

returnfalse
  }

// 创建租约的动作就是新建一个对应的resource,这个lock就是leaderelection提供的四种锁,
// 看你在runOrDie中初始化传入了什么锁
if
 err = le.config.Lock.Create(ctx, leaderElectionRecord); err != 
nil
 {

   klog.Errorf(
"error initially creating leader election record: %v"
, err)

returnfalse
  }

// 到了这里就已经拿到或者创建了租约,然后记录其一些属性,LeaderElectionRecord
  le.setObservedRecord(&leaderElectionRecord)


returntrue
 }


// 2. 获取记录检查身份和时间
if
 !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {

  le.setObservedRecord(oldLeaderElectionRecord)


  le.observedRawRecord = oldLeaderElectionRawRecord

 }

iflen
(oldLeaderElectionRecord.HolderIdentity) > 
0
 &&

  le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&

  !le.IsLeader() { 
// 不是leader,进行HolderIdentity比较,再加上时间,这个时候没有到竞选其,跳出
  klog.V(
4
).Infof(
"lock is held by %v and has not yet expired"
, oldLeaderElectionRecord.HolderIdentity)

returnfalse
 }


// 3.我们将尝试更新。 在这里leaderElectionRecord设置为默认值。让我们在更新之前更正它。
if
 le.IsLeader() { 
// 到这就说明是leader,修正他的时间
  leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime

  leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions

 } 
else
 { 
// LeaderTransitions 就是指leader调整(转变为其他)了几次,如果是,
// 则为发生转变,保持原有值
// 反之,则+1
  leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 
1
 }

// 完事之后更新APIServer中的锁资源,也就是更新对应的资源的属性信息
if
 err = le.config.Lock.Update(ctx, leaderElectionRecord); err != 
nil
 {

  klog.Errorf(
"Failed to update lock: %v"
, err)

returnfalse
 }

// setObservedRecord 是通过一个新的record来更新这个锁中的record
// 操作是安全的,会上锁保证临界区仅可以被一个线程/进程操作
 le.setObservedRecord(&leaderElectionRecord)

returntrue
}

到这里,已经完整知道利用 kubernetes 进行选举的流程都是什么了;下面简单回顾下,上述 leader 选举所有的步骤:
  • 首选创建的服务就是该服务的 leader,锁可以为 lease , endpoint 等资源进行上锁
  • 已经是 leader 的实例会不断续租,租约的默认值是 15 秒 (leaseDuration);leader 在租约满时更新租约时间(renewTime)。
  • 其他的 follower,会不断检查对应资源锁的存在,如果已经有 leader,那么则检查 renewTime,如果超过了租用时间(),则表明 leader 存在问题需要重新启动选举,直到有 follower 提升为 leader。
  • 而为了避免资源被抢占,Kubernetes API 使用了 ResourceVersion 来避免被重复修改(如果版本号与请求版本号不一致,则表示已经被修改了,那么 APIServer 将返回错误)

利用 Leader 机制实现 HA 应用

下面就通过一个 example 来实现一个,利用 kubernetes 提供的选举机制完成的高可用应用。

代码实现

如果仅仅是使用 Kubernetes 中的锁,实现的代码也只有几行而已。
package
 main


import
 (

"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

 metav1 
"k8s.io/apimachinery/pkg/apis/meta/v1"
 clientset 
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
)


funcbuildConfig(kubeconfig string)(*rest.Config, error)
 {

if
 kubeconfig != 
""
 {

  cfg, err := clientcmd.BuildConfigFromFlags(
""
, kubeconfig)

if
 err != 
nil
 {

returnnil
, err

  }

return
 cfg, 
nil
 }


 cfg, err := rest.InClusterConfig()

if
 err != 
nil
 {

returnnil
, err

 }

return
 cfg, 
nil
}


funcmain()
 {

 klog.InitFlags(
nil
)


var
 kubeconfig 
string
var
 leaseLockName 
string
var
 leaseLockNamespace 
string
var
 id 
string
// 初始化客户端的部分
 flag.StringVar(&kubeconfig, 
"kubeconfig"
""
"absolute path to the kubeconfig file"
)

 flag.StringVar(&id, 
"id"
""
"the holder identity name"
)

 flag.StringVar(&leaseLockName, 
"lease-lock-name"
""
"the lease lock resource name"
)

 flag.StringVar(&leaseLockNamespace, 
"lease-lock-namespace"
""
"the lease lock resource namespace"
)

 flag.Parse()


if
 leaseLockName == 
""
 {

  klog.Fatal(
"unable to get lease lock resource name (missing lease-lock-name flag)."
)

 }

if
 leaseLockNamespace == 
""
 {

  klog.Fatal(
"unable to get lease lock resource namespace (missing lease-lock-namespace flag)."
)

 }

 config, err := buildConfig(kubeconfig)

if
 err != 
nil
 {

  klog.Fatal(err)

 }

 client := clientset.NewForConfigOrDie(config)


 run := 
func(ctx context.Context)
 {

// 实现的业务逻辑,这里仅仅为实验,就直接打印了
  klog.Info(
"Controller loop..."
)


for
 {

   fmt.Println(
"I am leader, I was working."
)

   time.Sleep(time.Second * 
5
)

  }

 }


// use a Go context so we can tell the leaderelection code when we
// want to step down
 ctx, cancel := context.WithCancel(context.Background())

defer
 cancel()


// 监听系统中断
 ch := 
make
(
chan
 os.Signal, 
1
)

 signal.Notify(ch, os.Interrupt, syscall.SIGTERM)

gofunc()
 {

  <-ch

  klog.Info(
"Received termination, signaling shutdown"
)

  cancel()

 }()


// 创建一个资源锁
 lock := &resourcelock.LeaseLock{

  LeaseMeta: metav1.ObjectMeta{

   Name:      leaseLockName,

   Namespace: leaseLockNamespace,

  },

  Client: client.CoordinationV1(),

  LockConfig: resourcelock.ResourceLockConfig{

   Identity: id,

  },

 }


// 开启一个选举的循环
 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{

  Lock:            lock,

  ReleaseOnCancel: 
true
,

  LeaseDuration:   
60
 * time.Second,

  RenewDeadline:   
15
 * time.Second,

  RetryPeriod:     
5
 * time.Second,

  Callbacks: leaderelection.LeaderCallbacks{

   OnStartedLeading: 
func(ctx context.Context)
 {

// 当选举为leader后所运行的业务逻辑
    run(ctx)

   },

   OnStoppedLeading: 
func()
 {

// we can do cleanup here
    klog.Infof(
"leader lost: %s"
, id)

    os.Exit(
0
)

   },

   OnNewLeader: 
func(identity string)
 { 
// 申请一个选举时的动作
if
 identity == id {

return
    }

    klog.Infof(
"new leader elected: %s"
, identity)

   },

  },

 })

}

注:这种 lease 锁只能在 in-cluster 模式下运行,如果需要类似二进制部署的程序,可以选择 endpoint 类型的资源锁。

生成镜像

这里已经制作好了镜像并上传到 dockerhub(cylonchau/leaderelection:v0.0.2)上了,如果只要学习运行原理,则忽略此步骤
FROM
 golang:alpine AS builder

MAINTAINER
 cylon

WORKDIR /election
COPY . /election
ENV
 GOPROXY https://goproxy.cn,direct

RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o elector main.go

FROM
 alpine AS runner

WORKDIR /go/elector
COPY --from=builder /election/elector .
VOLUME ["/election"]
ENTRYPOINT ["./elector"]

准备资源清单

默认情况下,Kubernetes 运行的 pod 在请求 Kubernetes 集群内资源时,默认的账户是没有权限的,默认服务帐户无权访问协调  API,因此我们需要创建另一个 serviceaccount 并相应地设置  对应的 RBAC 权限绑定;在清单中配置上这个 sa,此时所有的 pod 就会有协调锁的权限了。
apiVersion:v1
kind:ServiceAccount
metadata:
name:sa-leaderelection
---
apiVersion:rbac.authorization.k8s.io/v1
kind:Role
metadata:
name:leaderelection
rules:
-apiGroups:
-coordination.k8s.io
resources:
-leases
verbs:
-'*'
---
apiVersion:rbac.authorization.k8s.io/v1
kind:RoleBinding
metadata:
name:leaderelection
roleRef:
apiGroup:rbac.authorization.k8s.io
kind:Role
name:leaderelection
subjects:
-kind:ServiceAccount
name:sa-leaderelection
---
apiVersion:apps/v1
kind:Deployment
metadata:
labels:
app:leaderelection
name:leaderelection
namespace:default
spec:
replicas:3
selector:
matchLabels:
app:leaderelection
template:
metadata:
labels:
app:leaderelection
spec:
containers:
-image:cylonchau/leaderelection:v0.0.2
imagePullPolicy:IfNotPresent
command:["./elector"]
args:
-"-id=$(POD_NAME)"
-"-lease-lock-name=test"
-"-lease-lock-namespace=default"
env:
-name:POD_NAME
valueFrom:
fieldRef:
apiVersion:v1
fieldPath:metadata.name
name:elector
serviceAccountName:sa-leaderelection

集群中运行

执行完清单后,当 pod 启动后,可以看到会创建出一个 lease。

$ kubectl get lease

NAME   HOLDER                            AGE

test
   leaderelection-5644c5f84f-frs5n   1s



$ kubectl describe lease

Name:         
test
Namespace:    default

Labels:       <none>

Annotations:  <none>

API Version:  coordination.k8s.io/v1

Kind:         Lease

Metadata:

  Creation Timestamp:  2022-06-28T16:39:45Z

  Managed Fields:

    API Version:  coordination.k8s.io/v1

    Fields Type:  FieldsV1

    fieldsV1:

      f:spec:

        f:acquireTime:

        f:holderIdentity:

        f:leaseDurationSeconds:

        f:leaseTransitions:

        f:renewTime:

    Manager:         elector

    Operation:       Update

    Time:            2022-06-28T16:39:45Z

  Resource Version:  131693

  Self Link:         /apis/coordination.k8s.io/v1/namespaces/default/leases/
test
  UID:               bef2b164-a117-44bd-bad3-3e651c94c97b

Spec:

  Acquire Time:            2022-06-28T16:39:45.931873Z

  Holder Identity:         leaderelection-5644c5f84f-frs5n

  Lease Duration Seconds:  60

  Lease Transitions:       0

  Renew Time:              2022-06-28T16:39:55.963537Z

Events:                    <none>

通过其持有者的信息查看对应 pod(因为程序中对 holder Identity 设置的是 pod 的名称),实际上是工作的 pod。
如上实例所述,这是利用 Kubernetes 集群完成的 leader 选举的方案,虽然这不是最完美解决方案,但这是一种简单的方法,因为可以无需在集群上部署更多东西或者进行大量的代码工作就可以利用 Kubernetes 集群来实现一个高可用的 HA 应用。

引用链接

[1]
参数: https://kubernetes.io/docs/reference/command-line-tools-reference/kube-controller-manager/
[2]
simple leader election with kubernetes: https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
[3]
pkg/leaderelection: https://github.com/kubernetes-sigs/controller-runtime/tree/master/pkg/leaderelection
[4]
client-go/tools/leaderelection: https://github.com/kubernetes/client-go/tree/v0.24.0/tools/leaderelection
[5]
example: https://github.com/kubernetes/client-go/blob/v0.24.0/examples/leader-election/main.go
[6]
tools/leaderelection/resourcelock/interface.go: https://www.cnblogs.com/Cylon/p/tools/leaderelection/resourcelock/interface.go
[7]
Lease: https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/lease-v1/
[8]
Get: https://github.com/kubernetes/client-go/blob/cab7ba1d4a523956b6395dcbe38620159ac43fef/tools/leaderelection/resourcelock/leaselock.go#L41-L53
[9]
Create: https://github.com/kubernetes/client-go/blob/cab7ba1d4a523956b6395dcbe38620159ac43fef/tools/leaderelection/resourcelock/leaselock.go#L56-L66
[10]
Update: https://github.com/kubernetes/client-go/blob/cab7ba1d4a523956b6395dcbe38620159ac43fef/tools/leaderelection/resourcelock/leaselock.go#L69-L82
[11]
RecordEvent: https://github.com/kubernetes/client-go/blob/cab7ba1d4a523956b6395dcbe38620159ac43fef/tools/leaderelection/resourcelock/leaselock.go#L85-L95
[12]
leaderelection.go: https://github.com/kubernetes/client-go/blob/v0.24.0/tools/leaderelection/leaderelection.go
[13]
RunOrDie(): https://github.com/kubernetes/client-go/blob/cab7ba1d4a523956b6395dcbe38620159ac43fef/examples/leader-election/main.go#L122
[14]
NewLeaderElector: https://github.com/kubernetes/client-go/blob/cab7ba1d4a523956b6395dcbe38620159ac43fef/tools/leaderelection/leaderelection.go#L77-L110
[15]
LeaderElector: https://github.com/kubernetes/client-go/blob/cab7ba1d4a523956b6395dcbe38620159ac43fef/tools/leaderelection/leaderelection.go#L177-L195
继续阅读
阅读原文