Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
5a3b4d694d
|
@ -0,0 +1,42 @@
|
|||
{
|
||||
"flow":{
|
||||
"name":"test",
|
||||
"uuid":"1234",
|
||||
"stops":[
|
||||
{
|
||||
"uuid":"0000",
|
||||
"name":"LoadGraph",
|
||||
"bundle":"cn.piflow.bundle.graphx.LoadGraph",
|
||||
"properties":{
|
||||
"dataPath":"hdfs://10.0.86.89:9000/xx/graphTest"
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
"uuid":"1111",
|
||||
"name":"LabelPropagation",
|
||||
"bundle":"cn.piflow.bundle.graphx.LabelPropagation",
|
||||
"properties":{
|
||||
"maxIter":"20"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
],
|
||||
"paths":[
|
||||
{
|
||||
"from":"LoadGraph",
|
||||
"outport":"edges",
|
||||
"inport":"edgesIn",
|
||||
"to":"LabelPropagation"
|
||||
},
|
||||
{
|
||||
"from":"LoadGraph",
|
||||
"outport":"vertex",
|
||||
"inport":"vertexIn",
|
||||
"to":"LabelPropagation"
|
||||
}
|
||||
|
||||
]
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
package cn.piflow.bundle.graphx
|
||||
|
||||
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||
import cn.piflow.conf.bean.PropertyDescriptor
|
||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||
import org.apache.spark.graphx._
|
||||
import org.apache.spark.graphx.lib.LabelPropagation
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class LabelPropagation extends ConfigurableStop {
|
||||
|
||||
val authorEmail: String = "06whuxx@163.com"
|
||||
val description: String = "Compute sub graph."
|
||||
var edgePortIn : String = "edgesIn"
|
||||
var vertexPortIn : String = "vertexIn"
|
||||
val inportList: List[String] = List(edgePortIn,vertexPortIn)
|
||||
|
||||
var edgePortOut : String = "edgesOut"
|
||||
var vertexPortOut : String = "vertexOut"
|
||||
val outportList: List[String] = List(edgePortOut,vertexPortOut)
|
||||
|
||||
var maxIter:String = _
|
||||
|
||||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val spark = pec.get[SparkSession]()
|
||||
val sc=spark.sparkContext
|
||||
val edge=in.read(edgePortIn).asInstanceOf[EdgeRDD[Int]]
|
||||
val vertex=in.read(vertexPortIn).asInstanceOf[VertexRDD[Int]]
|
||||
val graph=Graph(vertex,edge)
|
||||
|
||||
var maxIterValue:Int=50
|
||||
if(maxIter!=""){
|
||||
maxIterValue=maxIter.toInt
|
||||
}
|
||||
|
||||
val res=LabelPropagation.run(graph,maxIterValue)
|
||||
|
||||
import spark.sqlContext.implicits._
|
||||
out.write(edgePortOut,res.edges.toDF())
|
||||
out.write(vertexPortOut,res.vertices.toDF())
|
||||
}
|
||||
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
||||
}
|
||||
|
||||
def setProperties(map : Map[String, Any]): Unit = {
|
||||
maxIter = MapUtil.get(map,"maxIter").asInstanceOf[String]
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
val maxIter = new PropertyDescriptor().name("maxIter").displayName("MAX_ITER").defaultValue("").allowableValues(Set("")).required(false)
|
||||
descriptor = maxIter :: descriptor
|
||||
descriptor
|
||||
}
|
||||
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("graphx.jpg")
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroupEnum.GraphXGroup.toString)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package cn.piflow.bundle.graphx
|
||||
|
||||
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||
import cn.piflow.conf.bean.PropertyDescriptor
|
||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
|
||||
class LoadGraph extends ConfigurableStop {
|
||||
|
||||
val authorEmail: String = "06whuxx@163.com"
|
||||
val description: String = "Load data and construct a graph."
|
||||
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||
|
||||
|
||||
var edgePort : String = "edges"
|
||||
var vertexPort : String = "vertex"
|
||||
|
||||
val outportList: List[String] = List(edgePort,vertexPort)
|
||||
|
||||
|
||||
var dataPath:String = _
|
||||
|
||||
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||
val spark = pec.get[SparkSession]()
|
||||
val sc=spark.sparkContext
|
||||
|
||||
import spark.sqlContext.implicits._
|
||||
var graph=GraphLoader.edgeListFile(sc,dataPath,true).partitionBy(PartitionStrategy.RandomVertexCut)
|
||||
//val df=Seq((graph.edges.to,graph.vertices)).toDF()
|
||||
//TODO:can not transfer EdgeRdd to Dataset
|
||||
out.write(edgePort,graph.edges.toDF())
|
||||
out.write(vertexPort,graph.vertices.toDF())
|
||||
|
||||
//df.show()
|
||||
}
|
||||
|
||||
def initialize(ctx: ProcessContext): Unit = {
|
||||
|
||||
}
|
||||
|
||||
def setProperties(map : Map[String, Any]): Unit = {
|
||||
dataPath = MapUtil.get(map,"dataPath").asInstanceOf[String]
|
||||
}
|
||||
|
||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||
var descriptor : List[PropertyDescriptor] = List()
|
||||
val dataPath = new PropertyDescriptor().name("dataPath").displayName("DATA_PATH").defaultValue("").allowableValues(Set("")).required(true)
|
||||
descriptor = dataPath :: descriptor
|
||||
descriptor
|
||||
}
|
||||
|
||||
override def getIcon(): Array[Byte] = {
|
||||
ImageUtil.getImage("graphx.jpg")
|
||||
}
|
||||
|
||||
override def getGroup(): List[String] = {
|
||||
List(StopGroupEnum.GraphXGroup.toString)
|
||||
}
|
||||
|
||||
}
|
|
@ -26,5 +26,6 @@ object StopGroupEnum extends Enumeration {
|
|||
val Spider= Value("Spider")
|
||||
val Mongodb= Value("Mongodb")
|
||||
val Memcache= Value("Memcache")
|
||||
val GraphX=Value("GraphX")
|
||||
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ class FlowTest_XX {
|
|||
def testFlow(): Unit ={
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/gaussion.json"
|
||||
val file = "src/main/resources/labelpropagation.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
@ -52,7 +52,7 @@ class FlowTest_XX {
|
|||
def testFlow2json() = {
|
||||
|
||||
//parse flow json
|
||||
val file = "src/main/resources/gaussion.json"
|
||||
val file = "src/main/resources/labelpropagation.json"
|
||||
val flowJsonStr = FileUtil.fileReader(file)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||
|
||||
|
|
Loading…
Reference in New Issue