zookeeper笔记

入门

概述

ZooKeeper 是 Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。

ZooKeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。

image-20210530224147096

特点

image-20210530224553485

  1. ZooKeeper 是由 1个 leader,多个 follower 组成的集群

  2. 集群中只要有 半数以上 节点存活,ZooKeeper 就能正常服务

  3. 全局数据一致,每个 Server 上保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的

  4. 更新请求顺序执行,来自同一个 Client 的更新请求按期发送顺序依次执行

  5. 数据更新具有 原子性,一次更新要么成功,要么失败

  6. 实时性,在一定时间范围内,Client 能读到最新数据

数据结构

ZooKeeper 数据模型的结构与 Unix文件系统 很类似,整体上可以看作一棵树,每个节点称作一个 ZNode,每个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。

image-20210530225138353

应用场景

ZooKeeper 提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

统一命名服务

在分布式环境下,经常需要对应用/服务进行统一命名,便于识别,例如:IP不好记,但是域名很好记

image-20210530225625548

统一配置管理

分布式环境下,配置文件同步非常常见

  • 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群
  • 对配置文件修改后,希望能够快速同步到各个节点上

配置管理可交由 ZooKeeper 来实现

  • 将配置信息写入 ZooKeeper 的一个 ZNode
  • 各个客户端服务器监听这个 Znode
  • 一旦 Znode 数据修改,ZooKeeper 将通知各个客户端服务器

image-20210530230000214

统一集群管理

分布式环境中,实时掌握每个节点的状态是表要的,可根据节点实时状态做一些调整,ZooKeeper 可实现实时监控节点状态变化:

  1. 可将节点信息写入 ZooKeeper 的一个 ZNode
  2. 监听这个 ZNode 可获取它的实时状态变化

image-20210530230442616

服务器动态上下线

image-20210530230607568

软负载均衡

在 ZooKeeper 中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。

image-20210530230708011

下载安装

ZooKeeper 集群有多种搭建方式,比如 多台虚拟机,本机搭建,docker搭建 等等,这里使用 docker-compose 来搭建 ZooKeeper 3.7.0 集群。

目录及配置

目录树如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
➜  zookeeper tree
.
├── docker-compose.yaml
├── zoo1
│   ├── data
│   │   ├── myid
│   │   └── version-2
│   │   ├── acceptedEpoch
│   │   ├── currentEpoch
│   │   ├── snapshot.0
│   │   └── snapshot.100000000
│   ├── datalog
│   │   └── version-2
│   └── logs
│   └── zookeeper_audit.log
├── zoo2
│   ├── data
│   │   ├── myid
│   │   └── version-2
│   │   ├── acceptedEpoch
│   │   ├── currentEpoch
│   │   └── snapshot.0
│   ├── datalog
│   │   └── version-2
│   └── logs
│   └── zookeeper_audit.log
└── zoo3
├── data
│   ├── myid
│   └── version-2
│   ├── acceptedEpoch
│   ├── currentEpoch
│   └── snapshot.0
├── datalog
│   └── version-2
└── logs
└── zookeeper_audit.log

docker-compose.yaml 的配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
version: '3.8'
services:
zoo1:
image: zookeeper:3.7.0
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- ./zoo1/data:/data
- ./zoo1/datalog:/datalog
- ./zoo1/logs:/logs

zoo2:
image: zookeeper:3.7.0
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- ./zoo2/data:/data
- ./zoo2/datalog:/datalog
- ./zoo2/logs:/logs

zoo3:
image: zookeeper:3.7.0
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
volumes:
- ./zoo3/data:/data
- ./zoo3/datalog:/datalog
- ./zoo3/logs:/logs

可执行文件

这里已经将每个节点的 /data, /datalog, /logs 三个目录全部映射到宿主机上,随便进入一个节点容器,查看 /apache-zookeeper-3.7.0-bin/bin 下的可执行文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
root@zoo1:/apache-zookeeper-3.7.0-bin/bin# ls -l
total 72
-rwxr-xr-x 1 zookeeper zookeeper 232 Mar 17 09:45 README.txt
-rwxr-xr-x 1 zookeeper zookeeper 2066 Mar 17 09:45 zkCleanup.sh
-rwxr-xr-x 1 zookeeper zookeeper 1158 Mar 17 09:45 zkCli.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1620 Mar 17 09:45 zkCli.sh
-rwxr-xr-x 1 zookeeper zookeeper 1843 Mar 17 09:45 zkEnv.cmd
-rwxr-xr-x 1 zookeeper zookeeper 3690 Mar 17 09:45 zkEnv.sh
-rwxr-xr-x 1 zookeeper zookeeper 4559 Mar 17 09:45 zkServer-initialize.sh
-rwxr-xr-x 1 zookeeper zookeeper 1286 Mar 17 09:45 zkServer.cmd
-rwxr-xr-x 1 zookeeper zookeeper 11561 Mar 17 09:45 zkServer.sh
-rwxr-xr-x 1 zookeeper zookeeper 988 Mar 17 09:45 zkSnapShotToolkit.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1377 Mar 17 09:45 zkSnapShotToolkit.sh
-rwxr-xr-x 1 zookeeper zookeeper 987 Mar 17 09:45 zkSnapshotComparer.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1374 Mar 17 09:45 zkSnapshotComparer.sh
-rwxr-xr-x 1 zookeeper zookeeper 996 Mar 17 09:45 zkTxnLogToolkit.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1385 Mar 17 09:45 zkTxnLogToolkit.sh

配置参数

先进入节点容器,打印配置信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
root@zoo1:/conf# cat /conf/zoo.cfg
dataDir=/data
dataLogDir=/datalog
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
standaloneEnabled=true
admin.enableServer=true
server.1=0.0.0.0:2888:3888;2181
server.2=zoo2:2888:3888;2181
server.3=zoo3:2888:3888;2181
  1. tickTime=2000:通信心跳数

    ZooKeeper 服务端之间以及服务端与客户端心跳时间,单位为毫秒,它是 ZooKeeper 使用的基本时间,ZooKeeper session 的超时时间为该数值的两倍,即 2*tickTime

    对应 docker-compose 的配置:ZOO_TICK_TIME,默认 2000 毫秒

  2. initLimit=5:LF 初始通信时限

    集群中的 Follower 与 Leader 之间初始连接时能容忍的最长时间

    对应 docker-compose 的配置:ZOO_INIT_LIMIT,默认 5*tickTime

  3. syncLimit=2:LF 同步通信时限

    Leader 与 Follower 之间的最大响应时限,如果超时,Leader 会 认为该 Follower 已经挂掉,会从服务端列表踢除该 Follower

    对应 docker-compose 的配置:ZOO_SYNC_LIMIT,默认 2*tickTime

  4. maxClientCnxns=60

    单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台ZK服务器之间的连接数限制,不是针对指定客户端IP,也不是ZK集群的连接数限制,也不是单台ZK对所有客户端的连接数限制。

    对应 docker-compose 的配置:ZOO_MAX_CLIENT_CNXNS,默认 60

  5. server.1=0.0.0.0:2888:3888;2181

    1: 节点id

    0.0.0.0: 节点IP

    2888: 节点与Leader通信的端口

    3888: 选举用的端口

    2181: 客户端访问端口

内部原理

选举机制

半数机制:集群中半数以上机器存活,则集群可用,所以 ZooKeeper 适合安装奇数台服务器。

ZooKeeper 虽然没有在配置文件中指定 master 和 slave,但是 ZooKeeper 工作时只会有一个 Leader,其余都是 Follower,Leader 是通过内部选举机制推选出来的。

假设有五台 ZooKeeper 服务器组成了集群,id分别是 1-5,都是新启动且没有历史数据,每台机器的数据量都一致,按照 id 顺序启动,如下图所示:

image-20210531000530712

选举过程如下:

  1. 服务器1启动,此时只有它一台机器在运行,它发出去的报文没有任何响应,所以它的选举状态一直是 LOOKING 状态。
  2. 服务器2启动,它与服务器1通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id值较大的服务器2 胜出,但是由于没有超过半数以上,选举无法完成,服务器1和服务器2的选举状态保持在 LOOKING
  3. 服务器3启动,发起一次选举,此时服务器1和服务器2都会选举3,服务器3的票数为2,超过一半,当选为 Leader,服务器1和2 的状态更改为 FOLLOWING,服务器3的状态更改为 LEADING

  4. 服务器4启动,发起一次选举,由于 1,2,3 的状态都不是 LOOKING,不会更改选票信息,交换选票结果为服务器3获得3票继续当选 Leader,服务器4的状态更改为 FOLLOWING

  5. 服务器5启动,选举流程和4一致。

节点类型

持久(Persistent)

客户端和服务端断开连接后,创建的节点不会删除

持久类节点分为两类:

  1. 持久化目录节点

  2. 持久化顺序编号目录节点

    客户端与 ZooKeeper 断开连接后,该节点依旧存在,只是ZooKeeper 会给该节点进行顺序编号

    创建 ZNode 时设置顺序标识,ZNode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。

    在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以根据顺序号推断事件的顺序。

临时(Ephemeral)

客户端和服务端断开连接后,创建的节点自动删除。

临时节点不能拥有子节点

临时类节点分为两类:

  1. 临时目录节点
  2. 临时顺序编号目录节wqeqwewq

image-20210531003113991

监听器原理

  1. 首先客户端开启一个 main 线程,在 main 线程中创建 ZooKeeper 客户端
  2. main 线程会创建两个线程,一个负责网络连接通信(connect),另一个负责监听(listener)
  3. 通过 connect 线程将注册的监听事件发送给 ZooKeeper
  4. 通过 ZooKeeper 的注册监听列表将注册的监听事件添加到列表中
  5. ZooKeeper 监听到有数据或路径变化,就会将消息发送给客户端的 listener 线程
  6. 客户端 listener 线程内部处理监听事件

image-20210531011335773

常见的监听有:

  1. config [-c] [-w] [-s]
  2. get [-s] [-w] path
  3. ls [-s] [-w] [-R] path
  4. stat [-w] path

写数据流程

  1. client 向 server 发送写请求,这个 server 不一定是 leader
  2. 如果 server 不是 leader,那么会把请求转发给 leader,leader 再把请求转发给所有 follower,各个 follower 写成功后会通知 leader
  3. 当 leader 收到大多数 server 写成功了,那么就说明数据写成功了,写成功后,leader 会告诉接收客户端请求的 server 数据写成功了
  4. 收到请求的 server 会将写入成功的消息再告诉对应的 client

image-20210531011939853

客户端操作

命令基本语法 功能描述
help 显示所有操作命令,实际使用会报 Command not found help,不过也会提示所有的命令
ls [-s] [-w] [-R] path 使用 ls 命令查看当前 znode 中的内容
create [-s] [-e] [-c] [-t ttl] path [data] [acl] 创建,-s:含有序列 -e:临时(重启或者超时会消失)
get [-s] [-w] path 获得节点的值
set [-s] [-v version] path data 设置节点的具体值
stat [-w] path 查看节点状态
delete [-v version] path 删除节点
deleteall path [-b batch size] 递归删除节点

启动客户端

进入容器内部执行 /apache-zookeeper-3.7.0-bin/bin/zkCli.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
➜  ~ docker exec -it 894493ffc262 bash
root@zoo1:/apache-zookeeper-3.7.0-bin# cd bin/
root@zoo1:/apache-zookeeper-3.7.0-bin/bin# ls -l
total 72
-rwxr-xr-x 1 zookeeper zookeeper 232 Mar 17 09:45 README.txt
-rwxr-xr-x 1 zookeeper zookeeper 2066 Mar 17 09:45 zkCleanup.sh
-rwxr-xr-x 1 zookeeper zookeeper 1158 Mar 17 09:45 zkCli.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1620 Mar 17 09:45 zkCli.sh
-rwxr-xr-x 1 zookeeper zookeeper 1843 Mar 17 09:45 zkEnv.cmd
-rwxr-xr-x 1 zookeeper zookeeper 3690 Mar 17 09:45 zkEnv.sh
-rwxr-xr-x 1 zookeeper zookeeper 4559 Mar 17 09:45 zkServer-initialize.sh
-rwxr-xr-x 1 zookeeper zookeeper 1286 Mar 17 09:45 zkServer.cmd
-rwxr-xr-x 1 zookeeper zookeeper 11561 Mar 17 09:45 zkServer.sh
-rwxr-xr-x 1 zookeeper zookeeper 988 Mar 17 09:45 zkSnapShotToolkit.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1377 Mar 17 09:45 zkSnapShotToolkit.sh
-rwxr-xr-x 1 zookeeper zookeeper 987 Mar 17 09:45 zkSnapshotComparer.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1374 Mar 17 09:45 zkSnapshotComparer.sh
-rwxr-xr-x 1 zookeeper zookeeper 996 Mar 17 09:45 zkTxnLogToolkit.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1385 Mar 17 09:45 zkTxnLogToolkit.sh
root@zoo1:/apache-zookeeper-3.7.0-bin/bin# zkCli.sh
2021-05-30 16:41:18,597 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.max=256MB
2021-05-30 16:41:18,597 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.total=64MB
2021-05-30 16:41:18,601 [myid:] - INFO [main:ZooKeeper@637] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@7946e1f4
2021-05-30 16:41:18,604 [myid:] - INFO [main:X509Util@77] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2021-05-30 16:41:18,609 [myid:] - INFO [main:ClientCnxnSocket@239] - jute.maxbuffer value is 1048575 Bytes
2021-05-30 16:41:18,616 [myid:] - INFO [main:ClientCnxn@1726] - zookeeper.request.timeout value is 0. feature enabled=false
Welcome to ZooKeeper!
2021-05-30 16:41:18,638 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1171] - Opening socket connection to server localhost/127.0.0.1:2181.
2021-05-30 16:41:18,640 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1173] - SASL config status: Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2021-05-30 16:41:18,652 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1005] - Socket connection established, initiating session, client: /127.0.0.1:42454, server: localhost/127.0.0.1:2181
2021-05-30 16:41:18,666 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1438] - Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x100000b1e710001, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]

help

实际使用会报 Command not found help,不过也会提示所有的命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port -client-configuration properties-file cmd args
addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path [-b batch size]
delquota [-n|-b|-N|-B] path
get [-s] [-w] path
getAcl [-s] path
getAllChildrenNumber path
getEphemerals path
history
listquota path
ls [-s] [-w] [-R] path
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b|-N|-B val path
stat [-w] path
sync path
version
whoami
Command not found: Command not found help

ls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[zk: localhost:2181(CONNECTED) 1] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 4] ls -s /
[zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 00:00:00 UTC 1970
mZxid = 0x0
mtime = Thu Jan 01 00:00:00 UTC 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[zk: localhost:2181(CONNECTED) 5] create /wcxst

WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/
Created /wcxst
[zk: localhost:2181(CONNECTED) 7] create /test "this is test"
Created /test
[zk: localhost:2181(CONNECTED) 8] get /test
this is test
[zk: localhost:2181(CONNECTED) 9] create -s /serial
Created /serial0000000002
[zk: localhost:2181(CONNECTED) 10] create -e /temp
Created /temp
[zk: localhost:2181(CONNECTED) 15] create /wcxst/blog "this is my blog"
Created /wcxst/blog
[zk: localhost:2181(CONNECTED) 15] create /wcxst/blog "this is my blog"
Created /wcxst/blog
[zk: localhost:2181(CONNECTED) 16] ls /wcxst
[blog]
[zk: localhost:2181(CONNECTED) 17] ls /wcxst/
Path must not end with / character
[zk: localhost:2181(CONNECTED) 18] get /wcxst/blog
this is my blog

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[zk: localhost:2181(CONNECTED) 18] get /wcxst/blog
this is my blog
[zk: localhost:2181(CONNECTED) 19] get -s /wcxst/blog
this is my blog
cZxid = 0x10000000c
ctime = Sun May 30 16:54:30 UTC 2021
mZxid = 0x10000000c
mtime = Sun May 30 16:54:30 UTC 2021
pZxid = 0x10000000c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 15
numChildren = 0

set

1
2
3
[zk: localhost:2181(CONNECTED) 20] set /wcxst/blog "change blog"
[zk: localhost:2181(CONNECTED) 21] get /wcxst/blog
change blog

stat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[zk: localhost:2181(CONNECTED) 22] stat /wcxst
cZxid = 0x100000008
ctime = Sun May 30 16:50:15 UTC 2021
mZxid = 0x100000008
mtime = Sun May 30 16:50:15 UTC 2021
pZxid = 0x10000000c
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: localhost:2181(CONNECTED) 23] stat /wcxst/blog
cZxid = 0x10000000c
ctime = Sun May 30 16:54:30 UTC 2021
mZxid = 0x10000000d
mtime = Sun May 30 16:56:47 UTC 2021
pZxid = 0x10000000c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
字段 说明
cZxid 创建znode的zxid
ctime 创建znode的时间,单位毫秒
mZxid 最近一次修改znode的zxid(创建、删除、set直系子节点、set自身节点都会计数)
mtime 最近一次修改znode的时间,单位毫秒
pZxid 最近一次修改子节点的zxid(创建、删除直系子节点都会计数,set子节点不会计数)
cversion 修改子节点的次数(创建、删除直系子节点都会计数,set子节点不会计数)
dataVersion 表示对该znode的数据所做的更改次数
aclVersion 表示对此znode的ACL进行更改的次数
ephemeralOwner 如果znode是ephemeral类型节点,则这是znode所有者的 session ID。 如果znode不是ephemeral节点,则该字段设置为零
dataLength znode数据字段的长度
numChildren 直系子节点的数量(不会递归计算孙节点)

delete

1
2
3
4
5
6
7
8
9
[zk: localhost:2181(CONNECTED) 24] create /wcxst/uid "this is my uid"
Created /wcxst/uid
[zk: localhost:2181(CONNECTED) 25] ls /wcxst
[blog, uid]
[zk: localhost:2181(CONNECTED) 26] delete /wcxst/uid
[zk: localhost:2181(CONNECTED) 27] ls /wcxst
[blog]
[zk: localhost:2181(CONNECTED) 28] delete /wcxst
Node not empty: /wcxst

deleteall

1
2
3
4
5
6
7
8
9
[zk: localhost:2181(CONNECTED) 27] ls /wcxst
[blog]
[zk: localhost:2181(CONNECTED) 28] delete /wcxst
Node not empty: /wcxst
[zk: localhost:2181(CONNECTED) 29] deleteall /wcxst
[zk: localhost:2181(CONNECTED) 31] ls /
[serial0000000002, temp, test, zookeeper]
[zk: localhost:2181(CONNECTED) 33] ls /wcxst
Node does not exist: /wcxst

Api 使用(Golang)

使用的库: github.com/samuel/go-zookeeper

文档:https://pkg.go.dev/github.com/samuel/go-zookeeper/zk#pkg-overview

常规功能

测试了以下功能:

  • 连接 ZooKeeper
  • 全局监听
  • 查看子节点列表
  • 判断节点是否存在以及监听事件
  • 创建四种节点
  • 获取节点详情
  • 删除节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main

import (
"encoding/json"
"fmt"
"time"

"github.com/samuel/go-zookeeper/zk"
)

const (
ZKFlagPersistent = 0 // 创建ZNode时的Flag:永久存储
ZKFlagEphemeral = 1 // 创建ZNode时的Flag:临时存储
ZKFlagPersistentSequence = 2 // 创建ZNode时的Flag:永久存储+序列编号
ZKFlagEphemeralSequence = 3 // 创建ZNode时的Flag:临时存储+序列编号
)

func main() {
// 连接 ZooKeeper 集群
hosts := []string{"127.0.0.1:2181", "127.0.0.1:2182", "127.0.0.1:2183"}
option := zk.WithEventCallback(
func(event zk.Event) {
b, _ := json.Marshal(event)
fmt.Printf("【all event】%s\n", b)
},
)
conn, _, err := zk.Connect(hosts, time.Second*5, option)
if err != nil {
fmt.Printf("【connect zookeeper server error】 %v\n", err)
return
}
defer conn.Close()

parentPath := "/golang"
persistentPath := fmt.Sprintf("%s/persistent", parentPath)
ephemeralPath := fmt.Sprintf("%s/ephemeral", parentPath)
persistentSequencePath := fmt.Sprintf("%s/persistent_sequence", parentPath)
ephemeralSequencePath := fmt.Sprintf("%s/ephemeral_sequence", parentPath)

// 查看根节点下子节点
nodes, stat, err := conn.Children("/")
fmt.Printf("【root children】 %+v, %+v, %v\n", nodes, stat, err)

acl := zk.WorldACL(zk.PermAll)
// 添加golang节点
_, _ = Create(conn, parentPath, "this is a golang znode", ZKFlagPersistent, acl)

// 添加永久存储节点
_, _ = Create(conn, persistentPath, "this is a persistent znode", ZKFlagPersistent, acl)
// 添加临时存储节点
_, _ = Create(conn, ephemeralPath, "this is a ephemeral znode", ZKFlagEphemeral, acl)
// 添加永久存储节点-序列化编号
_, _ = Create(
conn, persistentSequencePath, "this is a persistent sequence znode", ZKFlagPersistentSequence, acl,
)
// 添加临时存储节点-序列化编号
_, _ = Create(
conn, ephemeralSequencePath, "this is a ephemeral sequence znode", ZKFlagEphemeralSequence, acl,
)

// 查看golang节点下子节点
nodes, stat, err = conn.Children(parentPath)
fmt.Printf("【golang children】 %+v, %+v, %v\n", nodes, stat, err)

// 判断golang是否存在
res, stat, err := conn.Exists(parentPath)
fmt.Printf("【golang Exists】 %+v, %+v, %v\n", res, stat, err)

// 查看节点【/golang/persistent】详情
b, stat, err := conn.Get(persistentPath)
fmt.Printf("【get path [%s] info result】 %s, %+v, %v\n", persistentPath, b, stat, err)
// 设置节点【/golang/persistent】数据
stat, err = conn.Set(persistentPath, []byte("change data"), stat.Version)
fmt.Printf("【change path [%s] data result】 %+v, %v\n", persistentPath, stat, err)
// 查看节点【/golang/persistent】详情
b, stat, err = conn.Get(persistentPath)
fmt.Printf("【get path [%s] info result】 %s, %+v, %v\n", persistentPath, b, stat, err)
// 删除结点【/golang/persistent】
err = conn.Delete(persistentPath, stat.Version)
fmt.Printf("【delete path [%s] result】 %v\n", persistentPath, err)

// 查看golang节点下子节点
nodes, stat, err = conn.Children(parentPath)
fmt.Printf("【golang children】 %+v, %+v, %v\n", nodes, stat, err)

// 查看节点【/golang】详情
b, stat, err = conn.Get(parentPath)
fmt.Printf("【get path [%s] info result】 %s, %+v, %v\n", parentPath, b, stat, err)

// 删除golang节点及其子节点
err = conn.Delete("/golang", stat.Version)
fmt.Printf("【delete path [%s] result】 %v\n", parentPath, err)

// 查看根节点下子节点
nodes, stat, err = conn.Children("/")
fmt.Printf("【root children】 %+v, %+v, %v\n", nodes, stat, err)

// exists watch
existsPath := fmt.Sprintf("%s/exists", parentPath)
res, stat, cha, err := conn.ExistsW(existsPath)
go func(<-chan zk.Event) {
for v := range cha {
b, _ := json.Marshal(v)
fmt.Printf("【exists watch event】%s\n", b)
}
}(cha)
_, _ = Create(conn, existsPath, "this is exists data", ZKFlagPersistent, acl)
}

// ZooKeeper 连接
type ZKConn struct {
*zk.Conn
}

// 创建
func Create(conn *zk.Conn, path string, data string, flag int32, acl []zk.ACL) (string, error) {
res, err := conn.Create(path, []byte(data), flag, acl)
if err != nil {
fmt.Printf("【create znode [%s] error】 %v\n", path, err)
return "", err
}
fmt.Printf("【create znode [%s] success】 %v\n", path, res)
return res, err
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
【all event】{"Type":-1,"State":1,"Path":"","Err":null,"Server":"127.0.0.1:2181"}
【all event】{"Type":-1,"State":100,"Path":"","Err":null,"Server":"127.0.0.1:2181"}
2021/05/31 13:25:17 Connected to 127.0.0.1:2181
【all event】{"Type":-1,"State":101,"Path":"","Err":null,"Server":"127.0.0.1:2181"}
2021/05/31 13:25:17 authenticated: id=72057641793290252, timeout=5000
2021/05/31 13:25:17 re-submitting `0` credentials after reconnect
【root children】 [zookeeper], &{Czxid:0 Mzxid:0 Ctime:0 Mtime:0 Version:0 Cversion:25 Aversion:0 EphemeralOwner:0 DataLength:0 NumChildren:1 Pzxid:4294967385}, <nil>
【create znode [/golang] success】 /golang
【create znode [/golang/persistent] success】 /golang/persistent
【create znode [/golang/ephemeral] success】 /golang/ephemeral
【create znode [/golang/persistent_sequence] success】 /golang/persistent_sequence0000000002
【create znode [/golang/ephemeral_sequence] success】 /golang/ephemeral_sequence0000000003
【golang children】 [ephemeral persistent_sequence0000000002 persistent ephemeral_sequence0000000003], &{Czxid:4294967387 Mzxid:4294967387 Ctime:1622438717117 Mtime:1622438717117 Version:0 Cversion:4 Aversion:0 EemeralOwner:0 DataLength:22 NumChildren:4 Pzxid:4294967391}, <nil>
【golang Exists】 true, &{Czxid:4294967387 Mzxid:4294967387 Ctime:1622438717117 Mtime:1622438717117 Version:0 Cversion:4 Aversion:0 EphemeralOwner:0 DataLength:22 NumChildren:4 Pzxid:4294967391}, <nil>
【get path [/golang/persistent] info result】 this is a persistent znode, &{Czxid:4294967388 Mzxid:4294967388 Ctime:1622438717126 Mtime:1622438717126 Version:0 Cversion:0 Aversion:0 EphemeralOwner:0 DataLength:26umChildren:0 Pzxid:4294967388}, <nil>
【change path [/golang/persistent] data result】 &{Czxid:4294967388 Mzxid:4294967392 Ctime:1622438717126 Mtime:1622438717211 Version:1 Cversion:0 Aversion:0 EphemeralOwner:0 DataLength:11 NumChildren:0 Pzxid:42947388}, <nil>
【get path [/golang/persistent] info result】 change data, &{Czxid:4294967388 Mzxid:4294967392 Ctime:1622438717126 Mtime:1622438717211 Version:1 Cversion:0 Aversion:0 EphemeralOwner:0 DataLength:11 NumChildren:0 xid:4294967388}, <nil>
【delete path [/golang/persistent] result】 <nil>
【golang children】 [ephemeral persistent_sequence0000000002 ephemeral_sequence0000000003], &{Czxid:4294967387 Mzxid:4294967387 Ctime:1622438717117 Mtime:1622438717117 Version:0 Cversion:5 Aversion:0 EphemeralOwn:0 DataLength:22 NumChildren:3 Pzxid:4294967393}, <nil>
【get path [/golang] info result】 this is a golang znode, &{Czxid:4294967387 Mzxid:4294967387 Ctime:1622438717117 Mtime:1622438717117 Version:0 Cversion:5 Aversion:0 EphemeralOwner:0 DataLength:22 NumChildren:3 xid:4294967393}, <nil>
【delete path [/golang] result】 zk: node has children
【root children】 [zookeeper golang], &{Czxid:0 Mzxid:0 Ctime:0 Mtime:0 Version:0 Cversion:26 Aversion:0 EphemeralOwner:0 DataLength:0 NumChildren:2 Pzxid:4294967387}, <nil>
【all event】{"Type":1,"State":3,"Path":"/golang/exists","Err":null,"Server":""}
【create znode [/golang/exists] success】 /golang/exists
【exists watch event】{"Type":1,"State":3,"Path":"/golang/exists","Err":null,"Server":""}
2021/05/31 13:25:17 recv loop terminated: err=EOF
2021/05/31 13:25:17 send loop terminated: err=<nil>

模拟监听服务器动态上下线

这个是模拟业务服务端动态上下线,客户端来做动态的监听响应,这里的服务端与客户端,本质上都是 ZooKeeper 的客户端。

  • ZooKeeper 集群:三台机器,访问地址分别为 127.0.0.1:2181、127.0.0.1:2182、127.0.0.1:2183

  • 业务服务端:负责维护 ZooKeeper 该业务特定的 ZNode 数据,创建的节点数据为临时序列类型,存储的数据为服务的IP

    三个服务端的IP:172.10.01.11、172.10.01.12、172.10.01.13

  • 客户服务端:启动时会连接 ZooKeeper, 之后获取服务端列表信息,并启动监听,每次事件通知过来时都会重新获取服务端信息

    三个客户端的IP:172.10.02.11、172.10.02.12、172.10.02.13

模拟流程
  1. 检测业务服务端的根结点是否存在

    启动业务服务端节点之前,先检测业务服务的根结点是否存在,不存在需要创建业务服务的根结点,需要注意,根结点需要是永久存储类型,因为临时节点不能拥有子节点,并且这里的业务服务端根结点也不能有序列号

  2. 使用 goroutine 启动业务服务端

    启动时设置 host 为 172.10.02.11 的服务端不是持续运行,而是在运行 10 秒后自动停止,用于模拟所有客户端启动成功后,某一台服务宕机

  3. 休息3秒,保证业务服务端都启动成功

  4. 使用 goroutine 启动业务客户端

  5. 业务服务端 172.10.01.11 启动 10 秒后会停止运行,此时可以观察客户端是否监听到并正确处理监听事件

  6. 休息20秒,保证业务客户端都正确处理了监听事件后,再启动 服务端 172.10.01.11,模拟服务端上线

代码结构
1
2
3
4
5
6
7
8
9
➜  distribute tree
.
├── client
│   └── client.go
├── common
│   └── common.go
├── main.go
└── server
└── server.go

总进程代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"time"

"github.com/samuel/go-zookeeper/zk"
"gitlab.wcxst.com/jormin/gohelper"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/client"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/common"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/server"
)

func main() {
// 模拟监听服务器动态上下线
// ZooKeeper 集群:三台机器,访问地址分别为 127.0.0.1:2181、127.0.0.1:2182、127.0.0.1:2183
// 服务端与客户端:本质上都是 ZooKeeper 的客户端
// 三个以业务服务端身份运行,负责维护 ZooKeeper 该业务特定的 ZNode 数据,创建的节点数据为临时序列类型,存储的数据为服务的IP
// 三个以业务客户端身份运行,负责获取并监听 ZooKeeper 该业务特定的 ZNode 数据
// 客户端启动时会连接 ZooKeeper, 之后获取服务端列表信息,并启动监听,每次事件通知过来时都会重新获取服务端信息
// 假定三个模拟服务的IP分别是 172.10.01.11、172.10.01.12、172.10.01.13
// 假定三个模拟客户端的IP分别是 172.10.02.11、172.10.02.12、172.10.02.13

// 启动业务服务端节点之前,先检测业务服务的根结点是否存在,不存在需要创建业务服务的根结点,需要注意,根结点需要是永久存储类型,因为临时节点不能拥有子节点,并且不能有序列号
conn, _, err := common.GetConn()
gohelper.Must(err)
exists, _, err := conn.Exists(common.ServerRootPath)
gohelper.Must(err)
if !exists {
_, err = conn.Create(
common.ServerRootPath, []byte("server root znode"), common.ZKFlagPersistent, zk.WorldACL(zk.PermAll),
)
gohelper.Must(err)
}

// 使用 goroutine 启动业务服务端
serverHosts := []string{"172.10.01.11", "172.10.01.12", "172.10.01.13"}
for _, host := range serverHosts {
// 服务端 172.10.01.11 第一次启动时设置不持续在线,用于模拟服务器宕机
alwaysOnline := true
if host == "172.10.01.11" {
alwaysOnline = false
}
go server.Run(host, alwaysOnline)
}

// 休息3秒,保证业务服务端都启动成功
time.Sleep(time.Second * 3)

// 使用 goroutine 启动业务客户端
clientHosts := []string{"172.10.02.11", "172.10.02.12", "172.10.02.13"}
for _, host := range clientHosts {
go client.Run(host)
}

// 业务服务端 172.10.01.11 启动 10 秒后会停止运行,此时可以观察客户端是否监听到并正确处理监听事件

// 休息20秒,保证业务客户端都正确处理了监听事件后,再启动 服务端 172.10.01.11,模拟服务端上线
time.Sleep(time.Second * 20)

go server.Run("172.10.01.11", true)

// 保持进程一直运行
select {}
}

公共库代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package common

import (
"time"

"github.com/samuel/go-zookeeper/zk"
)

const (
ZKFlagPersistent = 0 // 创建ZNode时的Flag:永久存储
ZKFlagEphemeral = 1 // 创建ZNode时的Flag:临时存储
ZKFlagPersistentSequence = 2 // 创建ZNode时的Flag:永久存储+序列编号
ZKFlagEphemeralSequence = 3 // 创建ZNode时的Flag:临时存储+序列编号
)

const ServerRootPath = "/server" // 业务服务根节点

// 获取连接
func GetConn() (*zk.Conn, <-chan zk.Event, error) {
// 连接 ZooKeeper 集群
hosts := []string{"127.0.0.1:2181", "127.0.0.1:2182", "127.0.0.1:2183"}
return zk.Connect(hosts, time.Second*5)
}

服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package server

import (
"fmt"
"time"

"github.com/samuel/go-zookeeper/zk"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/common"
)

var conn *zk.Conn

// 运行
func Run(host string, alwaysOnline bool) {
var err error
// 连接 ZooKeeper
conn, _, err = common.GetConn()
if err != nil {
fmt.Printf("【server %s】get zookeeper connection error: %v", host, err)
return
}
defer conn.Close()
// 注册服务
_, _ = RegisterServer(host)
// 根据参数判断进程服务一直在线
if alwaysOnline {
// 保持进程在线,模拟业务运行
select {}
} else {
// 不是一直在线,则运行 10 秒后自动停止,用于模拟所有客户端启动成功后,某一台服务宕机
time.Sleep(time.Second * 10)
fmt.Printf("【server %s】server off line\n", host)
}
}

// 注册服务,需要注意,该类型服务创建的节点是临时类型,这样当服务宕机的时候会自动删除该服务节点信息
func RegisterServer(host string) (string, error) {
res, err := conn.Create(
fmt.Sprintf("%s/server", common.ServerRootPath), []byte(host), common.ZKFlagEphemeralSequence,
zk.WorldACL(zk.PermAll),
)
if err == nil {
fmt.Printf("【server %s】register server success, znode path: %v\n", host, res)
} else {
fmt.Printf("【server %s】register server error: %v\n", host, err)
}
return res, err
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package client

import (
"fmt"

"github.com/samuel/go-zookeeper/zk"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/common"
)

var conn *zk.Conn

// 运行
func Run(host string) {
var err error
// 连接 ZooKeeper
conn, _, err = common.GetConn()
if err != nil {
fmt.Printf("【client %s】get zookeeper connection error: %v", host, err)
return
}
defer conn.Close()
// 获取服务列表
_, _ = GetServerList(host)
// 保持进程在线,模拟业务运行
select {}
}

// 获取服务列表并注册监听
func GetServerList(host string) ([]string, error) {
res, _, event, err := conn.ChildrenW(common.ServerRootPath)
if err != nil {
fmt.Printf("【client %s】get server list error: %v\n", host, err)
return nil, err
}

var servers []string
for _, s := range res {
b, _, _ := conn.Get(fmt.Sprintf("%s/%s", common.ServerRootPath, s))
servers = append(servers, fmt.Sprintf("%s", b))
}
fmt.Printf("【client %s】get server list success: %v\n", host, servers)
// 监听
go func(<-chan zk.Event) {
for _ = range event {
fmt.Printf("【client %s】server changed!\n", host)
_, _ = GetServerList(host)
}
}(event)
return servers, nil
}
测试结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2021/05/31 15:56:46 Connected to 127.0.0.1:2181
2021/05/31 15:56:46 authenticated: id=72057641793290322, timeout=5000
2021/05/31 15:56:46 re-submitting `0` credentials after reconnect
2021/05/31 15:56:46 Connected to 127.0.0.1:2182
2021/05/31 15:56:46 Connected to 127.0.0.1:2182
2021/05/31 15:56:46 Connected to 127.0.0.1:2183
2021/05/31 15:56:46 authenticated: id=216172829808263224, timeout=5000
2021/05/31 15:56:46 re-submitting `0` credentials after reconnect
2021/05/31 15:56:46 authenticated: id=144115235768434755, timeout=5000
2021/05/31 15:56:46 re-submitting `0` credentials after reconnect
2021/05/31 15:56:46 authenticated: id=144115235768434756, timeout=5000
2021/05/31 15:56:46 re-submitting `0` credentials after reconnect
【server 172.10.01.13】register server success, znode path: /server/server0000000091
【server 172.10.01.11】register server success, znode path: /server/server0000000092
【server 172.10.01.12】register server success, znode path: /server/server0000000093
2021/05/31 15:56:49 Connected to 127.0.0.1:2183
2021/05/31 15:56:49 Connected to 127.0.0.1:2181
2021/05/31 15:56:49 Connected to 127.0.0.1:2181
2021/05/31 15:56:49 authenticated: id=216172829808263225, timeout=5000
2021/05/31 15:56:49 re-submitting `0` credentials after reconnect
2021/05/31 15:56:49 authenticated: id=72057641793290323, timeout=5000
2021/05/31 15:56:49 re-submitting `0` credentials after reconnect
2021/05/31 15:56:49 authenticated: id=72057641793290324, timeout=5000
2021/05/31 15:56:49 re-submitting `0` credentials after reconnect
【client 172.10.02.11】get server list success: [172.10.01.12 172.10.01.11 172.10.01.13]
【client 172.10.02.13】get server list success: [172.10.01.12 172.10.01.11 172.10.01.13]
【client 172.10.02.12】get server list success: [172.10.01.12 172.10.01.11 172.10.01.13]
【server 172.10.01.11】server off line
【client 172.10.02.11】server changed!
【client 172.10.02.13】server changed!
【client 172.10.02.12】server changed!
2021/05/31 15:56:56 recv loop terminated: err=EOF
2021/05/31 15:56:56 send loop terminated: err=<nil>
【client 172.10.02.13】get server list success: [172.10.01.12 172.10.01.13]
【client 172.10.02.11】get server list success: [172.10.01.12 172.10.01.13]
【client 172.10.02.12】get server list success: [172.10.01.12 172.10.01.13]
2021/05/31 15:57:09 Connected to 127.0.0.1:2181
2021/05/31 15:57:09 authenticated: id=72057641793290325, timeout=5000
2021/05/31 15:57:09 re-submitting `0` credentials after reconnect
【server 172.10.01.11】register server success, znode path: /server/server0000000094
【client 172.10.02.11】server changed!
【client 172.10.02.12】server changed!
【client 172.10.02.13】server changed!
【client 172.10.02.13】get server list success: [172.10.01.11 172.10.01.12 172.10.01.13]
【client 172.10.02.11】get server list success: [172.10.01.11 172.10.01.12 172.10.01.13]
【client 172.10.02.12】get server list success: [172.10.01.11 172.10.01.12 172.10.01.13]

本文作者:Jormin
本文地址https://blog.lerzen.com/zookeeper笔记/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!

----- 到这结束咯 感谢您的阅读 -----