数据模型
 
常用命令

创建节点的几个有效性:  持久化目录节点为默认创建 创建方式为 create /节点path 临时节点 创建方式为 create -e /节点path 顺序节点 创建方式为 create -s /节点path 顺序临时节点 创建方式为 create -es /节点path 
javaAPI操作zookeeper实现节点增删改查
首先导入pom.xml坐标
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>zk-client</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
log4j.properties文件配置
log4j.rootLogger=off,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n
创建连接实现增删改查
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class CuratorTest {
private CuratorFramework client;
@Before
public void testConnect() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
client = CuratorFrameworkFactory.builder()
.connectString("192.168.149.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("itheima")
.build();
client.start();
}
@Test
public void testCreate() throws Exception {
String path = client.create().forPath("/app2", "hehe".getBytes());
System.out.println(path);
}
@Test
public void testCreate2() throws Exception {
String path = client.create().forPath("/app1");
System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path);
}
@Test
public void testCreate4() throws Exception {
String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(path);
}
@Test
public void testGet1() throws Exception {
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));
}
@Test
public void testGet2() throws Exception {
List<String> path = client.getChildren().forPath("/");
System.out.println(path);
}
@Test
public void testGet3() throws Exception {
Stat status = new Stat();
System.out.println(status);
client.getData().storingStatIn(status).forPath("/app1");
System.out.println(status);
}
@Test
public void testSet() throws Exception {
client.setData().forPath("/app1", "itcast".getBytes());
}
@Test
public void testSetForVersion() throws Exception {
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion();
System.out.println(version);
client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
}
@Test
public void testDelete() throws Exception {
client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception {
client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
client.delete().guaranteed().inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("我被删除了~");
System.out.println(event);
}
}).forPath("/app1");
}
@After
public void close() {
if (client != null) {
client.close();
}
}
}
事件监听器
基本概念:

JavaAPI操作
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class CuratorWatcherTest {
private CuratorFramework client;
@Before
public void testConnect() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
client = CuratorFrameworkFactory.builder()
.connectString("192.168.149.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("itheima")
.build();
client.start();
}
@After
public void close() {
if (client != null) {
client.close();
}
}
@Test
public void testNodeCache() throws Exception {
final NodeCache nodeCache = new NodeCache(client,"/app1");
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了~");
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
nodeCache.start(true);
while (true){
}
}
@Test
public void testPathChildrenCache() throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点变化了~");
System.out.println(event);
PathChildrenCacheEvent.Type type = event.getType();
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("数据变了!!!");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
pathChildrenCache.start();
while (true){
}
}
@Test
public void testTreeCache() throws Exception {
TreeCache treeCache = new TreeCache(client,"/app2");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("节点变化了");
System.out.println(event);
}
});
treeCache.start();
while (true){
}
}
}
分布式锁:
基本概念:
解决跨机器的进程之间的数据同步问题 
锁的原理
船舰一个临时的有序的节点 
JavaApi实现
多线程测试类:
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
Thread t1 = new Thread(ticket12306,"携程");
Thread t2 = new Thread(ticket12306,"飞猪");
t1.start();
t2.start();
}
锁的实现:
package com.itheima.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable{
private int tickets = 10;
private InterProcessMutex lock ;
public Ticket12306(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.149.135:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
client.start();
lock = new InterProcessMutex(client,"/lock");
}
@Override
public void run() {
while(true){
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Zookeeper集群搭建
 
事务请求 增删改 非事务请求 查
搭建教程: https://wws.lanzoui.com/ik1lfrbw9pe
注意点
在leader停掉后并且还是构成集群模式时会选择一个新的leader 当follower停掉后并且还构成集群环境,对数据没有影响
|