Scala中Akka实战,使用Akka完成wordCount计算任务、计算wordCount只有一个源文件 21

    xiaoxiao2023-10-05  152

    1. 具体需求&思路

    需求:实现单个文件的单词计数源文件:E:\wordCount.txt代码思路: 继承actor重写act()方法调用start()方法发送消息接收消息 a. 读取源文件,得到字符串,切割为list集合(每一行是一个元素),对 list进行flatMap()得到 =>Array[String] k1 v1 b. 自定义map逻辑,对Array[String]进行map() =>得到Array[(String,Int)],也就是得到k2,v2 c. 分组=>拆分,相同k2 发送到同一个reduce上,k2,v2合并成为集合Map[String,Array[(String,Int)]] d. 自定义reduce逻辑,Map[String,Array[(String,Int)]]=>Map[String,Int]。对v2合并的集合计算长度,得到k3,v3

    2. 代码如下

    import java.io.File import scala.actors.Actor import scala.collection.mutable import scala.io.{BufferedSource, Source} //定义模板类,发送和接收消息 case class SubmitTask(fileName:String) //1. 定义异步类,接收消息 class WorldCount extends Actor{ //2. 重写act()方法 override def act():Unit = { //5. 循环接收消息 loop{ react{ //业务代码 //1. 接收消息 case SubmitTask(fileName) => { //1. 接收到了fileName,需要通过url获取文件 val file:BufferedSource = Source.fromFile(new File(fileName)) //操作file对象,得到每一行使用“\r\n”拼接起来=>字符串 val contenStr:String = file.mkString //2. 切割字符串,得到list集合,每一个元素就是每一行内容 //[i love hadoop,i love hadoop too] val lines:Array[String] = contenStr.split("\r\n") //3.1 需要对lines进行map&flat => Array[String] //map就是lines中每一个元素进行切割,flat就是把map处理后的内层抽取出来,抽取为一个list集合 //array [i love hadoop i love hadoop too] val array:Array[String] = lines.flatMap(_.split(" ")) //3.2 还有对array进行map操作,也就是转变为k2(每一个单词) v2(1) //[("i",1),("love",1),("hadoop",1),("i",1),("love",1),("hadoop",1),("too",1)] val arrayTuple:Array[(String,Int)] = array.map((_,1)) //3.3 对元祖数组进行分组 => 分组,k2发送到同一个reduce,(k2,v2)形成一个集合 //{"i":[("i",1),("i",1)],"love":[("love",1),("love",1)],"hadoop":[("hadoop",1),("hadoop",1)]...} val groupBy:Map[String,Array[(String,Int)]] = arrayTuple.groupBy(_._1) //3.4 对map中的每一个元素的value,计算长度,重新写回value的位置 val mapResult:Map[String,Int] = groupBy.mapValues(_.length) //4. 输出结果 println(mapResultn) } //如果没有匹配到消息 case _ => { println("-----------no fileName---------") } } } } } //3. 定义object,发送消息 object WorldCount{ def main(args:Array[String]):Unit = { //1. 封装文件url val files:Array[String] = Array("E:\\wordCount.txt") //2.1 创建伴生类对象,调用start()方法,开启线程 val wordCount:WorldCount = new WorldCount //2.2 启动start()方法 wordCount.start() //4. 发送消息,异步无返回值,因为就计算一个文件 wordCount ! SubmitTask(files(0)) } }
    最新回复(0)