`

ZooKeeper概览

 
阅读更多

 参考博客: http://agapple.iteye.com/blog/1111377      (zookeeper 学习记录)

                    http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/(分布式服务框架ZooKeeper - 管理分布式环境中数据)

1. 是什么?

      是一种分布式的服务框架,主要用来解决分布式环境中的数据管理问题,如统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理让他们

      在集群的环境中,协调一致,就像会话管理(状态同步服务)一样,每台机器上的会话数据都要是最新

      的,那么怎么保证在另一台机器上的修改,及时的反馈到这台机器上了,ZooKeeper就能够帮我们

      做到!

   

   ZooKeeper的逻辑图:(Leader , Follower , Client)

   

 

2. ZooKeeper , 客户端 ? 服务器?在ZooKeeper里面发挥的作用?

      学SVN 也是客户端 , 服务器,开始学的时候根本就没想到 , 这东西咋还分服务端 客户端了!

      而 ZooKeeper 也是一样,分出来一个客户端!

       其实,想想DBMS , 服务端一般都是存储数据,然后后台进程运行在上面,进行维护!直接对数据进行操作(很像DAO)

       而我们,可能有时并不要关心这些后台进程,就像数据库,只要写我们的sql 语句就行,至于他们怎么解析,数据怎么CRUD ,那就是后台

       运行的进程帮我们解决了!

       分层,客户端只要使用服务端提供给我们的接口,就可以解决绝大多数问题!

       就像ZooKeeper , 数据保存在下图所示的树形结构中,对树的操作,我们进行封装,客户端只要调用相应的API就可以与后台交互!

       总之,还是分层!!不要觉得只要Tomcat才叫服务器!!

 

  ZooKeeper 数据模型图:

   

  这个数据模型是什么了?

   他看上不就像一个文件系统吗。我们把信息保存在服务端,总得通过某种方式组织吧!就是通过这种数据结构!是不是很像那个Linux,从"/" 下来,一层一层,没个目录

   都代表着特定的功能!

   看下面这个例子:

     // ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 3000, null); 要与服务端交互,那么就得首先得到ZooKeeper 就像Socket一样!
       得到它,存储数据的结构树,就近在眼前了,通过封装好了的API可以直接对树操作!

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
 
public class ZooKeeperTest {
 
    public static void main(String[] args) throws Exception{
  ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 3000, null); // 记得先运行服务端,要不会ZooKeeper ConnectLoss 
  System.out.println("=========创建节点===========");
  if(zk.exists("/test", false) == null) // 看"/test"节点是否存在,no 返回 null , 布尔参数watch ,如为true则表示如果该节点delete create成功时触发相应的Watcher
  { // 创建节点 后面两个参数下面会介绍  第二个参数 为节点data 注意这里目录信息也是可以存储数据的
    zk.create("/test", "znode1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  }
  System.out.println("=============查看节点是否安装成功===============");
  System.out.println(new String(zk.getData("/test", false, null)));
  
  System.out.println("=========修改节点的数据==========");
  zk.setData("/test", "zNode2".getBytes(), -1);
  System.out.println("========查看修改的节点是否成功=========");
  System.out.println(new String(zk.getData("/test", false, null)));
  
  System.out.println("=======删除节点==========");
  zk.delete("/test", -1);
  System.out.println("==========查看节点是否被删除============");
  System.out.println("节点状态:" + zk.exists("/test", false));
  zk.close();
    } 
}

 

3. 单机模式,集群模式(安装和配置)

   单机模式 就像启动tomcat一样,首先得修改相应的配置文件,在那个端口上运行....

   可参考这篇文章: http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/()

 

   集群模式

     windows下面一台机子模拟集群:一台机子上运行多到服务器进程!

     待续.......

 

4. 基本对象介绍

 

ZooKeeper : 客户端要连接 Zookeeper 服务器可以通过创建 org.apache.zookeeper. ZooKeeper 的一个实例对象,然后调用这个类提供的接口来和服务器交互。
ACL : 访问权限 , 不具备传递性,父节点不会传递给子节点

CreateMode : ephemeral(短暂的,当与他相关的session expire后,该节点就会移除) , PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目录节点会根据当前已近存在的节点数自动加 1,

然后返回给客户端已经成功创建的目录节点名;, Persistent ,(该目录存储的数据不会丢失,客户端shutdown了也不会消失) EPHEMERAL_SEQUENTIAL:临时自动编号节点

 

参考博客:  http://coolxing.iteye.com/blog/1871328 (zookeeper 数据模型)

   ZNode : 数据模型中的每个节点都是一个ZNode节点 , 由三部分组成:
      stat : 状态信息

      data : 该节点关联的数据

      children : 子节点      

 

   ZNode节点的状态信息:

 

czxid. 节点创建时的zxid.
mzxid. 节点最新一次更新发生时的zxid.
ctime. 节点创建时的时间戳.
mtime. 节点最新一次更新发生时的时间戳.
dataVersion. 节点数据的更新次数.
cversion. 其子节点的更新次数.
aclVersion. 节点ACL(授权信息)的更新次数.
ephemeralOwner. 如果该节点为ephemeral节点, ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是ephemeral节点, ephemeralOwner值为0. 至于什么是ephemeral节点, 请看后面的讲述.
dataLength. 节点数据的字节数.
numChildren. 子节点个数.

   znode节点的状态信息中包含czxid和mzxid, 那么什么是zxid呢?
    ZooKeeper状态的每一次改变, 都对应着一个递增的Transaction id, 该id称为zxid. 由于zxid的递增性质, 如果zxid1小于zxid2, 那么zxid1肯定先于zxid2发生. 创建任意节点, 或者更新任意节点的数据, 或者删除任意节   点, 都会导致Zookeeper状态发生改变, 从而导致zxid的值增加.

    

    ACL : 访问权限 , 不具备传递性,父节点不会传递给子节点 

    在创建节点,要指定节点的访问权限;

   支持的permission:

CREATE: you can create a child node
READ: you can get data from a node and list its children.
WRITE: you can set data for a node
DELETE: you can delete a child node
ADMIN: you can set permissions    

     

 

    Watcher

    参考博客 : http://www.cnblogs.com/viviman/archive/2013/03/11/2954118.html (zookeeper 如何永久监听)

    官方解释:

This interface specifies the public interface an event handler class must implement. A ZooKeeper client will get various events from the ZooKeepr server it connects to. An application using such a client handles these events by registering a callback object with the client. The callback object is expected to be an instance of a class that implements Watcher interface.

    意思就是说,服务器会返回给客户端各种event, 应用就会调用客户端处理这些事件,客户端注册了一个callback Object , 这个callobject object 实现了Watcher接口!

 

    服务端什么时候会向客户端send events ?

       getData,getChildren(),exists()这三个方法可以针对参数中的path设置watcher,当path对应的Node 有相应变化时,server端会给对应的设置了watcherclient 发送一个一次性的触发通知事件。客户端在收到这个触发通知事件  后,可以根据自己的业务逻辑进行相应地处理。 

 

     在Watcher内定义了一个Event接口,它里面又定义Keeperstate, EventType!

     EventType:

 None (-1),
 NodeCreated (1),
 NodeDeleted (2),
 NodeDataChanged (3),
 NodeChildrenChanged (4);

    Watcher里面还有一个抽象方法: // 下面的例子中实现了这个方法,但是没有显示的调用!它是一个回调方法

abstract public void process(WatchedEvent event); 
watchedEvent 可以监听的事件,通过这个event我们可以获得节点的path,keeperState,eventType

   

    watcher 注意事项:

     1. 这个watcher的功能是一次性的,如果还想继续得到watcher通知,在处理完事件后,要重新register

     2..watch是一次性触发的并且在获取watch事件和设置新watch事件之间有延迟,所以不能可靠的观察到节点的每一次变化。要认识到这一点。  

  

     一个Watcher的简单例子:

      

public class WatcherTest {

	public static void main(String[] args) throws IOException, KeeperException,
			InterruptedException {
		// TODO Auto-generated method stub
		Watcher wh = new Watcher() { // callback Object
			public void process(WatchedEvent event) {
				System.out.println("回调watcher实例: 路径" + event.getPath() + " 类型:"
				+ event.getType());
			}
		};

		ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 500000, wh);
		System.out.println("---------------------");
		// 创建一个节点root,数据是mydata,不进行ACL权限控制,节点为永久性的(即客户端shutdown了也不会消失)
		zk.exists("/root", true);
		zk.create("/root", "mydata".getBytes(), Ids.OPEN_ACL_UNSAFE,
		CreateMode.PERSISTENT);
		System.out.println("---------------------");

		// 在root下面创建一个childone znode,数据为childone,不进行ACL权限控制,节点为永久性的
		zk.exists("/root/childone", true);
		zk.create("/root/childone", "childone".getBytes(), Ids.OPEN_ACL_UNSAFE,
		CreateMode.PERSISTENT);
		System.out.println("---------------------");
		
		// 删除/root/childone这个节点,第二个参数为版本,-1的话直接删除,无视版本
        // 每次都必须重新注册Watcher , 如果不注册就不会有相应的callback
		zk.exists("/root/childone", true);
		zk.delete("/root/childone", -1);

		System.out.println("---------------------");
		zk.exists("/root", true);
		zk.delete("/root", -1);
		System.out.println("---------------------");

		// 关闭session

		zk.close();
	}

}

    输出:

    

---------------------
回调watcher实例: 路径null 类型:None
回调watcher实例: 路径/root 类型:NodeCreated
---------------------
回调watcher实例: 路径/root/childone 类型:NodeCreated
---------------------
回调watcher实例: 路径/root/childone 类型:NodeDeleted
---------------------
回调watcher实例: 路径/root 类型:NodeDeleted
---------------------

 

     

5. 看一个简单的运用

   这个例子,通过Zookeeper模拟的是Java concurrent 包中 CyclicBarrier , 也即是 只要当累加至某个设定值时才能通过!

  在这里 , 就是max ,只要连接服务端的会话(enter() )达到max个的时候才能通过!leave()

   运行下面程序时,注意 :

    1. 服务端必须手动开启

    2. 输入命令行参数, 分别表示 : 0 , 服务器ip , 1 , max : 连接的会话数目(客户端程序) 

    3. 服务器只要运行一次,但是客户端可以运行多次!这样每次的session数据都会存储在数据模型中! 

 

public class SyncPrimitive implements Watcher { // 实现了Watcher

    static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
             // 这里也注册了一个callback object this;
                zk = new ZooKeeper(address, 3000, this); // 得到ZooKeeper 注意参数 3000 表示SessionTimeOut
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
        //else mutex = new Integer(-1);
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    /**
     * Barrier
     */
    static public class Barrier extends SyncPrimitive {
        int size;
        String name;

        /**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }

        /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
        System.out.println(list);
                        return true;
                    }
                }
            }
        }

        /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
       // 注意这个地方,我从官网copy来的代码,原本没有这句 ,下面是这样的,zk.delete(root + "/" + name, 0);
      // 但是,那样的话,运行会出错,因为当你创建节点的模式为Sequence ,所以默认的会加一个后缀!所以root节点下不会有
       // 名字 为 name代表的节点 noNodeException!
     // 注意的是,delete操作是吧树中相应节点的信息删除,节点下不能有子节点,也就是不能是目录!NoEmptyException
     // 他的第二个参数表示节点的版本号,也就是活,一个节点上可能存储有多个版本的数据!
      // 类似于Oracle的乐观锁实现,多版本控制!
	 List<String> path = zk.getChildren(root, true);
            
	 zk.delete(root + "/" + path.get(0), 0);  
            
	 while (true) {
                
		synchronized (mutex) {
                    
			List<String> list = zk.getChildren(root, true);
                        
			if (list.size() > 0) {
                            
			   mutex.wait();
                        
			} else {
                            
			   return true;
                        
			}
                    
         	}
                
  	}	
    }
   public static void barrierTest(String args[]) {
        Barrier b = new Barrier(args[0], "/b1", new Integer(args[1]));
        try{
            boolean flag = b.enter();
            System.out.println("Entered barrier: " + args[1]);
            if(!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }

        // Generate random integer
        Random rand = new Random();
        int r = rand.nextInt(100);
        // Loop for rand iterations
        for (int i = 0; i < r; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {

            }
        }
        try{
            b.leave();
        } catch (KeeperException e){
        	e.printStackTrace();

        } catch (InterruptedException e){

        }
        System.out.println("Left barrier");
    }
    
    public static void main(String[] args) {
         barrierTest(args);
    }
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics