基于Apache Curator框架的ZooKeeper使用详解

source:https://www.cnblogs.com/erbing/p/9799098.html一 简介
Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作 。通过查看官方文档,可以发现Curator主要解决了三类问题:
【基于Apache Curator框架的ZooKeeper使用详解】封装ZooKeeper client与ZooKeeper server之间的连接处理提供了一套Fluent风格的操作API提供ZooKeeper各种应用场景(recipe,比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装Curator主要从以下几个方面降低了zk使用的复杂性:重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略 , 并且内部也提供了几种标准的重试策略(比如指数补偿)连接状态监控: Curator初始化之后会一直对zk连接进行监听,一旦发现连接状态发生变化将会作出相应的处理zk客户端实例管理:Curator会对zk客户端到server集群的连接进行管理,并在需要的时候重建zk实例,保证与zk集群连接的可靠性各种使用场景支持:Curator实现了zk支持的大部分使用场景(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践 , 并考虑了各种极端情况二 基于Curator的ZooKeeper基本用法publicclassCuratorBase{//会话超时时间privatefinalintSESSION_TIMEOUT=30*1000;//连接超时时间privatefinalintCONNECTION_TIMEOUT=3*1000;//ZooKeeper服务地址privatestaticfinalStringCONNECT_ADDR="192.168.1.1:2100,192.168.1.1:2101,192.168.1.:2102";//创建连接实例privateCuratorFrameworkclient=null;publicstaticvoidmain(String[]args)throwsException{//1 重试策略:初试时间为1s 重试10次RetryPolicyretryPolicy=newExponentialBackoffRetry(1000,10);//2通过工厂创建连接CuratorFrameworkclient=CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).connectionTimeoutMs(CONNECTION_TIMEOUT).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy)//命名空间.namespace("super").build();//3开启连接cf.start();System.out.println(States.CONNECTED);System.out.println(cf.getState());//创建永久节点client.create().forPath("/curator","/curatordata".getBytes());//创建永久有序节点client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequentialdata".getBytes());//创建临时节点client.create().withMode(CreateMode.EPHEMERAL).forPath("/curator/ephemeral","/curator/ephemeraldata".getBytes());//创建临时有序节点client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator/ephemeral_path1","/curator/ephemeral_path1data".getBytes());client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator/ephemeral_path2","/curator/ephemeral_path2data".getBytes());//测试检查某个节点是否存在Statstat1=client.checkExists().forPath("/curator");Statstat2=client.checkExists().forPath("/curator2");System.out.println("'/curator'是否存在:"+(stat1!=null?true:false));System.out.println("'/curator2'是否存在:"+(stat2!=null?true:false));//获取某个节点的所有子节点System.out.println(client.getChildren().forPath("/"));//获取某个节点数据System.out.println(newString(client.getData().forPath("/curator")));//设置某个节点数据client.setData().forPath("/curator","/curatormodifieddata".getBytes());//创建测试节点client.create().orSetData().creatingParentContainersIfNeeded().forPath("/curator/del_key1","/curator/del_key1data".getBytes());client.create().orSetData().creatingParentContainersIfNeeded().forPath("/curator/del_key2","/curator/del_key2data".getBytes());client.create().forPath("/curator/del_key2/test_key","test_keydata".getBytes());//删除该节点client.delete().forPath("/curator/del_key1");//级联删除子节点client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");}}orSetData()方法:如果节点存在则Curator将会使用给出的数据设置这个节点的值,相当于 setData() 方法creatingParentContainersIfNeeded()方法:如果指定节点的父节点不存在,则Curator将会自动级联创建父节点guaranteed()方法:如果服务端可能删除成功,但是client没有接收到删除成功的提示,Curator将会在后台持续尝试删除该节点deletingChildrenIfNeeded()方法:如果待删除节点存在子节点,则Curator将会级联删除该节点的子节点
事务管理:
/***事务管理:碰到异常 , 事务会回滚*@throwsException*/@TestpublicvoidtestTransaction()throwsException{//定义几个基本操作CuratorOpcreateOp=client.transactionOp().create().forPath("/curator/one_path","somedata".getBytes());CuratorOpsetDataOp=client.transactionOp().setData().forPath("/curator","otherdata".getBytes());CuratorOpdeleteOp=client.transactionOp().delete().forPath("/curator");//事务执行结果List<CuratorTransactionResult>results=client.transaction().forOperations(createOp,setDataOp,deleteOp);//遍历输出结果for(CuratorTransactionResultresult:results){System.out.println("执行结果是:"+result.getForPath()+"–"+result.getType());}}//因为节点“/curator”存在子节点 , 所以在删除的时候将会报错,事务回滚三 监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新 。产生的事件会传递给注册的PathChildrenCacheListener 。Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地 。Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据 。/***在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理*/ExecutorServicepool=Executors.newFixedThreadPool(2);/***监听数据节点的变化情况*/finalNodeCachenodeCache=newNodeCache(client,"/zk-huey/cnode",false);nodeCache.start(true);nodeCache.getListenable().addListener(newNodeCacheListener(){@OverridepublicvoidnodeChanged()throwsException{System.out.println("Nodedataischanged,newdata:"+newString(nodeCache.getCurrentData().getData()));}},pool);/***监听子节点的变化情况*/finalPathChildrenCachechildrenCache=newPathChildrenCache(client,"/zk-huey",true);childrenCache.start(StartMode.POST_INITIALIZED_EVENT);childrenCache.getListenable().addListener(newPathChildrenCacheListener(){@OverridepublicvoidchildEvent(CuratorFrameworkclient,PathChildrenCacheEventevent)throwsException{switch(event.getType()){caseCHILD_ADDED:System.out.println("CHILD_ADDED:"+event.getData().getPath());break;caseCHILD_REMOVED:System.out.println("CHILD_REMOVED:"+event.getData().getPath());break;caseCHILD_UPDATED:System.out.println("CHILD_UPDATED:"+event.getData().getPath());break;default:break;}}},pool);client.setData().forPath("/zk-huey/cnode","world".getBytes());Thread.sleep(10*1000);pool.shutdown();client.close();四 分布式锁
分布式编程时 , 比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们 。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等 。
下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒 。运行多次可以观察到 , 有时是t1先拿到锁而t2等待,有时又会反过来 。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据 。
importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.locks.InterProcessMutex;importorg.apache.curator.retry.RetryNTimes;importjava.util.concurrent.TimeUnit;/***Curatorframework'sdistributedlocktest.*/publicclassCuratorDistrLockTest{/**Zookeeperinfo*/privatestaticfinalStringZK_ADDRESS="192.168.1.100:2181";privatestaticfinalStringZK_LOCK_PATH="/zktest";publicstaticvoidmain(String[]args)throwsInterruptedException{//1.ConnecttozkCuratorFrameworkclient=CuratorFrameworkFactory.newClient(ZK_ADDRESS,newRetryNTimes(10,5000));client.start();System.out.println("zkclientstartsuccessfully!");Threadt1=newThread(()->{doWithLock(client);},"t1");Threadt2=newThread(()->{doWithLock(client);},"t2");t1.start();t2.start();}privatestaticvoiddoWithLock(CuratorFrameworkclient){InterProcessMutexlock=newInterProcessMutex(client,ZK_LOCK_PATH);try{if(lock.acquire(10*1000,TimeUnit.SECONDS)){System.out.println(Thread.currentThread().getName()+"holdlock");Thread.sleep(5000L);System.out.println(Thread.currentThread().getName()+"releaselock");}}catch(Exceptione){e.printStackTrace();}finally{try{lock.release();}catch(Exceptione){e.printStackTrace();}}}}五 Leader选举
当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法 。Curator提供了LeaderSelector监听器实现Leader选举功能 。同一时刻,只有一个Listener会进入takeLeadership()方法 , 说明它是当前的Leader 。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader 。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership , 如果不设置的话放弃了的Listener是不会再变成Leader的 。
importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.CuratorFrameworkFactory;importorg.apache.curator.framework.recipes.leader.LeaderSelector;importorg.apache.curator.framework.recipes.leader.LeaderSelectorListener;importorg.apache.curator.framework.state.ConnectionState;importorg.apache.curator.retry.RetryNTimes;importorg.apache.curator.utils.EnsurePath;/***Curatorframework'sleaderelectiontest.*Output:*LeaderSelector-2takeleadership!*LeaderSelector-2relinquishleadership!*LeaderSelector-1takeleadership!*LeaderSelector-1relinquishleadership!*LeaderSelector-0takeleadership!*LeaderSelector-0relinquishleadership!*…*/publicclassCuratorLeaderTest{/**Zookeeperinfo*/privatestaticfinalStringZK_ADDRESS="192.168.1.100:2181";privatestaticfinalStringZK_PATH="/zktest";publicstaticvoidmain(String[]args)throwsInterruptedException{LeaderSelectorListenerlistener=newLeaderSelectorListener(){@OverridepublicvoidtakeLeadership(CuratorFrameworkclient)throwsException{System.out.println(Thread.currentThread().getName()+"takeleadership!");//takeLeadership()methodshouldonlyreturnwhenleadershipisbeingrelinquished.Thread.sleep(5000L);System.out.println(Thread.currentThread().getName()+"relinquishleadership!");}@OverridepublicvoidstateChanged(CuratorFrameworkclient,ConnectionStatestate){}};newThread(()->{registerListener(listener);}).start();newThread(()->{registerListener(listener);}).start();newThread(()->{registerListener(listener);}).start();Thread.sleep(Integer.MAX_VALUE);}privatestaticvoidregisterListener(LeaderSelectorListenerlistener){//1.ConnecttozkCuratorFrameworkclient=CuratorFrameworkFactory.newClient(ZK_ADDRESS,newRetryNTimes(10,5000));client.start();//2.Ensurepathtry{newEnsurePath(ZK_PATH).ensure(client.getZookeeperClient());}catch(Exceptione){e.printStackTrace();}//3.RegisterlistenerLeaderSelectorselector=newLeaderSelector(client,ZK_PATH,listener);selector.autoRequeue();selector.start();}}
以上就是朝夕生活(www.30zx.com)关于“基于Apache Curator框架的ZooKeeper使用详解”的详细内容,希望对大家有所帮助!

猜你喜欢