add Word2Vec stop

This commit is contained in:
xiaoxiao 2018-10-26 11:21:22 +08:00
parent 0af6b10fb3
commit 84768e8abe
4 changed files with 199 additions and 2 deletions

View File

@ -0,0 +1,22 @@
server.ip=10.0.86.98
server.port=8001
#spark.master=spark://10.0.86.89:7077
#spark.master=spark://10.0.86.191:7077
spark.master=yarn
spark.deploy.mode=cluster
yarn.resourcemanager.hostname=10.0.86.191
yarn.resourcemanager.address=10.0.86.191:8032
yarn.access.namenode=hdfs://10.0.86.191:9000
yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/
yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
hive.metastore.uris=thrift://10.0.86.191:9083
#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/
h2.port=50001

View File

@ -0,0 +1,42 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from sparktest.word2vec"
}
},
{
"uuid":"1111",
"name":"WordToVec",
"bundle":"cn.piflow.bundle.ml_feature.WordToVec",
"properties":{
"maxIter":"",
"maxSentenceLength":"",
"minCount":"",
"numPartitions":"",
"stepSize":"",
"vectorSize":"",
"colName":"doc",
"outputCol":"result"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"WordToVec"
}
]
}
}

View File

@ -0,0 +1,133 @@
package cn.piflow.bundle.ml_feature
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.feature.Word2VecModel
import org.apache.spark.sql.SparkSession
class WordToVec extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val description: String = "transfer word to vector"
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var maxIter:String=_
var maxSentenceLength:String=_
var minCount:String=_
var numPartitions:String=_
var stepSize:String=_
var vectorSize:String=_
var colName:String=_
var outputCol:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
val sqlContext=spark.sqlContext
val df=in.read()
df.createOrReplaceTempView("doc")
sqlContext.udf.register("split",(str:String)=>str.split(" "))
val sqlText:String="select split("+colName+") as "+colName+"_new from doc"
val dfNew=sqlContext.sql(sqlText)
dfNew.show()
//Param for maximum number of iterations (>= 0)
var maxIterValue:Int=50
if(maxIter!=""){
maxIterValue=maxIter.toInt
}
//Sets the maximum length (in words) of each sentence in the input data. Any sentence longer than this threshold will be divided into chunks of up to maxSentenceLength size. Default: 1000
var maxSentenceLengthValue:Int=1000
if(maxSentenceLength!=""){
maxSentenceLengthValue=maxSentenceLength.toInt
}
//The minimum number of times a token must appear to be included in the word2vec model's vocabulary. Default: 5
var minCountValue:Int=1
if(minCount!=""){
minCountValue=minCount.toInt
}
var numPartitionsValue:Int=1
if(numPartitions!=""){
numPartitionsValue=numPartitions.toInt
}
//Param for Step size to be used for each iteration of optimization (> 0).
var stepSizeValue:Int=5
if(stepSize!=""){
stepSizeValue=stepSize.toInt
}
//The dimension of the code that you want to transform from words. Default: 100
var vectorSizeValue:Int=5
if(vectorSize!=""){
vectorSizeValue=vectorSize.toInt
}
//clustering with kmeans algorithm
val word2vec=new Word2Vec()
.setMaxIter(maxIterValue)
.setMaxSentenceLength(maxSentenceLengthValue)
.setMinCount(minCountValue)
.setNumPartitions(numPartitionsValue)
.setStepSize(stepSizeValue)
.setVectorSize(vectorSizeValue)
.setInputCol(colName+"_new")
.setOutputCol(outputCol)
.fit(dfNew)
import spark.implicits._
val dfOut=word2vec.transform(dfNew)
dfOut.show()
out.write(dfOut)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
maxIter=MapUtil.get(map,key="maxIter").asInstanceOf[String]
vectorSize=MapUtil.get(map,key="vectorSize").toString
maxSentenceLength=MapUtil.get(map,key="maxSentenceLength").toString
minCount=MapUtil.get(map,key="minCount").toString
numPartitions=MapUtil.get(map,key="numPartitions").toString
stepSize=MapUtil.get(map,key="stepSize").toString
colName=MapUtil.get(map,key="colName").asInstanceOf[String]
outputCol=MapUtil.get(map,key="outputCol").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val vectorSize = new PropertyDescriptor().name("vectorSize").displayName("VECTOR_SIZE").defaultValue("").required(false)
val maxSentenceLength = new PropertyDescriptor().name("maxSentenceLength").displayName("MAX_SENTENCE_LENGTH").description("").defaultValue("").required(false)
val maxIter=new PropertyDescriptor().name("maxIter").displayName("MAX_ITER").description("").defaultValue("").required(false)
val minCount=new PropertyDescriptor().name("minCount").displayName("MIN_COUNT").description("").defaultValue("").required(false)
val stepSize=new PropertyDescriptor().name("stepSize").displayName("STEP_SIZE").defaultValue("").required(false)
val numPartitions=new PropertyDescriptor().name("numPartitions").displayName("NUM_PARTITIONS").description("").defaultValue("").required(false)
val colName=new PropertyDescriptor().name("colName").displayName("INPUT_COL").description("").defaultValue("").required(true)
val outputCol=new PropertyDescriptor().name("outputCol").displayName("OUTPUT_COL").description("").defaultValue("").required(true)
descriptor = vectorSize :: descriptor
descriptor = maxSentenceLength :: descriptor
descriptor = maxIter :: descriptor
descriptor = minCount :: descriptor
descriptor = stepSize :: descriptor
descriptor = numPartitions :: descriptor
descriptor = colName :: descriptor
descriptor = outputCol :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("mllib.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.MLGroup.toString)
}
}

View File

@ -14,7 +14,7 @@ class FlowTest_XX {
def testFlow(): Unit ={ def testFlow(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/bikmeans.json" val file = "src/main/resources/word2vec.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)
@ -49,7 +49,7 @@ class FlowTest_XX {
def testFlow2json() = { def testFlow2json() = {
//parse flow json //parse flow json
val file = "src/main/resources/bikmeans.json" val file = "src/main/resources/word2vec.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]