Merge remote-tracking branch 'origin/master'

This commit is contained in:
yanfqidong0604 2018-10-23 18:30:41 +08:00
commit e3d1dda0e0
7 changed files with 229 additions and 32 deletions

View File

@ -0,0 +1,40 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"stops":[
{
"uuid":"0000",
"name":"KmeansTraining",
"bundle":"cn.piflow.bundle.ml_clustering.KmeansTraining",
"properties":{
"training_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt",
"model_save_path":"hdfs://10.0.86.89:9000/xx/naivebayes/kmeans.model",
"maxIter":"50",
"minTol":"1E-7",
"k":"50"
}
},
{
"uuid":"1111",
"name":"KmeansPrediction",
"bundle":"cn.piflow.bundle.ml_clustering.KmeansPrediction",
"properties":{
"test_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt",
"model_path":"hdfs://10.0.86.89:9000/xx/naivebayes/kmeans.model"
}
}
],
"paths":[
{
"from":"KmeansTraining",
"outport":"",
"inport":"",
"to":"KmeansPrediction"
}
]
}
}

View File

@ -0,0 +1,61 @@
package cn.piflow.bundle.ml_clustering
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.ml.clustering.KMeansModel
import org.apache.spark.sql.SparkSession
class KmeansPrediction extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val description: String = "Make use of a exist KmeansModel to predict."
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var test_data_path:String =_
var model_path:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
//load data stored in libsvm format as a dataframe
val data=spark.read.format("libsvm").load(test_data_path)
//data.show()
//load model
val model=KMeansModel.load(model_path)
val predictions=model.transform(data)
predictions.show()
out.write(predictions)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
test_data_path=MapUtil.get(map,key="test_data_path").asInstanceOf[String]
model_path=MapUtil.get(map,key="model_path").asInstanceOf[String]
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val test_data_path = new PropertyDescriptor().name("test_data_path").displayName("TEST_DATA_PATH").defaultValue("").required(true)
val model_path = new PropertyDescriptor().name("model_path").displayName("MODEL_PATH").defaultValue("").required(true)
descriptor = test_data_path :: descriptor
descriptor = model_path :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("mllib.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.MLGroup.toString)
}
}

View File

@ -0,0 +1,95 @@
package cn.piflow.bundle.ml_clustering
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.sql.SparkSession
class KmeansTraining extends ConfigurableStop{
val authorEmail: String = "xiaoxiao@cnic.cn"
val description: String = "Kmeans clustering."
val inportList: List[String] = List(PortEnum.NonePort.toString)
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
var training_data_path:String =_
var model_save_path:String=_
var maxIter:String=_
var k:Int=_
var minTol:String=_
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val spark = pec.get[SparkSession]()
//load data stored in libsvm format as a dataframe
val data=spark.read.format("libsvm").load(training_data_path)
//Param for maximum number of iterations (>= 0)
var maxIterValue:Int=50
if(maxIter!=""){
maxIterValue=maxIter.toInt
}
//Param for the convergence tolerance for iterative algorithms (>= 0).
var minTolValue:Double=1E-6
if(minTol!=""){
minTolValue=minTol.toDouble
}
//clustering with kmeans algorithm
val model=new KMeans()
.setMaxIter(maxIterValue)
.setTol(minTolValue)
.setK(k)
.fit(data)
//model persistence
model.save(model_save_path)
import spark.implicits._
val dfOut=Seq(model_save_path).toDF
dfOut.show()
out.write(dfOut)
}
def initialize(ctx: ProcessContext): Unit = {
}
def setProperties(map: Map[String, Any]): Unit = {
training_data_path=MapUtil.get(map,key="training_data_path").asInstanceOf[String]
model_save_path=MapUtil.get(map,key="model_save_path").asInstanceOf[String]
maxIter=MapUtil.get(map,key="maxIter").asInstanceOf[String]
minTol=MapUtil.get(map,key="minTol").asInstanceOf[String]
k=Integer.parseInt(MapUtil.get(map,key="k").toString)
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val training_data_path = new PropertyDescriptor().name("training_data_path").displayName("TRAINING_DATA_PATH").defaultValue("").required(true)
val model_save_path = new PropertyDescriptor().name("model_save_path").displayName("MODEL_SAVE_PATH").description("").defaultValue("").required(true)
val maxIter=new PropertyDescriptor().name("maxIter").displayName("MAX_ITER").description("Param for maximum number of iterations (>= 0).").defaultValue("").required(false)
val minTol=new PropertyDescriptor().name("minTol").displayName("MIN_TOL").description("Param for the convergence tolerance for iterative algorithms (>= 0).").defaultValue("").required(false)
val k=new PropertyDescriptor().name("k").displayName("K").description("The number of clusters. ").defaultValue("").required(true)
descriptor = training_data_path :: descriptor
descriptor = model_save_path :: descriptor
descriptor = maxIter :: descriptor
descriptor = minTol :: descriptor
descriptor = k :: descriptor
descriptor
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("mllib.png")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.MLGroup.toString)
}
}

View File

@ -229,7 +229,7 @@ class RdfToDF extends ConfigurableStop{
.map(s => (
get(propertyRegexPattern.matcher(s),"name")
->
get(propertyRegexPattern.matcher(s),"value").replace("\"", "'")
get(propertyRegexPattern.matcher(s),"value").replace("\"", " ")
)
)
.groupBy(i => i._1)
@ -259,6 +259,7 @@ class RdfToDF extends ConfigurableStop{
var sampleSchema : Seq[String] = sampleEntity.schema
sampleSchema +:= IDName
sampleSchema :+= LABELName
val entityDFSList: Seq[StructField] = for {
s <- sampleSchema
p <- sampleEntity.propSeq

View File

@ -7,44 +7,44 @@ import scala.collection.mutable.ListBuffer
class Entity( id : String,
label : String,
prop : Map[String, Any],
prop : Map[String, Array[String]],
val schema : Seq[String]
) extends Serializable {
var propSeq : Array[Any] = {
var ret : ListBuffer[Any] = new ListBuffer[Any]()
var propSeq : Array[AnyRef] = {
var ret : ListBuffer[AnyRef] = new ListBuffer[AnyRef]()
ret +:= id
for (name <- schema) {
val value = {
val l = for (name <- schema) yield {
val value : Array[String] = {
if (prop.contains(name))
prop(name)
else
""
}
value match {
case _: String =>
ret += value.asInstanceOf[String]
case _: Array[String] =>
ret += value.asInstanceOf[Array[String]]
case _ => ret += ""
Array("")
}
val str = value
.map(f => if (f == "") "\"\"" else f)
.map(_.replaceAll(";", " "))
.reduce((a,b) => a + ";" + b)
if (str.contains(";")) str.split(";")
else str
}
ret ++= l
ret += label
ret.toArray
ret.map(s => {
s match {
case str: String =>
if (str.contains(","))
"\"" + s + "\""
else
s
case _ =>
s.asInstanceOf[Array[String]].map(a => if (a.contains(",")) "\"" + a + "\"" else a)
}
}).toArray
}
override def toString: String = {
val l = for {
prop : Any <- propSeq
} yield {
prop match {
case _ : String => prop.asInstanceOf[String]
case _ : Array[String] =>
val temp : String = prop.asInstanceOf[Array[String]].reduce(_ + ";" + _)
temp
case _ : Any => ""
}
}
l.reduce(_ + "," + _)
this.propSeq.reduce((a,b) => a + "," + b)
}
def getEntityRow : Row = {
@ -55,8 +55,8 @@ object Entity{
def main(args: Array[String]): Unit = {
val m : Map[String, Any] = Map("sc1" -> "test1", "sc2" -> Array("2","1"))
val e = new Entity("id1","l1", m, Array("sc1","sc2"))
println(e.toString) //"label1","test1","2;1","id2"
// val m : Map[String, Any] = Map("sc1" -> "test1", "sc2" -> Array("2","1"))
// val e = new Entity("id1","l1", m, Array("sc1","sc2"))
// println(e.toString) //"label1","test1","2;1","id2"
}
}

View File

@ -14,7 +14,7 @@ class FlowTest_XX {
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/gbt.json"
val file = "src/main/resources/kmeans.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
@ -49,7 +49,7 @@ class FlowTest_XX {
def testFlow2json() = {
//parse flow json
val file = "src/main/resources/gbt.json"
val file = "src/main/resources/kmeans.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]