云服务器

zookeeper部署与使用API

2017-05-08 19:44:38 0

zookeeper介绍 zookeeper它是一个分布式的服务程序协调系统,简单的说它是一个以节点资源管理为中心,围绕节点资源操作来实现事件监听,触发而衍生出各种服务调度管理的系统。比如说,我创建了一个节点/zk_test,然后使用zookeeper的API程序可以监听这个/zk_test节点,当节点有事件发生的时候会通知这个监听程序,然后监听程序就可以实现自己的业务逻辑。

安装配置JDK 本文使用的是Ubuntu Server 14.04,安装配置JDK相对比较简单 sudo aptitude install -y default-jdk

部署zookeeper 访问官网下载二进制压缩包 http://zookeeper.apache.org/releases.html#download 本文用的是zookeeper-3.4.6.tar.gz,下载到/opt目录,然后解压 cd /opt tar zxf zookeeper-3.4.6.tar.gz cd zookeeper-3.4.6/ #配置文件 vim conf/zoo.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/var/lib/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
启动zookeeper服务 bin/zkServer.sh start #jps检查进程是否启动,netstat检查是否成功监听2181端口

启动zookeeper命令行 bin/zkCli.sh -server 127.0.0.1:2181

使用zookeeper #使用zookeeper命令行创建节点znode /zk_test 其中”my_data”是绑定到该znode的字符串 create /zk_test my_data

#查看当前节点列表 ls /

#设置节点数据 set /zk_test test123

#查看节点数据 get /zk_test

测zookeeper API java程序

该测试程序是zookeeper提供的一个监听节点znode时间,然后调用用户指定命令及参数的例子,我们继续使用上面创建的节点/zk_test作为测试对象,然后我们指定事件响应的时候执行的是cat显示文件内容,然后这个文件内容是节点的数据内容。

cd /tmp mkdir lib #把zookeeper压缩包里面zookeeper-3.4.6.jar 和 slf4j-api-1.6.1.jar copy到lib目录下 cp /opt/zookeeper-3.4.6/dist-maven/zookeeper-3.4.6.jar /tmp/lib cp /opt/zookeeper-3.4.6/lib/slf4j-api-1.6.1.jar /tmp/lib

vim Executor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/**
 * A simple example program to use DataMonitor to start and
 * stop executables based on a znode. The program watches the
 * specified znode and saves the data that corresponds to the
 * znode in the filesystem. It also starts the specified program
 * with the specified arguments when the znode exists and kills
 * the program if the znode goes away.
 */
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;

public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { String znode;

DataMonitor dm;

ZooKeeper zk;

String filename;

String exec[];

Process child;

public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); }

/** * @param args */ public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } }

/******************* * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ public void process(WatchedEvent event) { dm.process(event); }

public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } }

public void closing(int rc) { synchronized (this) { notifyAll(); } }

static class StreamWriter extends Thread { OutputStream os;

InputStream is;

StreamWriter(InputStream is, OutputStream os) { this.is = is; this.os = os; start(); }

public void run() { byte b[] = new byte[80]; int rc; try { while ((rc = is.read(b)) > 0) { os.write(b, 0, rc); } } catch (IOException e) { }

} }

public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } }

vim DataMonitor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
 * A simple class that monitors the data and existence of a ZooKeeper
 * node. It uses asynchronous ZooKeeper APIs.
 */
import java.util.Arrays;

import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat;

public class DataMonitor implements Watcher, StatCallback {

ZooKeeper zk;

String znode;

Watcher chainedWatcher;

boolean dead;

DataMonitorListener listener;

byte prevData[];

public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); }

/ * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { / * The existence status of the node has changed. */ void exists(byte data[]);

/** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); }

public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } }

public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; }

byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } } }

#编译,运行
1
2
javac -cp .:lib/* Executor.java DataMonitor.java
java -cp .:lib/* Executor 127.0.0.1:2181 /zk_test /tmp/test cat /tmp/test
然后回到zookeeper的命令行里面,对节点/zk_test进行设置,然后观察测试程序的输出结果。这个测试程序的运行参数是
1
2
3
4
5
USAGE: Executor hostPort znode filename program [args ...]
hostPort是zookeeper的地址端口
filename是输出节点内容到指定的文件
program是用户自定义执行的命令
[args...]是用户自定义执行的命令的输入参数
版权声明:本文为博主原创文章,未经博主允许不得转载。
上一篇: 无

微信关注

获取更多技术咨询