diff --git a/out/artifacts/piflow_jar/piflow_jar.jar b/out/artifacts/piflow_jar/piflow_jar.jar new file mode 100644 index 0000000..b22682f Binary files /dev/null and b/out/artifacts/piflow_jar/piflow_jar.jar differ diff --git a/piflow-bundle/pom.xml b/piflow-bundle/pom.xml index 9635a7f..da49213 100644 --- a/piflow-bundle/pom.xml +++ b/piflow-bundle/pom.xml @@ -110,6 +110,11 @@ elasticsearch-spark-20_2.11 5.6.3 + + org.apache.spark + spark-mllib_2.11 + 2.1.0 + diff --git a/piflow-bundle/src/main/resources/bayes.json b/piflow-bundle/src/main/resources/bayes.json new file mode 100644 index 0000000..20a900d --- /dev/null +++ b/piflow-bundle/src/main/resources/bayes.json @@ -0,0 +1,38 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"0000", + "name":"NaiveBayesTraining", + "bundle":"cn.piflow.bundle.ml_classification.NaiveBayesTraining", + "properties":{ + "training_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt", + "smoothing_value":"1.0", + "model_save_path":"hdfs://10.0.86.89:9000/xx/naivebayes/nb.model" + } + + }, + { + "uuid":"1111", + "name":"NaiveBayesPrediction", + "bundle":"cn.piflow.bundle.ml_classification.NaiveBayesPrediction", + "properties":{ + "test_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt", + "model_path":"hdfs://10.0.86.89:9000/xx/naivebayes/nb.model" + } + + } + + ], + "paths":[ + { + "from":"NaiveBayesTraining", + "outport":"", + "inport":"", + "to":"NaiveBayesPrediction" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/logistic.json b/piflow-bundle/src/main/resources/logistic.json new file mode 100644 index 0000000..a35b1e1 --- /dev/null +++ b/piflow-bundle/src/main/resources/logistic.json @@ -0,0 +1,43 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "stops":[ + { + "uuid":"0000", + "name":"LogisticRegressionTraining", + "bundle":"cn.piflow.bundle.ml_classification.LogisticRegressionTraining", + "properties":{ + "training_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt", + "model_save_path":"hdfs://10.0.86.89:9000/xx/naivebayes/lr.model", + "maxIter":"50", + "minTol":"1E-7", + "regParam":"0.1", + "elasticNetParam":"0.1", + "threshold":"0.5", + "family":"" + } + + }, + { + "uuid":"1111", + "name":"LogisticRegressionPrediction", + "bundle":"cn.piflow.bundle.ml_classification.LogisticRegressionPrediction", + "properties":{ + "test_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt", + "model_path":"hdfs://10.0.86.89:9000/xx/naivebayes/lr.model" + } + + } + + ], + "paths":[ + { + "from":"LogisticRegressionTraining", + "outport":"", + "inport":"", + "to":"LogisticRegressionPrediction" + } + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/LogisticRegressionPrediction.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/LogisticRegressionPrediction.scala new file mode 100644 index 0000000..4375487 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/LogisticRegressionPrediction.scala @@ -0,0 +1,59 @@ +package cn.piflow.bundle.ml_classification + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.MapUtil +import cn.piflow.conf.{ConfigurableStop, StopGroupEnum} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.spark.ml.classification.LogisticRegressionModel +import org.apache.spark.sql.SparkSession + +class LogisticRegressionPrediction extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" + val description: String = "Make use of a exist LogisticRegressionModel to predict." + val inportCount: Int = 1 + val outportCount: Int = 0 + var test_data_path:String =_ + var model_path:String=_ + + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + //load data stored in libsvm format as a dataframe + val data=spark.read.format("libsvm").load(test_data_path) + //data.show() + + //load model + val model=LogisticRegressionModel.load(model_path) + + val predictions=model.transform(data) + predictions.show() + out.write(predictions) + + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + + def setProperties(map: Map[String, Any]): Unit = { + test_data_path=MapUtil.get(map,key="test_data_path").asInstanceOf[String] + model_path=MapUtil.get(map,key="model_path").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val test_data_path = new PropertyDescriptor().name("test_data_path").displayName("TEST_DATA_PATH").defaultValue("").required(true) + val model_path = new PropertyDescriptor().name("model_path").displayName("MODEL_PATH").defaultValue("").required(true) + descriptor = test_data_path :: descriptor + descriptor = model_path :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = ??? + + override def getGroup(): List[String] = { + List(StopGroupEnum.MLGroup.toString) + } + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/LogisticRegressionTraining.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/LogisticRegressionTraining.scala new file mode 100644 index 0000000..a5b676a --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/LogisticRegressionTraining.scala @@ -0,0 +1,131 @@ +package cn.piflow.bundle.ml_classification + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.MapUtil +import cn.piflow.conf.{ConfigurableStop, StopGroupEnum} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.ml.classification.LogisticRegression + +class LogisticRegressionTraining extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" + val description: String = "Training a LogisticRegressionModel." + val inportCount: Int = 1 + val outportCount: Int = 0 + var training_data_path:String =_ + var model_save_path:String=_ + var maxIter:String=_ + var minTol:String=_ + var regParam:String=_ + var elasticNetParam:String=_ + var threshold:String=_ + var family:String=_ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + + //load data stored in libsvm format as a dataframe + val data=spark.read.format("libsvm").load(training_data_path) + + //Param for maximum number of iterations (>= 0) + var maxIterValue:Int=50 + if(maxIter!=""){ + maxIterValue=maxIter.toInt + } + + //Param for the convergence tolerance for iterative algorithms (>= 0) + var minTolValue:Double=1E-6 + if(minTol!=""){ + minTolValue=minTol.toDouble + } + + //Param for regularization parameter (>= 0). + var regParamValue:Double=0.2 + if(regParam!=""){ + regParamValue=regParam.toDouble + } + + //Param for the ElasticNet mixing parameter, in range [0, 1]. + var elasticNetParamValue:Double=0 + if(elasticNetParam!=""){ + elasticNetParamValue=elasticNetParam.toDouble + } + + //Param for threshold in binary classification prediction, in range [0, 1] + var thresholdValue:Double=0.5 + if(threshold!=""){ + thresholdValue=threshold.toDouble + } + + //Param for the name of family which is a description of the label distribution to be used in the model + var familyValue="auto" + if(family!=""){ + familyValue=family + } + + //training a Logistic Regression model + val model=new LogisticRegression() + .setMaxIter(maxIterValue) + .setTol(minTolValue) + .setElasticNetParam(regParamValue) + .setElasticNetParam(elasticNetParamValue) + .setThreshold(thresholdValue) + .setFamily(familyValue) + .fit(data) + + //model persistence + model.save(model_save_path) + + import spark.implicits._ + val dfOut=Seq(model_save_path).toDF + dfOut.show() + out.write(dfOut) + + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + + def setProperties(map: Map[String, Any]): Unit = { + training_data_path=MapUtil.get(map,key="training_data_path").asInstanceOf[String] + model_save_path=MapUtil.get(map,key="model_save_path").asInstanceOf[String] + maxIter=MapUtil.get(map,key="maxIter").asInstanceOf[String] + minTol=MapUtil.get(map,key="minTol").asInstanceOf[String] + regParam=MapUtil.get(map,key="regParam").asInstanceOf[String] + elasticNetParam=MapUtil.get(map,key="elasticNetParam").asInstanceOf[String] + threshold=MapUtil.get(map,key="threshold").asInstanceOf[String] + family=MapUtil.get(map,key="family").asInstanceOf[String] + + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val training_data_path = new PropertyDescriptor().name("training_data_path").displayName("TRAINING_DATA_PATH").defaultValue("").required(true) + val model_save_path = new PropertyDescriptor().name("model_save_path").displayName("MODEL_SAVE_PATH").description("ddd").defaultValue("").required(true) + val maxIter=new PropertyDescriptor().name("maxIter").displayName("MAX_ITER").description("ddd").defaultValue("").required(true) + val minTol=new PropertyDescriptor().name("minTol").displayName("MIN_TOL").description("ddd").defaultValue("").required(true) + val regParam=new PropertyDescriptor().name("regParam").displayName("REG_PARAM").description("ddd").defaultValue("").required(true) + val elasticNetParam=new PropertyDescriptor().name("elasticNetParam").displayName("ELASTIC_NET_PARAM").description("ddd").defaultValue("").required(true) + val threshold=new PropertyDescriptor().name("threshold").displayName("THRESHOLD").description("ddd").defaultValue("").required(true) + val family=new PropertyDescriptor().name("family").displayName("FAMILY").description("ddd").defaultValue("").required(true) + descriptor = training_data_path :: descriptor + descriptor = model_save_path :: descriptor + descriptor = maxIter :: descriptor + descriptor = minTol :: descriptor + descriptor = regParam :: descriptor + descriptor = elasticNetParam :: descriptor + descriptor = threshold :: descriptor + descriptor = family :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = ??? + + override def getGroup(): List[String] = { + List(StopGroupEnum.MLGroup.toString) + } + + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/NaiveBayesPrediction.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/NaiveBayesPrediction.scala new file mode 100644 index 0000000..70c5bab --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/NaiveBayesPrediction.scala @@ -0,0 +1,60 @@ +package cn.piflow.bundle.ml_classification + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.MapUtil +import cn.piflow.conf.{ConfigurableStop, StopGroupEnum} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.spark.ml.classification.NaiveBayesModel +import org.apache.spark.sql.SparkSession + +class NaiveBayesPrediction extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" + val description: String = "Make use of a exist NaiveBayesModel to predict." + val inportCount: Int = 1 + val outportCount: Int = 0 + var test_data_path:String =_ + var model_path:String=_ + + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + //load data stored in libsvm format as a dataframe + val data=spark.read.format("libsvm").load(test_data_path) + //data.show() + + //load model + val model=NaiveBayesModel.load(model_path) + + val predictions=model.transform(data) + predictions.show() + out.write(predictions) + + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + + def setProperties(map: Map[String, Any]): Unit = { + test_data_path=MapUtil.get(map,key="test_data_path").asInstanceOf[String] + model_path=MapUtil.get(map,key="model_path").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val test_data_path = new PropertyDescriptor().name("test_data_path").displayName("TEST_DATA_PATH").defaultValue("").required(true) + val model_path = new PropertyDescriptor().name("model_path").displayName("MODEL_PATH").defaultValue("").required(true) + descriptor = test_data_path :: descriptor + descriptor = model_path :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = ??? + + override def getGroup(): List[String] = { + List(StopGroupEnum.MLGroup.toString) + } + + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/NaiveBayesTraining.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/NaiveBayesTraining.scala new file mode 100644 index 0000000..5504223 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/ml_classification/NaiveBayesTraining.scala @@ -0,0 +1,73 @@ +package cn.piflow.bundle.ml_classification + +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.MapUtil +import cn.piflow.conf.{ConfigurableStop, StopGroupEnum} +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.spark.ml.classification.NaiveBayes +import org.apache.spark.sql.SparkSession + +class NaiveBayesTraining extends ConfigurableStop{ + val authorEmail: String = "xiaoxiao@cnic.cn" + val description: String = "Training a NaiveBayesModel." + val inportCount: Int = 1 + val outportCount: Int = 0 + var training_data_path:String =_ + var smoothing_value:String=_ + var model_save_path:String=_ + + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val spark = pec.get[SparkSession]() + + //load data stored in libsvm format as a dataframe + val data=spark.read.format("libsvm").load(training_data_path) + + //get smoothing factor + var smoothing_factor:Double=0 + if(smoothing_value!=""){ + smoothing_factor=smoothing_value.toDouble + } + + //training a NaiveBayes model + val model=new NaiveBayes().setSmoothing(smoothing_factor).fit(data) + + //model persistence + model.save(model_save_path) + + import spark.implicits._ + val dfOut=Seq(model_save_path).toDF + dfOut.show() + out.write(dfOut) + + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + + def setProperties(map: Map[String, Any]): Unit = { + training_data_path=MapUtil.get(map,key="training_data_path").asInstanceOf[String] + smoothing_value=MapUtil.get(map,key="smoothing_value").asInstanceOf[String] + model_save_path=MapUtil.get(map,key="model_save_path").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = List() + val training_data_path = new PropertyDescriptor().name("training_data_path").displayName("TRAINING_DATA_PATH").defaultValue("").required(true) + val smoothing_value = new PropertyDescriptor().name("smoothing_value").displayName("SMOOTHING_FACTOR").defaultValue("0").required(false) + val model_save_path = new PropertyDescriptor().name("model_save_path").displayName("MODEL_SAVE_PATH").defaultValue("").required(true) + descriptor = training_data_path :: descriptor + descriptor = smoothing_value :: descriptor + descriptor = model_save_path :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = ??? + + override def getGroup(): List[String] = { + List(StopGroupEnum.MLGroup.toString) + } + +} diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala index b0ec4cd..e157863 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/StopGroupEnum.scala @@ -18,5 +18,6 @@ object StopGroupEnum extends Enumeration { val RedisGroup = Value("RedisGroup") val SolrGroup = Value("SolrGroup") val ESGroup = Value("ESGroup") + val MLGroup=Value("MLGroup") } diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala index 0309ef1..e3899b2 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/FlowTest_XX.scala @@ -14,7 +14,7 @@ class FlowTest_XX { def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow.json" + val file = "src/main/resources/logistic.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -30,7 +30,7 @@ class FlowTest_XX { .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar") + .config("spark.jars","/root/xx/piflow/out/artifacts/piflow_jar/piflow_jar.jar") .enableHiveSupport() .getOrCreate() @@ -49,7 +49,7 @@ class FlowTest_XX { def testFlow2json() = { //parse flow json - val file = "src/main/resources/flow.json" + val file = "src/main/resources/logistic.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]