之前写过用redis实现的分布式锁,这次用etcd来实现一个。

原理

首先获取一个etcd的租约,拿着这个租约用etcd的事务操作去设置一个key,如果设置成功,就表示抢到了锁,否则抢索失败。租约的作用就是实现抢到锁之后的释放功能,防止长期占用。
etcd的txn事务比较特别,是一个IF-THEN-ELSE的形式,其中IF接收的参数是一个比较操作。多说无益,看代码吧。

代码

package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"127.0.0.1:2379"},
		DialTimeout: 5 * time.Second,
	}
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Print(err)
	}
	lease := clientv3.NewLease(client)
	leaseResp, err := lease.Grant(context.TODO(), 5)
	if err != nil {
		fmt.Println(err)
	}
	leaseID := leaseResp.ID
	ctx, cancelFunc := context.WithCancel(context.TODO())

    //两个defer用于释放锁
	defer cancelFunc()
	defer lease.Revoke(context.TODO(), leaseID)
    
    //抢锁和占用期间,需要不停的续租,续租方法返回一个只读的channel
	keepChan, err := lease.KeepAlive(ctx, leaseID)
	if err != nil {
		fmt.Println(err)
	}
    //处理续租返回的信息
	go func() {
		for {
			select {
			case keepResp := <-keepChan:
				if keepChan == nil {
					fmt.Println("lease out")
					goto END
				} else {
					fmt.Println("get resp", keepResp.ID)
				}
			}
		}
	END:
	}()
	kv := clientv3.NewKV(client)
	txn := kv.Txn(context.TODO())
    //开始抢锁事务操作
	txn.If(clientv3.Compare(clientv3.CreateRevision("/lock/9"), "=", 0)).Then(clientv3.OpPut("/lock/9", "", clientv3.WithLease(leaseID))).Else(clientv3.OpGet("/lock/9"))
    //提交事务
	txnResp, err := txn.Commit()
    
	if err != nil {
		fmt.Println(err)
		return
	}
    //如果抢锁成功
	if txnResp.Succeeded {
		fmt.Println("success")
	} else {
		fmt.Println("fail")
	}
	time.Sleep(5 * time.Second)
}