0. 说明
Spark 下运行job,使用第三方 Jar 包的 3 种方式。
1. 方式一
将第三方 Jar 包分发到所有的 spark/jars 目录下
2. 方式二
将第三方 Jar 打散,和我们自己的 Jar 包打到一起
类似的例子可以参考 在 Spark 集群上运行程序 中的打包部分
3. 方式三
在 spark-submit 命令中,通过 --jars 指定使用的第三方 Jar 包
【案例:使用 spark-shell 执行 taggen】
1. 启动 spark-shell,指定 fastjson 类库。
定位到 fastjson jar 包
D:\maven_repository\com\alibaba\fastjson\1.2.47\fastjson-1.2.47.jar
2. 启动spark-shell
spark-shell --master spark://s101:7077 --jars /home/centos/fastjson-1.2.47.jar
3. 定义函数 extractTag
// 定义函数,抽取标签列表
def extractTag(json: String) = {
import com.alibaba.fastjson.JSON
var list: scala.List[String] = Nil
// 将字符串解析成 json 对象
val obj = JSON.parseObject(json)
val arr = obj.getJSONArray("extInfoList")
if (arr != null && arr.size > 0) { // 得到数组的第一个 json 对象
val firstObj = arr.getJSONObject(0)
val values = firstObj.getJSONArray("values")
if (values != null && values.size > 0) {
var i = 0
while (i < values.size) {
val tag = values.getString(i)
list = tag :: list
i += 1;
}
}
}
list
}
4. 加载文件
// 1. 加载文件
val rdd1 = sc.textFile("/user/centos/temptags.txt")
5. 解析每行的 json 数据成为集合
// 2. 解析每行的json数据成为集合
val rdd2 = rdd1.map(line => {
val arr: Array[String] = line.split("\t")
// 商家id
val busid: String = arr(0)
// json
val json: String = arr(1)
val list: scala.List[String] = extractTag(json)
(busid, list)
})
6. 过滤空集合
// 3. 过滤空集合 (85766086,[干净卫生, 服务热情, 价格实惠, 味道赞])
val rdd3 = rdd2.filter(t => {
!t._2.isEmpty
})
7. 将值压扁
//4. 将值压扁
val rdd4 = rdd3.flatMapValues(list=>{
list
})
8. 滤除数字的tag
//5. 滤除数字的tag
val rdd5 = rdd4.filter(t=>{
try{
//
Integer.parseInt(t._2)
false
}
catch {
case _ => true
}
})
9. 标1成对
//6. 标1成对
val rdd6 = rdd5.map(t=>{
(t,1)
})
10. 聚合
//7. 聚合
val rdd7 = rdd6.reduceByKey(_+_)
11. 重组
//8. 重组
val rdd8 = rdd7.map(t=>{
(t._1._1,(t._1._2 , t._2)::Nil)
})
12. reduceByKey
//9. reduceByKey
val rdd9 =rdd8.reduceByKey(_ ::: _)
13. 分组内排序
//10. 分组内排序
val rdd10=rdd9.mapValues(list=>{
list.sortBy(t=>{
-t._2
}).take(5)
})
14. 商家间排序
//11. 商家间排序
val rdd11= rdd10.sortBy(t=>{
t._2(0)._2
} ,false)
15. collect
rdd11.collect()
16. 查看 Web UI
http://s101:8080/
17. DAG 视图