- Distributed Queue(分布式队列)
- Distributed Id Queue(分布式ID队列)
- Distributed Priority Queue(分布式优先级队列)
- Distributed Delay Queue(分布式延迟队列)
- Simple Distributed Queue(简单分布式队列)
- 参考资料
Curator官方强烈不建议把Zookeeper当作MQ使用。IMPORTANT – We recommend that you do NOT use ZooKeeper for Queues. Please see Tech Note 4 for details.
目录
Distributed Queue(分布式队列)
An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to be ordered (by means of ZK’s PERSISTENTSEQUENTIAL node). If a single consumer takes items out of the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate a single consumer.
示例代码
package com.freud.zk.curator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Queue - Distributed Queue
*
* @author Freud
*
*/
public class CuratorQueueDistributedQueueZookeeper {
private static final int SECOND = 1000;
private static final String path = "/curator_queue/distributed_queue";
private static final CountDownLatch down = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
final int index = i;
service.submit(new Callable<Void>() {
public Void call() throws Exception {
new CuratorQueueDistributedQueueZookeeper().schedule(index);
return null;
}
});
}
down.countDown();
Thread.sleep(10 * SECOND);
service.shutdown();
}
private void schedule(final int index) throws Exception {
down.await();
CuratorFramework client = this.getStartedClient(index);
// 创建队列
DistributedQueue<String> queue = QueueBuilder.builder(client, new StringQueueConsumer(index),
new StringQueueSerializer(), path).buildQueue();
queue.start();
if (index == 4) {
Thread.sleep(3 * SECOND);
for (int i = 0; i < 20; i++) {
// 生产消息
queue.put("message " + i);
}
}
}
private CuratorFramework getStartedClient(int index) {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
System.out.println("Thread [" + index + "] Server connected...");
return cfFluent;
}
/**
* 消息消费者
*
* @author Freud
*
*/
public class StringQueueConsumer implements QueueConsumer<String> {
private int index;
public StringQueueConsumer(int index) {
super();
this.index = index;
}
public void stateChanged(CuratorFramework cf, ConnectionState state) {
}
public void consumeMessage(String message) throws Exception {
System.out.println("Thread [" + index + "] get the queue value : " + message);
}
}
/**
* 消息序列化和反序列化逻辑
*
* @author Freud
*
*/
public class StringQueueSerializer implements QueueSerializer<String> {
public byte[] serialize(String item) {
return item.getBytes();
}
public String deserialize(byte[] bytes) {
return new String(bytes);
}
}
}
打印结果
Thread [0] Server connected...
Thread [3] Server connected...
Thread [2] Server connected...
Thread [1] Server connected...
Thread [4] Server connected...
Thread [0] get the queue value : message 0
Thread [1] get the queue value : message 1
Thread [1] get the queue value : message 2
Thread [1] get the queue value : message 3
Thread [1] get the queue value : message 4
Thread [4] get the queue value : message 5
Thread [4] get the queue value : message 6
Thread [4] get the queue value : message 7
Thread [3] get the queue value : message 8
Thread [3] get the queue value : message 9
Thread [3] get the queue value : message 10
Thread [3] get the queue value : message 11
Thread [4] get the queue value : message 12
Thread [2] get the queue value : message 13
Thread [2] get the queue value : message 14
Thread [2] get the queue value : message 15
Thread [4] get the queue value : message 16
Thread [2] get the queue value : message 17
Thread [2] get the queue value : message 18
Thread [2] get the queue value : message 19
Distributed Id Queue(分布式ID队列)
A version of DistributedQueue that allows IDs to be associated with queue items. Items can then be removed from the queue if needed.
示例代码
package com.freud.zk.curator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedIdQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Queue - Distributed Id Queue
*
* @author Freud
*
*/
public class CuratorQueueDistributedIdQueueZookeeper {
private static final int SECOND = 1000;
private static final String path = "/curator_queue/distributed_id_queue";
private static final CountDownLatch down = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
final int index = i;
service.submit(new Callable<Void>() {
public Void call() throws Exception {
new CuratorQueueDistributedIdQueueZookeeper().schedule(index);
return null;
}
});
}
down.countDown();
Thread.sleep(10 * SECOND);
service.shutdown();
}
private void schedule(final int index) throws Exception {
down.await();
CuratorFramework client = this.getStartedClient(index);
// 创建队列
DistributedIdQueue<String> queue = QueueBuilder.builder(client, new StringQueueConsumer(index),
new StringQueueSerializer(), path).buildIdQueue();
queue.start();
if (index == 4) {
Thread.sleep(3 * SECOND);
for (int i = 0; i < 20; i++) {
// 生产消息
queue.put("message " + i, i + "");
}
// ID Queue的特性是可以通过ID删除消息。
queue.remove("10");
}
}
private CuratorFramework getStartedClient(int index) {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
System.out.println("Thread [" + index + "] Server connected...");
return cfFluent;
}
/**
* 消息消费者
*
* @author Freud
*
*/
public class StringQueueConsumer implements QueueConsumer<String> {
private int index;
public StringQueueConsumer(int index) {
super();
this.index = index;
}
public void stateChanged(CuratorFramework cf, ConnectionState state) {
}
public void consumeMessage(String message) throws Exception {
System.out.println("Thread [" + index + "] get the queue value : " + message);
}
}
/**
* 消息序列化和反序列化逻辑
*
* @author Freud
*
*/
public class StringQueueSerializer implements QueueSerializer<String> {
public byte[] serialize(String item) {
return item.getBytes();
}
public String deserialize(byte[] bytes) {
return new String(bytes);
}
}
}
打印结果
Thread [2] Server connected...
Thread [3] Server connected...
Thread [1] Server connected...
Thread [4] Server connected...
Thread [0] Server connected...
Thread [1] get the queue value : message 0
Thread [3] get the queue value : message 1
Thread [3] get the queue value : message 2
Thread [3] get the queue value : message 3
Thread [3] get the queue value : message 4
Thread [3] get the queue value : message 5
Thread [3] get the queue value : message 6
Thread [3] get the queue value : message 7
Thread [3] get the queue value : message 8
Thread [3] get the queue value : message 9
Thread [4] get the queue value : message 11
Thread [4] get the queue value : message 12
Thread [0] get the queue value : message 13
Thread [0] get the queue value : message 14
Thread [0] get the queue value : message 15
Thread [1] get the queue value : message 16
Thread [1] get the queue value : message 17
Thread [1] get the queue value : message 18
Thread [1] get the queue value : message 19
Distributed Priority Queue(分布式优先级队列)
An implementation of the Distributed Priority Queue ZK recipe.
示例代码
package com.freud.zk.curator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedPriorityQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Queue - Distributed Priority Queue
*
* @author Freud
*
*/
public class CuratorQueueDistributedPriorityQueueZookeeper {
private static final int SECOND = 1000;
private static final String path = "/curator_queue/distributed_priority_queue";
private static final CountDownLatch down = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
final int index = i;
service.submit(new Callable<Void>() {
public Void call() throws Exception {
new CuratorQueueDistributedPriorityQueueZookeeper().schedule(index);
return null;
}
});
}
down.countDown();
Thread.sleep(10 * SECOND);
service.shutdown();
}
private void schedule(final int index) throws Exception {
down.await();
CuratorFramework client = this.getStartedClient(index);
// 创建队列
DistributedPriorityQueue<String> queue = QueueBuilder.builder(client, new StringQueueConsumer(index),
new StringQueueSerializer(), path).buildPriorityQueue(3);
queue.start();
if (index == 4) {
Thread.sleep(3 * SECOND);
for (int i = 0; i < 20; i++) {
// 生产消息
queue.put("message " + i, 20 - i);
}
}
}
private CuratorFramework getStartedClient(int index) {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
System.out.println("Thread [" + index + "] Server connected...");
return cfFluent;
}
/**
* 消息消费者
*
* @author Freud
*
*/
public class StringQueueConsumer implements QueueConsumer<String> {
private int index;
public StringQueueConsumer(int index) {
super();
this.index = index;
}
public void stateChanged(CuratorFramework cf, ConnectionState state) {
}
public void consumeMessage(String message) throws Exception {
System.out.println("Thread [" + index + "] get the queue value : " + message);
}
}
/**
* 消息序列化和反序列化逻辑
*
* @author Freud
*
*/
public class StringQueueSerializer implements QueueSerializer<String> {
public byte[] serialize(String item) {
return item.getBytes();
}
public String deserialize(byte[] bytes) {
return new String(bytes);
}
}
}
打印结果
Thread [0] Server connected...
Thread [2] Server connected...
Thread [4] Server connected...
Thread [3] Server connected...
Thread [1] Server connected...
Thread [3] get the queue value : message 19
Thread [0] get the queue value : message 18
Thread [0] get the queue value : message 17
Thread [0] get the queue value : message 16
Thread [0] get the queue value : message 15
Thread [0] get the queue value : message 14
Thread [0] get the queue value : message 13
Thread [3] get the queue value : message 12
Thread [3] get the queue value : message 11
Thread [3] get the queue value : message 10
Thread [3] get the queue value : message 9
Thread [0] get the queue value : message 8
Thread [1] get the queue value : message 7
Thread [1] get the queue value : message 6
Thread [0] get the queue value : message 5
Thread [0] get the queue value : message 4
Thread [0] get the queue value : message 3
Thread [0] get the queue value : message 2
Thread [0] get the queue value : message 1
Thread [0] get the queue value : message 0
Distributed Delay Queue(分布式延迟队列)
An implementation of a Distributed Delay Queue.
示例代码
package com.freud.zk.curator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Queue - Distributed Delay Queue
*
* @author Freud
*
*/
public class CuratorQueueDistributedDelayQueueZookeeper {
private static final int SECOND = 1000;
private static final String path = "/curator_queue/distributed_delay_queue";
private static final CountDownLatch down = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
CuratorFramework client = new CuratorQueueDistributedDelayQueueZookeeper().getStartedClient(-1);
if (client.checkExists().forPath(path) != null) {
client.delete().deletingChildrenIfNeeded().forPath(path);
}
client.close();
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
final int index = i;
service.submit(new Callable<Void>() {
public Void call() throws Exception {
new CuratorQueueDistributedDelayQueueZookeeper().schedule(index);
return null;
}
});
}
down.countDown();
Thread.sleep(20 * SECOND);
service.shutdown();
}
private void schedule(final int index) throws Exception {
down.await();
CuratorFramework client = this.getStartedClient(index);
// 创建队列
DistributedDelayQueue<String> queue = QueueBuilder.builder(client, new StringQueueConsumer(index),
new StringQueueSerializer(), path).buildDelayQueue();
queue.start();
if (index == 4) {
Thread.sleep(3 * SECOND);
for (int i = 0; i < 20; i++) {
// 生产消息 ,其中DelayUtilEpoch的单位为毫秒,代表触发时间的毫秒数,集群环境下需要注意做时间同步
queue.put("message " + i, System.currentTimeMillis() + ((i + 1) * SECOND / 2));
}
}
}
private CuratorFramework getStartedClient(int index) {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
System.out.println("Thread [" + index + "] Server connected...");
return cfFluent;
}
/**
* 消息消费者
*
* @author Freud
*
*/
public class StringQueueConsumer implements QueueConsumer<String> {
private int index;
public StringQueueConsumer(int index) {
super();
this.index = index;
}
public void stateChanged(CuratorFramework cf, ConnectionState state) {
}
public void consumeMessage(String message) throws Exception {
System.out.println("Thread [" + index + "] get the queue value : " + message);
}
}
/**
* 消息序列化和反序列化逻辑
*
* @author Freud
*
*/
public class StringQueueSerializer implements QueueSerializer<String> {
public byte[] serialize(String item) {
return item.getBytes();
}
public String deserialize(byte[] bytes) {
return new String(bytes);
}
}
}
打印结果
Thread [-1] Server connected...
Thread [4] Server connected...
Thread [3] Server connected...
Thread [0] Server connected...
Thread [2] Server connected...
Thread [1] Server connected...
Thread [1] get the queue value : message 0
Thread [1] get the queue value : message 1
Thread [2] get the queue value : message 2
Thread [2] get the queue value : message 3
Thread [0] get the queue value : message 4
Thread [1] get the queue value : message 5
Thread [4] get the queue value : message 6
Thread [1] get the queue value : message 7
Thread [2] get the queue value : message 8
Thread [0] get the queue value : message 9
Thread [2] get the queue value : message 10
Thread [1] get the queue value : message 11
Thread [1] get the queue value : message 12
Thread [1] get the queue value : message 13
Thread [0] get the queue value : message 14
Thread [0] get the queue value : message 15
Thread [4] get the queue value : message 16
Thread [0] get the queue value : message 17
Thread [2] get the queue value : message 18
Thread [1] get the queue value : message 19
可以观察到消息打印按照500毫秒的间隔依次有序打印出来。
Simple Distributed Queue(简单分布式队列)
A drop-in replacement for the DistributedQueue that comes with the ZK distribution.
示例代码
package com.freud.zk.curator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.SimpleDistributedQueue;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Queue - Simple Distributed Queue
*
* @author Freud
*
*/
public class CuratorQueueSimpleDistributedQueueZookeeper {
private static final int SECOND = 1000;
private static final String path = "/curator_queue/simple_distributed_queue";
private static final CountDownLatch down = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
CuratorFramework client = new CuratorQueueSimpleDistributedQueueZookeeper().getStartedClient(-1);
if (client.checkExists().forPath(path) != null) {
client.delete().deletingChildrenIfNeeded().forPath(path);
}
client.close();
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
final int index = i;
service.submit(new Callable<Void>() {
public Void call() throws Exception {
new CuratorQueueSimpleDistributedQueueZookeeper().schedule(index);
return null;
}
});
}
down.countDown();
Thread.sleep(20 * SECOND);
service.shutdown();
}
private void schedule(final int index) throws Exception {
down.await();
CuratorFramework client = this.getStartedClient(index);
SimpleDistributedQueue queue = new SimpleDistributedQueue(client, path);
if (index == 4) {
Thread.sleep(3 * SECOND);
for (int i = 0; i < 20; i++) {
// 生产消息
queue.offer(("message " + i).getBytes());
}
} else {
while (true) {
System.out.println("Thread [" + index + "] get queue value :" + new String(queue.take()));
}
}
}
private CuratorFramework getStartedClient(int index) {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
System.out.println("Thread [" + index + "] Server connected...");
return cfFluent;
}
}
打印结果
Thread [-1] Server connected...
Thread [0] Server connected...
Thread [1] Server connected...
Thread [4] Server connected...
Thread [2] Server connected...
Thread [3] Server connected...
Thread [0] get queue value :message 0
Thread [0] get queue value :message 1
Thread [1] get queue value :message 2
Thread [0] get queue value :message 3
Thread [1] get queue value :message 4
Thread [0] get queue value :message 5
Thread [1] get queue value :message 6
Thread [0] get queue value :message 7
Thread [3] get queue value :message 8
Thread [1] get queue value :message 9
Thread [0] get queue value :message 10
Thread [0] get queue value :message 11
Thread [1] get queue value :message 12
Thread [2] get queue value :message 13
Thread [1] get queue value :message 14
Thread [3] get queue value :message 15
Thread [0] get queue value :message 16
Thread [1] get queue value :message 17
Thread [0] get queue value :message 18
Thread [3] get queue value :message 19
参考资料
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》 – 倪超
Curator官网 : http://curator.apache.org/
本文来自转载,原文链接: