0%

Spark 引擎与特征工程

Spark引擎与大数据

分布式计算平台(通过网络通信交换数据)

  • 架构:manager node:调度组织;worker node:调度执行,结果返回给 Driver
  • Stage内:计算节点并行工作:主机、docker container;
    map/filter 需要逐条数据处理,可以并行处理
  • Stage 外:join 需要 shuffle/reduce,消耗资源,shuffle汇总统计结果 -> reduce结果
  • Driver功能:举例:RDD(分布式对象集合)生成DAG(TextFile/HDFS -> map -> filter -> join -> map)
  • 数据流中提供 map、repartition 功能

Spark的三种模式

  • Yarn模式
  • Standalone模式
  • local模式

Spark的特性

  • 内存存储中间结果
  • 读取多种数据源:HDFS、Hbase、Mysql
  • 容错性高:RDD计算通过checkpoint容错:Checkpoint Data,Logging the Updates
  • spark on hive
  • spark streaming,batch微批

Spark 调优

Spark与Broadcast

Broadcast Joins > Shuffle Joins, 提升执行性能

1
2
3
4
5
val blackIp=Set(ip1,ip2...)
#sc.broadcast创建广播变量
val blackIpBC=sc.broadcast(blackIp)
# 广播变量.value在task内获取广播变量的实际内容
rdd.filter(row=>!blackIpBC.value.contains(row.ip))

Spark与累加器

驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue()来访问累加器的值。

- python实现
1
2
3
4
5
6
7
8
9
10
11
12
13
file = sc.textFile(inputFile)
#创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)

def extractCallSigns(Line):
globle blankLines #访问全局变量
if (line == ""):
blankLines += 1
return line.split("")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines:%d" % blankLines.value

Spark处理与特征类型

类别型特征

电影风格、ID、导演、演员、性别、地理位置、性别、天气

数值型特征

点击量、点击率、年龄

onehot编码实操

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def oneHotEncoderExample(samples:DataFrame): Unit ={
//samples样本集中的每一条数据代表一部电影的信息,其中movieId为电影id
val samplesWithIdNumber = samples.withColumn("movieIdNumber", col("movieId").cast(sql.types.IntegerType))


//利用Spark的机器学习库Spark MLlib创建One-hot编码器
val oneHotEncoder = new OneHotEncoderEstimator()
.setInputCols(Array("movieIdNumber"))
.setOutputCols(Array("movieIdVector"))
.setDropLast(false)


//训练One-hot编码器,并完成从id特征到One-hot向量的转换
val oneHotEncoderSamples = oneHotEncoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber)
//打印最终样本的数据结构
oneHotEncoderSamples.printSchema()
//打印10条样本查看结果
oneHotEncoderSamples.show(10)

multi-hot编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def multiHotEncoderExample(samples:DataFrame): Unit ={
val samplesWithGenre = samples.select(col("movieId"), col("title"),explode(split(col("genres"), "\\|").cast("array<string>")).as("genre"))
val genreIndexer = new StringIndexer().setInputCol("genre").setOutputCol("genreIndex")

val stringIndexerModel : StringIndexerModel = genreIndexer.fit(samplesWithGenre)

val genreIndexSamples = stringIndexerModel.transform(samplesWithGenre)
.withColumn("genreIndexInt", col("genreIndex").cast(sql.types.IntegerType))

val indexSize = genreIndexSamples.agg(max(col("genreIndexInt"))).head().getAs[Int](0) + 1

val processedSamples = genreIndexSamples
.groupBy(col("movieId")).agg(collect_list("genreIndexInt").as("genreIndexes"))
.withColumn("indexSize", typedLit(indexSize))

val finalSample = processedSamples.withColumn("vector", array2vec(col("genreIndexes"),col("indexSize")))
finalSample.printSchema()
finalSample.show(10)
}

特征的尺度和特征的分布

  • 归一化:[0,1]
  • 分桶:MinMaxScale、QuantileDiscretizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def ratingFeatures(samples:DataFrame): Unit ={
samples.printSchema()
samples.show(10)


//利用打分表ratings计算电影的平均分、被打分次数等数值型特征
val movieFeatures = samples.groupBy(col("movieId"))
.agg(count(lit(1)).as("ratingCount"),
avg(col("rating")).as("avgRating"),
variance(col("rating")).as("ratingVar"))
.withColumn("avgRatingVec", double2vec(col("avgRating")))


movieFeatures.show(10)


//分桶处理,创建QuantileDiscretizer进行分桶,将打分次数这一特征分到100个桶中
val ratingCountDiscretizer = new QuantileDiscretizer()
.setInputCol("ratingCount")
.setOutputCol("ratingCountBucket")
.setNumBuckets(100)


//归一化处理,创建MinMaxScaler进行归一化,将平均得分进行归一化
val ratingScaler = new MinMaxScaler()
.setInputCol("avgRatingVec")
.setOutputCol("scaleAvgRating")


//创建一个pipeline,依次执行两个特征处理过程
val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler)
val featurePipeline = new Pipeline().setStages(pipelineStage)


val movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
//打印最终结果
movieProcessedFeatures.show(

Spark生成Item2Vec和Graph Embedding

优势大于pytorch、tensorflow,处理大数据方式有优势
Spark处理Embedding

Item2Vec

Item2Vec:处理数据

“文本句子等序列数据”,学习到物品间相似性。

  • 举例
    1、过滤掉评分低电影
    2、评分好的电影靠近一些
    3、评分差的电影和评分好的电影不要在序列中结对出现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def processItemSequence(sparkSession: SparkSession): RDD[Seq[String]] ={
//设定rating数据的路径并用spark载入数据
val ratingsResourcesPath = this.getClass.getResource("/webroot/sampledata/ratings.csv")
val ratingSamples = sparkSession.read.format("csv").option("header", "true").load(ratingsResourcesPath.getPath)


//实现一个用户定义的操作函数(UDF),用于之后的排序
val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => {
rows.map { case Row(movieId: String, timestamp: String) => (movieId, timestamp) }
.sortBy { case (movieId, timestamp) => timestamp }
.map { case (movieId, timestamp) => movieId }
})


//把原始的rating数据处理成序列数据
val userSeq = ratingSamples
.where(col("rating") >= 3.5) //过滤掉评分在3.5一下的评分记录
.groupBy("userId") //按照用户id分组
.agg(sortUdf(collect_list(struct("movieId", "timestamp"))) as "movieIds") //每个用户生成一个序列并用刚才定义好的udf函数按照timestamp排序
.withColumn("movieIdStr", array_join(col("movieIds"), " "))
//把所有id连接成一个String,方便后续word2vec模型处理


//把序列数据筛选出来,丢掉其他过程数据
userSeq.select("movieIdStr").rdd.map(r => r.getAs[String]("movieIdStr").split(" ").toSeq)

Item2Vec:模型训练

设定模型超参数

  • setVectorSize,设定生成Embedding向量的维度
  • setWindowSize,设定序列数据上采样的滑动窗口大小
  • setNumInterations,设定训练时的迭代次数
  • 调用fit接口,并提取出对应embedding响亮

基于随机游走的Graph Embedding方法

Deep Walk实现

  • 1、准备物品之间转移概率矩阵,构建物品关系图,定义了从物品A->物品B跳转概率
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    //samples 输入的观影序列样本集
    def graphEmb(samples : RDD[Seq[String]], sparkSession: SparkSession): Unit ={
    //通过flatMap操作把观影序列打碎成一个个影片对
    val pairSamples = samples.flatMap[String]( sample => {
    var pairSeq = Seq[String]()
    var previousItem:String = null
    sample.foreach((element:String) => {
    if(previousItem != null){
    pairSeq = pairSeq :+ (previousItem + ":" + element)
    }
    previousItem = element
    })
    pairSeq
    })
    //统计影片对的数量,countByValue
    val pairCount = pairSamples.countByValue()
    //转移概率矩阵的双层Map数据结构
    val transferMatrix = scala.collection.mutable.Map[String, scala.collection.mutable.Map[String, Long]]()
    val itemCount = scala.collection.mutable.Map[String, Long]()


    //求取转移概率矩阵
    pairCount.foreach( pair => {
    val pairItems = pair._1.split(":")
    val count = pair._2
    lognumber = lognumber + 1
    println(lognumber, pair._1)


    if (pairItems.length == 2){
    val item1 = pairItems.apply(0)
    val item2 = pairItems.apply(1)
    if(!transferMatrix.contains(pairItems.apply(0))){
    transferMatrix(item1) = scala.collection.mutable.Map[String, Long]()
    }


    transferMatrix(item1)(item2) = count
    itemCount(item1) = itemCount.getOrElse[Long](item1, 0) + count
    }

    随机游走采样

    利用转移概率矩阵生成新的序列样本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//随机游走采样函数
//transferMatrix 转移概率矩阵
//itemCount 物品出现次数的分布
def randomWalk(transferMatrix : scala.collection.mutable.Map[String, scala.collection.mutable.Map[String, Long]], itemCount : scala.collection.mutable.Map[String, Long]): Seq[Seq[String]] ={
//样本的数量
val sampleCount = 20000
//每个样本的长度
val sampleLength = 10
val samples = scala.collection.mutable.ListBuffer[Seq[String]]()

//物品出现的总次数
var itemTotalCount:Long = 0
for ((k,v) <- itemCount) itemTotalCount += v


//随机游走sampleCount次,生成sampleCount个序列样本
for( w <- 1 to sampleCount) {
samples.append(oneRandomWalk(transferMatrix, itemCount, itemTotalCount, sampleLength))
}


Seq(samples.toList : _*)
}


//通过随机游走产生一个样本的过程
//transferMatrix 转移概率矩阵
//itemCount 物品出现次数的分布
//itemTotalCount 物品出现总次数
//sampleLength 每个样本的长度
def oneRandomWalk(transferMatrix : scala.collection.mutable.Map[String, scala.collection.mutable.Map[String, Long]], itemCount : scala.collection.mutable.Map[String, Long], itemTotalCount:Long, sampleLength:Int): Seq[String] ={
val sample = scala.collection.mutable.ListBuffer[String]()


//决定起始点
val randomDouble = Random.nextDouble()
var firstElement = ""
var culCount:Long = 0
//根据物品出现的概率,随机决定起始点
breakable { for ((item, count) <- itemCount) {
culCount += count
if (culCount >= randomDouble * itemTotalCount){
firstElement = item
break
}
}}


sample.append(firstElement)
var curElement = firstElement
//通过随机游走产生长度为sampleLength的样本
breakable { for( w <- 1 until sampleLength) {
if (!itemCount.contains(curElement) || !transferMatrix.contains(curElement)){
break
}
//从curElement到下一个跳的转移概率向量
val probDistribution = transferMatrix(curElement)
val curCount = itemCount(curElement)
val randomDouble = Random.nextDouble()
var culCount:Long = 0
//根据转移概率向量随机决定下一跳的物品
breakable { for ((item, count) <- probDistribution) {
culCount += count
if (culCount >= randomDouble * curCount){
curElement = item
break
}
}}
sample.append(curElement)
}}
Seq(sample.toList : _

Welcome to my other publishing channels