Hello Coder


  • 首页

  • 归档

  • 标签

  • 搜索
close

Spring Retry

发表于 2017-09-25

​ 在很多场景中,我们需要“重试”,重试意味着反复执行一段代码直至成功,或者重试多次无果后标记失败。比如MQ发送消息失败,会采取重试手段,比如工程中使用RPC请求外部服务,可能因为网络波动出现超时而采取重试手段等等.

MQ自身也有重试机制,但是这种机制不是很灵活,如果某些功能没有使用MQ的话,那么就不是那么方便了.

框架概览

Spring Retry 框架广泛使用于Spring Batch,Spring Integration,Spring for Apache Hadoop等spring项目

如下图所示:

spring



  1. RetryTemplate,重试模板,是进入spring-retry框架的整体流程入口
  2. RetryCallback,重试回调,用户包装业务流,第一次执行和产生重试执行都会调用这个callback代码
  3. RetryPolicy,重试策略,不同策略有不同的重试方式
  4. BackOffPolicy,两次重试之间的回避策略,一般采用超时时间机制
  5. RecoveryCallback,当所有重试都失败后,回调该接口,提供给业务重试回复机制
  6. RetryContext,每次重试都会将其作为参数传入RetryCallback中使用
  7. RetryListener,监听重试行为,主要用于监控。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) {
final RetryTemplate retryTemplate = new RetryTemplate();
final SimpleRetryPolicy policy = new SimpleRetryPolicy(3,
Collections.<Class<? extends Throwable>, Boolean> singletonMap(Exception.class, true));
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(100);
retryTemplate.setRetryPolicy(policy);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
public Object doWithRetry(RetryContext context) throws Exception {
System.out.println("do some thing");
// 设置context一些属性,给RecoveryCallback传递一些属性
context.setAttribute("key", "value");
System.out.println(context.getRetryCount());
throw new Exception("exception");
}
};
// 如果RetryCallback执行出现指定异常, 并且超过最大重试次数依旧出现指定异常的话,就执行RecoveryCallback动作
final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
public Object recover(RetryContext context) throws Exception {
System.out.println("do recory operation");
System.out.println(context.getAttribute("key"));
return null;
}
};
try {
final Object result = retryTemplate.execute(retryCallback, recoveryCallback);
} catch (Exception e) {
e.printStackTrace();
}
}
阅读全文 »

连接池原理

发表于 2017-09-15

​ 连接池的基本思想就是为连接建立一个“缓冲池”。预先在缓冲池中放入一定数量的连接,当需要建立连接时,只需要从缓冲池中取出一个了,使用完毕后再放回去。

apache commons-pool是apache基金会的一个开源对象池组件,我们常用的数据库连接池dpcp和redis的java客户端jedis都使用commons-pool来管理连接。

主要提供这个几种类型的对象池:

类图
这篇文章主要介绍GenericObjectPool的实现。

工作原理:

连接池的核心思想是连接的复用,通过建立一个数据库连接池以及一套连接使用、分配和管理策略,使得该连接池中的连接可以得到高效,安全的复用,避免了数据库连接频繁建立和关闭的开销。

连接池的工作原理主要由三部分组成,分别为连接池的建立,连接池中连接的使用管理,连接池的关闭。

  1. 连接池的建立。一般在系统初始化时,连接池会根据系统配置建立,并在池中建立几个连接对象,以便使用时能从连接池中获取,连接池中的连接不能随意创建和关闭,这样避免了连接随意建立和关闭造成的系统开销。

  2. 连接池的管理。连接池管理策略是连接池机制的核心,连接池内连接的分配和释放对系统的性能有很大的影响。其策略是:

    当客户请求数据库连接时,首先查看连接池中是否有空闲连接,如果存在空闲连接,则将连接分配给客户使用;如果没有控线连接,则查看当前所开的连接数是否已经达到最大连接数,例如如果没有达到就重新创建一个请求的客户;如果达到,就按设定的最大等待时间进行等待,如果超出最大等待时间,则抛出异常给客户。

  3. 连接池的关闭。当应用程序退出时,关闭连接池中所有的链接,释放连接池相关资源,该过程正好与创建相反。

阅读全文 »

Tomcat线程池

发表于 2017-09-15

JDK只有两种典型的线程池,FixedPool 与 CachedPool:

  • FixedPool:固定线程数,忙不过来的全放到无限长的缓冲队列里。
  • CachedPool:忙不过来时无限的增加临时线程,闲时回落,没有缓冲队列。

ScheduledExecutorService定时原理

发表于 2017-09-08

Executor

JUC 包中的 Executor 架构带来了线程的创建与执行的分离。Executor 的继承者 ExecutorService 下面衍生出了两个重要的实现类,他们分别是

  • ThreadPoolExecutor 线程池
  • ScheduledThreadPoolExecutor继承了ThreadPoolExecutor, 支持周期性任务的线程池

通过 ThreadPoolExecutor 可以实现各式各样的自定义线程池,而 ScheduledThreadPoolExecutor 类则在自定义线程池的基础上增加了周期性执行任务的功能。

img

AbstractQueuedSynchronizer

发表于 2017-07-26

AbstractQueuedSynchronizer介绍

JDK1.5中提供的java.util.concurrent包中的大多数的同步器(Synchronizer)如Lock, Semaphore, Latch, Barrier等,这些类之间大多可以互相实现,如使用Lock实现一个Semaphore或者反过来,但是它们都是基于java.util.concurrent.locks.AbstractQueuedSynchronizer这个类的框架实现的

AbstractQueuedSynchronizer,队列同步器,简称AQS,它是java并发用来构建锁或者其他同步组件的基础框架。

一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法来管理同步状态,主要管理的方式是通过tryAcquire和tryRelease类似的方法来操作状态.

AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。在java的同步组件中,AQS的子类一般是同步组件的静态内部类。同步组件是面向使用者的,它定义了使用者与组件交互的接口,隐藏了具体的实现细节;而AQS面向的是同步组件的实现者,它简化了具体的实现方式,屏蔽了线程切换相关底层操作.

AQS源码分析

AQS的实现依赖内部的同步队列(FIFO双向队列)来完成同步状态的管理,假如当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,并将其加入同步队列,同时阻塞当前线程。当同步状态释放时,唤醒队列的首节点。

Node

Node主要包含以下成员变量:

1
2
3
4
5
6
7
8
class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
...
}
  • waitStatus:节点状态,主要有这几种状态:
    1. CANCELLED:当前线程等待已经取消,是唯一一个大于0的状态;
    2. SIGNAL:当前节点的后继节点需要运行;
    3. CONDITION:当前节点在等待condition;
    4. PROPAGATE:当前场景下后续的acquireShared可以执行;
  • prev:前驱节点;
  • next:后继节点;
  • thread:进入队列的当前线程;
  • nextWaiter:存储condition队列中的后继节点;

Node是sync队列和condition队列构建的基础,AQS拥有三个成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
  • 同步队列插入节点

当线程在获取锁的过程中被阻塞之后,这个线程会被包装成一个Node节点并被添加到线程同步队列之中,AQS提供基于CAS的设置尾节点的方法:

1
2
3
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

假如现在有多个线程同时被阻塞,那么各个线程所获取的尾节点就有可能相同,线程就不能被正确地添加到同步队列中去了,使用compareAndSetTail(Node expect,Node update)来对加入同步队列的线程进行安全地插入

img

  • 节点删除
    同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态之后将会唤醒后继节点,后继节点将会在获取同步状态成功的时候将自己设置为首节点。

    ​

    img

    ​

    设置首节点是由获取同步状态成功的线程来完成,因为每次只会有一个线程能够成功的获取到同步状态,所以,设置首节点并不需要CAS来保证。

    ​

1…91011…31
David

David

Develop Notes

155 日志
37 标签
GitHub Weibo
© 2016 - 2020 David
由 Hexo 强力驱动
主题 - NexT.Pisces