需要计算的目标文件是多个txt文档,代码如下
import java.io.File import scala.actors.{Actor, Future} import scala.collection.immutable._ import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.io.{BufferedSource, Source} /** * */ //0. 样例类, //发送消息 case class SendMesN(fileName: String) case class ReturnMesN(map: Map[String, Int]) class WordCountN extends Actor { override def act(): Unit = { loop { react { case SendMesN(fileName) => { //1. 读取源文件 val file: BufferedSource = Source.fromFile(new File(fileName)) val contents: String = file.mkString //切割为数组 val lines: Array[String] = contents.split("\r\n") //对每一行按照空格切割, 然后flat到同一层级 val array1: Array[String] = lines.flatMap(_.split(" ")) //2. 自定义map逻辑, k2 v2 val array2: Array[(String, Int)] = array1.map((_, 1)) //3. 分组, val map1: Map[String, Array[(String, Int)]] = array2.groupBy(x => x._1) //4. 自定义reduce逻辑 val mapNew: Map[String, Int] = map1.mapValues(x => x.size) // println(mapNew) sender ! ReturnMesN(mapNew) } case _ => { println("没有收到消息") } } } } } object WordCountN { def main(args: Array[String]): Unit = { //封装url val array: Array[String] = Array("E:\\01HM\\00pro\\scala-day02\\aaa\\wordCount.txt", "E:\\01HM\\00pro\\scala-day02\\aaa\\wordCount1.txt", "E:\\01HM\\00pro\\scala-day02\\aaa\\wordCount2.txt") //调用start()方法 val wo: WordCountN = new WordCountN wo.start() //定义一个set集合, 用于接收未来返回值 val resultSet = new mutable.HashSet[Future[Any]] val resultList = new mutable.ListBuffer[ReturnMesN] //循环发送异步有返回值的消息 for (fileName <- array) { val future: Future[Any] = wo !! SendMesN(fileName) resultSet += future } //判断resultSet中是否有值 while (resultSet.size > 0) { //如果有值, 需要过滤掉isSet为false的为来值 val futureResult: mutable.HashSet[Future[Any]] = resultSet.filter(_.isSet) //遍历set中的结果, for (item <- futureResult) { //通过apply()和强转方法, 得到每一个文件的ReturnMesN(map) resultList += item.apply().asInstanceOf[ReturnMesN] //需要从set中移除,要不然,循环停不了 resultSet -= item } } //resultList //[{result:{hadoop -> 2, love -> 2, i -> 2, too -> 1}},{result:{hadoop -> 2, love -> 2, i -> 2, too -> 1}},{result:{hadoop -> 2, love -> 2, i -> 2, too -> 1}}] //1. 得到内存 // [{hadoop -> 2, love -> 2, i -> 2, too -> 1},{hadoop -> 2, love -> 2, i -> 2, too -> 1},{hadoop -> 2, love -> 2, i -> 2, too -> 1}] val list1: ListBuffer[Map[String, Int]] = resultList.map(_.map) //2. 内存再抽离出来 // [(hadoop -> 2), (love -> 2), (i -> 2), (too -> 1)...] val list2: ListBuffer[(String, Int)] = list1.flatten //3. 分组 val map1: Map[String, ListBuffer[(String, Int)]] = list2.groupBy(_._1) //4. 自定义reduce逻辑 val mapNew: Map[String, Int] = map1.mapValues(_.foldLeft(0)(_ + _._2)) // val mapNew: Map[String, Int] = map1.mapValues(_.size) println(mapNew) } }