目录
Shared Counter(共享计数器)
Manages a shared integer. All clients watching the same path will have the up-to-date value of the shared integer (considering ZK’s normal consistency guarantees).
共享计数器,适用于Master操作,并将计数结果同步到其他所有的从服务器的情景,Zk Watcher的一个基础应用
示例代码
package com.freud.zk.curator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Counter - SharedCounter
*
* 共享计数器,适用于Master操作,并将计数结果同步到其他所有的从服务器的情景
*
* @author Freud
*
*/
public class CuratorCounterSharedCounterZookeeper {
private static final int SECOND = 1000;
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int index = i;
service.submit(new Runnable() {
public void run() {
try {
new CuratorCounterSharedCounterZookeeper().schedule(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(10 * SECOND);
service.shutdownNow();
}
private void schedule(final int index) throws Exception {
CuratorFramework client = this.getStartedClient(index);
String path = "/curator_counter/shared_counter";
final SharedCount count = new SharedCount(client, path, 100);
count.addListener(new SharedCountListener() {
public void stateChanged(CuratorFramework client, ConnectionState state) {
System.out.println("Thread [" + index + "][Callback]State changed!");
}
public void countHasChanged(SharedCountReader reader, int value) throws Exception {
System.out.println("Thread [" + index + "][Callback]Count changed to [" + value + "]!");
}
});
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep((index + 1) * 1000);
count.setCount(index);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
count.start();
}
private CuratorFramework getStartedClient(final int index) {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
System.out.println("Thread [" + index + "] Server connected...");
return cfFluent;
}
}
打印结果
Thread [2] Server connected...
Thread [1] Server connected...
Thread [0] Server connected...
Thread [0][Callback]State changed!
Thread [2][Callback]State changed!
Thread [1][Callback]State changed!
Thread [2][Callback]Count changed to [0]!
Thread [1][Callback]Count changed to [0]!
Thread [0][Callback]Count changed to [0]!
Thread [2][Callback]Count changed to [1]!
Thread [1][Callback]Count changed to [1]!
Thread [0][Callback]Count changed to [1]!
Thread [0][Callback]Count changed to [2]!
Thread [2][Callback]Count changed to [2]!
Thread [1][Callback]Count changed to [2]!
Distributed Atomic Long(分布式计数器)
A counter that attempts atomic increments. It first tries using optimistic locking. If that fails, an optional InterProcessMutex is taken. For both optimistic and mutex, a retry policy is used to retry the increment.
示例代码
package com.freud.zk.curator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* Zookeeper - Curator - Counter - DistributedAtomicLong
*
* 分布式计数器
*
* @author Freud
*
*/
public class CuratorCounterDistributedAtomicLongZookeeper {
private static final int SECOND = 1000;
private final static CountDownLatch down = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int index = i;
service.submit(new Runnable() {
public void run() {
try {
new CuratorCounterDistributedAtomicLongZookeeper().schedule(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
down.countDown();
Thread.sleep(10 * SECOND);
service.shutdownNow();
}
private void schedule(final int index) throws Exception {
down.await();
CuratorFramework client = this.getStartedClient(index);
String path = "/curator_counter/distribute_atomic_long";
DistributedAtomicLong count = new DistributedAtomicLong(client, path, new ExponentialBackoffRetry(1000, 3));
Thread.sleep((index + 1) * SECOND);
AtomicValue<Long> al = count.get();
System.out.println("Thread [" + index + "] get new Long value [" + al.postValue() + "] result status ["
+ al.succeeded() + "]");
count.increment();
}
private CuratorFramework getStartedClient(final int index) {
RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
// Fluent风格创建
CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
cfFluent.start();
System.out.println("Thread [" + index + "] Server connected...");
return cfFluent;
}
}
打印结果
Thread [0] Server connected...
Thread [1] Server connected...
Thread [2] Server connected...
Thread [0] get new Long value [0] result status [true]
Thread [1] get new Long value [1] result status [true]
Thread [2] get new Long value [2] result status [true]
参考资料
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》 – 倪超
Curator官网 : http://curator.apache.org/
本文来自转载,原文链接: