分布式锁
Zookeeper 分布式锁加锁原理
Zookeeper独占式非公平锁

- 多个线程尝试获取锁
- 判断锁是否被线程持有、是则所有线程监听该锁、
- 如果没有被线程持有,则多个线程同时去创建竞争一把锁
- 创建不成功的线程去监听等待锁的释放
- 创建成功的线程执行业务逻辑、完成后释放锁,由于多个线程同时监视这把锁,因此当锁被删除时,多个线程再同时去竞争获取这把锁。
存在的问题: 如果并发量高的情况下、性能回下降比较厉害。所有连接都对同一个结点进行监听,当服务器检测到删除时间时,通知所有连接,所有连接同时收到时间,再次并发竞争。(羊群效应)
Zookeeper独占式公平锁

- 获取锁的请求进来,直接在/lock结点下创建一个临时顺序结点;
- 判断自己是不是/module/lock/product_id节点下最小的结点;
2.1 是最小的则获得锁 2.2 不是,则对前面的结点进行监听(watch) - 获取到锁,处理业务逻辑,处理完释放锁(delete),然后后继第一个结点将收到通知,重复第2步判断。
借助临时顺序节点,可以避免同时多个节点的并发争抢锁,缓解了服务端压力,这种加锁模式都是基于排队加锁的,是公平锁的体现。
存在的问题:每个时刻只能有一个线程持有锁,其他线程全部得等待,在并发量高的情况,且业务逻辑花费时间较长,那么就会导致其他线程等待时间比较长,用户感觉到停顿,性能和用户体验都存在问题。
解决思想:读与读请求不应该相互阻塞,可以共享锁、读操作阻塞写操作,即读写不能同时进行,写操作阻塞读写操作,写写,写读不能同时进行。即使用读写锁;
顺带谈一下缓存数据库双写不一致问题;
- 读写不一致
 上图中,DB最后的值为20,而缓存的值为10,数据就不一致了。 - 写写不一致
 上面图中,数据库的值为20,缓存的值为11;发生不一致。
Zookeeper共享锁实现原理
 1)读锁会监听离得最近的写锁 2)写锁会监听离得最近的读锁和写锁;
代码示例
- 配置类
@Configuration
public class CuratorConfig {
@Bean(initMethod = "start")
public CuratorFramework curatorFramework(){
RetryPolicy policy=new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", policy);
return client;
}
}
TestController
@RestController
public class TestController {
@Autowired
OrderService orderService;
@Autowired
CuratorFramework curatorFramework;
@Value("${server.port}")
String port;
@RequestMapping("/order")
public Object order(int id) throws Exception {
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/product_" + id);
try{
lock.acquire();
orderService.addOrder(id);
}catch(Exception e){
if (e instanceof RuntimeException) {
throw e;
}
} finally{
lock.release();
}
return "ok:";
}
}
- OrderServer
@Service
public class OrderService {
@Autowired
ProductMapper productMapper;
@Autowired
OrderMapper orderMapper;
@Transactional(rollbackFor = Exception.class)
public void addOrder(int id) throws Exception {
Product product = productMapper.getProduct(id);
if (product.getStock()<=0){
throw new Exception("out of stock");
}
int stock = productMapper.deductStock(id);
if (stock==1){
Order order=new Order();
order.setPid(id);
order.setUserId(UUID.randomUUID().toString());
orderMapper.insert(order);
}else {
throw new RuntimeException("deduct stock fail, retry.");
}
}
}
ProductMapper
@Mapper
@Repository
public interface ProductMapper {
@Select(" select * from product where id=#{id} ")
Product getProduct(@Param("id") Integer id);
@Update(" update product set stock=stock-1 where id=#{id} ")
int deductStock(@Param("id") Integer id);
}
OrderMapper
@Mapper
@Repository
public interface OrderMapper {
@Options(useGeneratedKeys = true,keyColumn = "id",keyProperty = "id")
@Insert(" insert into `order`(user_id,pid) values(#{userId},#{pid}) ")
int insert(Order order);
}
Order
public class Order {
private Integer id;
private Integer pid;
private String userId;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getPid() {
return pid;
}
public void setPid(Integer pid) {
this.pid = pid;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
}
Product
public class Product {
private Integer id;
private String productName;
private Integer stock;
private Integer version;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public Integer getStock() {
return stock;
}
public void setStock(Integer stock) {
this.stock = stock;
}
public Integer getVersion() {
return version;
}
public void setVersion(Integer version) {
this.version = version;
}
}
注册中心
 Order-Service需要调用外部服务的User-Service。对外部的服务依赖,我们直接配置在 我们的服务配置文件中,在服务调用关系比较简单的场景,是完全OK的。随着服务的扩张, User-Service 可能需要进行集群部署,  如果系统的调用不是很复杂,可以通过配置管理,然后实现一个简单的客户端负载均衡也是OK 的,但是随着业务的发展,服务模块进行更加细粒度的划分,业务也变得更加复杂,再使用简单 的配置文件管理,将变得难以维护。当然我们可以再前面加一个服务代理,比如nginx做反向代 理, 如下  如果再复杂点,如下:  服务不再是简单的链路,而是错综复杂的调用链。
注册中心简介
这时候就要借助于Zookeeper的基本特性来实现一个注册中心;
-
注册中心,就是让众多的服务,都在Zookeeper中进行注册,啥是注册,注册就是把自己的一些服务信 息,比如IP,端口,还有一些更加具体的服务信息,都写到 Zookeeper节点上, 这样有需要的服务就可以直接从zookeeper上面去拿 -
怎么拿呢? 这时我们可以定义统一的名称,比如, User-Service, 那所有的用户服务在启动的时候,都在User-Service 这个节点下面创建一个子节 点(临时节点),这个子节点保持唯一就好,代表了每个服务实例的唯一标识,有依赖用户服务 的比如Order-Service 就可以通过User-Service 这个父节点,就能获取所有的User-Service 子 节点,并且获取所有的子节点信息(IP,端口等信息),拿到子节点的数据后Order-Service可 以对其进行缓存,然后实现一个客户端的负载均衡 -
同时还可以对这个User-Service 目录进行 监听, 这样有新的节点加入,或者退出,Order-Service都能收到通知,这样Order-Service重 新获取所有子节点,且进行数据更新 -
这个用户服务的子节点的类型都是临时结点,Zookeeper中临时结点的生命周期和Client的Session绑定的,如果Session超时,对应的结点就会被删除,被删除时,Zookeeper会通知对该节点父节点进行监听的客户端,这样客户端刷新本地缓存,当有新服务加入时,同样也会通知客户端,刷新本地缓存,要达到这个目标需要客户端重复的注册对父节点的监听。这样就实现了服务的自动注册和注销。 -
如下图所示:  -
Spring Cloud 生态也提供了Zookeeper注册中心的实现,这个项目叫 Spring Cloud Zookeeper
示例代码:
创建Product-center项目: pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>product-center</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>product-center</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties:
spring.application.name=product-center
#zookeeper 连接地址
spring.cloud.zookeeper.connect-string=192.168.109.200:2181
#将本服务注册到zookeeper
spring.cloud.zookeeper.discovery.register=true
spring.cloud.zookeeper.session-timeout=30000
ProductCenterApplication
@SpringBootApplication
@RestController
public class ProductCenterApplication {
@Value("${server.port}")
private String port;
@Value( "${spring.application.name}" )
private String name;
@GetMapping("/getInfo")
public String getServerPortAndName(){
return this.name +" : "+ this.port;
}
public static void main(String[] args) {
SpringApplication.run(ProductCenterApplication.class, args);
}
}
HeartbeatEventListener
@Component
@Slf4j
public class HeartbeatEventListener implements ApplicationListener<HeartbeatEvent> {
@Override
public void onApplicationEvent(HeartbeatEvent event) {
Object value = event.getValue();
ZookeeperServiceWatch source = (ZookeeperServiceWatch)event.getSource();
log.info(" event:source: {} ,event:value{}",source.getCache().getCurrentChildren("/services"),value.toString());
}
}
ApplicationRunner1
@Component
public class ApplicationRunner1 implements ApplicationRunner{
@Autowired
private ZookeeperServiceRegistry serviceRegistry;
@Override
public void run(ApplicationArguments args) throws Exception {
ZookeeperRegistration registration = ServiceInstanceRegistration.builder()
.defaultUriSpec()
.address("anyUrl")
.port(10)
.name("/a/b/c/d/anotherservice")
.build();
this.serviceRegistry.register(registration);
}
}
创建User-Center项目: pom
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework.cloud</groupId>-->
<!--<artifactId>spring-cloud-starter-openfeign</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
application.properties
spring.application.name=user-center
#zookeeper 连接地址
spring.cloud.zookeeper.connect-string=192.168.109.200:2181
启动类:
@SpringBootApplication
public class UserCenterApplication {
public static void main(String[] args) {
SpringApplication.run(UserCenterApplication.class, args);
}
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
return restTemplate;
}
}
TestController
@RestController
public class TestController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private LoadBalancerClient loadBalancerClient;
@GetMapping("/test")
public String test() {
return this.restTemplate.getForObject("http://product-center/getInfo", String.class);
}
@GetMapping("/lb")
public String getLb(){
ServiceInstance choose = loadBalancerClient.choose("product-center");
String serviceId = choose.getServiceId();
int port = choose.getPort();
return serviceId + " : "+port;
}
}
运行即可;
Leader选举
直接贴代码 pom.xml
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.8</version>
</dependency>
</dependencies>
LeaderSelectorDemo
package zookeeper.leaderselector;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class LeaderSelectorDemo {
private static final String CONNECT_STR="192.168.109.200:2181";
private static RetryPolicy retryPolicy=new ExponentialBackoffRetry( 5*1000, 10 );
private static CuratorFramework curatorFramework;
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
String appName = System.getProperty("appName");
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);
LeaderSelectorDemo.curatorFramework = curatorFramework;
curatorFramework.start();
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
public void takeLeadership(CuratorFramework client) throws Exception
{
System.out.println(" I' m leader now . i'm , "+appName);
TimeUnit.SECONDS.sleep(15);
}
};
LeaderSelector selector = new LeaderSelector(curatorFramework, "/cachePreHeat_leader", listener);
selector.autoRequeue();
selector.start();
countDownLatch.await();
}
}
运行即可。
|