在Storm中需要用HikariCP处理Mysql的数据交互,配置文件放在了src\main\resources\下面
在拓扑启动前,把配置文件中的属性全部取出来,放到JVM中去,这样直接使用System.getProperty就可以拿到,不用写一些全局常量一些工具类。
public static void initRemote() { String fileName = "dbsetting.properties"; try { Properties properties = new Properties(); InputStream ips = PropertyUtil.class.getResourceAsStream("/" + fileName); properties.load(ips); System.getProperties().putAll(properties); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }然后在启动的时候统一调用,并传入拓扑的Spout和Bolt。
本地单机开发环境执行Storm的Topo,一切正常。
但是在部署到集群中的时候,发现Nimbus主节点可以获取到配置文件的内容正常连接,但是Supervisor却获取不到配置文件内容。
1.是不是集群环境没获取到配置文件的IO流
2.使用System进行set赋值的时候是不是没放进去
最后一直搞到天亮,发现详情如下:
1.IO流正常
2.System的set,get正常
初始化读取配置文件set到System中是在Topo提交之前,并且最初的代码是再Spout和Bolt直接从System中进行get。
这样有一个问题就是,整个工程编译后的jar是在Nimbus节点执行的,这样System的set只是单独的放到了Nimbus的JVM中。
在Supervisor节点中,由于是不同的服务器,JVM不是一个,里面的值肯定和Nimbus不一样,肯本没有这些配置文件的属性,这就导致在Nimbus启动的时候一切正常,读取配置文件的log也都能正常打印,但是在Supervisor节点查看日志却发现配置文件读取到的一直是null。
这也是为什么我本机单机启动的时候从来不报错,因为一直都用的同一个JVM。
1.在Nimbus初始化时,放入本机的JVM
2.在创建topo的时候,将配位文件数据读取到之后作为构造函数的参数传入Spout和Bolt中
3.为了满足开闭原则,我把生成HikariCP对象的方法抽了出来做成接口,并且对Spout和Bolt分别又抽了一层做成了抽象类并且继承了这个接口,实现不同的get方法,方便后期新增Spout/Bolt的时候,可以根据是否需要对数据库的CRUD而选择不继承不同的BaseSpout和BaseBolt,或者后期添加其他拥有共性的Spout/Bolt时,直接进行新增,不需要改动原有代码。
4.这么抽象完之后,看起来是真的舒服。
完整版的代码在我的git私服中,关键点代码如下:
package com.zzj.topo; import com.zzj.topo.manager.DBTopo; import com.zzj.topo.manager.TopoManager; import com.zzj.util.PropertyUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class Launcher { public static void main(String[] args) { if(args != null && args.length > 0 && "1".equals(args[0])){ PropertyUtil.initRemote(); TopoManager.startRemote(new DBTopo()); }else{ PropertyUtil.initLocal(); TopoManager.startLocal(new DBTopo()); } } } package com.zzj.util; import lombok.extern.slf4j.Slf4j; import java.io.*; import java.util.Properties; @Slf4j public class PropertyUtil { public static void initLocal() { String path = "dbsetting.properties"; Properties properties = new Properties(); InputStream isb = PropertyUtil.class.getClass().getResourceAsStream("/"+path); try { properties.load(isb); System.getProperties().putAll(properties); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally { try { isb.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void initRemote() { String fileName = "dbsetting.properties"; try { Properties properties = new Properties(); InputStream ips = PropertyUtil.class.getResourceAsStream("/" + fileName); properties.load(ips); System.getProperties().putAll(properties); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }package com.zzj.topo.manager; import com.zzj.util.PropertyUtil; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; public class TopoManager { public static void startLocal(BuildTopo baseTopo) { LocalCluster lc = new LocalCluster(); try { lc.submitTopology(baseTopo.getClass().getSimpleName(),baseTopo.getConfig(),baseTopo.getStormTopology()); } catch (Exception e) { e.printStackTrace(); } } public static void startRemote(BuildTopo baseTopo) { try { StormSubmitter.submitTopology(baseTopo.getClass().getSimpleName(),baseTopo.getConfig(),baseTopo.getStormTopology()); } catch (Exception e) { e.printStackTrace(); } } }
package com.zzj.topo.manager; import com.zzj.topo.bolt.DBBolt; import com.zzj.topo.bolt.MysqlBolt; import com.zzj.topo.config.HikariConfig; import com.zzj.topo.spout.DBSpout; import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; public class DBTopo implements BuildTopo{ private Config config; private StormTopology stormTopology; public DBTopo() { initTopo(); } @Override public Config getConfig(){ return this.config; } @Override public StormTopology getStormTopology(){ return this.stormTopology; } public void initTopo(){ Config config = new Config(); config.setMaxSpoutPending(1); config.setDebug(true); config.setNumWorkers(3); this.config = config; StormTopology stormTopology; TopologyBuilder tb = new TopologyBuilder(); tb.setSpout("DBSpout",new DBSpout(HikariConfig.getInstance().getConfig()),1).setNumTasks(1); tb.setBolt("DBBoltNone",new DBBolt(HikariConfig.getInstance().getConfig()),2).shuffleGrouping("DBSpout").setNumTasks(4); tb.setBolt("MysqlBolt",new MysqlBolt(HikariConfig.getInstance().getConfig()),2).shuffleGrouping("DBBoltNone").setNumTasks(4); stormTopology = tb.createTopology(); this.stormTopology = stormTopology; } }
package com.zzj.topo.spout; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.zzj.topo.base.BaseSpout; import com.zzj.topo.entity.Dot; import lombok.extern.slf4j.Slf4j; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Date; import java.util.List; import java.util.Map; @Slf4j public class DBSpout extends BaseSpout { private SpoutOutputCollector _spoutOutputCollector; private String sqlQuery = "select * from gps_origin"; public DBSpout() { } public DBSpout(Map<String, Object> hikariConfig) { super(hikariConfig); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { log.info("===========================================================open===========================================================open===========================================================open"); _spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { Connection conn = null; try { connectionProvider.prepare(); conn = connectionProvider.getConnection(); if(conn == null ){ log.info("===========================================================conn is null"); } PreparedStatement pst = conn.prepareStatement(sqlQuery); ResultSet rs = pst.executeQuery(); List<Dot> dotList = Lists.newArrayList(); while (rs.next()){ Dot dot = new Dot(); dot.setId(rs.getString("userid")); dot.setLat(rs.getString("lat")); dot.setLon(rs.getString("lon")); dot.setMillisecond(rs.getLong("millisecond")); dotList.add(dot); } String jsonString = JSONObject.toJSONString(dotList); log.info("=============================================================================================================="); // log.info(jsonString); log.info("SPOUT"+new Date().toLocaleString()); log.info("=============================================================================================================="); _spoutOutputCollector.emit(new Values(jsonString)); } catch (Exception e) { e.printStackTrace(); }finally { try { if(conn != null){ conn.close(); } } catch (Exception e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("fromDB")); } }
package com.zzj.topo.base; import org.apache.commons.lang.Validate; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.HikariCPConnectionProvider; import org.apache.storm.topology.base.BaseRichSpout; import java.util.Map; public abstract class BaseSpout extends BaseRichSpout implements HikariHelper{ public ConnectionProvider connectionProvider; public BaseSpout() { } public BaseSpout(Map<String,Object> hikariConfig) { getHikariProviderForSpout(hikariConfig); } @Override public ConnectionProvider getHikariProviderForSpout(Map hikariConfig) { connectionProvider = new HikariCPConnectionProvider(hikariConfig); Validate.notNull(connectionProvider); return this.connectionProvider; } @Override public ConnectionProvider getHikariProviderForBlot(Map hikariConfig) { return null; } }
/* * Copyright (c) * 19-5-26 上午2:42 */ package com.zzj.topo.base; import org.apache.storm.jdbc.common.ConnectionProvider; import java.util.Map; public interface HikariHelper { ConnectionProvider getHikariProviderForSpout(Map hikariConfig); ConnectionProvider getHikariProviderForBlot(Map hikariConfig); }