1. 具体需求&思路
需求:实现单个文件的单词计数源文件:E:\wordCount.txt代码思路:
继承actor重写act()方法调用start()方法发送消息接收消息 a. 读取源文件,得到字符串,切割为list集合(每一行是一个元素),对 list进行flatMap()得到 =>Array[String] k1 v1 b. 自定义map逻辑,对Array[String]进行map() =>得到Array[(String,Int)],也就是得到k2,v2 c. 分组=>拆分,相同k2 发送到同一个reduce上,k2,v2合并成为集合Map[String,Array[(String,Int)]] d. 自定义reduce逻辑,Map[String,Array[(String,Int)]]=>Map[String,Int]。对v2合并的集合计算长度,得到k3,v3
2. 代码如下
import java
.io
.File
import scala
.actors
.Actor
import scala
.collection
.mutable
import scala
.io
.{BufferedSource
, Source
}
case class SubmitTask(fileName
:String
)
class WorldCount extends Actor{
override def
act():Unit
= {
loop
{
react
{
case SubmitTask(fileName
) => {
val file
:BufferedSource
= Source
.fromFile(new File(fileName
))
val contenStr:String
= file
.mkString
val lines
:Array
[String
] = contenStr
.split("\r\n")
val array
:Array
[String
] = lines
.flatMap(_
.split(" "))
val arrayTuple
:Array
[(String
,Int
)] = array
.map((_
,1))
val groupBy
:Map
[String
,Array
[(String
,Int
)]] = arrayTuple
.groupBy(_
._1
)
val mapResult
:Map
[String
,Int
] = groupBy
.mapValues(_
.length
)
println(mapResultn
)
}
case _
=> {
println("-----------no fileName---------")
}
}
}
}
}
object WorldCount
{
def
main(args
:Array
[String
]):Unit
= {
val files
:Array
[String
] = Array("E:\\wordCount.txt")
val wordCount
:WorldCount
= new WorldCount
wordCount
.start()
wordCount
! SubmitTask(files(0))
}
}