Guava future

Guava Future

同步、异步、阻塞、非阻塞

  • 同步:所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。也就是必须一件一件事做,等前一件做完了才能做下一件事。

  • 异步:异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

  • 阻塞:阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。

  • 非阻塞:非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。

并发

线程的并发方式一般有两种:

  • 异步执行一个任务,不需要返回结果
  • 需要异步执行返回结果

第一种,直接往线程池中仍Runnable对象,代码大概如下:

1
2
3
4
5
6
Executors.newCachedThreadPool().execute(new Runnable() {
@Override
public void run() {
//do something
}
});

第二种,一般有两种情况:

  • 主线程block到数据
  • 设置回调

JDK提供的方式就是主线程block到数据,类似如下代码:

1
2
3
4
5
6
7
8
Future<Object> future = Executors.newCachedThreadPool().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
Object data = future.get();//block直到拿到数据

这种方式缺点很显然,主线程阻塞。而JDK对Future设置回调没有相应的方法,Guava扩展了这一点,引入ListenableFuture,可以对ListenableFuture设置回调。

ListenableFuture

Guava为Java并行编程Future提供了很多有用扩展,其主要接口为ListenableFuture,并借助于Futures静态扩展。获得ListenableFuture实例有两种方法:

  • 向装饰后的线程池中提交callable
  • 将普通的future转换为listenableFuture

继承至Future的ListenableFuture,允许我们添加回调函数在线程运算完成时返回值或者方法执行完成立即返回。

对ListenableFuture添加回调函数:

1
Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)

其中 FutureCallback是一个包含onSuccess(V),onFailure(Throwable)的接口。

使用如:

1
2
3
4
5
6
7
8
9
10
Futures.addCallback(ListenableFuture, new FutureCallback<Object>() {
public void onSuccess(Object result) {
System.out.printf("onSuccess with: %s%n", result);
}
public void onFailure(Throwable thrown) {
System.out.printf("onFailure %s%n", thrown.getMessage());
}
});

同时Guava中Futures对于Future扩展还有:

  • transform:对于ListenableFuture的返回值进行转换。
  • allAsList:对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。
  • successfulAsList:和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。
  • immediateFuture/immediateCancelledFuture: 立即返回一个待返回值的ListenableFuture。
  • makeChecked: 将ListenableFuture 转换成CheckedFuture。CheckedFuture 是一个ListenableFuture ,其中包含了多个版本的get 方法,方法声明抛出检查异常.这样使得创建一个在执行逻辑中可以抛出异常的Future更加容易
  • JdkFutureAdapters.listenInPoolThread(future): guava同时提供了将JDK Future转换为ListenableFuture的接口函数。

装饰线程池

Guava提供了方法将JDK的线程池装饰成一个“可监听的线程池”。看如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Test
public void test1() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3, new ThreadFactory() {
private AtomicLong index = new AtomicLong(0);
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "commons-thread-" + index.incrementAndGet());
}
});
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPool);
ListenableFuture<String> listenableFuture = listeningExecutorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
//System.out.println(1 / 0);
return "world";
}
});
//1)监听
listenableFuture.addListener(new Runnable() {
@Override
public void run() {
System.out.println("can't get return value");
}
}, MoreExecutors.directExecutor());
//2)回调
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("the result of future is: " + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("exception:" + t.getMessage());
}
});
Thread.sleep(5000);
}

上面代码先定义了普通的线程池,Guava通过MoreExecutors.listeningDecorator(threadPool)将普通的线程池装饰成一个可监听的线程池,向装饰后的线程池中提交callable,就可以获得listenableFuture实例了。

可以对listenableFuture设置listener(方法:addListener ),同时指定listen执行的线程池,也可以设置回调(方法:Futures.addCallback ),获取listenableFuture的结果。

  • 方法一:通过ListenableFuture的addListener方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    listenableFuture.addListener(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("get listenable future's result " + listenableFuture.get());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }, executorService);
  • 方法二:通过Futures的静态方法addCallback给ListenableFuture添加回调函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(Integer result) {
                    System.out.println("get listenable future's result with callback " + result);
                }
                @Override
                public void onFailure(Throwable t) {
                    t.printStackTrace();
                }
            });

推荐使用第二种方法,因为第二种方法可以直接得到Future的返回值,或者处理错误情况。本质上第二种方法是通过调动第一种方法实现的,做了进一步的封装。

另外ListenableFuture接口还有其他几种内置具体实现:

  • SettableFuture:不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,可以通过程序设置此Future的返回值或者异常信息
  • CheckedFuture: 这是一个继承自ListenableFuture接口,他提供了checkedGet()方法,此方法在Future执行发生异常时,可以抛出指定类型的异常。

可以在callable内部捕获异常,也可以抛出,在回调的onFailure中处理异常。

上面演示的是对JDK普通的线程池,当然也可以对时间调度的线程池进行装饰。示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 演示定时调度线程池装饰成listen的效果
*
* @throws InterruptedException
*/
@Test
public void test2() throws InterruptedException {
ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(3));
//只有callable才对应有future
ListenableScheduledFuture<?> listenableScheduledFuture = listeningScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("hello world");
}
}, 5, 3, TimeUnit.SECONDS);
//因为上面的传的是runnable,所以没有返回值,没有返回值就不会触发future的callBack
Futures.addCallback(listenableScheduledFuture, new FutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
System.out.println("not result: " + result);
}
@Override
public void onFailure(Throwable t) {
}
});
Thread.sleep(12000);
}

装饰后的可监听的时间调度线程池使用方式和原来一样。这里调度了一个runnable对象。延迟5s后,每3s执行一次。因为runnable没有返回值,所以下面的回调不会被调用。当然可以向装饰后的线程池中提交callable,这样回调就会执行。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void test3() throws InterruptedException {
ListeningScheduledExecutorService listeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(3));
ListenableScheduledFuture<String> schedule = listeningScheduledExecutorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
return "world";
}
}, 3, TimeUnit.SECONDS);
Futures.addCallback(schedule, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("hello " + result);
}
@Override
public void onFailure(Throwable t) {
}
});
Thread.sleep(4000);
}

将Future转换为ListenerableFuture

Guava提供了方法将普通的future转换为listenerableFuture,以便于添加回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 演示将jdk的future转换为ListenableFuture
*/
@Test
public void test4() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
System.out.println(Thread.currentThread().getName());
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName());
return "world";
}
});
ListenableFuture<String> listenableFuture = JdkFutureAdapters.listenInPoolThread(future);
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println(Thread.currentThread().getName() + ": hello " + result);
}
@Override
public void onFailure(Throwable t) {
}
}, threadPool);//MoreExecutors.directExecutor();这种方式拿到的线程池还是当前的现场环境,还是同步的。
System.out.println("end.");
}

通过JdkFutureAdapters.listenInPoolThread(future)转换一个普通的future。设置回调时,可以指定回调执行的线程池。

Guava提供的listenerableFuture处理可以设置回调,还能通过function进行变换。

热评文章