Spring Retry

​ 在很多场景中,我们需要“重试”,重试意味着反复执行一段代码直至成功,或者重试多次无果后标记失败。比如MQ发送消息失败,会采取重试手段,比如工程中使用RPC请求外部服务,可能因为网络波动出现超时而采取重试手段等等.

MQ自身也有重试机制,但是这种机制不是很灵活,如果某些功能没有使用MQ的话,那么就不是那么方便了.

框架概览

Spring Retry 框架广泛使用于Spring Batch,Spring Integration,Spring for Apache Hadoop等spring项目

如下图所示:

spring



  1. RetryTemplate,重试模板,是进入spring-retry框架的整体流程入口
  2. RetryCallback,重试回调,用户包装业务流,第一次执行和产生重试执行都会调用这个callback代码
  3. RetryPolicy,重试策略,不同策略有不同的重试方式
  4. BackOffPolicy,两次重试之间的回避策略,一般采用超时时间机制
  5. RecoveryCallback,当所有重试都失败后,回调该接口,提供给业务重试回复机制
  6. RetryContext,每次重试都会将其作为参数传入RetryCallback中使用
  7. RetryListener,监听重试行为,主要用于监控。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) {
final RetryTemplate retryTemplate = new RetryTemplate();
final SimpleRetryPolicy policy = new SimpleRetryPolicy(3,
Collections.<Class<? extends Throwable>, Boolean> singletonMap(Exception.class, true));
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(100);
retryTemplate.setRetryPolicy(policy);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
public Object doWithRetry(RetryContext context) throws Exception {
System.out.println("do some thing");
// 设置context一些属性,给RecoveryCallback传递一些属性
context.setAttribute("key", "value");
System.out.println(context.getRetryCount());
throw new Exception("exception");
}
};
// 如果RetryCallback执行出现指定异常, 并且超过最大重试次数依旧出现指定异常的话,就执行RecoveryCallback动作
final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
public Object recover(RetryContext context) throws Exception {
System.out.println("do recory operation");
System.out.println(context.getAttribute("key"));
return null;
}
};
try {
final Object result = retryTemplate.execute(retryCallback, recoveryCallback);
} catch (Exception e) {
e.printStackTrace();
}
}

重试策略:RetryPolicy

重试策略定义了当操作失败时如何进行重试操作

重试框架Spring retry实践

  1. NeverRetryPolicy:只调用RetryCallback一次,不重试
  2. AlwaysRetryPolicy:无限重试,最好不要用
  3. SimpleRetryPolicy:重试n次,默认3,也是模板默认的策略
  4. TimeoutRetryPolicy:在n毫秒内不断进行重试,超过这个时间后停止重试
  5. ExceptionClassifierRetryPolicy: 可以根据不同的异常,执行不同的重试策略

回退策略:BackOffPolicy(重试间隔)

当操作执行失败时,根据设置的重试策略进行重试。通过BackoffPolicy可以设定再次重试的时间间隔。

重试框架Spring retry实践

  1. NoBackOffPolicy:不回避
  2. FixedBackOffPolicy:n毫秒退避后再进行重试
有状态重试 OR 无状态重试

所谓无状态重试是指重试在一个线程上下文中完成的重试,反之不在一个线程上下文完成重试的就是有状态重试。之前的SimpleRetryPolicy就属于无状态重试,因为重试是在一个循环中完成的。什么时候后会出现或者说需要有状态重试呢?通常有两种情况:事务回滚和熔断。

使用注解

每次有重试需求的时候都写一个RetryTemplate太臃肿了,使用注解可以大大简化开发,减少重复代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class AnnotationService {
public Logger logger = LoggerFactory.getLogger(AnnotationService.class);
@Retryable(maxAttempts = 5, backoff = @Backoff(random = true))
public String someService() {
int random = (int) (Math.random() * 10);
if (random < 4) {
logger.info("random={} Null Pointer Excep", random);
throw new NullPointerException();
} else if (random < 9) {
logger.info("random={} Arithmetic Excep", random);
throw new ArithmeticException();
}
logger.info("random={} ok !!!!", random);
return "ok";
}
@Recover
public String recover(NullPointerException ne) {
logger.info("{}", "NullPointerException");
return "null pointer recover";
}
@Recover
public String recover(ArithmeticException ne) {
logger.info("{}", "ArithmeticException");
return "ArithmeticException recover";
}
}
@Configuration // 相当于xml bean容器
@EnableRetry
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class AnnotationRetryTest {
public AnnotationRetryTest() {
System.out.println("spring容器启动初始化。。。");
}
@Bean // 注入AnnotationService
public AnnotationService annotationService() {
return new AnnotationService();
}
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AnnotationRetryTest.class);
AnnotationService annoService = context.getBean(AnnotationService.class);
String result = annoService.someService();
System.out.println(result);
}
}
  1. @EnableRetry:能否重试,proxyTargetClass属性为true时(默认false),使用CGLIB代理。默认使用标准JAVA注解。当类中有@Retryable注释的方法时,对该方法生成代理。
  2. @Retryable:注解需要被重试的方法
    • include 指定处理的异常类。默认为空
    • exclude 指定不需要处理的异常。默认为空
    • vaue 指定要重试的异常。默认为空
    • maxAttempts 最大重试次数。默认3次
    • backoff 重试等待策略。默认使用@Backoff注解
  3. @Backoff:重试回退策略(立即重试还是等待一会再重试)
    • 不设置参数时,默认使用FixedBackOffPolicy,重试等待1000ms
    • 只设置delay属性时,使用FixedBackOffPolicy,重试等待指定的毫秒数
    • 当设置delaymaxDealy属性时,重试等待在这两个值之间均态分布
    • 使用delaymaxDealymultiplier属性时,使用ExponentialBackOffPolicy
    • 当设置multiplier属性不等于0时,同时也设置了random属性时,使用ExponentialRandomBackOffPolicy`
    • @Recover: 用于@Retryable失败时的“兜底”处理方法。@Recover注释的方法参数为@Retryable异常类,返回值应与重试方法返回相同,否则无法识别!因此可以针对可能异常设置多个@Recover方法进行“兜底”处理。

源码分析

RetryTemplateexecute 是线程安全的,实现逻辑使用ThreadLocal保存每个执行实例的RetryContext执行上下文。

这里以SimpleRetryPolicyFixedBackOffPolicy策略为例

1
2
3
4
5
6
7
8
// RetrySynchronizationManager.class
private static final ThreadLocal<RetryContext> context = new ThreadLocal<RetryContext>();
public static RetryContext register(RetryContext context) {
RetryContext oldContext = getContext();
RetrySynchronizationManager.context.set(context);
return oldContext;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state) throws E,
ExhaustedRetryException {
....some code ...
// 注册当前的RetryContext,放在ThreadLocal中保证线程安全
RetrySynchronizationManager.register(context);
Throwable lastException = null;
try {
...some code...
// 判断当前context是否满足重试条件,这里是循环
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
// Reset the last exception, so if we are successful
// the close interceptors will not think we failed...
lastException = null;
// 执行自定义业务逻辑
return retryCallback.doWithRetry(context);
}catch (Throwable e) {
try {
// 把异常e填充进context,同时重试技术加1
registerThrowable(retryPolicy, state, context, e);
}
catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable", ex);
}
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
// 如果满足重试条件,如已重试次数小于预定次数,则线程等待预定的间隔时间
backOffPolicy.backOff(backOffContext);
}
catch (BackOffInterruptedException ex) {
lastException = e;
// back off was prevented by another thread - fail the retry
if (logger.isDebugEnabled()) {
logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
}
throw ex;
}
}
if (shouldRethrow(retryPolicy, context, state)) {
if (logger.isDebugEnabled()) {
logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
}
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
}
if (context.isExhaustedOnly()) {
rethrow(context, "Retry exhausted after last attempt with no recovery path.");
}
// 所有重试都失败,处理“兜底”方法recover()
return handleRetryExhausted(recoveryCallback, context, state);
}
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null);
doCloseInterceptors(retryCallback, context, lastException);
RetrySynchronizationManager.clear();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
// FixedBackOffPolicy.class
private Sleeper sleeper = new ThreadWaitSleeper();
protected void doBackOff() throws BackOffInterruptedException {
try {
// 时间间隔是由线程sleep实现
sleeper.sleep(backOffPeriod);
}
catch (InterruptedException e) {
throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
}
}
1
2
3
4
5
6
7
8
public class ThreadWaitSleeper implements Sleeper {
@Override
public void sleep(long backOffPeriod) throws InterruptedException {
Thread.sleep(backOffPeriod);
}
}

参考

Spring Retry 常用示例

利用Spring-Retry定制化你的RPC重试

Guide to Spring Retry

github:spring-retry

热评文章