Hello Coder


  • 首页

  • 归档

  • 标签

  • 搜索
close

Spring事件监听器

发表于 2017-10-23

Spring 事件框架

Spring事件框架包含三个部件:

  • 事件:ApplicationEvent
  • 事件监听器:ApplicationListener,对监听到的事件进行处理
  • 事件发布:ApplicationEventPublisher,ApplicationContext继承了该接口,在ApplicationContext的抽象实现类AbstractApplicationContext中做了实现

Spring 事件原理

Spring提供的事件类型

img

  • ContextStartedEvent:ApplicationContext启动后触发的事件;
  • ContextStoppedEvent:ApplicationContext停止后触发的事件;
  • ContextRefreshedEvent:ApplicationContext初始化或刷新完成后触发的事件;(容器初始化完成后调用)
  • ContextClosedEvent:ApplicationContext关闭后触发的事件;(如web容器关闭时自动会触发spring容器的关闭)

事件发布者

ApplicationContext接口继承了ApplicationEventPublisher,并在AbstractApplicationContext实现了具体代码,实际执行是委托给ApplicationEventMulticaster:

1
2
3
4
5
6
7
8
9
10
// AbstractApplicationContext.class
public void publishEvent(ApplicationEvent event) {
//省略部分代码
}
// 事件发布委托给ApplicationEventMulticaster来执行
getApplicationEventMulticaster().multicastEvent(event);
if (this.parent != null) {
this.parent.publishEvent(event);
}
}

常用的ApplicationContext都继承自AbstractApplicationContext,如ClassPathXmlApplicationContext、XmlWebApplicationContext等,自动拥有这个功能

阅读全文 »

scala Execution Context

发表于 2017-10-19

Implicits.global vs Implicits.defaultContext


Scala Future需要放在执行上下文(线程池)中执行,一般常见的线程池有2种:

  • Scala Execution Context:
1
import scala.concurrent.ExecutionContext.Implicits.global

该线程池是由Scala标准库提供的,它是一种特殊的ForkJoinPool线程池,在高负载时,使用的线程可能比优化时多(对比play或者akka framework),因此性能可能会降低

  • Play Default Execution Context:
1
import play.api.libs.concurrent.Execution.Implicits.defaultContext

该线程池是由play提供,play应用应该使用这种执行上下文。同时,应该避免阻塞操作放入play execution context,好的办法是放入其他的execution context,避免play应用资源用尽。

注意:理解哪个线程运行了future非常重要,导入play的默认执行环境(execution context)。这是一个隐式(implicit)的参数,会被传入所有接受回调的future API方法中。执行环境(execution context)通常等价于线程池,但这并不是一定的。

简单的把同步IO封装入Future并不能将其转换为异步的。如果你不能通过改变应用架构来避免阻塞操作,那么该操作总会在某一时刻被执行的,而相应的线程则会被阻塞。所以,除了将操作封装于Future中,还必须让它运行在配置了足够线程来处理可预计并发的独立执行环境(execution context)中。更多信息请见理解Play线程池

参考

https://stackoverflow.com/questions/40301032/what-is-the-difference-between-scalas-execution-context-and-plays-execution-co

https://stackoverflow.com/questions/30805337/plays-internal-execution-context

ForkJoinPool

发表于 2017-10-18

Java 1.7 引入了一种新的并发框架—— Fork/Join Framework

Fork/Join的思路:通过分而治之(把大任务分割成若干个小任务),只不过划分之后的任务更适合分派给不同的计算资源,可以并行的完成任务。

原理

Fork/Join框架主要依靠fork和join两个操作,一般对这两个操作的解释如下:

  • fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
  • join():等待该任务的处理线程处理完毕,获得返回值。

这里有个问题,不断的fork()如果是不断创建线程,岂不是要“线程数量爆炸”?事实上,ForkJoinPool用了一种work stealing的算法,避免产生大量线程。所以如果一开始设置线程池的线程数为N,实际上使用ForkJoinPool的时候也只会有固定的线程数(默认和CPU核数一样)。

Fork/Join的基本用法

1
2
3
4
5
if (当前这个任务工作量足够小)
直接完成这个任务
else
将这个任务分解成两个部分
分别触发(invoke)这两个子任务的执行,并等待结果

ForkJoin框架组成

  • ForkJoinPool:管理worker线程,类似ThreadPoolExecutor,提供接口用于提交或者执行任务;
  • ForkJoinWorkerThread:worker线程,任务保存在一个deque中;
  • ForkJoinTask:ForkJoin框架中运行的任务,可以fork子任务,可以join子任务完成。
阅读全文 »

RabbitMQ生产者分析

发表于 2017-09-28

分布式消息中间件

在使用RabbitMQ做消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。

Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:

  • fanout:生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用
  • topic : 生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息
  • direct:生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)

生产者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<rabbit:connection-factory id="connectionFactory"
host="${rabbit.connect.host}" port="${rabbit.connect.port}" username="${rabbit.connect.username}"
password="${rabbit.connect.password}" channel-cache-size="${rabbit.connect.channelCacheSize}"
publisher-returns="true" publisher-confirms="true" />
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />
<bean id="messageConverter"
class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
<bean id="returnCallback"
class="com.zsr.test.callback.MessageReturnCallback" />
<bean id="confirmCallback"
class="com.zsr.test.callback.MessageConfirmCallback" />
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory" confirm-callback="confirmCallback"
return-callback="returnCallback" message-converter="messageConverter" />
阅读全文 »

RabbitMQ消费者分析

发表于 2017-09-27

消息消费者有2种模式:pull或push,RabbitMQ通常使用push模式(也可以使用拉模式),Kafka采用pull模式。push更关注实时性,pull更关注消费者消费能力。

消费者配置

Java项目中,一般会结合spring-amqp框架来使用RabbitMQ,spring-amqp底层调用RabbitMQ的java client来和Broker交互,比如我们会用如下配置来建立RabbitMQ的连接池、声明Queue以及指明监听者的监听行为:

1
2
3
4
5
6
7
8
<rabbit:connection-factory id="connectionFactory" />
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
<!-- template非必须,主要用于生产者发送消息-->
<rabbit:template id="template" connection-factory="connectionFactory" />
<rabbit:queue name="test.queue" />
<rabbit:listener-container connection-factory="connectionFactory" concurrency="1" prefetch="1">
<rabbit:listener ref="listener" queue-names="test.queue" />
</rabbit:listener-container>

concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数,prefetch是每次从一次性从broker里面取的待消费的消息的个数(默认为1).

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
// Test
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new CachingConnectionFactory(5672);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setQueueNames("spring");
container.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
});
container.start();
}
阅读全文 »
1…8910…31
David

David

Develop Notes

155 日志
37 标签
GitHub Weibo
© 2016 - 2020 David
由 Hexo 强力驱动
主题 - NexT.Pisces