From 84768e8abe20c75bfeae529ff54ef6cbbe4ed736 Mon Sep 17 00:00:00 2001 From: xiaoxiao Date: Fri, 26 Oct 2018 11:21:22 +0800 Subject: [PATCH] add Word2Vec stop --- piflow-bundle/config.properties | 22 +++ .../src/main/resources/word2vec.json | 42 ++++++ .../piflow/bundle/ml_feature/WordToVec.scala | 133 ++++++++++++++++++ .../scala/cn/piflow/bundle/FlowTest_XX.scala | 4 +- 4 files changed, 199 insertions(+), 2 deletions(-) create mode 100644 piflow-bundle/config.properties create mode 100644 piflow-bundle/src/main/resources/word2vec.json create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/ml_feature/WordToVec.scala diff --git a/piflow-bundle/config.properties b/piflow-bundle/config.properties new file mode 100644 index 0000000..145678e --- /dev/null +++ b/piflow-bundle/config.properties @@ -0,0 +1,22 @@ +server.ip=10.0.86.98 +server.port=8001 + +#spark.master=spark://10.0.86.89:7077 +#spark.master=spark://10.0.86.191:7077 +spark.master=yarn +spark.deploy.mode=cluster +yarn.resourcemanager.hostname=10.0.86.191 +yarn.resourcemanager.address=10.0.86.191:8032 +yarn.access.namenode=hdfs://10.0.86.191:9000 +yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/ +yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar + +hive.metastore.uris=thrift://10.0.86.191:9083 + +#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar +piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar + +yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/ +checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/ + +h2.port=50001 \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/word2vec.json b/piflow-bundle/src/main/resources/word2vec.json new file mode 100644 index 0000000..1626707 --- /dev/null +++ b/piflow-bundle/src/main/resources/word2vec.json @@ -0,0 +1,42 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"0000", + "name":"SelectHiveQL", + "bundle":"cn.piflow.bundle.hive.SelectHiveQL", + "properties":{ + "hiveQL":"select * from sparktest.word2vec" + } + + }, + { + "uuid":"1111", + "name":"WordToVec", + "bundle":"cn.piflow.bundle.ml_feature.WordToVec", + "properties":{ + "maxIter":"", + "maxSentenceLength":"", + "minCount":"", + "numPartitions":"", + "stepSize":"", + "vectorSize":"", + "colName":"doc", + "outputCol":"result" + } + + } + + ], + "paths":[ + { + "from":"SelectHiveQL", + "outport":"", + "inport":"", + "to":"WordToVec" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_feature/WordToVec.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_feature/WordToVec.scala new file mode 100644 index 0000000..883cb24 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_feature/WordToVec.scala @@ -0,0 +1,133 @@ +package cn.piflow.bundle.ml_feature + +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum} +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.{ImageUtil, MapUtil} +import org.apache.spark.ml.feature.Word2Vec +import org.apache.spark.ml.feature.Word2VecModel +import org.apache.spark.sql.SparkSession + +class WordToVec extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" + val description: String = "transfer word to vector" + val inportList: List[String] = List(PortEnum.DefaultPort.toString) + val outportList: List[String] = List(PortEnum.DefaultPort.toString) + var maxIter:String=_ + var maxSentenceLength:String=_ + var minCount:String=_ + var numPartitions:String=_ + var stepSize:String=_ + var vectorSize:String=_ + var colName:String=_ + var outputCol:String=_ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + val sqlContext=spark.sqlContext + val df=in.read() + df.createOrReplaceTempView("doc") + sqlContext.udf.register("split",(str:String)=>str.split(" ")) + val sqlText:String="select split("+colName+") as "+colName+"_new from doc" + val dfNew=sqlContext.sql(sqlText) + dfNew.show() + + //Param for maximum number of iterations (>= 0) + var maxIterValue:Int=50 + if(maxIter!=""){ + maxIterValue=maxIter.toInt + } + + //Sets the maximum length (in words) of each sentence in the input data. Any sentence longer than this threshold will be divided into chunks of up to maxSentenceLength size. Default: 1000 + var maxSentenceLengthValue:Int=1000 + if(maxSentenceLength!=""){ + maxSentenceLengthValue=maxSentenceLength.toInt + } + + //The minimum number of times a token must appear to be included in the word2vec model's vocabulary. Default: 5 + var minCountValue:Int=1 + if(minCount!=""){ + minCountValue=minCount.toInt + } + + var numPartitionsValue:Int=1 + if(numPartitions!=""){ + numPartitionsValue=numPartitions.toInt + } + + //Param for Step size to be used for each iteration of optimization (> 0). + var stepSizeValue:Int=5 + if(stepSize!=""){ + stepSizeValue=stepSize.toInt + } + + //The dimension of the code that you want to transform from words. Default: 100 + var vectorSizeValue:Int=5 + if(vectorSize!=""){ + vectorSizeValue=vectorSize.toInt + } + + //clustering with kmeans algorithm + val word2vec=new Word2Vec() + .setMaxIter(maxIterValue) + .setMaxSentenceLength(maxSentenceLengthValue) + .setMinCount(minCountValue) + .setNumPartitions(numPartitionsValue) + .setStepSize(stepSizeValue) + .setVectorSize(vectorSizeValue) + .setInputCol(colName+"_new") + .setOutputCol(outputCol) + .fit(dfNew) + + import spark.implicits._ + val dfOut=word2vec.transform(dfNew) + dfOut.show() + out.write(dfOut) + + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + + def setProperties(map: Map[String, Any]): Unit = { + maxIter=MapUtil.get(map,key="maxIter").asInstanceOf[String] + vectorSize=MapUtil.get(map,key="vectorSize").toString + maxSentenceLength=MapUtil.get(map,key="maxSentenceLength").toString + minCount=MapUtil.get(map,key="minCount").toString + numPartitions=MapUtil.get(map,key="numPartitions").toString + stepSize=MapUtil.get(map,key="stepSize").toString + colName=MapUtil.get(map,key="colName").asInstanceOf[String] + outputCol=MapUtil.get(map,key="outputCol").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val vectorSize = new PropertyDescriptor().name("vectorSize").displayName("VECTOR_SIZE").defaultValue("").required(false) + val maxSentenceLength = new PropertyDescriptor().name("maxSentenceLength").displayName("MAX_SENTENCE_LENGTH").description("").defaultValue("").required(false) + val maxIter=new PropertyDescriptor().name("maxIter").displayName("MAX_ITER").description("").defaultValue("").required(false) + val minCount=new PropertyDescriptor().name("minCount").displayName("MIN_COUNT").description("").defaultValue("").required(false) + val stepSize=new PropertyDescriptor().name("stepSize").displayName("STEP_SIZE").defaultValue("").required(false) + val numPartitions=new PropertyDescriptor().name("numPartitions").displayName("NUM_PARTITIONS").description("").defaultValue("").required(false) + val colName=new PropertyDescriptor().name("colName").displayName("INPUT_COL").description("").defaultValue("").required(true) + val outputCol=new PropertyDescriptor().name("outputCol").displayName("OUTPUT_COL").description("").defaultValue("").required(true) + descriptor = vectorSize :: descriptor + descriptor = maxSentenceLength :: descriptor + descriptor = maxIter :: descriptor + descriptor = minCount :: descriptor + descriptor = stepSize :: descriptor + descriptor = numPartitions :: descriptor + descriptor = colName :: descriptor + descriptor = outputCol :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = { + ImageUtil.getImage("mllib.png") + } + + override def getGroup(): List[String] = { + List(StopGroupEnum.MLGroup.toString) + } +} diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala index 0fe77a1..01c1d2e 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala @@ -14,7 +14,7 @@ class FlowTest_XX { def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/bikmeans.json" + val file = "src/main/resources/word2vec.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -49,7 +49,7 @@ class FlowTest_XX { def testFlow2json() = { //parse flow json - val file = "src/main/resources/bikmeans.json" + val file = "src/main/resources/word2vec.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]