博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper-体验原生api
阅读量:6704 次
发布时间:2019-06-25

本文共 12470 字,大约阅读时间需要 41 分钟。

hot3.png

简介

ZooKeeper API 中,有两个包是我们经常打交道的,分别是 org.apache.zookeeper, org.apache.zookeeper.data 。前一个包提供了一些API操作zk,例如对节点node增删改查,后一个包定义了一些实体类,例如对zk 节点进行权限控制的ACL类、Id类等。

下面将对一些常用的API解析。

创建会话

提供了下面几种api创建zk实体

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,  long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
  • connectString: zk的url
  • sessionTimeout: session的超时时间
  • watcher: zookeeper实例的状态监视器,由于zk的创建过程是异步的,这里可以通过注册一个监视器,监听zk的状态,当状态变为 SyncConnected 后,即表示已经连接成功。

下面代码完成会话的创建,需求中主线程需要在zk会话创建完成后再退出,由于创建过程是异步的,所以两个线程使用锁做同步。

private static CountDownLatch zookeeperInitializeLatch = new CountDownLatch(1);    @Test    public void testCreateSession() throws InterruptedException, IOException {        //会话过程创建是异步的        final ZooKeeper zooKeeper = new ZooKeeper(ZookeeperHelper.zkAddress, //连接字符串                ZookeeperHelper.sessionTimeout, // 会话超时时间                new Watcher() {                    @Override                    public void process(WatchedEvent event) {                        System.out.println("-------------receive watch event : " + event);                        if (Event.KeeperState.SyncConnected == event.getState()) {                            //创建完成,唤醒主线程                            System.out.println("-------------zookeeper session established...");                            zookeeperInitializeLatch.countDown();                        }                    }                } //监听器,监听状态的改变        );        zookeeperInitializeLatch.await(); //等待会话创建完成,唤醒        //唤醒后,休眠5s,关闭zk        Thread.sleep(5000);        zooKeeper.close();    }

创建节点Node

API如下:

String create(final String path, byte data[], List
acl, CreateMode createMode) //同步方式create(final String path, byte data[], List
acl, CreateMode createMode, StringCallback cb, Object ctx) //异步方式创建节点
  • acl:是节点的权限控制策略,即控制是否可以查看节点的数据,删除子节点等,详情请参考
  • createMode:节点类型

在前面的文章中介绍过,节点包括临时节点,持久节点,顺序节点,其中临时节点的生命周期是绑定zk会话的,即会话结束了临时节点就不存在了。

下面例子中给予了ID为 word:anyone,权限为 all 的ACL策略,意思是说,所有人都给予最大的权限,可以对节点干嘛都行。

** 另外,zk客户端在创建子节点的时候,必须存在它的父节点,也就是说,不能递归创建。举个例子,当创建 "/parent/child" 的时候,由于"/parent" 这个节点不存在,会抛异常 “NoNodeException: KeeperErrorCode = NoNode for /parent/child” **。

/**     * 创建节点     */    @Test    public void testCreateNode() throws InterruptedException, IOException, KeeperException {        String ephemeral_path = rootPath + "/create_ephemeral"; //节点        String persistent_path = rootPath + "/create_persistent"; //节点        byte data[] = "".getBytes(); //数据        List
acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; //ACL策略 String zNodePath1 = zooKeeper.create(ephemeral_path, data, acl, CreateMode.EPHEMERAL); //同步创建临时节点 System.out.println("---------success to create node : " + zNodePath1); String zNodePath2 = zooKeeper.create(ephemeral_path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL); //同步创建临时顺序节点 System.out.println("---------success to create node : " + zNodePath2); String zNodePath3 = zooKeeper.create(persistent_path, data, acl, CreateMode.PERSISTENT); //同步创建持久节点 System.out.println("---------success to create node : " + zNodePath3); String zNodePath4 = zooKeeper.create(persistent_path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL); //同步创建持久顺序节点 System.out.println("---------success to create node : " + zNodePath4); //可以打断点,通过工具观察节点变化 //清除 zooKeeper.delete(ephemeral_path, -1); zooKeeper.delete(persistent_path, -1); } /** * 下面测试临时节点在session关闭时是否存在 */ @Test public void testEphemeralLifecycle() throws InterruptedException, IOException, KeeperException { String ephemeral_path = rootPath + "/ephemeral_lifecycle"; ZooKeeper zooKeeper1 = ZookeeperUtils.startZookeeper(); //开启session byte data[] = "".getBytes(); //数据 List
acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; //ACL策略 String zNodePath1 = zooKeeper1.create(ephemeral_path, data, acl, CreateMode.EPHEMERAL); //同步创建临时节点 System.out.println("---------success to create node : " + zNodePath1); zooKeeper1.close(); //关闭session ZooKeeper zooKeeper2 = ZookeeperUtils.startZookeeper(); //开启session Stat stat = zooKeeper2.exists(ephemeral_path, null); //查看刚才创建的临时节点还存在吗,存在的情况,stat不为空 Assert.assertNull(stat); zooKeeper2.close(); }

删除节点

API

delete(final String path, int version)  //同步方式delete(final String path, int version, VoidCallback cb,  Object ctx)  //异步方式
  • path:路径
  • version:期望版本号,相当于数据库的乐观锁,在node里面存放着状态信息stat,里面存放有版本号version,如果设置的期待版本号与节点里的不相同,会操作节点失败。当版本号设置为-1时,忽略节点的版本号。

节点的删除需要自行递归删除,当需要删除的节点有子节点的时候,将会抛出异常

@Test    public void testDeleteNode() throws InterruptedException, IOException, KeeperException {        //创建数据        String ephemeral_path = rootPath + "/get_children_ephemeral";        String persistent_path = rootPath + "/get_children_persistent";        byte data[] = "".getBytes(); //数据        List
acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; //ACL策略 String zNodeEphemeralPath = zooKeeper.create(ephemeral_path, data, acl, CreateMode.EPHEMERAL); System.out.println("---------success to create node : " + zNodeEphemeralPath); Thread.sleep(2000); String zNodePersistentPath = zooKeeper.create(persistent_path, data, acl, CreateMode.PERSISTENT); System.out.println("---------success to create node : " + zNodePersistentPath); //只能删除叶子节点,不能删除有叶子节点的父节点,需要递归删除 //清除 zooKeeper.delete(ephemeral_path, -1); zooKeeper.delete(persistent_path, -1); }

获取子节点

提供的API

public List
getChildren(String path, boolean watch) //同步List
getChildren(final String path, Watcher watcher) //同步public void getChildren(String path, boolean watch, Children2Callback cb,Object ctx) //异步
  • watcher:观察节点的变化,包括节点的删除,子节点的创建、删除。需要注意的是,watcher是个一次性的东西,当它被回调的时候,将会从节点中删除,所以如果要一致监控该节点,需要反复注册watcher。

getChildren返回的子节点路径是路径名,不是全路径,例如节点路径是/node,子节点的路径是/node/child,通过节点 getChildren 获取的路径是 child。

@Testpublic void testGetChildren() throws InterruptedException, IOException, KeeperException {        //第一次尝试获取子节点,注册监听器        // watcher只作用一次,需要重复注册watcher        zooKeeper.getChildren(rootPath, new Watcher() {            @Override            public void process(WatchedEvent event) {                if (Event.EventType.NodeChildrenChanged == event.getType()) {                    //有子节点发生变化                    try {                        System.out.println("子节点:" + zooKeeper.getChildren(event.getPath(), null));                        if (rootPath != null) {                            zooKeeper.getChildren(event.getPath(), this);                        }                    } catch (Exception e) {                        e.printStackTrace();                    }                }            }        });        //创建数据        String ephemeral_path = rootPath + "/get_children_ephemeral";        String persistent_path = rootPath + "/get_children_persistent";        byte data[] = "".getBytes(); //数据        List
acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; //ACL策略 String zNodeEphemeralPath = zooKeeper.create(ephemeral_path, data, acl, CreateMode.EPHEMERAL); System.out.println("---------success to create node : " + zNodeEphemeralPath); Thread.sleep(1000); String zNodePersistentPath = zooKeeper.create(persistent_path, data, acl, CreateMode.PERSISTENT); System.out.println("---------success to create node : " + zNodePersistentPath); Thread.sleep(1000); //防止下面删除节点后再收到监听事件 System.out.println("开始删除节点"); //清除 zooKeeper.delete(ephemeral_path, -1); Thread.sleep(1000); zooKeeper.delete(persistent_path, -1); }

更新节点数据

API

public Stat setData(final String path, byte data[], int version) //同步public void setData(final String path, byte data[], int version,  StatCallback cb, Object ctx)  //异步
  • version:与文章 删除节点 里的version一致
  • cb:异步回调接口
@Test    public void testSetData() throws KeeperException, InterruptedException {        //创建数据        String persistent_path = rootPath + "/set_data_persistent";        List
acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; //ACL策略 zooKeeper.create(persistent_path, "123".getBytes(), acl, CreateMode.PERSISTENT); System.out.println("---------success to create node : " + persistent_path); Stat stat = new Stat(); zooKeeper.getData(persistent_path, null, stat); //第一次修改数据,使用对应的version Stat stat1 = zooKeeper.setData(persistent_path, "456".getBytes(), stat.getVersion()); try { //第二次修改数据,使用错误的版本号 // KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /api/set_data_persistent zooKeeper.setData(persistent_path, "789".getBytes(), stat1.getVersion() + 1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } //清除 zooKeeper.delete(persistent_path, -1); }

获取节点数据

提供的部分API

public byte[] getData(final String path, Watcher watcher, Stat stat) //同步public void getData(String path, boolean watch, DataCallback cb, Object ctx)  // 异步
  • watcher:当节点的数据发生变化时回调,也是一次性的
  • stat:如果需要获取节点的状态信息,传一个对象

如果当前节点的数据是“hello”,当我们重新把数据设置为"hello"时,虽然数据内容没有改变,但是zk会把stat的数据版本号加1,所以也会触发watcher回调。

@Test    public void testGetData() throws InterruptedException, IOException, KeeperException {        //创建数据        String persistent_path = rootPath + "/get_data_persistent";        Stat stat = new Stat(); //节点状态        String charset = "utf-8";        String nodeData = new String("node data".getBytes(charset), charset);        byte data[] = nodeData.getBytes(charset); //数据        List
acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; //ACL策略 String zNodePersistentPath = zooKeeper.create(persistent_path, data, acl, CreateMode.PERSISTENT); System.out.println("---------success to create node : " + zNodePersistentPath); byte[] receivedData = zooKeeper.getData(persistent_path, new Watcher() { @Override public void process(WatchedEvent event) { try { if (Event.EventType.NodeDataChanged == event.getType()) { System.out.println("-------------节点数据发生改变"); Stat stat = new Stat(); //节点状态 System.out.println("改变后的数据:" + new String(zooKeeper.getData(persistent_path, null, stat), charset)); System.out.println("czxid:" + stat.getCzxid() + " mzxid:" + stat.getMzxid() + " version:" + stat.getVersion()); } } catch (Exception e) { e.printStackTrace(); } } }, stat); System.out.println("初始数据:" + new String(receivedData, charset)); System.out.println("czxid:" + stat.getCzxid() + " mzxid:" + stat.getMzxid() + " version:" + stat.getVersion()); zooKeeper.setData(persistent_path, nodeData.getBytes(charset), -1); //数据没变,但是版本号变化,修改的事务ID变化 Thread.sleep(1000); //清除 zooKeeper.delete(persistent_path, -1); }

判断节点是否存在

提供的部分API

public Stat exists(final String path, Watcher watcher)public void exists(final String path, Watcher watcher,  StatCallback cb, Object ctx)
  • watcher:可以监听子节点的创建、删除,和节点的数据更新
@Test    public void testExists() throws KeeperException, InterruptedException {        //创建数据        String persistent_path = rootPath + "/exists_persistent";        List
acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; //ACL策略 zooKeeper.create(persistent_path, "123".getBytes(), acl, CreateMode.PERSISTENT); System.out.println("---------success to create node : " + persistent_path); Stat stat = zooKeeper.exists(persistent_path, null); Assert.assertNotNull(stat); Stat stat_not_exists = zooKeeper.exists("/not_exists_node", null); Assert.assertNull(stat_not_exists); //清除 zooKeeper.delete(persistent_path, -1); }

##配套代码

转载于:https://my.oschina.net/thinwonton/blog/994668

你可能感兴趣的文章
精选30道Java笔试题解答
查看>>
特殊符号 UNICODE编码
查看>>
C#图解教程 第八章 表达式和运算符
查看>>
解决NavicatPremium导入CSV文件中文乱码的问题
查看>>
Python基础:语法基础(3)
查看>>
单元测试利器 JUnit 4
查看>>
杭电2097
查看>>
Set default value for column[转]
查看>>
个人管理:从影片《横空出世》中学到...
查看>>
WPF:从WPF Diagram Designer Part 4学习分组、对齐、排序、序列化和常用功能
查看>>
STM32 RTC 对晶振的要求实在不地道
查看>>
struct sockaddr与struct sockaddr_in的区别和联系
查看>>
什么是整洁的代码(Clean Code)?
查看>>
编码风格不是编码规范
查看>>
搜索引擎评估与互联网用户行为建设
查看>>
支持取消操作和暂停操作的Backgroundworker
查看>>
程序员职场-三人行,必有我师
查看>>
override与new的区别
查看>>
linux下播放mp3
查看>>
[转载]---通过一个示例,演示利用logminer,恢复delete误删除操作的数据
查看>>