zookeeper补充
Additional zookeeper
Published: 2021-06-09

对于zookeeper的一些补充,可结合课件zookeeper.pdf一起查阅

结点

临时结点

  • 临时结点(包括临时顺序结点)在执行quit命令之后将会被删除(临时结点会在会话过期之后被删除,但在会话中断时不会被删除,也就是说ctrl c命令强行中断会话不会删除临时结点)
  • 临时结点无法创建子节点

持久顺序结点或临时顺序结点

这里主要补充说明顺序的含义:

首先在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建结点过程中,ZK会自动为给定节点名加上一个数字后缀作为新的节点名。这个数字后缀的范围是整形的最大值

使用docker部署zookeeper

$ docker pull zookeeper:3.4.14
$ docker run --name zk -p 2181:2181 -d zookeeper:3.4.14

监听

在shell中,ZK的监听是一次性的,监听过一次之后第二次就不会再监听了

但是在Java中,ZK的监听是无限次数的

Java操作ZK

创建项目引入依赖:

<dependency>
	<groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

具体操作:

// junit测试:
private ZKClient zkClient;
/**
 * 获取ZK客户端连接
 */
@Before
public void before(){
    // 参数1:服务器的ip和端口
    // 参数2:会话的超时时间
    // 参数3:会话的连接时间
    // 参数4:序列化方式
    //// 单机模式
    zkClient = new zkClient("192.168.28.132:2181", 30000, 60000, new SerializableSerializer());
    //// 集群模式
    zkClient = new zkClient("192.168.28.132:2181,192.168.28.132:2182,192.168.28.132:2183", 30000, 60000, new SerializableSerializer());
}

// 在ZK创建结点
@Test
public void testCreateNode(){
    // 1.持久结点
    zkClient.create("/node1", "qyf", CreateMode.PERSISTENT);
    // 2.持久顺序结点
    zkClient.create("/node1/names", "qianyifeng", CreateMode.PERSISTENT_SEQUENTIAL);
    // 3.临时结点
    zkClient.create("/node1/lists", "qq", CreateMode.EPHEMERAL);
    // 4.临时顺序结点
    zkClient.create("/node1/lists11", "qyftemp", CreateMode.EPHEMERAL_SEQUENTIAL);
}
// 删除结点
@Test
public void testDeleteNode(){
    // 删除没有子节点的结点 返回值:是否删除成功
    boolean delete = zkClient.delete("/node1");
    // 递归删除结点信息 返回值:是否删除成功
    boolean recursive = zkClient.deleteRecursive("/node1");
}
// 查看结点的子节点
@Test
public void testFindNodes(){
    // 获取指定路径的结点信息 
    // 返回值为当前节点的子节点信息
    List<String> children = zkClient.getChildren("/");
    children.forEach(System.out::println);
}
// 获取某个结点数据  
// 注意:通过Java客户端操作需要保证节点存储的数据和获取结点时数据序列化方式必须一致(Java是针对对象的序列化和反序列化,所以如果序列化的时候是按照字符串方式序列化,而读取的时候,也就是反序列化的时候按照对象的方式反序列化,那必然就会报错。解决方法就是如果要用Java去读,那就最好用Java去写而不要用shell的方式去写)
@Test
public void testFindNodeData(){
    Object readData = zkClient.readData("/node3");
    System.out.println(readData);
}
// 获取数据以及当前结点的状态信息
@Test
public void testFindNodeDataAndStat(){
    Stat stat = new Stat();
    Object readData = zkClient.readData("/node0000000024", stat);
    System.out.println(readData);
    System.out.println(stat);
    System.out.println(stat.getCversion());
    System.out.println(stat.getCtime());
    System.out.println(stat.getCzxid());
    // ... 只要能在shell中的stat命令能看到的状态信息都可以用Java的方式去得到
}
// 修改结点数据
@Test
public void testWriteData(){
    User user = new User();
    user.setId(1);
    user.setName("消遣");
    user.setAge(23);
    user.setBir(new Date());
    zkClient.writeData("/node1", user);
    User o = zkClient.readData("/node1");
    System.out.println(o);
}
// 监听结点数据的变化
// 需要注意的是,Java中设置的监听必须由Java来执行具体的操作这些操作才能被Java监听到,如果是用shell以命令方式执行具体操作则监听不到
@Test
public void testOnNodeDataChange() throws IOException{
    zkClient.subscribeDataChanges("/node00000000024", new IZkDataListener(){
        // 当结点的值在修改时,会自动调用这个方法将当前修改节点的名字和节点变化之后的数据传递给方法
        public void handleDataChange(String nodeName, Object result) throws Exception{
            System.out.println(nodeName);
            System.out.println(result);
        }
        // 当结点的值被删除的时候,会自动调用这个方法,会将节点的名字以参数形式传递给方法
        public void handleDataDeleted(String nodeName) throws Exception{
            System.out.println("结点名字:" + nodeName);
        }
    });
    // 阻塞客户端,如果不阻塞,程序就会直接执行完毕,读不到监听到的内容。这里我们用阻塞输入流的方式将监听到的内容打印到控制台
    System.in.read();
}
// 监听结点目录的变化
@Test
public void testOnNodesChange() throws IOException{
    zkClient.subscribeChildChanges("/node0000000024", new IZkChildListener(){
        // 当前结点发生变化时,会自动调用这个方法
        // 参数1:父节点名称
        // 参数2:父节点中的所有子节点名称
        public void handleChileChange(String nodeName, List<String> list) throws Exception{
            System.out.println("父节点名称:" + nodeName);
            System.out.println("发生变更后结点的孩子结点名称:");
            list.forEach(System.out::println);
        }
    });
    // 阻塞客户端,如果不阻塞,程序就会直接执行完毕,读不到监听到的内容。这里我们用阻塞输入流的方式将监听到的内容打印到控制台
    System.in.read();
}

/**
 * 关闭资源
 */
@After
public void after(){
    zkClinet.close();
}

集群搭建

参考:https://blog.csdn.net/A_Little_Fish_/article/details/117384614(zookeeper集群的安装部署(详细步骤))、https://zookeeper.apache.org/doc/r3.7.0/zookeeperStarted.html(官方文档)

大致说一下:

  • 集群中机器数量最好是奇数

  • 首先需要创建三个文件夹zkdata1zkdata2zkdata3,每一个文件夹下创建myid文件,里面的内容分别为1、2、3

  • 在ZK的conf目录下创建三个ZK配置文件:zoo1.cfgzoo2.cfgzoo3.cfg

    • zoo1.cfg

      tickTime=2000
      initLimit=10
      syncLimit=5
      dataDir=/root/zkdata1
      clientPort=3001
      server.1=10.15.0.5:3002:3003 // 说明一下这里的3002是集群之间做原子广播的通讯端口,3003是集群之间做心跳检查的通讯端口
      server.2=10.15.0.5:4002:4003
      server.3=10.15.0.5:5002:5003
      
    • zoo2.cfg

      tickTime=2000
      initLimit=10
      syncLimit=5
      dataDir=/root/zkdata2
      clientPort=4001
      server.1=10.15.0.5:3002:3003 // 说明一下这里的3002是集群之间做原子广播的通讯端口,3003是集群之间做心跳检查的通讯端口
      server.2=10.15.0.5:4002:4003
      server.3=10.15.0.5:5002:5003
      
    • zoo3.cfg

      tickTime=2000
      initLimit=10
      syncLimit=5
      dataDir=/root/zkdata3
      clientPort=5001
      server.1=10.15.0.5:3002:3003 // 说明一下这里的3002是集群之间做原子广播的通讯端口,3003是集群之间做心跳检查的通讯端口
      server.2=10.15.0.5:4002:4003
      server.3=10.15.0.5:5002:5003
      
  • 分别启动各个zk服务器

    $ ./bin/zkServer.sh start /usr/zookeeper/conf/zoo1.cfg
    $ ./bin/zkServer.sh start /usr/zookeeper/conf/zoo2.cfg
    $ ./bin/zkServer.sh start /usr/zookeeper/conf/zoo3.cfg
    
  • 查看各个zk服务器的角色信息

    $ ./bin/zkServer.sh status /usr/zookeeper/conf/zoo1.cfg
    
  • 客户端连接任意zk服务器进行结点操作

    $ ./bin/zkCli.sh -server 192.168.0.220:3001
    
  • 停止特定zk服务器

    $ ./bin/zkServer.sh stop /usr/zookeeper/conf/zoo3.cfg
    

集群数据同步

集群中除了主结点(master结点)别的都是从节点(slave结点)

当某个结点收到更改结点数据的commit之后,集群中半数的结点投票通过后就会同步数据