目录
Barrier(分布式栅栏)
Distributed systems use barriers to block processing of a set of nodes until a condition is met at which time all the nodes are allowed to proceed.
分布式栅栏 – 等待一定时间,然后将所有数据一起触发
示例代码
package com.freud.zk.curator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Barriers - Barrier
*
* 分布式栅栏或 - 等待一定时间,然后将所有数据一起触发
*
* @author Freud
*
*/
public class CuratorBarriersBarrierZookeeper {
private static final int SECOND = 1000;
private static final int thread = 5;
private static final String path = "/curator_barrier/distribute_barrier";
private final static CountDownLatch down = new CountDownLatch(1);
private static DistributedBarrier barrier;
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(thread);
for (int i = 0; i < thread; i++) {
final int index = i;
service.submit(new Runnable() {
public void run() {
try {
new CuratorBarriersBarrierZookeeper().schedule(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
down.countDown();
Thread.sleep(2 * SECOND);
barrier.removeBarrier();
Thread.sleep(1 * SECOND);
service.shutdownNow();
}
private void schedule(final int index) throws Exception {
down.await();
CuratorFramework client = this.getStartedClient(index);
barrier = new DistributedBarrier(client, path);
System.out.println("Thread [" + index + "] on ready!");
barrier.setBarrier();
barrier.waitOnBarrier();
System.out.println("Thread [" + index + "] finised!");
}
private CuratorFramework getStartedClient(final int index) {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
// System.out.println("Thread [" + index + "] Server connected...");
return cfFluent;
}
}
打印结果
Thread [2] on ready!
Thread [0] on ready!
Thread [3] on ready!
Thread [4] on ready!
Thread [1] on ready!
Thread [4] finised!
Thread [3] finised!
Thread [0] finised!
Thread [2] finised!
Thread [1] finised!
Double Barrier
Double barriers enable clients to synchronize the beginning and the end of a computation. When enough processes have joined the barrier, processes start their computation and leave the barrier once they have finished.
双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。
示例代码
package com.freud.zk.curator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Barriers - Barrier
*
* 分布式栅栏 - 等待一定时间,然后将所有数据一起触发
*
* @author Freud
*
*/
public class CuratorBarriersDoubleBarrierZookeeper {
private static final int SECOND = 1000;
private static final int thread = 5;
private static final String path = "/curator_barrier/double_barrier";
private final static CountDownLatch down = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(thread);
for (int i = 0; i < thread; i++) {
final int index = i;
service.submit(new Runnable() {
public void run() {
try {
new CuratorBarriersDoubleBarrierZookeeper().schedule(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
down.countDown();
Thread.sleep(10 * SECOND);
service.shutdownNow();
}
private void schedule(final int index) throws Exception {
down.await();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
new CuratorBarriersDoubleBarrierZookeeper().getStartedClient(), path, thread);
System.out.println("Thread [" + index + "] on ready!");
barrier.enter();
System.out.println("Thread [" + index + "] Running!");
barrier.leave();
System.out.println("Thread [" + index + "] finised!");
}
private CuratorFramework getStartedClient() {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
// System.out.println("Server connected...");
return cfFluent;
}
}
打印结果
Thread [4] on ready!
Thread [1] on ready!
Thread [0] on ready!
Thread [2] on ready!
Thread [3] on ready!
Thread [3] Running!
Thread [0] Running!
Thread [4] Running!
Thread [1] Running!
Thread [2] Running!
Thread [2] finised!
Thread [4] finised!
Thread [0] finised!
Thread [3] finised!
Thread [1] finised!
参考资料
Curator官网 : http://curator.apache.org/
跟着实例学习ZooKeeper的用法: Barrier http://ifeve.com/zookeeper-barrier/
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》 – 倪超
本文来自转载,原文链接: