Hello Coder


  • 首页

  • 归档

  • 标签

  • 搜索
close

zookeeper 分布式锁

发表于 2016-08-29

分布式锁

分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。

可重入锁Shared Reentrant Lock

Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。它是由类InterProcessMutex来实现。
它的构造函数为:

1
public InterProcessMutex(CuratorFramework client, String path)

通过acquire获得锁,并提供超时机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
* re-entrantly. Each call to acquire must be balanced by a call to release()
*/
public void acquire()
/**
* Acquire the mutex - blocks until it's available or the given time expires. Note: the same
* thread can call acquire re-entrantly. Each call to acquire that returns true must be
* balanced by a call to release()
* Parameters:
* time - time to wait
* unit - time unit
* Returns:
* true if the mutex was acquired, false if not
*/
public boolean acquire(long time,
TimeUnit unit)

通过release()方法释放锁。InterProcessMutex 实例可以重用。

Revoking
ZooKeeper recipes wiki定义了可协商的撤销机制。
为了撤销mutex, 调用下面的方法:

1
2
3
4
5
6
/**
* 将锁设为可撤销的. 当别的进程或线程想让你释放锁是Listener会被调用。
* Parameters:
* listener - the listener
*/
public void makeRevocable(RevocationListener<T> listener)

如果请求撤销当前的锁, 调用Revoker方法。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Utility to mark a lock for revocation. Assuming that the lock has been registered
* with a RevocationListener, it will get called and the lock should be released. Note,
* however, that revocation is cooperative.
* Parameters:
* client - the client
* path - the path of the lock - usually from something like
* InterProcessMutex.getParticipantNodes()
*/
public static void attemptRevoke(CuratorFramework client,
String path)
throws Exception
阅读全文 »

zookeeper leader选举

发表于 2016-08-29

Leader选举

在实际使用ZooKeeper开发中,我们最常用的是Apache Curator。 在使用ZK API开发时会遇到让人头疼的几个问题,ZK连接管理、SESSION失效等一些异常问题的处理,Curator替我们解决了这些问题,通过对ZK连接状态的监控来做出相应的重连等操作,并触发事件。

更好的地方是Curator对ZK的一些应用场景提供了非常好的实现,而且有很多扩充,这些都符合ZK使用规范。
它的主要组件为:

  • Recipes: ZooKeeper的系列recipe实现, 基于 Curator Framework.**
  • Framework: 封装了大量ZooKeeper常用API操作,降低了使用难度, 基于Zookeeper增加了一些新特性,对ZooKeeper链接的管理,对链接丢失自动重新链接。**
  • Utilities: 一些ZooKeeper操作的工具类包括ZK的集群测试工具路径生成等非常有用,在Curator-Client包下org.apache.curator.utils。
  • Client: ZooKeeper的客户端API封装,替代官方 ZooKeeper class,解决了一些繁琐低级的处理,提供一些工具类。
  • Errors: 异常处理, 连接异常等
  • Extensions: 对curator-recipes的扩展实现,拆分为curator-: stuck_out_tongue_closed_eyes: iscovery和curator-: stuck_out_tongue_closed_eyes: iscovery-server提供基于RESTful的Recipes WEB服务.

在分布式计算中, leader election是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是leader或者coordinator. 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader.
除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。

Curator 有两种选举recipe, 可以根据需求选择合适的。

Leader latch

使用LeaderLatch类选举的例子.

构造函数如下:

1
2
public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath, String id)

启动LeaderLatch: leaderLatch.start();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Add this instance to the leadership election and attempt to acquire leadership.
*
* @throws Exception errors
*/
public void start() throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
{
try
{
internalStart();
}
finally
{
startTask.set(null);
}
}
}));
}

一旦启动, LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader。 可以随时查看一个给定的实例是否是leader:

1
public boolean hasLeadership()

LeaderLatch在请求成为leadership时有block方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void await()
throws InterruptedException, EOFException
{
synchronized (this)
{
while ((this.state.get() == State.STARTED) && (!this.hasLeadership.get()))
{
wait();
}
}
if (this.state.get() != State.STARTED)
{
throw new EOFException();
}
}
Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.
public boolean await(long timeout,
TimeUnit unit)
throws InterruptedException

一旦不使用LeaderLatch了,必须调用close方法。 如果它是leader,会释放leadership, 其它的参与者将会重新选举一个leader。

异常处理
LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当 SUSPENDED 或 LOST 时, leader不再认为自己还是leader.当LOST 连接重连后 RECONNECTED,LeaderLatch会删除先前的ZNode然后重新创建一个.
LeaderLatch用户必须考虑导致leadership丢失的连接问题。 强烈推荐你使用ConnectionStateListener。

阅读全文 »

java mock

发表于 2016-08-26

Mock 测试

Mock 最大的功能是帮你把单元测试的耦合分解开,如果你的代码对另一个类或者接口有依赖,它能够帮你模拟这些依赖,并帮你验证所调用的依赖的行为。

比如一段代码有这样的依赖:

img

当我们需要测试A类的时候,如果没有 Mock,则我们需要把整个依赖树都构建出来,而使用 Mock 的话就可以将结构分解开,像下面这样:

img

mock对象就是在调试期间用来作为真实对象的替代品。

mock测试就是在测试过程中,对那些不容易构建的对象用一个虚拟对象来代替测试的方法就叫mock测试。

Mock 对象使用范畴

  • 真实对象具有不可确定的行为,产生不可预测的效果,(如:股票行情,天气预报)
  • 真实对象很难被创建的
  • 真实对象的某些行为很难被触发
  • 真实对象实际上还不存在的(和其他开发小组或者和新的硬件打交道)等等
阅读全文 »

zookeeper 安装

发表于 2016-08-24

Zookeeper 安装及配置(Mac)

Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

Zookeeper 的安装

1
下载地址:http://www.apache.org/dyn/closer.cgi/zookeeper

首先从官网下载ZooKeeper压缩包,然后解压下载得到的ZooKeeper压缩包,发现有“bin,conf,lib”等目录。“bin目录”中存放有运行脚本;“conf目录”中存放有配置文件;“lib目录”中存放有运行所需要第三方库。
解压文件:

1
tar zxvf zookeeper-3.4.8.tar.gz

Zookeeper 的配置

伪分布式部署

  1. 在“conf”目录下,新建一个名为“zoo.cfg”的文件,准备部署3个节点,其中内容如下:
1
2
3
4
5
6
7
8
9
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/Users/nali/program/zookeeper-3.4.8_1/data
dataLogDir=/Users/nali/program/zookeeper-3.4.8_1/logs
clientPort=2181(不同节点注意不同)
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

参数说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#tickTime: 客户端和服务端或者服务端多个节点之间维持心跳的时间间隔,单位毫秒。
#initLimit:zookeeper集群中的包含多台server, 其中一台为leader, 集群中其余的server为follower.
initLimit参数配置初始化连接时, follower和leader之间的最长心跳时间. 此时该参数设置为5, 说明时间限制为5 倍tickTime, 即5*2000=10000ms=10s.
#syncLimit:该参数配置leader和follower之间发送消息, 请求和应答的最大时间长度. 此时该参数设置为2, 说明时间限制为2倍tickTime, 即4000ms.
#dataDir: 数据文件存放目录,可以是任意目录
#dataLogDir: 用来重做数据的事务日志文件存放目录.
#clientPort: 监听client连接的端口号.
#server.1,server.2,server.3: 表示节点编号,后边用冒号隔开的三个数字,分别表示节点的ip,交换数据的端口号,某个节点挂掉之后专门用来选举的端口号。
注:initLimit,syncLimit在单节点部署模式下,不需要配置。
  1. 在dataDir目录下创建myid文件,写入该节点的编号 1 。这样一个节点就配置完成了。
  2. 复制 zookeeper-3.4.8-1 到 zookeeper-3.4.8-2和 zookeeper-3.4.8-3,要修改的地方是 zoo.cfg里的 dataDir,dataLogDir,clientPort。还有需要在自己的data目录下新建myid文件,写入自己的编号。
myid Data目录 Client Server Leader 配置文件
1 /zookeeper-3.4.8_1/data 2181 2888 3888 z1.cfg
2 /zookeeper-3.4.8_2/data 2182 2889 3889 z2.cfg
3 /zookeeper-3.4.8_3/data 2183 2890 3890 z3.cfg
  1. 启动和测试

分别进入三个节点的bin目录,启动zookeeper,运行./zkServer.sh start。

注:前边节点启动的时候,会抛出一些错误,可忽略。这是因为另外的节点没启动,导致的通信异常。

1
2
3
4
5
nalideMacBook-Pro-4:program nali$ cd zookeeper-3.4.8_3/bin/
nalideMacBook-Pro-4:bin nali$ ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /Users/nali/program/zookeeper-3.4.8_3/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

​

可以用自带的基于telnet的客户端测试一下,看看是否启动成功。随便进入一个节点的bin目录,比如节点0

./zkCli.sh -server 127.0.0.1:2180,随便输入一个字符,他会跳出help界面。说明服务端启动成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
nalideMacBook-Pro-4:bin nali$ ./zkCli.sh -server 127.0.0.1:2181
Connecting to 127.0.0.1:2181
2016-08-25 21:02:28,712 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.8--1, built on 02/06/2016 03:18 GMT
2016-08-25 21:02:28,715 [myid:] - INFO [main:Environment@100] - Client environment:host.name=192.168.120.222
2016-08-25 21:02:28,715 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_91
2016-08-25 21:02:28,717 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2016-08-25 21:02:28,717 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre
2016-08-25 21:02:28,717 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/Users/nali/program/zookeeper-3.4.8_1/bin/../build/classes:/Users/nali/program/zookeeper-3.4.8_1/bin/../build/lib/*.jar:/Users/nali/program/zookeeper-3.4.8_1/bin/../lib/slf4j-log4j12-1.6.1.jar:/Users/nali/program/zookeeper-3.4.8_1/bin/../lib/slf4j-api-1.6.1.jar:/Users/nali/program/zookeeper-3.4.8_1/bin/../lib/netty-3.7.0.Final.jar:/Users/nali/program/zookeeper-3.4.8_1/bin/../lib/log4j-1.2.16.jar:/Users/nali/program/zookeeper-3.4.8_1/bin/../lib/jline-0.9.94.jar:/Users/nali/program/zookeeper-3.4.8_1/bin/../zookeeper-3.4.8.jar:/Users/nali/program/zookeeper-3.4.8_1/bin/../src/java/lib/*.jar:/Users/nali/program/zookeeper-3.4.8_1/bin/../conf:
2016-08-25 21:02:28,718 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/Users/nali/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
2016-08-25 21:02:28,718 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/var/folders/jr/rgnrtr4n13n3d6tbwsch9_qr0000gn/T/
2016-08-25 21:02:28,718 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA>
2016-08-25 21:02:28,718 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Mac OS X
2016-08-25 21:02:28,719 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=x86_64
2016-08-25 21:02:28,719 [myid:] - INFO [main:Environment@100] - Client environment:os.version=10.11.4
2016-08-25 21:02:28,719 [myid:] - INFO [main:Environment@100] - Client environment:user.name=nali
2016-08-25 21:02:28,719 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/Users/nali
2016-08-25 21:02:28,719 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/Users/nali/program/zookeeper-3.4.8_1/bin
2016-08-25 21:02:28,721 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@531d72ca
Welcome to ZooKeeper!
  1. Zookeeper常用命令
    • ZooKeeper服务端命令:
      • 启动ZK服务: bin/zkServer.sh start
      • 查看ZK服务状态: bin/zkServer.sh status
      • 停止ZK服务: bin/zkServer.sh stop
      • 重启ZK服务: bin/zkServer.sh restart
    • zk客户端命令
      • 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
      • 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据
      • 创建文件,并设置初始内容: create /zk “test” 创建一个新的 znode节点“ zk ”以及与它关联的字符
      • 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
      • 修改文件内容: set /zk “zkbak” 对 zk 所关联的字符串进行设置
      • 删除文件: delete /zk 将刚才创建的 znode 删除
      • 退出客户端: quit
      • 帮助命令: help

总结

zookeeper仅仅是维护了一个分布式的树形目录。如下图。它通过fast paxos算法保证多个节点上znode的数据一致性。一套zookeeper可以同时给多个应用程序使用,只需要隔离好各自的path~。在生产环境中,多个程序,比如hadoop,hbase,strom共用一套zookeeper也是常事。结构如下图。

img

Guava cache

发表于 2016-08-24

Guava Cache

Guava cache是一个应用内缓存(一般作为应用的本地缓存,redis作为集中式分布式缓存)。

一个缓存需要考虑的问题:

  • 缓存读取失败如何加载数据
  • 加载策略(同步还是异步)
  • 缓存过期问题
  • 统计缓存命中情况
  • 缓存数据失效时设置监听
  • 缓存满时替换策略(LRU、FIFO)
  • ……

Guava简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void test1() {
CacheLoader<String, String> loader = new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
System.out.println("call..");
return key.toUpperCase();
}
};
//fluent风格...
LoadingCache<String, String> cache = CacheBuilder.newBuilder().build(loader);
System.out.println(cache.size());
System.out.println(cache.getUnchecked("aaa"));
System.out.println(cache.size());
System.out.println(cache.getUnchecked("aaa"));
try {
System.out.println(cache.get("cjp"));
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

该示例中,仅仅将一个类型是字符串作为key,value为它的大写形式。运行的结果如下:

1
2
3
4
5
6
7
0
call..
AAA
1
AAA
call..
CJP

Guava的缓存是一个LoadingCache实例,通过CacheBuilder创建该实例,并传入一个CacheLoader,CacheLoader实例注明了在缓存读取失败时如何加载数据,开始时,缓存中没有任何数据,size为0,当取aaa的时候,触发了缓存加载数据,输出call…,虽然缓存的size变成了1。然后再取aaa时,因为缓存中已经有了该key对应的value,就没有触发加载。

需要注意一下getUnchecked方法和get方法的不同,前者不对可能的异常做检查,调用代码不需要显式的捕捉异常,而后者调用代码需要显式的捕获异常。

这是一个非常简单的示例,可以看到使用guava实现一个缓存非常简单,如果将创建CacheLoader实例和build LoadingCache的两行代码合并,使用仅一行代码就可以实现一个缓存,并且Guava的缓存是线程安全的,可以放心的在多线程的环境中使用。

复杂一点的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void test2() throws Exception {
//缓存同步删除
LoadingCache<String, String> cache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).maximumSize(3).recordStats().removalListener(new RemovalListener<String, String>() {
@Override
public void onRemoval(RemovalNotification<String, String> notification) {
System.out.println("remove key[" + notification.getKey() + "],value[" + notification.getValue() + "],remove reason[" + notification.getCause() + "]");
System.out.println("remove thread name " + Thread.currentThread().getName());
}
}).build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
System.out.println("key[" + key + "] to upper case");
return key.toUpperCase();
}
});
System.out.println(cache.getUnchecked("a"));
System.out.println(cache.getUnchecked("b"));
System.out.println("thread name " + Thread.currentThread().getName());
cache.invalidate("b");//删除key为b的值
System.out.println(cache.getUnchecked("a"));
Thread.sleep(5000);
System.out.println(cache.getUnchecked("c"));
System.out.println(cache.getUnchecked("a"));
System.out.println(cache.stats().toString());
System.out.println("end");
}

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
key[a] to upper case
A
key[b] to upper case
B
thread name main
remove key[b],value[B],remove reason[EXPLICIT]
remove thread name main A
remove key[a],value[A],remove reason[EXPIRED]
remove thread name main key[c] to upper case
C
key[a] to upper case
A
CacheStats{hitCount=1, missCount=4, loadSuccessCount=4, loadExceptionCount=0, totalLoadTime=3460000, evictionCount=1}
end

创建了一个稍复杂的LoadingCache实例。各方法意义如下:

expireAfterWrite:写入缓存后的过期时间
maximumSize:缓存的最多存放元素个数
recordStats:对缓存命中情况进行统计
removalListener:设置缓存数据失效时监听器

Guava中很多地方都是这种fluent的方式.

在删除的监听器中打印线程的名字是为了显示该监听器是同步的还是异步的。可以看到删除监听是同步的,因为和主线程的名字是一样的,其实可以理解,因为我们并没有指定额外的线程池。删除监听器中可以看到删除的key、value、cause。主线程sleep 5s后,缓存中key为a的元素就过期了,可以看到监听器被调用,最后通过cache.stats()取得缓存命中的情况统计。可以看到命中1次,miss了4次(load了4次),事实上的确如此。

可以通过RemovalListeners.asynchronous方法就可以创建一个异步的listener对象。如下方式创建LoadingCache:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
LoadingCache<String, String> cache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).maximumSize(3).recordStats().removalListener(RemovalListeners.asynchronous(new RemovalListener<String, String>() {
//删除缓存监听器异步删除
@Override
public void onRemoval(RemovalNotification<String, String> notification) {
System.out.println("remove key[" + notification.getKey() + "],value[" + notification.getValue() + "],remove reason[" + notification.getCause() + "]");
System.out.println("remove thread name " + Thread.currentThread().getName());
}
}, Executors.newCachedThreadPool())).build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
System.out.println("key[" + key + "] to upper case");
return key.toUpperCase();
}
});

RemovalListeners.asynchronous方法接受两个参数,第一个参数是RemovalListener对象,第二个参数接收一个线程池,这样就可以异步的设置删除监听器了。运行可以看到主线程的线程名和监听器中的线程名是不同的。

上面创建缓存的方式是通过expireAfterWrite指定元素的过期时间,达到重新加载的。也就是说当过期后,这个元素就不存在了,再获取的时候就要通过load重新加载,当加载的时候,获取value的主线程必须同步的等缓存加载完获得数据后才能继续执行。这在一定程度上限制了访问速度。

如果数据量不大的情况下,就不必使用过期时间这种方式,而使用刷新,使用refreshAfterWrite指定刷新的时间间隔。看如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void test3() throws InterruptedException {
LoadingCache<String, String> cache = CacheBuilder.newBuilder().recordStats().refreshAfterWrite(3, TimeUnit.SECONDS).build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
System.out.println("load key[" + key + "]");
return key.toUpperCase();
}
@Override
public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
System.out.println("reload key[" + key + "],oldValue[" + oldValue + "]");
return super.reload(key, oldValue);
}
});
System.out.println(cache.getUnchecked("a"));
System.out.println(cache.getUnchecked("b"));
cache.refresh("a");
Thread.sleep(3000);
System.out.println(cache.getUnchecked("a"));
System.out.println(cache.getUnchecked("c"));
}

这是一个非常简单的refresh示例,如果使用refreshAfterWrite,需要实现CacheLoader的reload方法,如果不实现,他有一个默认的实现,就是本示例展示的代码,直接调用load方法。代码的运行结果如下:

1
2
3
4
5
6
7
load key[a]
A
load key[b] B
reload key[a],oldValue[A]
load key[a] reload key[a],oldValue[A]
load key[a] A
load key[c] C

本例中刷新的时间设置为3s,再第一次显式的调用cache.refresh("a")的时候,可以看到reload方法被调用了。但是reload直接走默认的实现,调用了load方法,所以接着就输出了load key[a]当主线程sleep 3s后,再取a的值时因为超过刷新间隔,又会调用reload方法。可以想象这里的reload肯定是以同步的方式进行的,因为我们并没有指定额外的线程池用来执行reload方法,也就是说当到达刷新时间间隔后,取value的主线程还是要等refresh结束,才能拿到数据后执行,这和刚才的expireAfterWrite方式差不多。Guava提供了异步刷新的方式,看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 缓存失效时异步重现加载,缓存调用者永远不用阻塞等
*
* @throws InterruptedException
*/
@Test
public void test4() throws InterruptedException {
LoadingCache<String, String> cache = CacheBuilder.newBuilder().recordStats().refreshAfterWrite(3, TimeUnit.SECONDS).build(new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
System.out.println("load key[" + key + "]");
return key.toUpperCase();
}
@Override
public ListenableFuture<String> reload(final String key, String oldValue) throws Exception {
ListenableFutureTask<String> task = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("reload key[" + key + "] synchronize at thread[" + Thread.currentThread().getName() + "],this will take 1 second...");
Thread.sleep(1000);
System.out.println("reload end...");
return key.toUpperCase();
}
});
Executors.newCachedThreadPool().execute(task);
System.out.println("reload key[" + key + "],oldValue[" + oldValue + "]");
return task;
}
});
//注意:如果重来没有被get过,在缓存中完全没有,第一次调用会执行load,然后加入到cache中,只有被加入到其中的
//到达失效时间后,再被加载的时候才会触发reload
System.out.println(cache.getUnchecked("a"));
System.out.println(cache.getUnchecked("b"));
cache.refresh("a");
Thread.sleep(3000);
//这里的取a 不会触发reload,因为上面refresh需要耗1s才能结束,而主线程这里只需要等3s
//所以这里的a还有1s的存活时间
System.out.println(cache.getUnchecked("a"));
//但是这里的b 就必须reload了,但是reload的过程需要注意下:先调用load方法,然后发现失效了,但是还会返回之前
//缓存中的值,同时会加载reload,因为是异步reload,主线程这里不用等reload结束,继续向下运行获取c的值
System.out.println(cache.getUnchecked("b"));
System.out.println(cache.getUnchecked("c"));
//这里再暂停5s是为了看清楚上面reload b的结束
Thread.sleep(5000);
}

本示例依然设置refresh时间为3s。重点是reload方法,先打印出reload执行所在的线程名,为了能清楚的看到主线程不需要等refresh完,这里sleep了1s。其他代码跟之前的差不多,运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
load key[a]
A
load key[b] B
reload key[a],oldValue[A]
reload key[a] synchronize at thread[pool-1-thread-1],this will take 1 second...
reload end...
A
reload key[b],oldValue[B]
B
load key[c] C
reload key[b] synchronize at thread[pool-2-thread-1],this will take 1 second...
reload end...

当执行cache.refresh("a")代码的时候,调用了reload方法,可以看到reload所在线程名是线程池中的。这句代码紧接着主线程sleep了3s,然后又去取a的值,按理说这时候a应该到达了刷新的时间间隔了,但是因为之前的reload方法执行就需要1s,所以对于a来说,还有1s的刷新时间剩余,所以这时取a的值,并不会触发reload。而紧接着取b的值就不同了,因为b没有被refresh过,这时候取b的值达到了刷新的时间间隔,所以会触发reload b。但是因为是异步的刷新,主线程根本不用等刷新完,所以立即输出了原来旧的值B,并立即输出了load c的结果,然后才看到 reload b的过程在继续进行,直到结束。

异步刷新,主线程永远不用等缓存的加载!现在在工作中所有使用Guava cahe的地方全部采用这种方式。

注意如下几点:

  • refreshAfterWrite和expireAfterWrite的区别
    • refreshAfterWrite只不过在刷新时间间隔到的时候,调用reload方法获取对于的key对于的value后替换当前内存中的key值。原来内存中的key对于的value是一直存在的。
    • expireAfterWrite方式当达到过期时间后,内存中的对应的key-value就被删除了(应该是被动删除方式,其实还在内存中,获取key的瞬间被删除)。只能通过load方法重新加载key对于的value。
  • refresh方式并不是达到时间间隔后就立即刷新,而是在get数据的时候,发现超过刷新时间间隔了才会刷新,是被动的方式。
  • 只有缓存中存在的key,在到达刷新时间时,才会通过reload刷新,如果缓存中没有对应key的value,第一次永远是调用load加载数据。
1…232425…31
David

David

Develop Notes

155 日志
37 标签
GitHub Weibo
© 2016 - 2020 David
由 Hexo 强力驱动
主题 - NexT.Pisces