Hello Coder


  • 首页

  • 归档

  • 标签

  • 搜索
close

TreeMap

发表于 2018-03-13

概要

  • 类继承关系
1
2
3
NavigableMap<K,V>
java.util.AbstractMap<K,V>
java.util.TreeMap<K,V>
  • 定义
1
2
3
public class TreeMap<K,V>
extends AbstractMap<K,V>
implements NavigableMap<K,V>, Cloneable, java.io.Serializable

TreeMap实现NavigableMap接口,说明支持一系列的导航方法

  • 核心成员变量
1
2
  • 内部节点
1
2
  • 要点

]

阅读全文 »

rabbitmq消息防丢失

发表于 2018-02-22

如何解决消息丢失

生产者与消费者相互解耦,怎么保证生产者已将消息投递到 RabbitMQ 服务端,又如何确认消费者已经消费了该消息?

消息生产可靠性

RabbitMQ为我们提供了两种方式:

  1. 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  2. 通过将channel设置成confirm模式来实现;
RabbitMQ事务机制

注意:使用事务模式会导致服务端吞吐量急剧下降,实际使用场景很少,这里简单介绍一下

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式(通道事务),txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

1
2
3
4
5
6
7
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
Confirm模式(常用模式)

模仿了协议中已经存在的消费者ACK确认机制,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息

1
2
3
4
5
6
1. 对于可路由的消息,当所有的队列接收到消息后,broker向client发送basic.ack确认通知;
2. 对于路由到持久队列的持久化消息,当消息持久化到磁盘后,broker向client发送basic.ack确认通知;
3. 对于路由到镜像队列的消息,当所有的镜像队列都接收到消息后,broker向client发送basic.ack确认通知;
4. 对于不可路由的消息,broker一旦确认该消息不可路由时,则向client发送basic.ack确认通知;
5. 对于不可路由且mandatory强制投递的消息,broker一旦确认该消息不可路由时,先向client发送basic.return通知,然后发送basic.ack确认通知;
  • Spring Amqp配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<rabbit:connection-factory id="connectionFactory"
host="${rabbit.connect.host}"
port="${rabbit.connect.port}"
username="${rabbit.connect.username}"
password="${rabbit.connect.password}"
channel-cache-size="${rabbit.connect.channelCacheSize}"
publisher-returns="true"
publisher-confirms="true" />
<bean id="returnCallback"
class="com.ximalaya.trump.business.message.callback.MessageReturnCallback" />
<bean id="confirmCallback"
class="com.ximalaya.trump.business.message.callback.MessageConfirmCallback" />
<!-- mandatory必须设置true,return-callback才生效 -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
confirm-callback="confirmCallback"
return-callback="returnCallback"
message-converter="messageConverter"
mandatory="true" />

当mandatory标志设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃

  • confirmCallback回调方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MessageConfirmCallback implements ConfirmCallback {
private static final Logger LOG = LoggerFactory.getLogger(MessageConfirmCallback.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack) {
if (ack) {
// 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true(这是一个坑,需要注意), 这种情况一般需要在returnCallBack中重新投递
LOG.info("消息确认成功");
} else {
// 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
// 处理丢失的消息(nack)
LOG.warn("消息确认失败");
}
}
}
  • returnCallback回调方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MessageReturnCallback implements ReturnCallback {
private static final Logger LOG = LoggerFactory.getLogger(MessageReturnCallback.class);
private static Charset charset = Charset.forName("UTF-8");
@Autowired
private RabbitTemplate rabbitTemplate;
private JsonParser jsonParser;
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String messageContent = new String(message.getBody(), charset);
LOG.warn(
"Return message from exchange {} and routing {}, reply-code is {}, reply-text is {}, message content is {}",
exchange, routingKey, replyCode, replyCode, messageContent);
Map<String, Object> map = jsonParser.toMap(messageContent);
String correlationId = (String) map.get("id");
if (null == correlationId) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
} else {
CorrelationData correlationData = new CorrelationData(correlationId);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
LOG.info("retry send message {} to exchange {} succeed.", messageContent, exchange);
}
}

注意:

  • confirm 主要是用来判断消息是否有正确到达交换机,如果有,那么就 ack 就返回 true;如果没有,则是 false。
  • return 则表示如果你的消息已经正确到达交换机,但是后续处理出错了,那么就会回调 return,并且把信息送回给你(前提是需要设置了 Mandatory,不设置那么就丢弃);如果消息没有到达交换机,那么不会调用 return 的东西。

消息消费可靠性

为了保证消息从队列可靠地到达 Consumer,RabbitMQ提供消息确认机制。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它

消息生产、消费的流程图:

ack 模式
  • NONE

    该模式下, 当 broker 发送消息成功后, 会立即将此消息从消息队列中删除, 而不会等待消费者的 ACK 回复

  • MANUAL
  • AUTO(默认模式)

    当 broker 发送消息给消费者时, 不会立即将此消息删除, 而是需要等待消费者的 ACK 回复后才会删除消息. 因此在手动 ACK 模式下, 当消费者收到消息并处理完成后, 需要向 broker 显示地发送 ACK 指令.

参考

RabbitMQ 实战(四)消费者 ack 以及 生产者 confirms

rabbitmq 消息可靠投递及消费机制

rabbitmq的发布确认和事务

RabbitMQ之消息确认机制(事务+Confirm)

有货RabbitMQ双活实践

Spring Boot集成RabbitMQ与ACK确认模式

Java 线程状态

发表于 2018-02-09

问题

1
2
同一个线程能不能start执行2次?
---不能

Java虚拟机所暴露的线程状态,与操作系统底层的线程状态是两个不同层面的事

JVM线程状态

JVM中定义的线程状态有以下几种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public enum State {
/**
* 线程刚创建,还未start
*/
NEW,
/**
* 线程正在jvm中执行,对操作系统来说可能正在等待时间片执行(就绪态)
* I/O阻塞或网络等待,对应的JVM线程状态也是RUNNABLE
*/
RUNNABLE,
/**
* 程正在等待获取锁,阻塞状态
*/
BLOCKED,
/**
* 一般通过Object.wait()或者Thread.join(),进入waiting状态;前提是这个线程已经拥有锁了
* 如果一个线程调用了一个对象的wait方法,那么这个线程就会处于waiting状态直到另外一个线程调用这个对
* 象的notify或者notifyAll方法后才会解除这个状态
*/
WAITING,
/**
* 通过sleep(timeout)或wait(timeout)方法进入的限期等待的状态
* 需要注意2者区别
*/
TIMED_WAITING,
/**
* 线程执行结束
*/
TERMINATED;
}

blocked和waiting状态的区别

  • blocked是虚拟机认为程序还不能进入某个区域,因为同时进去就会有问题,这是一块临界区
  • wait操作的先决条件是要进入临界区,也就是线程已经拿到锁了,自己可能进去做了一些事情,但此时通过判定业务上的参数,发现还有一些其他配合的资源没有准备充分,那么自己就等等再做其他事情。

sleep()和wait()操作的区别

  • sleep是Thread类的静态方法,wait是Object类中定义的方法
  • Thread.sleep不会导致锁行为的改变,如果当前线程是拥有锁的,那么Thread.sleep不会让线程释放锁;Object.wait会释放锁,进入等待队列中
  • wait不带计时参数是WAITING状态,带计时参数是TIMED_WAITING状态;不管是通过notify()唤醒还是超时时间已到,都需要重新获取锁才能继续执行

线程start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public synchronized void start() {
// threadStatus=0,表示当前为NEW状态
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
// 本地方法执行线程
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}

start()方法是同步的,而且执行前会判断当前线程状态是否为NEW;也就是说,同一个线程只能执行一次

网络阻塞线程状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class testBlockedSocketState {
public static void main(String[] args) throws InterruptedException {
Thread serverThread = new Thread(new Runnable() {
@Override
public void run() {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(10086);
while (true) {
// 阻塞的accept方法
Socket socket = serverSocket.accept();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}, "socket线程"); // 线程的名字
serverThread.start();
// 确保run已经得到执行
Thread.sleep(500);
// 状态为RUNNABLE
System.out.println(serverThread.getState());
}

使用jstack查看线程状态:

1
2
3
4
5
6
7
8
"socket线程" #9 prio=5 os_prio=31 tid=0x00007ff0bc026800 nid=0x4c03 runnable [0x00007000010c6000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at com.zsr.test.thread.ThreadBlockedIOState$1.run(ThreadBlockedIOState.java:19)
at java.lang.Thread.run(Thread.java:745)

可见JVM线程状态为RUNNABLE;I/O 阻塞也是如此

注意:进行传统上的 IO 操作时,我们也会说“阻塞”,但这个“阻塞”与线程的BLOCKED 状态是两码事

1
当进行阻塞式的 IO 操作时,底层的操作系统线程确实处在阻塞状态

参考

Java线程状态分析

Java 线程状态之 RUNNABLE

What is difference between wait and sleep in Java?

缓存更新分析

发表于 2018-01-24

问题来源

项目中为了加快响应时间以及减轻数据库压力,使用了redis缓存,同时缓存设置了过期时间(1天)

1
2
3
2个线程并发:一个读数据,一个写数据;
假使缓存过期,读缓存未命中,写数据还未开始,这个时候加载数据到缓存中(脏数据);然后,写数据开始。
结果导致,缓存失效前读到的数据都是脏数据,如何解决?

为什么要用缓存

一般数据都存放在关系型数据库中,以常用的MySQL数据库为例,正常情况下响应时间在10ms以内甚至更短;但是当数据上亿条,任何一款关系型数据库的响应时间都不可能控制在10ms以内

同时高并发情况下,比如同时来1万次请求,MySQL单库TPS(每秒事务量)大概只有1500左右,其它的请求只能处于等待状态,严重情况下数据库崩溃

使用缓存的场景

对于缓存来说,数据变更少且查询比较频繁是最好的场景,如果查询量不够大或者数据变动太频繁,缓存也就是失去了意义。

缓存类型

本地缓存

有些数据量不常变化,但是访问十分频繁,例如省、市地区数据。针对这种场景,可以将数据加载到应用的内存中,以提升系统的访问效率,减少数据库访问同时加快响应时间。

  • Guava Cache

比较常见的本地缓存有Guava Cache,使用方式如下:

1
2
3
4
5
6
7
8
9
10
LoadingCache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(1000) // 设置最大大小
.expireAfterWrite(10, TimeUnit.MINUTES) // 设置过期时间, 10分钟
.build(
new CacheLoader<String, String>() {
// 加载缓存内容
public String load(String key) throws Exception {
return getFromDB(key);
}
});
  • 本地缓存的缺点
    1. 数据保存在当前JVM中,无法共享
    2. 重启应用缓存丢失

分布式缓存

提到分布式缓存基本上都会说到 Redis,Redis使用内存作为存储,所以性能上要比数据库要好很多,再加上Redis 还支持很多种数据结构,使用起来比较方便;但是,Redis需要通过网络来访问,所以网络的性能决定了 Reids 的瓶颈

缓存更新策略

更多策略见缓存更新的套路,这里具体分析喜马拉雅的缓存更新策略:先更新数据库,再失效缓存

Cache Aside Pattern

1
2
3
1. 失效:先从缓存中读取数据,没有得到,则从数据库中读取,成功后,放到缓存中。
2. 命中:从缓存中读取数据,取到后返回。
3. 更新:先把数据存到数据库中,成功后,再让缓存失效。

这种策略也会产生问题,比如:

1
2
3
4
两个并发操作,一个是更新操作,一个是查询操作;
读操作先到,没有命中缓存,然后就到数据库中读取。
这时来了一个写操作,写完数据库后,然缓存失效。
然后,之前的读操作再把旧数据写到缓存中,还是会造成脏数据。

但是,这种情况出现的概率非常低。这个场景需要发生在读缓存时缓存失效,并发着有一个写操作。而实际上数据库的写操作比读操作慢得多,而且还要锁表,而读操作需要在写操作之前进入数据库操作,又要在写操作完成后更新缓存,所有的这些条件都具备的概率并不大。所以,Cache Aside Pattern 还是相对靠谱的方式。

参考

缓存系统设计与更新机制

缓存更新的套路

rabbitmq消息重试

发表于 2018-01-17

如何实现消息重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"/>
<rabbit:queue name="microlesson.reward.msg.queue" />
<rabbit:direct-exchange name="business.common.order.confirmed.direct.exchange">
<rabbit:bindings>
<rabbit:binding key="1245" queue="microlesson.reward.msg.queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container prefetch="20"
concurrency="10"
connection-factory="connectionFactory"
message-converter="messageConverter"
requeue-rejected="false"
advice-chain="retryInterceptor">
<rabbit:listener ref="rewordMsgListener" queue-names="microlesson.reward.msg.queue"/>
</rabbit:listener-container>
<bean id="rewordMsgListener" class="com.ximalaya.microlesson.listener.listener.RewardMsgListener"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="10000"/>
<property name="maxInterval" value="600000"/>
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="7"/>
</bean>
</property>
</bean>
<!--retry Interceptor -->
<bean id="retryInterceptor"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="messageRecoverer"/>
<property name="retryOperations" ref="retryTemplate"/>
</bean>
  • requeue-rejected设为false表示一条消息即使没有被ack,也不会再重新发送(默认是会重新加入队列)

如何实现消息延时

RabbitMQ可以针对Queue和Message设置x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter

参考

rabbitmq 消息可靠投递及消费机制

1…567…31
David

David

Develop Notes

155 日志
37 标签
GitHub Weibo
© 2016 - 2020 David
由 Hexo 强力驱动
主题 - NexT.Pisces