容错性高:RDD计算通过checkpoint容错:Checkpoint Data,Logging the Updates
spark on hive
spark streaming,batch微批
Spark 调优
Spark与Broadcast
Broadcast Joins > Shuffle Joins, 提升执行性能
1 2 3 4 5
val blackIp=Set(ip1,ip2...) #sc.broadcast创建广播变量 val blackIpBC=sc.broadcast(blackIp) # 广播变量.value在task内获取广播变量的实际内容 rdd.filter(row=>!blackIpBC.value.contains(row.ip))
def oneHotEncoderExample(samples:DataFrame): Unit ={ //samples样本集中的每一条数据代表一部电影的信息,其中movieId为电影id val samplesWithIdNumber = samples.withColumn("movieIdNumber", col("movieId").cast(sql.types.IntegerType))
//利用Spark的机器学习库Spark MLlib创建One-hot编码器 val oneHotEncoder = new OneHotEncoderEstimator() .setInputCols(Array("movieIdNumber")) .setOutputCols(Array("movieIdVector")) .setDropLast(false)
//训练One-hot编码器,并完成从id特征到One-hot向量的转换 val oneHotEncoderSamples = oneHotEncoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber) //打印最终样本的数据结构 oneHotEncoderSamples.printSchema() //打印10条样本查看结果 oneHotEncoderSamples.show(10)
multi-hot编码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
def multiHotEncoderExample(samples:DataFrame): Unit ={ val samplesWithGenre = samples.select(col("movieId"), col("title"),explode(split(col("genres"), "\\|").cast("array<string>")).as("genre")) val genreIndexer = new StringIndexer().setInputCol("genre").setOutputCol("genreIndex")
val stringIndexerModel : StringIndexerModel = genreIndexer.fit(samplesWithGenre)
val genreIndexSamples = stringIndexerModel.transform(samplesWithGenre) .withColumn("genreIndexInt", col("genreIndex").cast(sql.types.IntegerType))
val indexSize = genreIndexSamples.agg(max(col("genreIndexInt"))).head().getAs[Int](0) + 1
val processedSamples = genreIndexSamples .groupBy(col("movieId")).agg(collect_list("genreIndexInt").as("genreIndexes")) .withColumn("indexSize", typedLit(indexSize))
val finalSample = processedSamples.withColumn("vector", array2vec(col("genreIndexes"),col("indexSize"))) finalSample.printSchema() finalSample.show(10) }
def ratingFeatures(samples:DataFrame): Unit ={ samples.printSchema() samples.show(10)
//利用打分表ratings计算电影的平均分、被打分次数等数值型特征 val movieFeatures = samples.groupBy(col("movieId")) .agg(count(lit(1)).as("ratingCount"), avg(col("rating")).as("avgRating"), variance(col("rating")).as("ratingVar")) .withColumn("avgRatingVec", double2vec(col("avgRating")))
movieFeatures.show(10)
//分桶处理,创建QuantileDiscretizer进行分桶,将打分次数这一特征分到100个桶中 val ratingCountDiscretizer = new QuantileDiscretizer() .setInputCol("ratingCount") .setOutputCol("ratingCountBucket") .setNumBuckets(100)
//归一化处理,创建MinMaxScaler进行归一化,将平均得分进行归一化 val ratingScaler = new MinMaxScaler() .setInputCol("avgRatingVec") .setOutputCol("scaleAvgRating")
//创建一个pipeline,依次执行两个特征处理过程 val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler) val featurePipeline = new Pipeline().setStages(pipelineStage)
val movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures) //打印最终结果 movieProcessedFeatures.show(