Storm

Storm 入门教程

topologies topology: 由用户编写的Storm集群中的业务处理逻辑
deamon: 守护进程
worker process: 工作进程
stream: 指Storm中的数据流
tuple: 指stream中的最小单元数据
primitive: 基件 指storm topology 的组成部分,比如 bolt(螺栓) 和 spout(喷嘴)

Storm 集群里的各种组件

从表面上看一个 Storm 集群 与 一个 Hadoop 集群相似,然而在 Hadoop 上运行 “MapReduce jobs”, 在 Storm 上运行 “topologies”, 但是 “jobs” 和 “topologies” 是非常不同的– 一个关键的不同是 MapReduce job 最终会结束,而一个 topology 是永远在等待消息并处理(直到你杀掉它)

一个 Storm 集群中有两种节点(node):主节点和工作节点(指Storm集群中不同角色的服务器节点),主节点运行一个叫 “Nimbus” 的守护进程(daemon)跟 Hadoop 的 “任务跟踪器”(Jobtracker)类似。Nimbus 负责向集群中分发代码, 向各机器分配任务,以及监测故障。

每一个工作节点运行一个名叫 “Supervisor” 的守护进程。Supervisor 监听 Nimbus 指派到这个这台机器的任务,根据 Numbus 的指派信息来停止或启动工作进程(worker process) ,每一个 worker process 执行一个topology的子集,一个运行中的topology由跨越多个主机的多个 worker process 组成。

Storm cluster

在 Nimbus 和 Supervisors 之间的所有协调调度通过一个 Zookeeper 集群来完成。另外,Nimbus 守护进程和 Supervisor 守护进程都是快速失败 (fail-fast)和无状态的;所有的状态保存在 Zookeeper 或者本地磁盘中。这意味着你可以 kill -9 Nimbus 或者 Supervisors 他们会自动恢复,就像什么都没发生过一样。这种设计让 Storm 集群变的不可思议的稳定。

Topologies

在Strom上做实时计算, 需要创建 “Topology”,一个 topology 是一个计算过程的描述,一个 topology 中的每一个节点包含处理逻辑,节点之间的连接表明了数据在节点之间是如何传递的。

运行一个topology是很简单的。首先,将你所有的代码和依赖都打包到一个单独的jar包中,然后运行像下面这样的命令:

1
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

这样会传递arg1arg2参数给backtype.storm.MyTopology类,这个类的 main 方法定义topology 并将它提交到 Nimbus。Strom jar 负责连接 Nimbus 并上传jar包.

由于 topology 的定义本来就是 Thrift 结构,并且 Nimbus 是一个 Thrift 服务, 所以可以使用任何编程语言来创建和提交 topology。上面的方法是使用基于 JVM 的编程语言来完成的最简单的方法,参考Running topologies on a production cluster 来获取更多的关于开启和停止 topology 的方法。

Streams

Strom 的核心抽象概念是 “流” (stream),一个 stream 相当于一个无限的元组(tuple) 序列,Storm 提供了以可靠且分布式的方法来将一个 stream 转换成一个新 stream 的基件 (primitive) ,例如:你可以将twitter流转换为热门话题流。

Storm提供基本的用来做流转换的的基件是 “spout” 和 “bolts” ,spout 和 bolt 提供了接口,你可以实现这些接口来处理的你自己的应用程序相关的逻辑。

spout 是流的来源, 例如 spout 可以从一个 Kestrel 队列来读 tuple 并且发射(emit)他们形成一个流,或者 spout 可以连接到 Twitter api,来发射一个推文的流。

一个 bolt 消费任意数量的流, 做一些处理,然后可能会发射出新的流,复杂的流转换,例如从一个推文的流计算出一个热门话题的流,需要多个步骤,多个 bolt 。bolt可以通过运行函数(functions)来做任何事,例如过滤元组,做流聚合,做流连接,跟数据库交互等等。

所有的 spout 和 bolt 被打包到了一个 “topology” 中 ,topology 是你提交给 Storm 集群来执行的计算过程的最高抽象,一个 topology 类似一个流转换的图表,它现显示了哪些 bolt 是绑定(subscribe)哪些 stream 上的 。当一个 spout 或者 bolt 发射出一个 tuple 到 stream 中,它会发送 tuple 到所有绑定了这个 stream 的 bolt 中。

[spout和bolt的关系图]

topology 中节点(指 topology 中的 spout 或者 bolt )之间的连接表明了 tuple 是如何在他们之间传递的。例如如果在 spout A 和 bolt B 之间有一个连接,从 spout A 到 bolt C 之间有一个连接,从 boltB 到 boltC 有一个连接,tuple 会发到 bolt B 和 bolt C 中, 所有 bolt B 的输出 tuple 也会流到 bolt C 中。

topology中的每一个节点都是并行执行的。在topology中,你可以指定每个节点的并行数量n,然后 Storm会启动 n 个线程在集群中运行

一个 topology 是永远运行的,直到你杀掉它,Storm 会自动重新分配失败的任务。另外,Storm 保证没有数据丢失, 即使主机挂掉消息丢失。

数据模型

Storm 使用 tuple 做数据模型,一个 tuple 是被命名过的值列表(A tuple is a named list of values),一个 tuple 中的字段可以是任何类型的对象。它是开箱即用的,Storm 支持所有的简单数据类型,如字符串,字节数组作为 tuple 的字段值。如果要使用另一种类型的对象,只需要为这个类型实现一个 serializer.

topology 中的每一个节点都应该为它要发射的元组声明输出字段, 例如, 下面这个bolt声明了它发射字段为 “double” 和 “triple” 字段的元组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}

declareOutputFields 方法声明了输出字段为["double", "triple"].

简单的topology例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* This is a basic example of a Storm topology.
*/
public class ExclamationTopology {
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}

这个 topology 包含一个 spout 和两个 bolt,spout 发送单词,每一个 bolt 附加 “!!!” 到它的输入数据中。这些节点排练成一条线:spout 先发射 tuple 到第一个 bolt,然后第一个 bolt 发送到第二个 bolt。如果 spout 发送 [“bob”] 和 [“john”] 元组,然后第二个bolt会发送 [“bob!!!!!!”] 和 [“john!!!!!!”] 元组.

代码中使用 setSpoutsetBolt 方法来定义节点.这些方法需要传入: 一个用户指定的id,一个包含处理逻辑的对象,以及你希望这个节点运行的并行数量。在这个例子中,spout 被指定了id “words”, bolt 被指定了id “exclaim1” 和 “exclaim2”.

传入的 Spout 对象实现了 IRichSpout 接口并包含业务逻辑, Bolt 对象实现了 IRichBolt 接口并包含业务逻辑.

最后一个参数,设置这个节点的并行数量是几,这个参数是可选的,它表明有多少线程应该在集群中运行该组件 ,如果你忽略了它,Storm 会给这个节点(即 spout 或者 bolt)只分配一个线程。

setBolt 返回一个 InputDeclarer 对象用来给 bolt 定义输入。这 “exclaim1”组件 声明了它要想读入所有 “words” 组件的发射的打乱分组过的所有 tuple. “exclaim2” 组件声明了它要读入所有 “exclaim1” 发射的打乱分组过的 tuple,”打乱分组”(shuffile group)意味着 tuple 必须从输入中随机分发到 bolt 的任务中。有许多在组件之间将数据分组的方法,打乱只是其中一种。

如果你希望 “exclaim2” 组件,既读取 “words” 又读取 “exclaim1” 发射的 tuple , 你可以像如下这样实现 “excliam2” :

1
2
3
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");

可以给 bolt 链式的指定多个数据源。

Spouts 负责发射新的消息到 topology中。 在这个 topology 中 TestWordSpouts方法 从 [“nathan”, “mike”, “jackson”, “golda”, “bertels”] 中每 100毫秒 发射一个随机的字符, TestWordSpout 中 nextTuple()方法 的实现是这样的:

1
2
3
4
5
6
7
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}

ExclamationBolt类的 prepare 方法给 bolt 提供了一个 OutputCollector 对象用来从这个 bolt 中发射 tuple 。 在这个 bolt 中的任何位置都可以发射 tuples —在 prepare, execute, cleanup 方法中, 甚至在异步的其他线程中。

  • prepare 方法仅仅保持一个 OutputCollector 对象实例以便在后面execute 方法中调用。

  • execute 方法从输入中接收一个 tuple。ExclamationBolt 从元组中取到第一个字段,然后在后面附加 “!!!” 。 如果你实现的 bolt 订阅了多个输入源, 你可以使用Tuple#getSourceComponent 方法查到当前的 tuple 是来自哪个组件.

    execute 方法里还可以做一些其他操作,即将输入的 tuple 作为 emit 的第一个参数传入,这样这个 tuple 会被确认。这是 Storm 可靠api一部分它能保证,不会丢失数据,这些在本教程后面的章节中还会阐述。

  • cleanup 方法会在 Bolt 停止时被调用,用来关闭清理所有打开的资源。不能保证这个方法一定会在集群中被调用,如果正在运行的机器发生了爆炸(作者在搞笑),这样就没办法调用这个方法了。cleanup方法其实是专门为你在本地模式(将Storm集群在一个进程中模拟出来)下运行 topology ,你希望运行和杀掉 topology 而不必担心资源泄露。

declareOutputFields 方法声明 ExclamationBolt 发射包含一个 word 字段的 tuple

getComponentConfiguration 方法允许你配置影响这个 bolt 如何运行的各种参数,关于配置的更多内容 Configuration.

cleanupgetComponentConfiguration 方法通常并不是必须的, 你可以通过继承一个提供了默认实现的基类来更简洁的定义 bolt。 通过继承 BaseRichBolt类 ,ExclamationBolt可以被实现的更简洁,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

在本地模式下运行 ExclamationTopology

Storm 有两种运行模式:本地模式分布式模式

  • 在本地模式中,Storm 完全在一个进程中运行,用线程来模拟各个工作节点。本地模式对与开发和测试topology是非常有用的,当你运行 storm-starter 中的 topology时,它会运行在本地模式下,可以看到每一个组件发射的消息.
  • 在分布式模式下,Storm 运行在一组机器上,当你提交一个 topology 到 master上,就会同时提交所有必要的代码来运行这个 topology,master会负责分发你的代码,并分配工作进程来运行你的 topology,如果工作进程挂掉了,master会在某处重新分配他们.

下面是在本地模式运行 ExclamationTopology 的代码:

1
2
3
4
5
6
7
8
9
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

这段代码通过创建 LocalCluster 对象定义了一个进程内的集群。提交 topology 到虚拟集群和提交到分布式集群是一样的,通过调用 submitTopology 来向 LocalCluster 中提交 topology,它接受三个参数: topology的名字,topology的配置,topology本身。

名字是用来识别这个 topology,以便日后杀掉它。。topology会一直运行直到你杀掉它。

配置是用来调优运行 topology 的各个方面,下面是两个常见的配置:

  1. TOPOLOGY_WORKERS (用 setNumWorkers来设置) 指定你将在集群分配几个进程来运行这个这个topology,topology中的每一个组件会被当做多个线程来运行。一个组件被分配线程的数量通过 setBoltsetSpout 方法来配置,这些线程存在于工作进程中。每个工作进程包含一些组件中的一些线程,例如,你分配了 300 个线程给所有的组件,在配置中设置了50个工作进程,那么每个工作进程会运行6个线程,每一个线程可能属于不同的组件。通过调整每个元件的并行度和运行这些线程的工作进程的数量来对 storm 的并行性能调优。
  2. TOPOLOGY_DEBUG (通过 setDebug 设置),当被设为 true 时,storm 将记录元件发射的每个消息,在本地模式测试topology时这是很有用的,但是在线上模式运行时,你更愿意将它关闭

流分组 Stream groupings

流分组让 topology 知道在组件之间如何发送 tuple,记住 spouts 和 bolts 是被当成很多 tasks (这里的task 就是setBolt和 setSpout 中产生的工作线程,如果设置了数量,就是线程组或者任务组即 set of tasks)并行运行在整个集群中的,如果想看看 topology 是如何在 task 层级运行的,如下图:

Tasks in a topology

当一个运行 Bolt A 的 task 发射了一个 tuple 到 Bolt B,那么它应该发射到哪个 task(当然是运行Bolt B 的task) 呢?

流分组 (Stream grouping)答了这个问题,它告诉 Storm 如何在 set of task(任务组)之间发送 tuple,在我们深入不同种类的流分组以前,让我们看看 storm-start 里的另一个 topology ,WordCountTopology从一个 spout 中读取句子并且从 WordCountBolt 中获取某个单词出现的次数:

1
2
3
4
5
6
7
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));

SplitSentence 把它接收到的每一个句子中的每一个单词当做 tuple 发送出去,WordCount在内存中维护了一个单词和数量的映射关系,每次 WordCount 接收到一个单词,它就更新单词的数量,然后发送新的单词数量。

还有一些不同种类的流分组:

  • 基本的分组类型叫做 “乱序分组(shuffle grouping)” ,它将使 tuple 被随机发个一个 task,WordCountTopology中 使用了乱序分组来从 RandomSentenceSpoutSplitSentence 发送 tuple, 这样所有的处理任务就能够被平均的分配到所有运行SplitSentence Bolt的 task 上。

  • 一个更有趣的分组类型是 字段分组(fields grouping)SplitSentenceWordCount之间使用了一个字段分组,WordCount能够运作的一个极为重要的要求是相同的单词必须被发到同一个 task中,否则会有一个以上的 task 会接收到相同的单词,然后他们会发射错误的计数。字段分组使我们可以用字段将一个流分组,这使得相同字段的内容总是被分到同一个task中。由于WordCountword 字段上使用字段分组订阅了 SplitSentence‘s 的输出流,这样相同的单词总是会进入到相同的task.

    字段分组是流连接和流聚合以及许多其他用力的基本实现,究其原理,字段分组是通过 mod hashing(哈希的一种) 来实现的.

还有一些其他类型的分组,可以在概念里查看更多。

保证消息处理的可靠性

storm保证从spout发出的每个tuple都会被完全处理。

Storm 提供了几个不同层次的保证消息处理的方式,例如最佳努力,至少一次,只有一次三种方式。下文基于“至少一次”方式.

消息被”完整处理”的含义?

从spout发射的一个tuple可以引起其它成千上万个tuple因它而产生 。举例:统计每个单词出现次数的Topology.

1
2
3
4
5
6
7
8
9
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));

这个topology从Kestrel队列中读取句子,并将句子分割成一个个的单词,最后发送的是各个单词出现的次数。从spout发出的tuple将会在下游触发生成更多的tuple:句子中的每个单词会形成一个tuple以及后面每个单词的计数会形成新的tuple。以下是tuple构成的消息树或者说tuple树:

Tuple tree

当这棵tuple树被完全使用并且这棵树中的所有的消息都被完全的处理过了,storm就认为spout发出的tuple(这个tuple是树中的根节点)被”完全处理”了。而当这棵树中的所有消息在特定的时间内没有被完全的处理,storm就认为该tuple就是未被完全处理的。其中,处理的时限可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 进行设置,默认是30秒。

消息被完全处理或者未被完全处理分别会发生什么?

为了理解这个问题,我们首先来看看spout发出的tuple的生命周期。这里给出了spouts需要实现的接口作为参考:

1
2
3
4
5
6
7
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}

首先,Storm通过调用Spout中的nextTuple方法来请求一个tuple。Spout使用open方法中提供的SpoutOutputCollector的实例来向它的输出流中发送tuple(collector.emit())。当发送一个tuple时,Spout会提供一个”message id”,后面将会用这个”message id”来标识相应的tuple。例如:KestrelSpout从kestrel队列中读取一条消息并以Kestrel中消息的id作为tuple的”message id”来发送该tuple。通过SpoutOutputCollector的实例_collector来发送方式如下:

1
_collector.emit(new Values("field1", "field2", 3) , msgId);

接下来,该tuple被发送至消费bolts,Storm来跟踪以该tuple为根节点生成的消息树。如果Storm检测到一个tuple被”完全处理”了,Storm将会根据message id调用起始时Spout task(这里设计到并发,一个spout可能产生多个spout task,各个task都会产生tuple)中的ack方法。同样的,如果tuple在规定时间内未被”完全处理”,Storm就会调用fail方法。调用ack或者fail在生成该tuple的Spout task上进行的。因此,一个Spout在执行时产生了多个tasks,一个tuple的ack或fail不会又非生成该tuple的task来完成。

用KestrelSpout来看看Spout在消息处理保证机制中做了些什么。当KestrelSpout从Kestrel队列中获取一条消息时,它会”opens”(打开)这个消息。这意味着该消息还并没有从队列中提取出来,而是处于一种”pending”(待处理)的状态,等待确定这条消息被确实的处理完成。处于”pending”状态的消息不会被发送到该队列的其他消费者中。另外,如果客户端断开了连接,所有”pending”状态下的消息都将回归队列中的正常状态。当一个消息被”opend”(打开),Kestrel将会提供该消息的数据以及该消息的唯一的id给相应的客户端。KestrelSpout就使用这个id作为tuple的”message id”。当KestrelSpout中的ack或者fail被调用时,KestrelSpout将会发送ack或者fail消息并附上message id给Kestrel,以便Kestrel将该消息从队列中踢出或者让该消息回归正常状态等待下次”open”。

Storm的可靠性API

作为一个用户,我们使用storm的可靠性能力时需要做两件事。1.当我们在消息树中建立了新的连接的时候,我们需要告诉Storm。2.当我们处理完了单个的tuple(这里tuple是整个过程中产生的tuple不单单只根节点的那个tuple)也需要告诉Storm。通过以上两点,Storm就可以检测tuple树是否处理完成,并调用相应的ack或者fail。Storm API提供了简洁的方案来完成上述任务。

确定tuple树中的一个连接叫做”anchoring”(锚定)。Anchoring将在发送一个新tuple的时候就完成。我们看下面这个bolt例子。这个bolt将一个包含一个句子的tuple分割成多个单词tuple:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

每个单词tuple通过_collector.emit(tuple,new Values(word))中的第一个参数(本例中的tuple)来anchored(锚定)。因为单词tuple已经被锚定了,如果下游的tuple出现处理失败的情况,处于树根节点的spout tuple就会进行重发。如果我们以_collector.emit(new Values(word))的方式发送tuple,显然这种方式发送的tuple是没有被锚定的。如果下游处理失败,也不会重发。我们可以根据具体的容错需要来选择相应的方式。一个输出的tuple可以被多个输入tuple锚定。当需要进行流连接(join)或者汇聚(aggregation)时,这个功能非常有用。一个多锚定的(multi-anchored)tuple处理失败将可能会引起spouts的多个tuple的重发。通过以下方式可以完成多锚定(multi-anchored):

1
2
3
4
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

多锚定将输出的tuple加入了一个多tuple树(其实已然不是一个树结构,而是一个有向无环图DAG,其实树就是一个特殊的DAG嘛)可以称为tuple-DAG:

Tuple DAG

Anchoring使得我们能够使tuple树具体化。如何来确定单个tuple的处理是否完成: 这是通过OutputCollector中的ack以及fail来实现的。例如,上面的SplitSentence例子,它在发送完所有的单词tuples后进行了ack。

可以通过OutputCollector中的fail方法来迅速通知位于根节点的spout tuple。这样,我们就无需等到超出时限才发送fail消息。

每个tuple都必须进行ack或者fail。Storm使用内存来跟踪每个tuple,所以如果不对每个tuple进行ack或者fail,负责跟踪tuple的任务将一直运行,直到内存用光。

大多数bolts都以种通用的模式来读取输入的tuple,发送tuple,以及在execute方法的结尾处来ack这个tuple。这种的bolts被归到了过滤器或者说简单函数这一类。Storm中有一个BasicBolt接口包含了这一模式。将SplitSentence改成BasicBolt的形式:

1
2
3
4
5
6
7
8
9
10
11
12
public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

这个实现比以前的更加简洁,但是功能上是一样的。Tuples在BasicOutputCollector中是自动与输入的tuple锚定的(不需要我们填tuple参数)。并且当execute方法完成时输入tuple也是自动的进行ack。相反,bolts进行aggregations(汇聚)或者joins(连接)时,将会推迟ack直到在批量的tuples的基础上计算出了结果之后。Aggregations以及joins将会对他们的output tuples进行多锚定,这些东西超出IBasicBolt的简单模式。

Storm如何有效的实现可靠性

在Storm topology中有一系列特殊的”acker”它们会为每个spout tuple追踪DAG中的tuples。当一个acker检测到一个DAG完成了,它将会发送一条消息给生成了该spout tuple的task来进行确认。(这里,大家肯定会有疑惑,每次tuple发送时都进行了ack,为什么还有acker。answer:每次的ack是告知acker本次的处理完了。而acker要进行汇总告诉根节点task所有的tuple都处理完成了)。可以通过Config.TOPOLOGY_ACKER_EXECUTORS来设置acker的个数。Storm默认acker的个数与worker的个数一样。–当我们需要处理大量的消息时,我们可能就需要提高这个acker的量了。

理解Storm的可靠性的最好方式是研究tuples以及tuple DAG的生命周期。当tuple在topology中生成时,无论是spout还是bolt生成的tuple,都将被赋予一个64bit的随机id。这些id就被ackers用来跟踪这个tuple DAG中的每个spout tuple。

每个tuple都知道所有与其位于同个DAG的spout tuples的id。当我们发送一个新的tuple时,spout tuple的id将会通过锚定这个方式拷贝到新的tuple中。当该tuple进行ack,它会发送一条消息给相应的acker task告诉其tupel DAG的变化情况。白话一点:该tuple说:我已经完成了该id号的spout tuple来的计算,然后这些是锚定了我的新的tuple。

举个例子,如果tuples D与E是在C的基础上生成的,那么下面就是当C ack以后,tuple DAG的变化(打红叉说明就是ack了):

img

由于C被ack时,D与E又被加入到了DAG中,故这个DAG还没有被完全处理。

这里有些细节需要说明,上面已经提过,你可以自己设置任意数量的acker tasks。这就会导致出现问题:当一个tuple被ack了,它如何知道它该向哪个acker task发消息?

Storm使用mod hashing的方式将spout tuple id映射给相应的acker task。因为每一个tuple知道它所有的spout的tuple id, 所以它自然可以算出要通知哪个acker(这里注意因为一个tuple可能存在于多个tuple树)。

另一个细节是acker task如何知道向哪个spout task发送消息?

当spout task发出一个新的tuple时,它会简单的发一个消息给一个合适的acker,并且告诉acker它自己的id(taskid),这样storm就有了taskid-tupleid的对应关系。因此acker task可以根据spout task id来确定当tuple完成后向相应的spout task发消息。

Acker tasks并不是显式的追踪tuple DAG(这会使得单单运行acker task就耗光内存),而是acker用了一种不同的方式,使得对于每个spout tuple所需要的内存量是恒定的(20 bytes)。这种追踪算法是Storm的关键,也是storm的一个主要突破。

acker task存储了一个spout tuple id到一个值对(value1,value2)的映射 。value1是spout task id用来在完成处理tuple的时候发送消息用的。value2是一个64bit的数字叫做”ack val”(ack变量):ack val是整个tuple树的状态的一个表示,不管这棵树多大, 它只是简单地把这棵树上的所有创建的tupleid/ack的tupleid一起异或(XOR)。。

当acker task发现ack变量的值为0时,那就说明tuple树已经完全处理了。因为tuple id是随机的64bit数,所以如果因为不同的数异或产生0的概率是特别小的。经数学计算,每秒10k次的ack,也需要50000000年才会出现一次上述错误情况。而且即使出现了上述错误,也仅仅是造成了一次数据的丢失,如果碰巧这次处理是失败的。

现在我们来看看,在各种失败的情况下,storm如何避免数据丢失:

  • 由于task died,tuple未进行ack:这种情况下,处理将超时,spout tuple将重发
  • Acker task dies:这种情况下所有该Acker task跟踪下的spout tuple都将因为超时而重发
  • Spout task dies:这种情况下Spout的源头将进行消息重发,例如:当与消费客户端失去连接后,Kestrel和RabbitMQ将会把所有的”待处理”消息回复正常。

你可以看到,Storm的可靠性机制是完全分布式、可扩展以及容错的。

调整storm的可靠性

Acker tasks是轻量级的,所以我们在topology并不需要部署太多。我们可以通过Storm UI来监控他的性能。如果流量不太对,我们可能就需要加大acker task的量了。

如果可靠性对你来说不太重要,我们就可以不追踪tuple了。这将使消息的传输量下降一半。另外,下游的tuple也不需要拷贝spout tuple id,这也将减少带宽使用。

我们有三种方式来移除可靠性。第一种是设置Config.TOPOLOGY_ACKERS为0 ,这种情况下,Storm将会在spout发送了tuple后就调用spout的ack方法。因此DAG就不会被追踪。

第二种方式是将消息的message id置为null。通过在SpoutOutputcollector.emit参数中设置message id为null,就可以关闭对当前spout tuple的追踪。

第三种方式就是之前提到过的,我们可以不对下游的tuple进行锚定(anchor)。

热评文章