forked from opensci/piflow
Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
6ef6f5a9c2
Binary file not shown.
|
@ -5,13 +5,16 @@
|
||||||
"stops":[
|
"stops":[
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
{
|
{
|
||||||
"uuid":"1111",
|
"uuid":"1111",
|
||||||
"name":"FolderJsonParser",
|
"name":"FolderJsonParser",
|
||||||
"bundle":"cn.piflow.bundle.json.FolderJsonParser",
|
"bundle":"cn.piflow.bundle.json.FolderJsonParser",
|
||||||
"properties":{
|
"properties":{
|
||||||
"FolderPath":"hdfs://10.0.86.89:9000/aYQDJson/",
|
"FolderPath":"hdfs://10.0.86.89:9000/aYQDJson/",
|
||||||
"tag":"students"
|
"tag":"students_student"
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
{
|
||||||
|
"flow":{
|
||||||
|
"name":"test",
|
||||||
|
"uuid":"1234",
|
||||||
|
"stops":[
|
||||||
|
|
||||||
|
|
||||||
|
{
|
||||||
|
"uuid":"1111",
|
||||||
|
"name":"MultiFolderJsonParser",
|
||||||
|
"bundle":"cn.piflow.bundle.json.MultiFolderJsonParser",
|
||||||
|
"properties":{
|
||||||
|
"jsonPathes":"hdfs://10.0.86.89:9000/aYQDJson/student.json;hdfs://10.0.86.89:9000/aYQDJson/student01.json;hdfs://10.0.86.89:9000/aYQDJson/student02.json",
|
||||||
|
"tag":"students_student"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uuid":"1111",
|
||||||
|
"name":"JsonSave",
|
||||||
|
"bundle":"cn.piflow.bundle.json.JsonSave",
|
||||||
|
"properties":{
|
||||||
|
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/yqd02.json"
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
],
|
||||||
|
"paths":[
|
||||||
|
{
|
||||||
|
"from":"MultiFolderJsonParser",
|
||||||
|
"outport":"",
|
||||||
|
"inport":"",
|
||||||
|
"to":"JsonSave"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
{
|
||||||
|
"flow":{
|
||||||
|
"name":"test",
|
||||||
|
"uuid":"1234",
|
||||||
|
"stops":[
|
||||||
|
{
|
||||||
|
"uuid":"0000",
|
||||||
|
"name":"GBTTraining",
|
||||||
|
"bundle":"cn.piflow.bundle.ml_classification.GBTTraining",
|
||||||
|
"properties":{
|
||||||
|
"training_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt",
|
||||||
|
"model_save_path":"hdfs://10.0.86.89:9000/xx/naivebayes/gbt.model",
|
||||||
|
"maxBins":"20",
|
||||||
|
"maxDepth":"10",
|
||||||
|
"minInfoGain":"0.1",
|
||||||
|
"minInstancesPerNode":"2",
|
||||||
|
"impurity":"entropy",
|
||||||
|
"subSamplingRate":"0.6",
|
||||||
|
"stepSize":"0.2",
|
||||||
|
"lossType":"logistic"
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uuid":"1111",
|
||||||
|
"name":"GBTPrediction",
|
||||||
|
"bundle":"cn.piflow.bundle.ml_classification.GBTPrediction",
|
||||||
|
"properties":{
|
||||||
|
"test_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt",
|
||||||
|
"model_path":"hdfs://10.0.86.89:9000/xx/naivebayes/gbt.model"
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
],
|
||||||
|
"paths":[
|
||||||
|
{
|
||||||
|
"from":"GBTTraining",
|
||||||
|
"outport":"",
|
||||||
|
"inport":"",
|
||||||
|
"to":"GBTPrediction"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
{
|
||||||
|
"flow":{
|
||||||
|
"name":"test",
|
||||||
|
"uuid":"1234",
|
||||||
|
"stops":[
|
||||||
|
{
|
||||||
|
"uuid":"0000",
|
||||||
|
"name":"MultilayerPerceptronTraining",
|
||||||
|
"bundle":"cn.piflow.bundle.ml_classification.MultilayerPerceptronTraining",
|
||||||
|
"properties":{
|
||||||
|
"training_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt",
|
||||||
|
"model_save_path":"hdfs://10.0.86.89:9000/xx/naivebayes/mlp.model",
|
||||||
|
"maxIter":"50",
|
||||||
|
"minTol":"1E-7",
|
||||||
|
"stepSize":"0.1",
|
||||||
|
"layers":"(6,5,3,2)",
|
||||||
|
"thresholds":"(0.6,0.4)"
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uuid":"1111",
|
||||||
|
"name":"MultilayerPerceptronPrediction",
|
||||||
|
"bundle":"cn.piflow.bundle.ml_classification.MultilayerPerceptronPrediction",
|
||||||
|
"properties":{
|
||||||
|
"test_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt",
|
||||||
|
"model_path":"hdfs://10.0.86.89:9000/xx/naivebayes/mlp.model"
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
],
|
||||||
|
"paths":[
|
||||||
|
{
|
||||||
|
"from":"MultilayerPerceptronTraining",
|
||||||
|
"outport":"",
|
||||||
|
"inport":"",
|
||||||
|
"to":"MultilayerPerceptronPrediction"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
{
|
||||||
|
"flow":{
|
||||||
|
"name":"test",
|
||||||
|
"uuid":"1234",
|
||||||
|
"stops":[
|
||||||
|
{
|
||||||
|
"uuid":"0000",
|
||||||
|
"name":"RandomForestTraining",
|
||||||
|
"bundle":"cn.piflow.bundle.ml_classification.RandomForestTraining",
|
||||||
|
"properties":{
|
||||||
|
"training_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt",
|
||||||
|
"model_save_path":"hdfs://10.0.86.89:9000/xx/naivebayes/rf.model",
|
||||||
|
"maxBins":"20",
|
||||||
|
"maxDepth":"10",
|
||||||
|
"minInfoGain":"0.1",
|
||||||
|
"minInstancesPerNode":"2",
|
||||||
|
"impurity":"entropy",
|
||||||
|
"subSamplingRate":"0.6",
|
||||||
|
"featureSubsetStrategy":"auto",
|
||||||
|
"numTrees":"10"
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uuid":"1111",
|
||||||
|
"name":"RandomForestPrediction",
|
||||||
|
"bundle":"cn.piflow.bundle.ml_classification.RandomForestPrediction",
|
||||||
|
"properties":{
|
||||||
|
"test_data_path":"hdfs://10.0.86.89:9000/xx/watermellonDataset.txt",
|
||||||
|
"model_path":"hdfs://10.0.86.89:9000/xx/naivebayes/rf.model"
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
],
|
||||||
|
"paths":[
|
||||||
|
{
|
||||||
|
"from":"RandomForestTraining",
|
||||||
|
"outport":"",
|
||||||
|
"inport":"",
|
||||||
|
"to":"RandomForestPrediction"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,16 +3,21 @@ package cn.piflow.bundle.json
|
||||||
|
|
||||||
import java.net.URI
|
import java.net.URI
|
||||||
|
|
||||||
|
import cn.piflow.bundle.util.JsonUtil
|
||||||
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
import cn.piflow.conf.bean.PropertyDescriptor
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.{FileStatus, FileSystem, FileUtil, Path}
|
import org.apache.hadoop.fs.{FileStatus, FileSystem, FileUtil, Path}
|
||||||
import org.apache.spark.sql.{DataFrame, SparkSession}
|
import org.apache.spark.sql._
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.util.control.Breaks.{break, breakable}
|
import scala.util.control.Breaks.{break, breakable}
|
||||||
|
import org.apache.spark.sql.{DataFrame, SQLContext}
|
||||||
|
import org.apache.spark.sql.functions._
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class FolderJsonParser extends ConfigurableStop{
|
class FolderJsonParser extends ConfigurableStop{
|
||||||
override val authorEmail: String = "yangqidong@cnic.cn"
|
override val authorEmail: String = "yangqidong@cnic.cn"
|
||||||
|
@ -24,21 +29,37 @@ class FolderJsonParser extends ConfigurableStop{
|
||||||
var FolderPath:String = _
|
var FolderPath:String = _
|
||||||
var tag : String = _
|
var tag : String = _
|
||||||
|
|
||||||
|
|
||||||
|
var openArrField:String=""
|
||||||
|
var ArrSchame:String=""
|
||||||
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||||
val spark = pec.get[SparkSession]()
|
val spark = pec.get[SparkSession]()
|
||||||
|
val sql: SQLContext = spark.sqlContext
|
||||||
|
|
||||||
val arrPath: ArrayBuffer[String] = getFileName(FolderPath)
|
val arrPath: ArrayBuffer[String] = getFileName(FolderPath)
|
||||||
val FinalDF = getFinalDF(arrPath,spark)
|
var FinalDF: DataFrame = getFinalDF(arrPath,sql)
|
||||||
out.write(FinalDF)
|
|
||||||
|
if(tag.length>0){
|
||||||
|
val writeDF: DataFrame = JsonUtil.ParserJsonDF(FinalDF,tag)
|
||||||
|
FinalDF=writeDF
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def getDf(Path: String,ss:SparkSession): DataFrame ={
|
println("##########################################################################")
|
||||||
|
FinalDF.printSchema()
|
||||||
|
FinalDF.show(20)
|
||||||
|
println("##########################################################################")
|
||||||
|
out.write(FinalDF)
|
||||||
|
}
|
||||||
|
|
||||||
|
//根据路径获取df
|
||||||
|
def getDf(Path: String,ss:SQLContext): DataFrame ={
|
||||||
val frame: DataFrame = ss.read.json(Path)
|
val frame: DataFrame = ss.read.json(Path)
|
||||||
frame
|
frame
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//遍历路径,获取总df
|
||||||
def getFinalDF(arrPath: ArrayBuffer[String],ss:SparkSession): DataFrame = {
|
def getFinalDF(arrPath: ArrayBuffer[String],ss:SQLContext): DataFrame = {
|
||||||
var index: Int = 0
|
var index: Int = 0
|
||||||
breakable {
|
breakable {
|
||||||
for (i <- 0 until arrPath.length) {
|
for (i <- 0 until arrPath.length) {
|
||||||
|
@ -50,12 +71,13 @@ class FolderJsonParser extends ConfigurableStop{
|
||||||
}
|
}
|
||||||
|
|
||||||
val df01 = ss.read.option("multiline","true").json(arrPath(index))
|
val df01 = ss.read.option("multiline","true").json(arrPath(index))
|
||||||
var df: DataFrame = df01.select(tag)
|
|
||||||
df.printSchema()
|
var aaa:String="name"
|
||||||
|
var df: DataFrame = df01
|
||||||
for(d <- index+1 until(arrPath.length)){
|
for(d <- index+1 until(arrPath.length)){
|
||||||
if(getDf(arrPath(d),ss).count()!=0){
|
if(getDf(arrPath(d),ss).count()!=0){
|
||||||
val df1: DataFrame = ss.read.option("multiline","true").json(arrPath(d)).select(tag)
|
val df1: DataFrame = ss.read.option("multiline","true").json(arrPath(d))
|
||||||
df1.printSchema()
|
// df1.printSchema()
|
||||||
val df2: DataFrame = df.union(df1).toDF()
|
val df2: DataFrame = df.union(df1).toDF()
|
||||||
df=df2
|
df=df2
|
||||||
}
|
}
|
||||||
|
@ -64,7 +86,7 @@ class FolderJsonParser extends ConfigurableStop{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//获取.xml所有文件路径
|
//获取.json所有文件路径
|
||||||
def getFileName(path:String):ArrayBuffer[String]={
|
def getFileName(path:String):ArrayBuffer[String]={
|
||||||
val conf: Configuration = new Configuration()
|
val conf: Configuration = new Configuration()
|
||||||
val hdfs: FileSystem = FileSystem.get(URI.create(path),conf)
|
val hdfs: FileSystem = FileSystem.get(URI.create(path),conf)
|
||||||
|
@ -82,6 +104,7 @@ class FolderJsonParser extends ConfigurableStop{
|
||||||
override def setProperties(map: Map[String, Any]): Unit = {
|
override def setProperties(map: Map[String, Any]): Unit = {
|
||||||
FolderPath = MapUtil.get(map,"FolderPath").asInstanceOf[String]
|
FolderPath = MapUtil.get(map,"FolderPath").asInstanceOf[String]
|
||||||
tag = MapUtil.get(map,"tag").asInstanceOf[String]
|
tag = MapUtil.get(map,"tag").asInstanceOf[String]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -90,8 +113,10 @@ class FolderJsonParser extends ConfigurableStop{
|
||||||
var descriptor : List[PropertyDescriptor] = List()
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
val FolderPath = new PropertyDescriptor().name("FolderPath").displayName("FolderPath").description("The path of the json folder").defaultValue("").required(true)
|
val FolderPath = new PropertyDescriptor().name("FolderPath").displayName("FolderPath").description("The path of the json folder").defaultValue("").required(true)
|
||||||
descriptor = FolderPath :: descriptor
|
descriptor = FolderPath :: descriptor
|
||||||
val tag = new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse").defaultValue("").required(true)
|
val tag = new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse,If you want to open an array field,you have to write it like this:links_name(MasterField_ChildField)").defaultValue("").required(false)
|
||||||
descriptor = tag :: descriptor
|
descriptor = tag :: descriptor
|
||||||
|
|
||||||
|
|
||||||
descriptor
|
descriptor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
package cn.piflow.bundle.json
|
package cn.piflow.bundle.json
|
||||||
|
|
||||||
import cn.piflow._
|
import cn.piflow._
|
||||||
|
import cn.piflow.bundle.util.JsonUtil
|
||||||
import cn.piflow.conf._
|
import cn.piflow.conf._
|
||||||
import cn.piflow.conf.bean.PropertyDescriptor
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||||
import org.apache.spark.sql.SparkSession
|
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||||
|
|
||||||
import scala.beans.BeanProperty
|
import scala.beans.BeanProperty
|
||||||
|
|
||||||
|
@ -22,10 +23,15 @@ class JsonParser extends ConfigurableStop{
|
||||||
|
|
||||||
val spark = pec.get[SparkSession]()
|
val spark = pec.get[SparkSession]()
|
||||||
|
|
||||||
val jsonDF = spark.read.option("multiline","true").json(jsonPath)
|
var jsonDF = spark.read.json(jsonPath)
|
||||||
val jsonDFNew = jsonDF.select(tag)
|
|
||||||
jsonDFNew.printSchema()
|
if(tag.length>0){
|
||||||
jsonDFNew.show(10)
|
val writeDF: DataFrame = JsonUtil.ParserJsonDF(jsonDF,tag)
|
||||||
|
jsonDF=writeDF
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonDF.printSchema()
|
||||||
|
jsonDF.show(10)
|
||||||
out.write(jsonDF)
|
out.write(jsonDF)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +47,7 @@ class JsonParser extends ConfigurableStop{
|
||||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
var descriptor : List[PropertyDescriptor] = List()
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("jsonPath").description("The path of the json file").defaultValue("").required(true)
|
val jsonPath = new PropertyDescriptor().name("jsonPath").displayName("jsonPath").description("The path of the json file").defaultValue("").required(true)
|
||||||
val tag=new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse").defaultValue("").required(true)
|
val tag = new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse,If you want to open an array field,you have to write it like this:links_name(MasterField_ChildField)").defaultValue("").required(false)
|
||||||
descriptor = jsonPath :: descriptor
|
descriptor = jsonPath :: descriptor
|
||||||
descriptor = tag :: descriptor
|
descriptor = tag :: descriptor
|
||||||
descriptor
|
descriptor
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
package cn.piflow.bundle.json
|
||||||
|
|
||||||
|
import cn.piflow.bundle.util.JsonUtil
|
||||||
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||||
|
import org.apache.spark.sql
|
||||||
|
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||||
|
|
||||||
|
import scala.util.control.Breaks.{break, breakable}
|
||||||
|
|
||||||
|
class MultiFolderJsonParser extends ConfigurableStop{
|
||||||
|
val authorEmail: String = "yangqidong@cnic.cn"
|
||||||
|
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||||
|
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
val description: String = "Merge JSON files"
|
||||||
|
|
||||||
|
|
||||||
|
var jsonPathes: String = _
|
||||||
|
var tag : String = _
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||||
|
val ss = pec.get[SparkSession]()
|
||||||
|
|
||||||
|
val arrPath: Array[String] = jsonPathes.split(";")
|
||||||
|
|
||||||
|
var index: Int = 0
|
||||||
|
breakable {
|
||||||
|
for (i <- 0 until arrPath.length) {
|
||||||
|
if (ss.read.json(arrPath(i)).count() != 0) {
|
||||||
|
index = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var FinalDF = ss.read.option("multiline","true").json(arrPath(index))
|
||||||
|
|
||||||
|
for(d <- index+1 until(arrPath.length)){
|
||||||
|
if(ss.read.json(arrPath(d)).count()!=0){
|
||||||
|
val df1: DataFrame = ss.read.option("multiline","true").json(arrPath(d))
|
||||||
|
// df1.printSchema()
|
||||||
|
val df2: DataFrame = FinalDF.union(df1).toDF()
|
||||||
|
FinalDF=df2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(tag.length>0){
|
||||||
|
val writeDF: DataFrame = JsonUtil.ParserJsonDF(FinalDF,tag)
|
||||||
|
FinalDF=writeDF
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
FinalDF.show(10)
|
||||||
|
|
||||||
|
out.write(FinalDF)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def setProperties(map: Map[String, Any]): Unit = {
|
||||||
|
jsonPathes = MapUtil.get(map,"jsonPathes").asInstanceOf[String]
|
||||||
|
tag = MapUtil.get(map,"tag").asInstanceOf[String]
|
||||||
|
}
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
|
val jsonPathes = new PropertyDescriptor().name("jsonPathes").displayName("jsonPathes").description("The path of the json file,the delimiter is ;").defaultValue("").required(true)
|
||||||
|
val tag = new PropertyDescriptor().name("tag").displayName("tag").description("The tag you want to parse,If you want to open an array field,you have to write it like this:links_name(MasterField_ChildField)").defaultValue("").required(false)
|
||||||
|
descriptor = jsonPathes :: descriptor
|
||||||
|
descriptor = tag :: descriptor
|
||||||
|
descriptor
|
||||||
|
}
|
||||||
|
override def getIcon(): Array[Byte] = {
|
||||||
|
ImageUtil.getImage("./src/main/resources/selectHiveQL.jpg")
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = {
|
||||||
|
List(StopGroupEnum.JsonGroup.toString)
|
||||||
|
}
|
||||||
|
override def initialize(ctx: ProcessContext): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package cn.piflow.bundle.ml_classification
|
||||||
|
|
||||||
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
import cn.piflow.conf.util.MapUtil
|
||||||
|
import org.apache.spark.ml.classification.GBTClassificationModel
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
class GBTPrediction extends ConfigurableStop{
|
||||||
|
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||||
|
val description: String = "Make use of a exist GBT Model to predict."
|
||||||
|
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||||
|
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
var test_data_path:String =_
|
||||||
|
var model_path:String=_
|
||||||
|
|
||||||
|
|
||||||
|
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||||
|
val spark = pec.get[SparkSession]()
|
||||||
|
//load data stored in libsvm format as a dataframe
|
||||||
|
val data=spark.read.format("libsvm").load(test_data_path)
|
||||||
|
//data.show()
|
||||||
|
|
||||||
|
//load model
|
||||||
|
val model=GBTClassificationModel.load(model_path)
|
||||||
|
|
||||||
|
val predictions=model.transform(data)
|
||||||
|
predictions.show()
|
||||||
|
out.write(predictions)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def initialize(ctx: ProcessContext): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def setProperties(map: Map[String, Any]): Unit = {
|
||||||
|
test_data_path=MapUtil.get(map,key="test_data_path").asInstanceOf[String]
|
||||||
|
model_path=MapUtil.get(map,key="model_path").asInstanceOf[String]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
|
val test_data_path = new PropertyDescriptor().name("test_data_path").displayName("TEST_DATA_PATH").defaultValue("").required(true)
|
||||||
|
val model_path = new PropertyDescriptor().name("model_path").displayName("MODEL_PATH").defaultValue("").required(true)
|
||||||
|
descriptor = test_data_path :: descriptor
|
||||||
|
descriptor = model_path :: descriptor
|
||||||
|
descriptor
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = {
|
||||||
|
List(StopGroupEnum.MLGroup.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,148 @@
|
||||||
|
package cn.piflow.bundle.ml_classification
|
||||||
|
|
||||||
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
import cn.piflow.conf.util.MapUtil
|
||||||
|
import org.apache.spark.ml.classification.GBTClassifier
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
class GBTTraining extends ConfigurableStop{
|
||||||
|
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||||
|
val description: String = "Training a GBT Model."
|
||||||
|
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||||
|
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
var training_data_path:String =_
|
||||||
|
var model_save_path:String=_
|
||||||
|
var maxBins:String=_
|
||||||
|
var maxDepth:String=_
|
||||||
|
var minInfoGain:String=_
|
||||||
|
var minInstancesPerNode:String=_
|
||||||
|
var impurity:String=_
|
||||||
|
var subSamplingRate:String=_
|
||||||
|
var lossType:String=_
|
||||||
|
var stepSize:String=_
|
||||||
|
|
||||||
|
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||||
|
val spark = pec.get[SparkSession]()
|
||||||
|
|
||||||
|
//load data stored in libsvm format as a dataframe
|
||||||
|
val data=spark.read.format("libsvm").load(training_data_path)
|
||||||
|
|
||||||
|
//Maximum number of bins used for discretizing continuous features and for choosing how to split on features at each node. More bins give higher granularity.Must be >= 2 and >= number of categories in any categorical feature.
|
||||||
|
var maxBinsValue:Int=40
|
||||||
|
if(maxBins!=""){
|
||||||
|
maxBinsValue=maxBins.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
//Maximum depth of the tree (>= 0).The maximum is 30.
|
||||||
|
var maxDepthValue:Int=30
|
||||||
|
if(maxDepth!=""){
|
||||||
|
maxDepthValue=maxDepth.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
//Minimum information gain for a split to be considered at a tree node.
|
||||||
|
var minInfoGainValue:Double=0.2
|
||||||
|
if(minInfoGain!=""){
|
||||||
|
minInfoGainValue=minInfoGain.toDouble
|
||||||
|
}
|
||||||
|
|
||||||
|
//Minimum number of instances each child must have after split.
|
||||||
|
var minInstancesPerNodeValue:Int=3
|
||||||
|
if(minInstancesPerNode!=""){
|
||||||
|
minInstancesPerNodeValue=minInstancesPerNode.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
//Param for the name of family which is a description of the label distribution to be used in the model
|
||||||
|
var impurityValue="gini"
|
||||||
|
if(impurity!=""){
|
||||||
|
impurityValue=impurity
|
||||||
|
}
|
||||||
|
|
||||||
|
var subSamplingRateValue:Double=0.6
|
||||||
|
if(subSamplingRate!=""){
|
||||||
|
subSamplingRateValue=subSamplingRate.toDouble
|
||||||
|
}
|
||||||
|
|
||||||
|
var lossTypeValue="logistic"
|
||||||
|
if(lossType!=""){
|
||||||
|
lossTypeValue=lossType
|
||||||
|
}
|
||||||
|
|
||||||
|
var stepSizeValue:Double=0.1
|
||||||
|
if(stepSize!=""){
|
||||||
|
stepSizeValue=stepSize.toDouble
|
||||||
|
}
|
||||||
|
|
||||||
|
//training a GBT model
|
||||||
|
val model=new GBTClassifier()
|
||||||
|
.setMaxBins(maxBinsValue)
|
||||||
|
.setMaxDepth(maxDepthValue)
|
||||||
|
.setMinInfoGain(minInfoGainValue)
|
||||||
|
.setMinInstancesPerNode(minInstancesPerNodeValue)
|
||||||
|
.setImpurity(impurityValue)
|
||||||
|
.setLossType(lossTypeValue)
|
||||||
|
.setSubsamplingRate(subSamplingRateValue)
|
||||||
|
.setStepSize(stepSizeValue)
|
||||||
|
.fit(data)
|
||||||
|
|
||||||
|
//model persistence
|
||||||
|
model.save(model_save_path)
|
||||||
|
|
||||||
|
import spark.implicits._
|
||||||
|
val dfOut=Seq(model_save_path).toDF
|
||||||
|
dfOut.show()
|
||||||
|
out.write(dfOut)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def initialize(ctx: ProcessContext): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def setProperties(map: Map[String, Any]): Unit = {
|
||||||
|
training_data_path=MapUtil.get(map,key="training_data_path").asInstanceOf[String]
|
||||||
|
model_save_path=MapUtil.get(map,key="model_save_path").asInstanceOf[String]
|
||||||
|
maxBins=MapUtil.get(map,key="maxBins").asInstanceOf[String]
|
||||||
|
maxDepth=MapUtil.get(map,key="maxDepth").asInstanceOf[String]
|
||||||
|
minInfoGain=MapUtil.get(map,key="minInfoGain").asInstanceOf[String]
|
||||||
|
minInstancesPerNode=MapUtil.get(map,key="minInstancesPerNode").asInstanceOf[String]
|
||||||
|
impurity=MapUtil.get(map,key="impurity").asInstanceOf[String]
|
||||||
|
subSamplingRate=MapUtil.get(map,key="subSamplingRate").asInstanceOf[String]
|
||||||
|
lossType=MapUtil.get(map,key="lossType").asInstanceOf[String]
|
||||||
|
stepSize=MapUtil.get(map,key="stepSize").asInstanceOf[String]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
|
val training_data_path = new PropertyDescriptor().name("training_data_path").displayName("TRAINING_DATA_PATH").defaultValue("").required(true)
|
||||||
|
val model_save_path = new PropertyDescriptor().name("model_save_path").displayName("MODEL_SAVE_PATH").description("").defaultValue("").required(true)
|
||||||
|
val maxBins=new PropertyDescriptor().name("maxBins").displayName("MAX_BINS").description("ddd").defaultValue("").required(false)
|
||||||
|
val maxDepth=new PropertyDescriptor().name("maxDepth").displayName("MAX_DEPTH").description("ddd").defaultValue("").required(false)
|
||||||
|
val minInfoGain=new PropertyDescriptor().name("minInfoGain").displayName("MIN_INFO_GAIN").description("ddd").defaultValue("").required(false)
|
||||||
|
val minInstancesPerNode=new PropertyDescriptor().name("minInstancesPerNode").displayName("MIN_INSTANCES_PER_NODE").description("ddd").defaultValue("").required(false)
|
||||||
|
val impurity=new PropertyDescriptor().name("impurity").displayName("IMPURITY").description("Criterion used for information gain calculation (case-insensitive). Supported: \"entropy\" and \"gini\". (default = gini)").defaultValue("").required(false)
|
||||||
|
val subSamplingRate=new PropertyDescriptor().name("subSamplingRate").displayName("SUB_SAMPLING_RATE").description("ddd").defaultValue("").required(false)
|
||||||
|
val lossType=new PropertyDescriptor().name("lossType").displayName("LOSS_TYPE").description("ddd").defaultValue("").required(false)
|
||||||
|
val stepSize=new PropertyDescriptor().name("stepSize").displayName("STEP_SIZE").description("ddd").defaultValue("").required(false)
|
||||||
|
descriptor = training_data_path :: descriptor
|
||||||
|
descriptor = model_save_path :: descriptor
|
||||||
|
descriptor = maxBins :: descriptor
|
||||||
|
descriptor = maxDepth :: descriptor
|
||||||
|
descriptor = minInfoGain :: descriptor
|
||||||
|
descriptor = minInstancesPerNode :: descriptor
|
||||||
|
descriptor = impurity :: descriptor
|
||||||
|
descriptor = subSamplingRate::descriptor
|
||||||
|
descriptor = lossType :: descriptor
|
||||||
|
descriptor = stepSize :: descriptor
|
||||||
|
descriptor
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = {
|
||||||
|
List(StopGroupEnum.MLGroup.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package cn.piflow.bundle.ml_classification
|
||||||
|
|
||||||
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
import cn.piflow.conf.util.MapUtil
|
||||||
|
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
class MultilayerPerceptronPrediction extends ConfigurableStop{
|
||||||
|
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||||
|
val description: String = "Make use of a exist MultilayerPerceptronModel to predict."
|
||||||
|
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
var test_data_path:String =_
|
||||||
|
var model_path:String=_
|
||||||
|
|
||||||
|
|
||||||
|
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||||
|
val spark = pec.get[SparkSession]()
|
||||||
|
//load data stored in libsvm format as a dataframe
|
||||||
|
val data=spark.read.format("libsvm").load(test_data_path)
|
||||||
|
//data.show()
|
||||||
|
|
||||||
|
//load model
|
||||||
|
val model=MultilayerPerceptronClassificationModel.load(model_path)
|
||||||
|
|
||||||
|
val predictions=model.transform(data)
|
||||||
|
predictions.show()
|
||||||
|
out.write(predictions)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def initialize(ctx: ProcessContext): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def setProperties(map: Map[String, Any]): Unit = {
|
||||||
|
test_data_path=MapUtil.get(map,key="test_data_path").asInstanceOf[String]
|
||||||
|
model_path=MapUtil.get(map,key="model_path").asInstanceOf[String]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
|
val test_data_path = new PropertyDescriptor().name("test_data_path").displayName("TEST_DATA_PATH").defaultValue("").required(true)
|
||||||
|
val model_path = new PropertyDescriptor().name("model_path").displayName("MODEL_PATH").defaultValue("").required(true)
|
||||||
|
descriptor = test_data_path :: descriptor
|
||||||
|
descriptor = model_path :: descriptor
|
||||||
|
descriptor
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = {
|
||||||
|
List(StopGroupEnum.MLGroup.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,124 @@
|
||||||
|
package cn.piflow.bundle.ml_classification
|
||||||
|
|
||||||
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
import cn.piflow.conf.util.MapUtil
|
||||||
|
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
class MultilayerPerceptronTraining extends ConfigurableStop{
|
||||||
|
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||||
|
val description: String = "Training a MultilayerPerceptronModel."
|
||||||
|
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||||
|
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
var training_data_path:String =_
|
||||||
|
var model_save_path:String=_
|
||||||
|
var layers:String=_
|
||||||
|
var maxIter:String=_
|
||||||
|
var stepSize:String=_
|
||||||
|
var thresholds:String=_
|
||||||
|
var minTol:String=_
|
||||||
|
|
||||||
|
|
||||||
|
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||||
|
val spark = pec.get[SparkSession]()
|
||||||
|
|
||||||
|
//load data stored in libsvm format as a dataframe
|
||||||
|
val data=spark.read.format("libsvm").load(training_data_path)
|
||||||
|
|
||||||
|
//Layer sizes including input size and output size.
|
||||||
|
var layersValue:Array[Int]=Array()
|
||||||
|
if(layers==""){
|
||||||
|
throw new IllegalArgumentException
|
||||||
|
}else{
|
||||||
|
layersValue=layers.substring(1,layers.length-1).split(",").map(_.toInt)
|
||||||
|
}
|
||||||
|
|
||||||
|
//Param for Thresholds in multi-class classification to adjust the probability of predicting each class.Sample format:(0.6,0.4)
|
||||||
|
var thresholdsValue:Array[Double]=Array()
|
||||||
|
if(thresholds==""){
|
||||||
|
throw new IllegalArgumentException
|
||||||
|
}else{
|
||||||
|
thresholdsValue=thresholds.substring(1,layers.length-1).split(",").map(_.toDouble)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//Param for maximum number of iterations (>= 0)
|
||||||
|
var maxIterValue:Int=50
|
||||||
|
if(maxIter!=""){
|
||||||
|
maxIterValue=maxIter.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
//Param for the convergence tolerance for iterative algorithms (>= 0).
|
||||||
|
var minTolValue:Double=1E-6
|
||||||
|
if(minTol!=""){
|
||||||
|
minTolValue=minTol.toDouble
|
||||||
|
}
|
||||||
|
|
||||||
|
//Param for Step size to be used for each iteration of optimization (> 0).
|
||||||
|
var stepSizeValue:Double=0.8
|
||||||
|
if(stepSize!=""){
|
||||||
|
stepSizeValue=stepSize.toDouble
|
||||||
|
}
|
||||||
|
|
||||||
|
//training a MultilayerPerceptron model
|
||||||
|
val model=new MultilayerPerceptronClassifier()
|
||||||
|
.setMaxIter(maxIterValue)
|
||||||
|
.setTol(minTolValue)
|
||||||
|
.setStepSize(stepSizeValue)
|
||||||
|
.setLayers(layersValue)
|
||||||
|
.fit(data)
|
||||||
|
|
||||||
|
//model persistence
|
||||||
|
model.save(model_save_path)
|
||||||
|
|
||||||
|
import spark.implicits._
|
||||||
|
val dfOut=Seq(model_save_path).toDF
|
||||||
|
dfOut.show()
|
||||||
|
out.write(dfOut)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def initialize(ctx: ProcessContext): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def setProperties(map: Map[String, Any]): Unit = {
|
||||||
|
training_data_path=MapUtil.get(map,key="training_data_path").asInstanceOf[String]
|
||||||
|
model_save_path=MapUtil.get(map,key="model_save_path").asInstanceOf[String]
|
||||||
|
maxIter=MapUtil.get(map,key="maxIter").asInstanceOf[String]
|
||||||
|
minTol=MapUtil.get(map,key="minTol").asInstanceOf[String]
|
||||||
|
thresholds=MapUtil.get(map,key="thresholds").asInstanceOf[String]
|
||||||
|
stepSize=MapUtil.get(map,key="stepSize").asInstanceOf[String]
|
||||||
|
layers=MapUtil.get(map,key="layers").asInstanceOf[String]
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
|
val training_data_path = new PropertyDescriptor().name("training_data_path").displayName("TRAINING_DATA_PATH").defaultValue("").required(true)
|
||||||
|
val model_save_path = new PropertyDescriptor().name("model_save_path").displayName("MODEL_SAVE_PATH").description("").defaultValue("").required(true)
|
||||||
|
val maxIter=new PropertyDescriptor().name("maxIter").displayName("MAX_ITER").description("Param for maximum number of iterations (>= 0).").defaultValue("").required(false)
|
||||||
|
val minTol=new PropertyDescriptor().name("minTol").displayName("MIN_TOL").description("Param for the convergence tolerance for iterative algorithms (>= 0).").defaultValue("").required(false)
|
||||||
|
val stepSize=new PropertyDescriptor().name("stepSize").displayName("STEP_SIZE").description("Param for Step size to be used for each iteration of optimization (> 0).").defaultValue("").required(false)
|
||||||
|
val thresholds=new PropertyDescriptor().name("thresholds").displayName("THRESHOLDS").description("DoubleArrayParam.Param for Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values > 0 excepting that at most one value may be 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class's threshold.").defaultValue("").required(true)
|
||||||
|
val layers=new PropertyDescriptor().name("layers").displayName("LAYERS").description("Layer sizes including input size and output size. ").defaultValue("").required(true)
|
||||||
|
descriptor = training_data_path :: descriptor
|
||||||
|
descriptor = model_save_path :: descriptor
|
||||||
|
descriptor = maxIter :: descriptor
|
||||||
|
descriptor = minTol :: descriptor
|
||||||
|
descriptor = stepSize :: descriptor
|
||||||
|
descriptor = thresholds :: descriptor
|
||||||
|
descriptor = layers :: descriptor
|
||||||
|
descriptor
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = {
|
||||||
|
List(StopGroupEnum.MLGroup.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package cn.piflow.bundle.ml_classification
|
||||||
|
|
||||||
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
import cn.piflow.conf.util.MapUtil
|
||||||
|
import org.apache.spark.ml.classification.RandomForestClassificationModel
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
class RandomForestPrediction extends ConfigurableStop{
|
||||||
|
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||||
|
val description: String = "Make use of a exist RandomForest Model to predict."
|
||||||
|
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||||
|
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
var test_data_path:String =_
|
||||||
|
var model_path:String=_
|
||||||
|
|
||||||
|
|
||||||
|
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||||
|
val spark = pec.get[SparkSession]()
|
||||||
|
//load data stored in libsvm format as a dataframe
|
||||||
|
val data=spark.read.format("libsvm").load(test_data_path)
|
||||||
|
//data.show()
|
||||||
|
|
||||||
|
//load model
|
||||||
|
val model=RandomForestClassificationModel.load(model_path)
|
||||||
|
|
||||||
|
val predictions=model.transform(data)
|
||||||
|
predictions.show()
|
||||||
|
out.write(predictions)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def initialize(ctx: ProcessContext): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def setProperties(map: Map[String, Any]): Unit = {
|
||||||
|
test_data_path=MapUtil.get(map,key="test_data_path").asInstanceOf[String]
|
||||||
|
model_path=MapUtil.get(map,key="model_path").asInstanceOf[String]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
|
val test_data_path = new PropertyDescriptor().name("test_data_path").displayName("TEST_DATA_PATH").defaultValue("").required(true)
|
||||||
|
val model_path = new PropertyDescriptor().name("model_path").displayName("MODEL_PATH").defaultValue("").required(true)
|
||||||
|
descriptor = test_data_path :: descriptor
|
||||||
|
descriptor = model_path :: descriptor
|
||||||
|
descriptor
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = {
|
||||||
|
List(StopGroupEnum.MLGroup.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,148 @@
|
||||||
|
package cn.piflow.bundle.ml_classification
|
||||||
|
|
||||||
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
import cn.piflow.conf.util.MapUtil
|
||||||
|
import org.apache.spark.ml.classification.RandomForestClassifier
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
|
||||||
|
class RandomForestTraining extends ConfigurableStop{
|
||||||
|
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||||
|
val description: String = "Training a RandomForestModel."
|
||||||
|
val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||||
|
val outportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
var training_data_path:String =_
|
||||||
|
var model_save_path:String=_
|
||||||
|
var maxBins:String=_
|
||||||
|
var maxDepth:String=_
|
||||||
|
var minInfoGain:String=_
|
||||||
|
var minInstancesPerNode:String=_
|
||||||
|
var impurity:String=_
|
||||||
|
var subSamplingRate:String=_
|
||||||
|
var featureSubsetStrategy:String=_
|
||||||
|
var numTrees:String=_
|
||||||
|
|
||||||
|
def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
|
||||||
|
val spark = pec.get[SparkSession]()
|
||||||
|
|
||||||
|
//load data stored in libsvm format as a dataframe
|
||||||
|
val data=spark.read.format("libsvm").load(training_data_path)
|
||||||
|
|
||||||
|
//Maximum number of bins used for discretizing continuous features and for choosing how to split on features at each node. More bins give higher granularity.Must be >= 2 and >= number of categories in any categorical feature.
|
||||||
|
var maxBinsValue:Int=40
|
||||||
|
if(maxBins!=""){
|
||||||
|
maxBinsValue=maxBins.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
//Maximum depth of the tree (>= 0).The maximum is 30.
|
||||||
|
var maxDepthValue:Int=30
|
||||||
|
if(maxDepth!=""){
|
||||||
|
maxDepthValue=maxDepth.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
//Minimum information gain for a split to be considered at a tree node.
|
||||||
|
var minInfoGainValue:Double=0.2
|
||||||
|
if(minInfoGain!=""){
|
||||||
|
minInfoGainValue=minInfoGain.toDouble
|
||||||
|
}
|
||||||
|
|
||||||
|
//Minimum number of instances each child must have after split.
|
||||||
|
var minInstancesPerNodeValue:Int=3
|
||||||
|
if(minInstancesPerNode!=""){
|
||||||
|
minInstancesPerNodeValue=minInstancesPerNode.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
//Param for the name of family which is a description of the label distribution to be used in the model
|
||||||
|
var impurityValue="gini"
|
||||||
|
if(impurity!=""){
|
||||||
|
impurityValue=impurity
|
||||||
|
}
|
||||||
|
|
||||||
|
var subSamplingRateValue:Double=0.6
|
||||||
|
if(subSamplingRate!=""){
|
||||||
|
subSamplingRateValue=subSamplingRate.toDouble
|
||||||
|
}
|
||||||
|
|
||||||
|
var featureSubsetStrategyValue="auto"
|
||||||
|
if(featureSubsetStrategy!=""){
|
||||||
|
featureSubsetStrategyValue=featureSubsetStrategy
|
||||||
|
}
|
||||||
|
|
||||||
|
var numTreesValue:Int=10
|
||||||
|
if(numTrees!=""){
|
||||||
|
numTreesValue=numTrees.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
//training a RandomForest model
|
||||||
|
val model=new RandomForestClassifier()
|
||||||
|
.setMaxBins(maxBinsValue)
|
||||||
|
.setMaxDepth(maxDepthValue)
|
||||||
|
.setMinInfoGain(minInfoGainValue)
|
||||||
|
.setMinInstancesPerNode(minInstancesPerNodeValue)
|
||||||
|
.setImpurity(impurityValue)
|
||||||
|
.setFeatureSubsetStrategy(featureSubsetStrategyValue)
|
||||||
|
.setSubsamplingRate(subSamplingRateValue)
|
||||||
|
.setNumTrees(numTreesValue)
|
||||||
|
.fit(data)
|
||||||
|
|
||||||
|
//model persistence
|
||||||
|
model.save(model_save_path)
|
||||||
|
|
||||||
|
import spark.implicits._
|
||||||
|
val dfOut=Seq(model_save_path).toDF
|
||||||
|
dfOut.show()
|
||||||
|
out.write(dfOut)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def initialize(ctx: ProcessContext): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def setProperties(map: Map[String, Any]): Unit = {
|
||||||
|
training_data_path=MapUtil.get(map,key="training_data_path").asInstanceOf[String]
|
||||||
|
model_save_path=MapUtil.get(map,key="model_save_path").asInstanceOf[String]
|
||||||
|
maxBins=MapUtil.get(map,key="maxBins").asInstanceOf[String]
|
||||||
|
maxDepth=MapUtil.get(map,key="maxDepth").asInstanceOf[String]
|
||||||
|
minInfoGain=MapUtil.get(map,key="minInfoGain").asInstanceOf[String]
|
||||||
|
minInstancesPerNode=MapUtil.get(map,key="minInstancesPerNode").asInstanceOf[String]
|
||||||
|
impurity=MapUtil.get(map,key="impurity").asInstanceOf[String]
|
||||||
|
subSamplingRate=MapUtil.get(map,key="subSamplingRate").asInstanceOf[String]
|
||||||
|
featureSubsetStrategy=MapUtil.get(map,key="featureSubsetStrategy").asInstanceOf[String]
|
||||||
|
numTrees=MapUtil.get(map,key="numTrees").asInstanceOf[String]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
|
var descriptor : List[PropertyDescriptor] = List()
|
||||||
|
val training_data_path = new PropertyDescriptor().name("training_data_path").displayName("TRAINING_DATA_PATH").defaultValue("").required(true)
|
||||||
|
val model_save_path = new PropertyDescriptor().name("model_save_path").displayName("MODEL_SAVE_PATH").description("").defaultValue("").required(true)
|
||||||
|
val maxBins=new PropertyDescriptor().name("maxBins").displayName("MAX_BINS").description("ddd").defaultValue("").required(false)
|
||||||
|
val maxDepth=new PropertyDescriptor().name("maxDepth").displayName("MAX_DEPTH").description("ddd").defaultValue("").required(false)
|
||||||
|
val minInfoGain=new PropertyDescriptor().name("minInfoGain").displayName("MIN_INFO_GAIN").description("ddd").defaultValue("").required(false)
|
||||||
|
val minInstancesPerNode=new PropertyDescriptor().name("minInstancesPerNode").displayName("MIN_INSTANCES_PER_NODE").description("ddd").defaultValue("").required(false)
|
||||||
|
val impurity=new PropertyDescriptor().name("impurity").displayName("IMPURITY").description("Criterion used for information gain calculation (case-insensitive). Supported: \"entropy\" and \"gini\". (default = gini)").defaultValue("").required(false)
|
||||||
|
val subSamplingRate=new PropertyDescriptor().name("subSamplingRate").displayName("SUB_SAMPLING_RATE").description("ddd").defaultValue("").required(false)
|
||||||
|
val featureSubsetStrategy=new PropertyDescriptor().name("featureSubsetStrategy").displayName("FEATURE_SUBSET_STRATEGY").description("ddd").defaultValue("").required(false)
|
||||||
|
val numTrees=new PropertyDescriptor().name("numTrees").displayName("NUM_TREES").description("ddd").defaultValue("").required(false)
|
||||||
|
descriptor = training_data_path :: descriptor
|
||||||
|
descriptor = model_save_path :: descriptor
|
||||||
|
descriptor = maxBins :: descriptor
|
||||||
|
descriptor = maxDepth :: descriptor
|
||||||
|
descriptor = minInfoGain :: descriptor
|
||||||
|
descriptor = minInstancesPerNode :: descriptor
|
||||||
|
descriptor = impurity :: descriptor
|
||||||
|
descriptor = subSamplingRate::descriptor
|
||||||
|
descriptor = featureSubsetStrategy :: descriptor
|
||||||
|
descriptor = numTrees :: descriptor
|
||||||
|
descriptor
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = {
|
||||||
|
List(StopGroupEnum.MLGroup.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,5 +1,24 @@
|
||||||
package cn.piflow.bundle.rdf
|
package cn.piflow.bundle.rdf
|
||||||
|
|
||||||
class CsvToNeo4J {
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum}
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
|
||||||
|
class CsvToNeo4J extends ConfigurableStop{
|
||||||
|
override val authorEmail: String = "sha0w@cnic.cn"
|
||||||
|
override val description: String = "this stop use linux shell & neo4j-import command to lead CSV file data create/into a database"
|
||||||
|
override val inportList: List[String] = List(PortEnum.NonePort.toString)
|
||||||
|
override val outportList: List[String] = List(PortEnum.NonePort.toString)
|
||||||
|
|
||||||
|
override def setProperties(map: Map[String, Any]): Unit = ???
|
||||||
|
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = ???
|
||||||
|
|
||||||
|
override def getIcon(): Array[Byte] = ???
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = ???
|
||||||
|
|
||||||
|
override def initialize(ctx: ProcessContext): Unit = ???
|
||||||
|
|
||||||
|
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = ???
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,247 +0,0 @@
|
||||||
package cn.piflow.bundle.rdf
|
|
||||||
|
|
||||||
import java.util.regex.{Matcher, Pattern}
|
|
||||||
|
|
||||||
import cn.piflow.bundle.util.Entity
|
|
||||||
import cn.piflow.conf.bean.PropertyDescriptor
|
|
||||||
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
|
||||||
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
|
||||||
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
|
||||||
import org.apache.spark.rdd.RDD
|
|
||||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
|
||||||
import org.apache.spark.sql.{Row, SparkSession}
|
|
||||||
|
|
||||||
class RdfToCsv extends ConfigurableStop{
|
|
||||||
override val authorEmail: String = "shaow@cnic.cn"
|
|
||||||
val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
|
||||||
val outportList: List[String] = List("entityOut","relationshipOut")
|
|
||||||
override val description: String = "Simple spark program to convert *.n3 RDF file to CSV(DataFrame) file"
|
|
||||||
|
|
||||||
var rdfFilepath : String = _
|
|
||||||
var isFront : String = _
|
|
||||||
var PRegex : String = _
|
|
||||||
var RRegex : String = _
|
|
||||||
var ERegex : String = _
|
|
||||||
var RSchema : String = _
|
|
||||||
var entityPort : String = "entityOut"
|
|
||||||
var relationshipPort : String = "relationshipOut"
|
|
||||||
|
|
||||||
override def setProperties(map: Map[String, Any]): Unit = {
|
|
||||||
isFront = MapUtil.get(map, "isFromFront").asInstanceOf[String]
|
|
||||||
PRegex = MapUtil.get(map, "propertyRegex").asInstanceOf[String]
|
|
||||||
RRegex = MapUtil.get(map, "relationshipRegex").asInstanceOf[String]
|
|
||||||
ERegex = MapUtil.get(map, "entityRegex").asInstanceOf[String]
|
|
||||||
RSchema = MapUtil.get(map, "relationshipSchema").asInstanceOf[String]
|
|
||||||
|
|
||||||
if (isFront == "false")
|
|
||||||
rdfFilepath = MapUtil.get(map, "filePath").asInstanceOf[String]
|
|
||||||
}
|
|
||||||
|
|
||||||
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
|
||||||
|
|
||||||
var descriptor: List[PropertyDescriptor] = List()
|
|
||||||
val filePath = new PropertyDescriptor()
|
|
||||||
.name("filePath")
|
|
||||||
.displayName("inputHDFSFilePath")
|
|
||||||
.description("The path of the input rdf file")
|
|
||||||
.defaultValue("")
|
|
||||||
.required(true)
|
|
||||||
|
|
||||||
|
|
||||||
val isFromFront = new PropertyDescriptor()
|
|
||||||
.name("isFromFront")
|
|
||||||
.displayName("isFromFront")
|
|
||||||
.description("identify the file path source(should have same schema)")
|
|
||||||
.allowableValues(Set("true", "false"))
|
|
||||||
.defaultValue("false")
|
|
||||||
.required(true)
|
|
||||||
|
|
||||||
val propertyRegex = new PropertyDescriptor()
|
|
||||||
.name("propertyRegex")
|
|
||||||
.displayName("property regex")
|
|
||||||
.description("define the propertyRegex to parse the n3 file's property lines\r\n" +
|
|
||||||
"this regex string should be fully named and regular\r\n" +
|
|
||||||
"you need to SPECIFIC five value's name \r\n" +
|
|
||||||
"1.prefix 2.id 3.pprefix 4.name 5.value" +
|
|
||||||
"the form should be like this : \r\n" +
|
|
||||||
"(?<prefix>...?<id>... ?<pprefix>...?<name> ?<value>...)\r\n" +
|
|
||||||
"check the default value carefully to knowledge the right structure")
|
|
||||||
.defaultValue("<(?<prefix>http:\\/\\/[^>]+\\/)(?<id>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> <(?<pprefix>http:\\/\\/[^>]+\\/)(?<name>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> \"(?<value>.+)\" \\.")
|
|
||||||
.required(true)
|
|
||||||
|
|
||||||
val relationshipRegex = new PropertyDescriptor()
|
|
||||||
.name("relationshipRegex")
|
|
||||||
.displayName("relationship regex")
|
|
||||||
.description("define the propertyRegex to parse the n3 file's relationship lines\r\n" +
|
|
||||||
"this regex string should be fully named and regular\r\n" +
|
|
||||||
"you need to SPECIFIC six value's name \r\n" +
|
|
||||||
"1.prefix1 2.id1 3.tprefix 4.type 5.prefix2 6.id2" +
|
|
||||||
"the form should be like this : \r\n" +
|
|
||||||
"(?<prefix1>...?<id1>... ?<tprefix>...?<type> ?<prefix2>...?<id2>)\r\n" +
|
|
||||||
"check the default value carefully to knowledge the right structure")
|
|
||||||
.defaultValue("<(?<prefix1>http:\\/\\/[^>]+\\/)(?<id1>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> <(?<tprefix>http:\\/\\/[^>]+\\/)(?<type>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)(?<!#type)> <(?<prefix2>http:\\/\\/[^>]+\\/)(?<id2>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> \\.")
|
|
||||||
.required(true)
|
|
||||||
|
|
||||||
val entityRegex = new PropertyDescriptor()
|
|
||||||
.name("entityRegex")
|
|
||||||
.displayName("entity regex")
|
|
||||||
.description("define the propertyRegex to parse the n3 file's entity lines\r\n" +
|
|
||||||
"this regex string should be fully named and regular\r\n" +
|
|
||||||
"you need to SPECIFIC four value's name \r\n" +
|
|
||||||
"1.prefix 2.id 4.lprefix 5.label" +
|
|
||||||
"the form should be like this : \r\n" +
|
|
||||||
"(?<prefix>...?<id>... ... ?<lprefix>...?<label>)\r\n" +
|
|
||||||
"check the default value carefully to knowledge the right structure")
|
|
||||||
.defaultValue("(<(?<prefix>http:\\/\\/[^>]+\\/)(?<id>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> <(?:http:\\/\\/[^>]+\\/)(?:[^\\/][-A-Za-z0-9._#$%^&*!@~]+)(?:#type)> <(?<lprefix>http:\\/\\/[^>]+\\/)(?<label>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> \\.")
|
|
||||||
.required(true)
|
|
||||||
|
|
||||||
val relationshipSchema = new PropertyDescriptor()
|
|
||||||
.name("relationshipSchema")
|
|
||||||
.displayName("relationship's schema")
|
|
||||||
.description("define the schema of relationship, as a user, \r\n" +
|
|
||||||
"you should ponder the name of start id and end id\r\n" +
|
|
||||||
"make sure your schema looks like the default value")
|
|
||||||
.defaultValue("ENTITY_ID:START_ID,role,ENTITY_ID:END_ID,RELATION_TYPE:TYPE")
|
|
||||||
.required(true)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
descriptor = filePath :: descriptor
|
|
||||||
descriptor = isFromFront :: descriptor
|
|
||||||
descriptor = propertyRegex :: descriptor
|
|
||||||
descriptor = entityRegex :: descriptor
|
|
||||||
descriptor = relationshipRegex :: descriptor
|
|
||||||
descriptor = relationshipSchema :: descriptor
|
|
||||||
descriptor
|
|
||||||
}
|
|
||||||
|
|
||||||
override def getIcon(): Array[Byte] = {
|
|
||||||
ImageUtil.getImage("./src/main/resources/rdf.png")
|
|
||||||
}
|
|
||||||
|
|
||||||
override def getGroup(): List[String] = {
|
|
||||||
List(StopGroupEnum.RDFGroup.toString)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def initialize(ctx: ProcessContext): Unit = {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
override def perform(in: JobInputStream,
|
|
||||||
out: JobOutputStream,
|
|
||||||
pec: JobContext): Unit = {
|
|
||||||
|
|
||||||
val entityRegexPattern : Pattern = Pattern.compile(ERegex)
|
|
||||||
val relationRegexPattern : Pattern = Pattern.compile(RRegex)
|
|
||||||
val propertyRegexPattern : Pattern = Pattern.compile(PRegex)
|
|
||||||
val spark = pec.get[SparkSession]()
|
|
||||||
val sc = spark.sparkContext
|
|
||||||
val sq = spark.sqlContext
|
|
||||||
var hdfsFile : RDD[String] = sc.emptyRDD[String]
|
|
||||||
//in
|
|
||||||
if (isFront == "true") {
|
|
||||||
val inDF : Array[String] = in
|
|
||||||
.read()
|
|
||||||
.collect()
|
|
||||||
.map(r => r.getAs[String](1))
|
|
||||||
var index = 0
|
|
||||||
val iterator : Iterator[String] = inDF.iterator
|
|
||||||
|
|
||||||
while(iterator.hasNext) {//every row's first col should be the exact hdfs path of the n3 file
|
|
||||||
index match {
|
|
||||||
case 0 => hdfsFile = sc.textFile(iterator.next())
|
|
||||||
index += 1
|
|
||||||
case 1000 =>
|
|
||||||
println(hdfsFile.count()) //in some case the num of file will excess 10w, use this way to reduce the depth of DAG
|
|
||||||
index = 1
|
|
||||||
case _ => hdfsFile = hdfsFile.union(sc.textFile(iterator.next))
|
|
||||||
index += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
hdfsFile = sc.textFile(rdfFilepath)
|
|
||||||
}
|
|
||||||
val entityRdd = hdfsFile
|
|
||||||
.filter(s => entityRegexPattern.matcher(s).find() ||
|
|
||||||
propertyRegexPattern.matcher(s).find())
|
|
||||||
val relationshipRdd = hdfsFile
|
|
||||||
.filter(s => relationRegexPattern.matcher(s).find())
|
|
||||||
val settleUpEntityRdd = entityRdd.map(s => {
|
|
||||||
val me = entityRegexPattern.matcher(s)
|
|
||||||
val mp = propertyRegexPattern.matcher(s)
|
|
||||||
if (me.find()) {
|
|
||||||
(me.group("prefix") + me.group("id"), me.group())
|
|
||||||
} else {
|
|
||||||
mp.find()
|
|
||||||
(mp.group("prefix") + mp.group("id"), mp.group())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.groupByKey() //main
|
|
||||||
.values
|
|
||||||
.map(s => s.toList)
|
|
||||||
val entitySchema : Set[String] = settleUpEntityRdd
|
|
||||||
.map(s => s.filter(l => propertyRegexPattern.matcher(l).find()))
|
|
||||||
.map(s => {
|
|
||||||
s.map(line => {
|
|
||||||
val m = propertyRegexPattern.matcher(line)
|
|
||||||
m.find()
|
|
||||||
m.group("name")
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.map(l => {
|
|
||||||
l.toSet
|
|
||||||
})
|
|
||||||
.reduce(_ ++ _)
|
|
||||||
|
|
||||||
|
|
||||||
val finalEntitySchema = "ENTITY_ID:ID," +
|
|
||||||
entitySchema.reduce((a, b) => a + "," + b) + ",ENTITY_TYPE:LABEL"
|
|
||||||
val entityDFSchema : StructType = StructType(finalEntitySchema.split(",")
|
|
||||||
.map(i => StructField(i, StringType, nullable = true)))
|
|
||||||
val relationshipDFSchema : StructType = StructType(RSchema.split(",")
|
|
||||||
.map(i => StructField(i, StringType, nullable = true)))
|
|
||||||
|
|
||||||
val entityRowRdd = settleUpEntityRdd.map(l => {
|
|
||||||
val en = l.filter(s => entityRegexPattern.matcher(s).find()).head
|
|
||||||
val label = get(entityRegexPattern.matcher(en),"lprefix") +
|
|
||||||
get(entityRegexPattern.matcher(en),"label")
|
|
||||||
val id = get(entityRegexPattern.matcher(en),"prefix") + get(entityRegexPattern.matcher(en),"id")
|
|
||||||
val prop = l
|
|
||||||
.filter(s => ?(propertyRegexPattern, s))
|
|
||||||
.map(s => (
|
|
||||||
get(propertyRegexPattern.matcher(s),"name")
|
|
||||||
->
|
|
||||||
get(propertyRegexPattern.matcher(s),"value").replace("\"", "'")
|
|
||||||
)
|
|
||||||
).toArray
|
|
||||||
.toMap
|
|
||||||
new Entity(id, label, prop, entitySchema.toSeq).getEntityRow
|
|
||||||
})
|
|
||||||
val relationshipRowRdd = relationshipRdd.map(s => Seq(
|
|
||||||
get(relationRegexPattern.matcher(s),"prefix1") ,
|
|
||||||
get(relationRegexPattern.matcher(s),"id1") ,
|
|
||||||
get(relationRegexPattern.matcher(s),"tprefix") ,
|
|
||||||
get(relationRegexPattern.matcher(s),"type") ,
|
|
||||||
get(relationRegexPattern.matcher(s),"prefix2") ,
|
|
||||||
get(relationRegexPattern.matcher(s),"id2") ,
|
|
||||||
get(relationRegexPattern.matcher(s),"type")
|
|
||||||
)
|
|
||||||
).map(s => Row(s))
|
|
||||||
val entityDF = sq.createDataFrame(entityRowRdd, entityDFSchema)
|
|
||||||
val relationDF = sq.createDataFrame(relationshipRowRdd, relationshipDFSchema)
|
|
||||||
entityDF.show(1)
|
|
||||||
relationDF.show(1)
|
|
||||||
out.write(entityPort, entityDF)
|
|
||||||
out.write(relationshipPort, relationDF)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
def get(m : Matcher, name : String) : String = {
|
|
||||||
if (m.find()) m.group(name)
|
|
||||||
else ""
|
|
||||||
}
|
|
||||||
|
|
||||||
def ? (regex : Pattern, str : String) : Boolean = {
|
|
||||||
regex.matcher(str).find()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,296 @@
|
||||||
|
package cn.piflow.bundle.rdf
|
||||||
|
|
||||||
|
import java.util.regex.{Matcher, Pattern}
|
||||||
|
|
||||||
|
import cn.piflow.bundle.util.Entity
|
||||||
|
import cn.piflow.conf.bean.PropertyDescriptor
|
||||||
|
import cn.piflow.conf.util.{ImageUtil, MapUtil}
|
||||||
|
import cn.piflow.conf.{ConfigurableStop, PortEnum, StopGroupEnum}
|
||||||
|
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType}
|
||||||
|
import org.apache.spark.sql.{Row, SparkSession}
|
||||||
|
|
||||||
|
class RdfToDF extends ConfigurableStop{
|
||||||
|
override val authorEmail: String = "shaow@cnic.cn"
|
||||||
|
|
||||||
|
override val description: String = "Simple spark program to convert *.n3 RDF file to CSV(DataFrame) file"
|
||||||
|
|
||||||
|
var rdfFilepath : String = _
|
||||||
|
var isFront : String = _
|
||||||
|
var PRegex : String = _
|
||||||
|
var RRegex : String = _
|
||||||
|
var ERegex : String = _
|
||||||
|
var RSchema : String = _
|
||||||
|
var IDName : String = _
|
||||||
|
var LABELName : String = _
|
||||||
|
|
||||||
|
var entityPort : String = "entityOut"
|
||||||
|
var relationshipPort : String = "relationshipOut"
|
||||||
|
|
||||||
|
override def setProperties(map: Map[String, Any]): Unit = {
|
||||||
|
isFront = MapUtil.get(map, "isFromFront").asInstanceOf[String]
|
||||||
|
PRegex = MapUtil.get(map, "propertyRegex").asInstanceOf[String]
|
||||||
|
RRegex = MapUtil.get(map, "relationshipRegex").asInstanceOf[String]
|
||||||
|
ERegex = MapUtil.get(map, "entityRegex").asInstanceOf[String]
|
||||||
|
RSchema = MapUtil.get(map, "relationshipSchema").asInstanceOf[String]
|
||||||
|
IDName = MapUtil.get(map, "entityIdName").asInstanceOf[String]
|
||||||
|
LABELName = MapUtil.get(map, "entityLabelName").asInstanceOf[String]
|
||||||
|
if (isFront == "false")
|
||||||
|
rdfFilepath = MapUtil.get(map, "filePath").asInstanceOf[String]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
|
||||||
|
|
||||||
|
var descriptor: List[PropertyDescriptor] = List()
|
||||||
|
val filePath = new PropertyDescriptor()
|
||||||
|
.name("filePath")
|
||||||
|
.displayName("inputHDFSFilePath")
|
||||||
|
.description("The path of the input rdf file")
|
||||||
|
.defaultValue("")
|
||||||
|
.required(true)
|
||||||
|
|
||||||
|
|
||||||
|
val isFromFront = new PropertyDescriptor()
|
||||||
|
.name("isFromFront")
|
||||||
|
.displayName("isFromFront")
|
||||||
|
.description("identify the file path source(should have same schema)")
|
||||||
|
.allowableValues(Set("true", "false"))
|
||||||
|
.defaultValue("false")
|
||||||
|
.required(true)
|
||||||
|
|
||||||
|
val propertyRegex = new PropertyDescriptor()
|
||||||
|
.name("propertyRegex")
|
||||||
|
.displayName("property regex")
|
||||||
|
.description("define the propertyRegex to parse the n3 file's property lines\r\n" +
|
||||||
|
"this regex string should be fully named and regular\r\n" +
|
||||||
|
"you need to SPECIFIC five value's name \r\n" +
|
||||||
|
"1.prefix 2.id 3.pprefix 4.name 5.value" +
|
||||||
|
"the form should be like this : \r\n" +
|
||||||
|
"(?<prefix>...?<id>... ?<pprefix>...?<name> ?<value>...)\r\n" +
|
||||||
|
"check the default value carefully to knowledge the right structure")
|
||||||
|
.defaultValue("<(?<prefix>http:\\/\\/[^>]+\\/)(?<id>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> <(?<pprefix>http:\\/\\/[^>]+\\/)(?<name>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> \"(?<value>.+)\" \\.")
|
||||||
|
.required(true)
|
||||||
|
|
||||||
|
val relationshipRegex = new PropertyDescriptor()
|
||||||
|
.name("relationshipRegex")
|
||||||
|
.displayName("relationship regex")
|
||||||
|
.description("define the propertyRegex to parse the n3 file's relationship lines\r\n" +
|
||||||
|
"this regex string should be fully named and regular\r\n" +
|
||||||
|
"you need to SPECIFIC six value's name \r\n" +
|
||||||
|
"1.prefix1 2.id1 3.tprefix 4.type 5.prefix2 6.id2" +
|
||||||
|
"the form should be like this : \r\n" +
|
||||||
|
"(?<prefix1>...?<id1>... ?<tprefix>...?<type> ?<prefix2>...?<id2>)\r\n" +
|
||||||
|
"check the default value carefully to knowledge the right structure")
|
||||||
|
.defaultValue("<(?<prefix1>http:\\/\\/[^>]+\\/)(?<id1>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> <(?<tprefix>http:\\/\\/[^>]+\\/)(?<type>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)(?<!#type)> <(?<prefix2>http:\\/\\/[^>]+\\/)(?<id2>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> \\.")
|
||||||
|
.required(true)
|
||||||
|
|
||||||
|
val entityRegex = new PropertyDescriptor()
|
||||||
|
.name("entityRegex")
|
||||||
|
.displayName("entity regex")
|
||||||
|
.description("define the propertyRegex to parse the n3 file's entity lines\r\n" +
|
||||||
|
"this regex string should be fully named and regular\r\n" +
|
||||||
|
"you need to SPECIFIC four value's name \r\n" +
|
||||||
|
"1.prefix 2.id 4.lprefix 5.label" +
|
||||||
|
"the form should be like this : \r\n" +
|
||||||
|
"(?<prefix>...?<id>... ... ?<lprefix>...?<label>)\r\n" +
|
||||||
|
"check the default value carefully to knowledge the right structure")
|
||||||
|
.defaultValue("(<(?<prefix>http:\\/\\/[^>]+\\/)(?<id>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> <(?:http:\\/\\/[^>]+\\/)(?:[^\\/][-A-Za-z0-9._#$%^&*!@~]+)(?:#type)> <(?<lprefix>http:\\/\\/[^>]+\\/)(?<label>[^\\/][-A-Za-z0-9._#$%^&*!@~]+)> \\.")
|
||||||
|
.required(true)
|
||||||
|
|
||||||
|
val relationshipSchema = new PropertyDescriptor()
|
||||||
|
.name("relationshipSchema")
|
||||||
|
.displayName("relationship's schema")
|
||||||
|
.description("define the schema of relationship, as a user, \r\n" +
|
||||||
|
"you should ponder the name of start id and end id\r\n" +
|
||||||
|
"make sure your schema looks like the default value")
|
||||||
|
.defaultValue("ENTITY_ID:START_ID,role,ENTITY_ID:END_ID,RELATION_TYPE:TYPE")
|
||||||
|
.required(true)
|
||||||
|
|
||||||
|
val entityIdName = new PropertyDescriptor()
|
||||||
|
.name("entityIdName")
|
||||||
|
.displayName("entity's id")
|
||||||
|
.description("define the id of entity, as a user, \r\n" +
|
||||||
|
"you should ponder the style like \'id\' + :ID\r\n" +
|
||||||
|
"make sure your schema looks like the default value")
|
||||||
|
.defaultValue("ENTITY_ID:ID")
|
||||||
|
.required(true)
|
||||||
|
|
||||||
|
val entityLabelName = new PropertyDescriptor()
|
||||||
|
.name("entityLabelName")
|
||||||
|
.displayName("entity's label")
|
||||||
|
.description("define the label of entity, as a user, \r\n" +
|
||||||
|
"you should ponder the style like \'label\' + :LABEL\r\n" +
|
||||||
|
"make sure your schema looks like the default value")
|
||||||
|
.defaultValue("ENTITY_TYPE:LABEL")
|
||||||
|
.required(true)
|
||||||
|
|
||||||
|
descriptor = filePath :: descriptor
|
||||||
|
descriptor = isFromFront :: descriptor
|
||||||
|
descriptor = propertyRegex :: descriptor
|
||||||
|
descriptor = entityRegex :: descriptor
|
||||||
|
descriptor = relationshipRegex :: descriptor
|
||||||
|
descriptor = relationshipSchema :: descriptor
|
||||||
|
descriptor = entityIdName :: descriptor
|
||||||
|
descriptor = entityLabelName :: descriptor
|
||||||
|
descriptor
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getIcon(): Array[Byte] = {
|
||||||
|
ImageUtil.getImage("./src/main/resources/rdf.png")
|
||||||
|
}
|
||||||
|
|
||||||
|
override def getGroup(): List[String] = {
|
||||||
|
List(StopGroupEnum.RDFGroup.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def initialize(ctx: ProcessContext): Unit = {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
override def perform(in: JobInputStream,
|
||||||
|
out: JobOutputStream,
|
||||||
|
pec: JobContext): Unit = {
|
||||||
|
|
||||||
|
val entityRegexPattern : Pattern = Pattern.compile(ERegex)
|
||||||
|
val relationRegexPattern : Pattern = Pattern.compile(RRegex)
|
||||||
|
val propertyRegexPattern : Pattern = Pattern.compile(PRegex)
|
||||||
|
val spark = pec.get[SparkSession]()
|
||||||
|
val sc = spark.sparkContext
|
||||||
|
val sq = spark.sqlContext
|
||||||
|
var hdfsFile : RDD[String] = sc.emptyRDD[String]
|
||||||
|
//in
|
||||||
|
if (isFront == "true") {
|
||||||
|
val inDF : Array[String] = in
|
||||||
|
.read()
|
||||||
|
.collect()
|
||||||
|
.map(r => r.getAs[String](1))
|
||||||
|
var index = 0
|
||||||
|
val iterator : Iterator[String] = inDF.iterator
|
||||||
|
|
||||||
|
while(iterator.hasNext) {//every row's first col should be the exact hdfs path of the n3 file
|
||||||
|
index match {
|
||||||
|
case 0 => hdfsFile = sc.textFile(iterator.next())
|
||||||
|
index += 1
|
||||||
|
case 1000 =>
|
||||||
|
println(hdfsFile.count()) //in some case the num of file will excess 10w, use this way to reduce the depth of DAG
|
||||||
|
index = 1
|
||||||
|
case _ => hdfsFile = hdfsFile.union(sc.textFile(iterator.next))
|
||||||
|
index += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
hdfsFile = sc.textFile(rdfFilepath)
|
||||||
|
}
|
||||||
|
val entityRdd = hdfsFile
|
||||||
|
.filter(s => entityRegexPattern.matcher(s).find() ||
|
||||||
|
propertyRegexPattern.matcher(s).find())
|
||||||
|
val relationshipRdd = hdfsFile
|
||||||
|
.filter(s => relationRegexPattern.matcher(s).find())
|
||||||
|
val settleUpEntityRdd = entityRdd.map(s => {
|
||||||
|
val me = entityRegexPattern.matcher(s)
|
||||||
|
val mp = propertyRegexPattern.matcher(s)
|
||||||
|
if (me.find()) {
|
||||||
|
(me.group("prefix") + me.group("id"), me.group())
|
||||||
|
} else {
|
||||||
|
mp.find()
|
||||||
|
(mp.group("prefix") + mp.group("id"), mp.group())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.groupByKey() //main
|
||||||
|
.values
|
||||||
|
.map(s => s.toList)
|
||||||
|
val entitySchema : Set[String] = settleUpEntityRdd
|
||||||
|
.map(s => s.filter(l => propertyRegexPattern.matcher(l).find()))
|
||||||
|
.map(s => {
|
||||||
|
s.map(line => {
|
||||||
|
val m = propertyRegexPattern.matcher(line)
|
||||||
|
m.find()
|
||||||
|
m.group("name")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.map(l => {
|
||||||
|
l.toSet
|
||||||
|
})
|
||||||
|
.reduce(_ ++ _)
|
||||||
|
|
||||||
|
|
||||||
|
val finalEntitySchema = IDName + "," +
|
||||||
|
entitySchema.reduce((a, b) => a + "," + b) + "," + LABELName
|
||||||
|
|
||||||
|
|
||||||
|
val entityObjectRdd = settleUpEntityRdd.map(l => {
|
||||||
|
val en = l.filter(s => entityRegexPattern.matcher(s).find()).head
|
||||||
|
val label = get(entityRegexPattern.matcher(en),"lprefix") +
|
||||||
|
get(entityRegexPattern.matcher(en),"label")
|
||||||
|
val id = get(entityRegexPattern.matcher(en),"prefix") + get(entityRegexPattern.matcher(en),"id")
|
||||||
|
val prop = l
|
||||||
|
.filter(s => ?(propertyRegexPattern, s))
|
||||||
|
.map(s => (
|
||||||
|
get(propertyRegexPattern.matcher(s),"name")
|
||||||
|
->
|
||||||
|
get(propertyRegexPattern.matcher(s),"value").replace("\"", "'")
|
||||||
|
)
|
||||||
|
).toArray
|
||||||
|
.toMap
|
||||||
|
new Entity(id, label, prop, entitySchema.toSeq)
|
||||||
|
})
|
||||||
|
|
||||||
|
val entityRowRdd = entityObjectRdd.map(_.getEntityRow)
|
||||||
|
|
||||||
|
val sampleEntity = entityObjectRdd.first()
|
||||||
|
|
||||||
|
val relationshipRowRdd = relationshipRdd.map(s => Seq(
|
||||||
|
get(relationRegexPattern.matcher(s),"prefix1") ,
|
||||||
|
get(relationRegexPattern.matcher(s),"id1") ,
|
||||||
|
get(relationRegexPattern.matcher(s),"tprefix") ,
|
||||||
|
get(relationRegexPattern.matcher(s),"type") ,
|
||||||
|
get(relationRegexPattern.matcher(s),"prefix2") ,
|
||||||
|
get(relationRegexPattern.matcher(s),"id2") ,
|
||||||
|
get(relationRegexPattern.matcher(s),"type")
|
||||||
|
)
|
||||||
|
).map(s => Row(s))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
var sampleSchema : Seq[String] = sampleEntity.schema
|
||||||
|
sampleSchema +:= IDName
|
||||||
|
sampleSchema :+= LABELName
|
||||||
|
val entityDFSList: Seq[StructField] = for {
|
||||||
|
s <- sampleSchema
|
||||||
|
p <- sampleEntity.propSeq
|
||||||
|
} yield {
|
||||||
|
p match {
|
||||||
|
case _: String => StructField(s, StringType, nullable = true)
|
||||||
|
case _: Array[String] => StructField(s,
|
||||||
|
DataTypes.createArrayType(StringType),
|
||||||
|
nullable = true)
|
||||||
|
case _ => StructField(s, StringType, nullable = true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val relationshipDFSchema : StructType = StructType(RSchema.split(",")
|
||||||
|
.map(i => StructField(i, StringType, nullable = true)))
|
||||||
|
|
||||||
|
val entityDFSchema : StructType = StructType(entityDFSList)
|
||||||
|
|
||||||
|
val entityDF = sq.createDataFrame(entityRowRdd, entityDFSchema)
|
||||||
|
val relationDF = sq.createDataFrame(relationshipRowRdd, relationshipDFSchema)
|
||||||
|
entityDF.show(1)
|
||||||
|
relationDF.show(1)
|
||||||
|
out.write(entityPort, entityDF)
|
||||||
|
out.write(relationshipPort, relationDF)
|
||||||
|
}
|
||||||
|
|
||||||
|
def get(m : Matcher, name : String) : String = {
|
||||||
|
if (m.find()) m.group(name)
|
||||||
|
else ""
|
||||||
|
}
|
||||||
|
|
||||||
|
def ? (regex : Pattern, str : String) : Boolean = {
|
||||||
|
regex.matcher(str).find()
|
||||||
|
}
|
||||||
|
|
||||||
|
override val inportList: List[String] = List(PortEnum.DefaultPort.toString)
|
||||||
|
override val outportList: List[String] = List(entityPort, relationshipPort)
|
||||||
|
}
|
|
@ -5,10 +5,13 @@ import org.apache.spark.sql.Row
|
||||||
|
|
||||||
import scala.collection.mutable.ListBuffer
|
import scala.collection.mutable.ListBuffer
|
||||||
|
|
||||||
class Entity( id : String, label : String, prop : Map[String, Any], schema : Seq[String]) extends Serializable
|
class Entity( id : String,
|
||||||
{
|
label : String,
|
||||||
def propSeq : Array[String] = {
|
prop : Map[String, Any],
|
||||||
var ret : ListBuffer[String] = new ListBuffer[String]()
|
val schema : Seq[String]
|
||||||
|
) extends Serializable {
|
||||||
|
var propSeq : Array[Any] = {
|
||||||
|
var ret : ListBuffer[Any] = new ListBuffer[Any]()
|
||||||
ret +:= id
|
ret +:= id
|
||||||
for (name <- schema) {
|
for (name <- schema) {
|
||||||
val value = {
|
val value = {
|
||||||
|
@ -21,12 +24,7 @@ class Entity( id : String, label : String, prop : Map[String, Any], schema : Seq
|
||||||
case _: String =>
|
case _: String =>
|
||||||
ret += value.asInstanceOf[String]
|
ret += value.asInstanceOf[String]
|
||||||
case _: Array[String] =>
|
case _: Array[String] =>
|
||||||
var temp = value.asInstanceOf[Array[String]]
|
ret += value.asInstanceOf[Array[String]]
|
||||||
val templist: ListBuffer[String] = ListBuffer()
|
|
||||||
for (t <- temp) {
|
|
||||||
templist.+=(t)
|
|
||||||
}
|
|
||||||
ret += templist.reduce((a,b) => a + ";" + b)
|
|
||||||
case _ => ret += ""
|
case _ => ret += ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,7 +33,18 @@ class Entity( id : String, label : String, prop : Map[String, Any], schema : Seq
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString: String = {
|
override def toString: String = {
|
||||||
this.propSeq.map(a =>"\"" + a + "\"").reduce((a,b) => a + "," + b)
|
val l = for {
|
||||||
|
prop : Any <- propSeq
|
||||||
|
} yield {
|
||||||
|
prop match {
|
||||||
|
case _ : String => prop.asInstanceOf[String]
|
||||||
|
case _ : Array[String] =>
|
||||||
|
val temp : String = prop.asInstanceOf[Array[String]].reduce(_ + ";" + _)
|
||||||
|
temp
|
||||||
|
case _ : Any => ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
l.reduce(_ + "," + _)
|
||||||
}
|
}
|
||||||
|
|
||||||
def getEntityRow : Row = {
|
def getEntityRow : Row = {
|
||||||
|
@ -43,9 +52,11 @@ class Entity( id : String, label : String, prop : Map[String, Any], schema : Seq
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
object Entity{
|
object Entity{
|
||||||
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
val m : Map[String, Any] = Map("sc1" -> "test1", "sc2" -> Array("2","1"))
|
val m : Map[String, Any] = Map("sc1" -> "test1", "sc2" -> Array("2","1"))
|
||||||
val e = new Entity("label1", "id2", m, Array("sc1","sc2"))
|
val e = new Entity("id1","l1", m, Array("sc1","sc2"))
|
||||||
println(e.toString) //"label1","test1","2;1","id2"
|
println(e.toString) //"label1","test1","2;1","id2"
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
package cn.piflow.bundle.util
|
||||||
|
|
||||||
|
import org.apache.spark.sql.functions.explode
|
||||||
|
import org.apache.spark.sql.{Column, DataFrame, SQLContext, SparkSession}
|
||||||
|
|
||||||
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
|
object JsonUtil extends Serializable{
|
||||||
|
|
||||||
|
|
||||||
|
// The tag you want to parse,If you want to open an array field,you have to write it like this:links_name(MasterField_ChildField)
|
||||||
|
def ParserJsonDF(df:DataFrame,tag:String): DataFrame = {
|
||||||
|
|
||||||
|
var openArrField:String=""
|
||||||
|
var ArrSchame:String=""
|
||||||
|
|
||||||
|
var tagARR: Array[String] = tag.split(",")
|
||||||
|
var tagNew:String=""
|
||||||
|
|
||||||
|
|
||||||
|
for(tt<-tagARR){
|
||||||
|
|
||||||
|
if(tt.indexOf("_")> -1){
|
||||||
|
//包含“.”
|
||||||
|
val openField: Array[String] = tt.split("_")
|
||||||
|
openArrField=openField(0)
|
||||||
|
|
||||||
|
ArrSchame+=(openField(1)+",")
|
||||||
|
}else{
|
||||||
|
tagNew+=(tt+",")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tagNew+=openArrField
|
||||||
|
ArrSchame=ArrSchame.substring(0,ArrSchame.length-1)
|
||||||
|
|
||||||
|
tagARR = tagNew.split(",")
|
||||||
|
var FinalDF:DataFrame=df
|
||||||
|
|
||||||
|
//如果用户选择返回字段
|
||||||
|
var strings: Seq[Column] =tagNew.split(",").toSeq.map(p => new Column(p))
|
||||||
|
|
||||||
|
if(tag.length>0){
|
||||||
|
val df00 = FinalDF.select(strings : _*)
|
||||||
|
FinalDF=df00
|
||||||
|
}
|
||||||
|
|
||||||
|
//如果用户选择打开的数组字段,并给出schame
|
||||||
|
if(openArrField.length>0&&ArrSchame.length>0){
|
||||||
|
|
||||||
|
val schames: Array[String] = ArrSchame.split(",")
|
||||||
|
|
||||||
|
var selARR:ArrayBuffer[String]=ArrayBuffer()//分别取出已经打开的字段
|
||||||
|
//遍历数组,封装到column对象中
|
||||||
|
var coARR:ArrayBuffer[Column]=ArrayBuffer()//打开字段的select方法用
|
||||||
|
val sss = tagNew.split(",")//打开字段后todf方法用
|
||||||
|
var co: Column =null
|
||||||
|
for(each<-tagARR){
|
||||||
|
if(each==openArrField){
|
||||||
|
co = explode(FinalDF(openArrField))
|
||||||
|
for(x<-schames){
|
||||||
|
|
||||||
|
selARR+=(openArrField+"."+x)
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
selARR+=each
|
||||||
|
co=FinalDF(each)
|
||||||
|
}
|
||||||
|
coARR+=co
|
||||||
|
}
|
||||||
|
println("###################")
|
||||||
|
selARR.foreach(println(_))
|
||||||
|
var selSEQ: Seq[Column] = selARR.toSeq.map(q => new Column(q))
|
||||||
|
|
||||||
|
var df01: DataFrame = FinalDF.select(coARR : _*).toDF(sss:_*)
|
||||||
|
FinalDF = df01.select(selSEQ : _*)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
FinalDF
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,7 +14,7 @@ class FlowTest_XX {
|
||||||
def testFlow(): Unit ={
|
def testFlow(): Unit ={
|
||||||
|
|
||||||
//parse flow json
|
//parse flow json
|
||||||
val file = "src/main/resources/decisiontree.json"
|
val file = "src/main/resources/gbt.json"
|
||||||
val flowJsonStr = FileUtil.fileReader(file)
|
val flowJsonStr = FileUtil.fileReader(file)
|
||||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||||
println(map)
|
println(map)
|
||||||
|
@ -49,7 +49,7 @@ class FlowTest_XX {
|
||||||
def testFlow2json() = {
|
def testFlow2json() = {
|
||||||
|
|
||||||
//parse flow json
|
//parse flow json
|
||||||
val file = "src/main/resources/decisiontree.json"
|
val file = "src/main/resources/gbt.json"
|
||||||
val flowJsonStr = FileUtil.fileReader(file)
|
val flowJsonStr = FileUtil.fileReader(file)
|
||||||
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,35 @@ class JsonFolderTest {
|
||||||
@Test
|
@Test
|
||||||
def testFlow(): Unit ={
|
def testFlow(): Unit ={
|
||||||
|
|
||||||
|
//测试数据
|
||||||
|
/* {
|
||||||
|
"name": "BeJson",
|
||||||
|
"url": "http://www.bejson.com",
|
||||||
|
"page": 88,
|
||||||
|
"isNonProfit": true,
|
||||||
|
"address": {
|
||||||
|
"street": "科技园路.",
|
||||||
|
"city": "江苏苏州",
|
||||||
|
"country": "中国"
|
||||||
|
},
|
||||||
|
"links": [
|
||||||
|
{
|
||||||
|
"name": "Google",
|
||||||
|
"url": "http://www.google.com"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Baidu",
|
||||||
|
"url": "http://www.baidu.com"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "SoSo",
|
||||||
|
"url": "http://www.SoSo.com"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}*/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//parse flow json
|
//parse flow json
|
||||||
val file = "src/main/resources/JsonFolderTest.json"
|
val file = "src/main/resources/JsonFolderTest.json"
|
||||||
val flowJsonStr = FileUtil.fileReader(file)
|
val flowJsonStr = FileUtil.fileReader(file)
|
||||||
|
@ -24,7 +53,6 @@ class JsonFolderTest {
|
||||||
val flow = flowBean.constructFlow()
|
val flow = flowBean.constructFlow()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//execute flow
|
//execute flow
|
||||||
val spark = SparkSession.builder()
|
val spark = SparkSession.builder()
|
||||||
.master("spark://10.0.86.89:7077")
|
.master("spark://10.0.86.89:7077")
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
package cn.piflow.bundle
|
||||||
|
|
||||||
|
import cn.piflow.Runner
|
||||||
|
import cn.piflow.conf.bean.FlowBean
|
||||||
|
import cn.piflow.conf.util.{FileUtil, OptionUtil}
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
import org.junit.Test
|
||||||
|
|
||||||
|
import scala.util.parsing.json.JSON
|
||||||
|
|
||||||
|
class MultiFolderJsonParserTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testFlow(): Unit ={
|
||||||
|
|
||||||
|
//parse flow json
|
||||||
|
val file = "src/main/resources/MultiFolderJsonParser.json"
|
||||||
|
val flowJsonStr = FileUtil.fileReader(file)
|
||||||
|
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||||
|
println(map)
|
||||||
|
|
||||||
|
//create flow
|
||||||
|
val flowBean = FlowBean(map)
|
||||||
|
val flow = flowBean.constructFlow()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
//execute flow
|
||||||
|
val spark = SparkSession.builder()
|
||||||
|
.master("spark://10.0.86.89:7077")
|
||||||
|
.appName("piflow-hive-bundle")
|
||||||
|
.config("spark.driver.memory", "1g")
|
||||||
|
.config("spark.executor.memory", "2g")
|
||||||
|
.config("spark.cores.max", "2")
|
||||||
|
.config("spark.jars","/root/Desktop/gitWORK/out/artifacts/piflow_bundle/piflow_bundle.jar")
|
||||||
|
.enableHiveSupport()
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
val process = Runner.create()
|
||||||
|
.bind(classOf[SparkSession].getName, spark)
|
||||||
|
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
|
||||||
|
.start(flow);
|
||||||
|
|
||||||
|
process.awaitTermination();
|
||||||
|
val pid = process.pid();
|
||||||
|
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
|
||||||
|
spark.close();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
def testFlow2json() = {
|
||||||
|
|
||||||
|
//parse flow json
|
||||||
|
val file = "src/main/resources/flow.json"
|
||||||
|
val flowJsonStr = FileUtil.fileReader(file)
|
||||||
|
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
|
||||||
|
|
||||||
|
//create flow
|
||||||
|
val flowBean = FlowBean(map)
|
||||||
|
val flowJson = flowBean.toJson()
|
||||||
|
println(flowJson)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue