spark的问题集锦

    xiaoxiao2022-07-14  179

    和小伙伴们一起做这个项目好久了,上线期间也是遇到了许多问题,这里自我总结下,防止下次再犯! 项目在开发方面主要是基于spark开发的,里面使用到了spark core、spark sql、spark Streaming去进行编程,项目中有多个模块,像用户session分析模块、单条转换率模块、各区域热门商品模块、广告实时点击流统计模块等。 项目中遇到的问题: 1、classNotFound问题 使用maven把项目打成jar包,但是这里一定要主要不要忘记把一些项目中需要的依赖也打进jar包中。 2、数据写入mysql出现问题 经检查发现是mysql关闭了,解决办法:在调度脚本中加上service mysqld start,保证mysql启动。 3、集群内存不足,解决办法:hadoop集群扩容,增加Datanode。 4、spark序列化问题,serialable error等此类型的错误一般都是因为没有把class类序列化,解决办法:在定义pojo类时都实现serializable接口。 5、spark程序运行过程中时不时的报错shuffl …file not found ,产生原因:一般是在shuffle时某个节点向别的节点远程拉取数据,但是那个节点却正在做gc操作,在gc的时候该节点的其他的一切进程都会暂时停止工作,所以如果刚好碰到时间较长的gc时就会拉取数据失败并报错。解决办法:在sparkConf中设置一些参数: spark.shuffle.io.maxRetries, 20 //最大尝试拉取数据次数,默认3次 spark.shuffle.io.retrywait,60 //每次拉取数据等待时间,默认5s 6、spark streaming的batch interal的调试,不能太大,也不要过小,不能低于200ms 7、spark宕机消费kafka的数据保证exactly once,当spark程序停掉或意外死亡后,如果没有任何配置,那么从它死亡到重启之间的kafka的数据就不能被消费了,相当于丢失了,这是我们不能接受的,解决办法:spark在消费kafka数据时我们自己维护好每个topic的partition的消费到的offset,在开始读取kafka数据时要以指定offset的方式读取kafka的数据。代码干货: //从db中读取自己维护的offset数据,kafkaTopic是自定义的pojo List lastestOffset = DBUtil.getLastestOffset(); Map<String,String> kafkaParams =new HashMap<String, String>(); kafkaParams.put(“metadata.broker.list”, “192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092”); Map<TopicAndPartition, Long> fromOffset = new HashMap<TopicAndPartition, Long>(); //可以指定添加多个topic及分区,这里批量从mysql中查询并设置 for (kafkaTopic kafkaTopic : lastestOffset) { fromOffset.put(new TopicAndPartition(kafkaTopic.getTopic(), kafkaTopic.getPartition()), kafkaTopic.getOffset()); } JavaInputDStream source = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, fromOffset, new Function<MessageAndMetadata<String, String>, String>() { //预处理,这里不做任何处理,直接返回message private static final long serialVersionUID = 1L; public String call(MessageAndMetadata<String, String> v1) throws Exception { return v1.message(); } }); //内部在使用对应的Dstream时把offset维护到db中 /** * * @param source源数据流 */ public static void storedToDb(JavaInputDStream source){ source.foreachRDD(new Function<JavaRDD, Void>() { private static final long serialVersionUID = 1L; @Override public Void call(JavaRDD v1) throws Exception { kafkaTopic kafkaTopic = new kafkaTopic(); OffsetRange[] offsets = ((HasOffsetRanges)v1.rdd()).offsetRanges(); System.out.println(“我是offset:”+offsets); for (OffsetRange o : offsets) { System.out.println("我是topic: “+o.topic()+” 我是partition: “+o.partition()+ " 我是fromoffset: “+o.fromOffset()+” 我是untiloffset:”+o.untilOffset()); kafkaTopic.setTopic(o.topic()); kafkaTopic.setPartition(o.partition()); kafkaTopic.setOffset(o.fromOffset()); DBUtil.updateOffset(kafkaTopic); } // update to DB … return null; } }); } 8、OutOfMemory,产生原因:内存不足,一般在shuffle时容易产生,解决办法:增大作业资源、减少不必要的持久化Rdd、broadCast的使用、Kryo序列化的使用,shuffle方面的调优(consolidation机制的开启,适度减小shuffle中reduce task向shuffle map task拉取数据的大小spark.reducer.maxSizeInFlight,默认为48M,试着去调整) 9、spark streaming的batch Durations的调试过程 首先打开spark的作业监控webUI的streaming页面 内部的几个重要指标: (1):该指标为数据源source的接收,每秒从数据源接收的时间事件数,这里我的数据源是kafka。 (2):数据能不能处理完首先看这个指标。该指标表示spark job提交的延迟时间,比较重要,spark 能否在指定的Durations时间内处理完一批数据可以看这个,一般只要延迟时间在一个Durations内就可以,如果Durations过高,或一直持续增长(就像我的这种情况),则表示在你定义的Durations内处理不完数据,数据一直在堆压,此时解决办法为:去优化!第一步也是最有效的一步就是加大集群资源,进行集群扩容,但是扩容可没那么简单啊,哎,等领导同意扩容了黄花菜都凉了,所以扩容知道下就行了,一般这种方案是行不通的。第二步看看你当前可以使用的集群资源是否完充分使用,在sparkConf中设置并行度conf.set(“spark.default.parallelism”, “10”);第三步加大你的Durations时间,这个时间具体为多少就要根据具体业务去慢慢调试了。 (3):该指标表示job的处理时间,也可以关注看看 10:sparkStreaming的高可用 (1):Driver高可用 在程序中创建streaming工厂,一旦Driver在运算过程中挂掉了还会自动重启,当然对于数据的读取依赖于checkpoint,因此必须要设置checkpoint目录。 具体使用方式为:

    public static void main(String[] args) { final String checkPoint="hdfs://master:9000/check/"; JavaStreamingContextFactory javaStreamingContextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf conf = new SparkConf() .setAppName("wordCount") .setMaster("local") .set("spark.default.parallelism","10"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2)); jsc.checkpoint(checkPoint); //.......具体处理逻辑都写在这里 return jsc; } }; JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkPoint, javaStreamingContextFactory);//外部获取 jsc.start(); jsc.awaitTermination(); jsc.close(); }

    除此之外想要保证Driver的高可用,在提交spark作业时必须是cluster模式, 且在spark-submit中加上–supervise参数。 (2):WAL预写日志的开启 conf.set(“spark.streaming.receiver.writeAheadLog.enable”,“true”); 11、在项目中是不允许硬编码的,一般都会写到配置文件中以便于后期的维护,而且最好的做法是把配置文件放到项目外部,这样修改起来就很方便了,spark也提供了这个功能,在spark-submit后跟上–files properties文件,在spark的程序内使用流的方式去读取文件properties.load(new FileInPutStream(“fileName”)); 一个简单的小干货如下:

    public static void main(String[] args) throws Exception{ SparkConf conf = new SparkConf() .setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); Properties properties =new Properties(); properties.load(new FileInputStream("test.properties")); String name = properties.getProperty("name"); System.out.println("从配置文件读取的名字为:"+name); jsc.close(); } (持续更新中)
    最新回复(0)