storm是一个流式并行计算框架,该框架的特点流式处理,尽可能快地处理数据,所以一般被应用在需要实时得出结果的场景。对于一个jar包提交上去的任务对应其内部一整个流程,称为topology,而一个topology里的编程模型有两种组件:
spout 负责接收外部数据,下发到blot组件blot 一种是ack的blot,负责处理错误数据,执行重发机制;另一种就是需要我们实现的业务blot,用于接收spout下发的数据,并进行业务处理,处理完毕可以接下一个blot继续处理业务,也可以将数据存储到持久层。
storm框架也有主从节点,主节点成为nimbus,负责唤醒supervisor节点,而从节点称为supervisor,负责提交jar包,并启动设定好的work数量,而一个work就是一个执行任务,负责执行jar包上的程序。
搭建流程
首先需要有zookeeper集群,并启动集群才能将supervisor节点联系在一起配置storm文件,启动storm集群编写storm代码,提交jar包到集群上执行
搭建步骤
下载storm安装包住,这里我使用的是storm-1.2.2版本: storm1.2.2
解压storm安装包
tail -zxvf apache-storm-1.2.2.tar.gz -C /usr/local
修改storm安装路径
mv /usr/local/apache-storm-1.2.2 /usr/local/storm
修改storm文件配置
vi storm.yaml
storm.yaml配置文件信息如下: #指定storm使用的zk集群
storm.zookeeper.servers:
- "hadoop1"
- "hadoop2"
- "hadoop3"
- "hadoop4"
#指定storm本地状态保存地址
storm.local.dir: "/usr/local/storm/workdir"
#指定storm集群中的nimbus节点所在的服务器
nimbus.host: "hadoop1"
#指定nimbus启动JVM最大可用内存大小
nimbus.childopts: "-Xmx1024m"
#指定supervisor启动JVM最大可用内存大小
supervisor.childopts: "-Xmx1024m"
#指定supervisor节点上,每个worker启动JVM最大可用内存大小
worker.childopts: "-Xmx768m"
#指定ui启动JVM最大可用内存大小,ui服务一般与nimbus同在一个节点上。
ui.childopts: "-Xmx768m"
#指定supervisor节点上,启动worker时对应的端口号,每个端口对应槽,每个槽位对应一个worker
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
配置文件注意要点:
storm.zookeeper.servers 配置需要集群的storm机器主机名或ipstorm.local.dir 尽量配置在storm安装目录中,以便分发都其他机器上nimbus.host 根据自己需要配置一台机器作为storm的nimbus节点supervisor.slots.ports 配置多个supervisor启动worker的端口,用来建立supervisor与worker的通讯
创建一个保存storm的主从节点以及blot组件状态信息的工作目录
mkdir -p /usr/local/storm/workdir
分发配置好的storm到其他机器上
scp -r /usr/local/storm hadoop2:/usr/local
scp -r /usr/local/storm hadoop3:/usr/local
scp -r /usr/local/storm hadoop4:/usr/local
启动zookeeper集群,如果没有安装zookeeper的,请按照这个步骤搭建zk集群 zookeeper3.4.13集群搭建
在配置nimbus节点的机器上启动storm的nimbus节点:
nohup storm nimbus &
nohup storm ui & //启动storm的管理界面
在其他机器上启动supervisor节点,前提是启动的机器必须是zookeeper集群中的一台:
nohup storm supervisor &
输入网址:hadoop1:8080hadoop1 表示nimbus节点所在机器的主机名
如果出现上图则表示storm搭建成功!!!
storm并发编程
storm里面一个流程从接收数据到处理数据这个过程称为一个topology,可以这么理解,当我们自己编写一个java程序打包成一个jar包提交到storm上,那么storm在执行这个jar的过程就是一个topology。
一个topology里面包含了很多个task,这些task的类型大致分两种,一种是spout,另一种是blot,而storm还有内置的ack blot用于处理错误数据,实现重发机制。spout和blot是storm的编程模型,也就是说我们需要在这上面编写自己的业务代码。
spout 用于接收外部数据,并发射到blot组件上进行处理
blot 接收spout发射过来的数据,进行业务处理,然后发射到另一个bolt进行处理或存储到持久层
从物理上,storm有两种节点类型,一种是nimbus节点,属于主节点,负责分配任务,监控任务;一种是supervisor节点,属于从节点,接收nimbus分配过来的任务,并启动自己的worker,worker数量是根据端口号来的。而worker是执行具体任务的组件,相当于一个线程,一个worker可以存在多个task
在storm中有并发度的概念,指的是一种类型的task(spout / blot)被几个线程同时执行
实例演示
实现一个可以单词实时统计的功能
创建一个maven工程,在pom文件中添加如下信息
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.guoyu</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>MyKafka</name>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
topology代码演示
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import com.guoyu.storm.bolt.SplitBolt;
import com.guoyu.storm.bolt.WordCountBolt;
import com.guoyu.storm.spout.WordCountSpout;
public class WordCountTopology {
public static void main(String[] args) {
TopologyBuilder topology = new TopologyBuilder();
topology.setSpout("spout", new WordCountSpout(),1);
topology.setBolt("blot1", new SplitBolt(),10).shuffleGrouping("spout");
topology.setBolt("blot2", new WordCountBolt(), 2).fieldsGrouping("blot1", new Fields("word"));
Config conf = new Config();
conf.setNumWorkers(3);
//集群模式
//StormSubmitter.submitTopology("mywordcount", conf, topology);
LocalCluster local = new LocalCluster();
local.submitTopology("mywordcount", conf, topology.createTopology());
}
}
topology作为一个程序入口,设置相应的spout和blot,设置具体的配置。storm提供了两种模式:一种是集群模式,一种是本地模式。
spout代码演示
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class WordCountSpout extends BaseRichSpout{
SpoutOutputCollector collector;
Map<String,Integer> map = new HashMap<String,Integer>();
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
String str = "I am super man , I am guoyu";
collector.emit(new Values(str));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("god"));
}
清洗数据的bolt代码演示
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class SplitBolt extends BaseRichBolt{
OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String str = input.getString(0);
String[] words = str.split(" ");
for(String word : words) {
collector.emit(new Values(word,1));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","num"));
}
}
单词统计的blot
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class WordCountBolt extends BaseRichBolt{
OutputCollector collector;
Map<String,Integer> map = new HashMap<String,Integer>();
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String word = input.getString(0);
Integer count = input.getInteger(1);
if(map.containsKey(word)) {
map.put(word, map.get(word)+1);
}else {
map.put(word,count);
}
System.out.println("count:"+map);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
这里,我们先使用本地模式测试,效果如图:
在我们编写spout这个task上,里面的nextTuple方法是循环调用的,所以输出的数据会循环统计
集群模式
使用maven,将项目打成jar包
生成jar包的效果图
将jar包放在nimbus节点的机器上,执行提交命令,就可以启动集群模式了
storm jar wordcount-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.guoyu.storm.topology.WordCountTopology wordcount
打开管理页面就可以看到提交上去的jar包
点击wordcount打开详情页面,就可以知道worker在哪台机器上执行jar包
定位执行jar包的机器,在其机器上查看storm的日志打印
cd /usr/local/storm/logs/workers-artifacts/wordcount-1-1558526639/6700
tail -f worker.log
注:pom文件中storm的jar引用需要添加作用域,集群启动才能生效
单词实时统计次数的storm实例就此告一段落!!!