update stop

This commit is contained in:
bao319 2020-03-31 21:19:55 +08:00
parent e3f59f335f
commit e028914c33
10 changed files with 209 additions and 41 deletions

View File

@ -2,13 +2,12 @@ spark.master=yarn
spark.deploy.mode=cluster spark.deploy.mode=cluster
#hdfs default file system #hdfs default file system
fs.defaultFS=hdfs://10.0.86.191:9000 fs.defaultFS=hdfs://192.168.3.138:8020
#yarn resourcemanager hostname #yarn resourcemanager hostname
yarn.resourcemanager.hostname=10.0.86.191 yarn.resourcemanager.hostname=192.168.3.139
#if you want to use hive, set hive metastore uris #if you want to use hive, set hive metastore uris
hive.metastore.uris=thrift://10.0.88.71:9083 hive.metastore.uris=thrift://192.168.3.140:9083
#show data in log, set 0 if you do not show the logs #show data in log, set 0 if you do not show the logs
data.show=10 data.show=10
@ -20,4 +19,4 @@ monitor.throughput=true
server.port=8001 server.port=8001
#h2db port #h2db port
h2.port=50001 h2.port=50001

View File

@ -9,7 +9,7 @@
"name":"CsvParser", "name":"CsvParser",
"bundle":"cn.piflow.bundle.csv.CsvParser", "bundle":"cn.piflow.bundle.csv.CsvParser",
"properties":{ "properties":{
"csvPath":"hdfs://192.168.3.138:8020/test/", "csvPath":"hdfs://192.168.3.138:8020/test/csvparse.csv",
"header": "true", "header": "true",
"delimiter":",", "delimiter":",",
"schema":"id,name,gender,age" "schema":"id,name,gender,age"

View File

@ -0,0 +1,43 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"checkpoint":"Merge",
"stops":[
{
"uuid":"1111",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
},
{
"uuid":"0000",
"name":"HiveToNeo4j",
"bundle":"cn.piflow.bundle.neo4j.HiveToNeo4j",
"properties": {
"hdfsDirPath": "/test",
"hdfsUrl":"hdfs://192.168.3.138:8020",
"fileName": "user1.csv",
"delimiter":",",
"header": "true",
"url":"bolt://192.168.3.141:7687",
"userName": "neo4j",
"password":"null",
"cypher": "USING PERIODIC COMMIT 100 LOAD CSV WITH HEADERS FROM 'http://192.168.3.138:50070/webhdfs/v1/test/user1.csv? op=OPEN' AS line FIELDTERMINATOR ',' CREATE (n:user{userid:line.id,username:line.name,userscore:line.score,userschool:line.school,userclass:line.class})"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"HiveToNeo4j"
}
]
}
}

View File

@ -0,0 +1,38 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"checkpoint":"Merge",
"stops":[
{
"uuid":"1111",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from test.user1"
}
},
{
"uuid":"0000",
"name":"HiveToNeo4j",
"bundle":"cn.piflow.bundle.neo4j.HiveToNeo4j",
"properties": {
"url":"bolt://192.168.3.141:7687",
"userName": "neo4j",
"password":"null",
"cypher": "USING PERIODIC COMMIT 100 LOAD CSV WITH HEADERS FROM 'http://192.168.3.138:50070/webhdfs/v1/test/user1.csv? op=OPEN' AS line FIELDTERMINATOR ',' CREATE (n:user{userid:line.id,username:line.name,userscore:line.score,userschool:line.school,userclass:line.class})"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"HiveToNeo4j"
}
]
}
}

View File

@ -11,7 +11,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
class CsvParser extends ConfigurableStop{ class CsvParser extends ConfigurableStop{
val authorEmail: String = "xjzhu@cnic.cn" val authorEmail: String = "xjzhu@cnic.cn"
val description: String = "Parse csv file" val description: String = "Parse csv file or folder"
val inportList: List[String] = List(Port.DefaultPort) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort) val outportList: List[String] = List(Port.DefaultPort)
@ -69,7 +69,7 @@ class CsvParser extends ConfigurableStop{
val csvPath = new PropertyDescriptor() val csvPath = new PropertyDescriptor()
.name("csvPath") .name("csvPath")
.displayName("CsvPath") .displayName("CsvPath")
.description("The path of csv file") .description("The path of csv file or folder")
.defaultValue("") .defaultValue("")
.required(true) .required(true)
.example("hdfs://127.0.0.1:9000/test/") .example("hdfs://127.0.0.1:9000/test/")

View File

@ -14,8 +14,8 @@ class AllFieldsCleanNeo4j extends ConfigurableStop {
val authorEmail: String = "anhong12@cnic.cn" val authorEmail: String = "anhong12@cnic.cn"
val description: String = "Clean DataFrame for NSFC Neo4j" val description: String = "Clean DataFrame for NSFC Neo4j"
val inportList: List[String] = List(Port.DefaultPort.toString) val inportList: List[String] = List(Port.DefaultPort)
val outportList: List[String] = List(Port.DefaultPort.toString) val outportList: List[String] = List(Port.DefaultPort)
// //
@ -81,7 +81,7 @@ class AllFieldsCleanNeo4j extends ConfigurableStop {
} }
override def getGroup(): List[String] = { override def getGroup(): List[String] = {
List(StopGroup.Neo4jGroup.toString) List(StopGroup.Neo4jGroup)
} }
} }

View File

@ -14,8 +14,8 @@ import scala.collection.mutable.ArrayBuffer
class HiveToNeo4j extends ConfigurableStop{ class HiveToNeo4j extends ConfigurableStop{
override val authorEmail: String = "anhong12@cnic.cn" override val authorEmail: String = "anhong12@cnic.cn"
override val description: String = "Hive to Neo4j" override val description: String = "Hive to Neo4j"
override val inportList: List[String] =List(Port.DefaultPort.toString) override val inportList: List[String] =List(Port.DefaultPort)
override val outportList: List[String] = List(Port.DefaultPort.toString) override val outportList: List[String] = List(Port.DefaultPort)
var hiveQL:String = _ var hiveQL:String = _
@ -29,7 +29,6 @@ class HiveToNeo4j extends ConfigurableStop{
var pathARR:ArrayBuffer[String]=ArrayBuffer() var pathARR:ArrayBuffer[String]=ArrayBuffer()
var oldFilePath:String = _ var oldFilePath:String = _
var url : String =_ var url : String =_
var userName : String =_ var userName : String =_
var password : String =_ var password : String =_
@ -157,29 +156,86 @@ class HiveToNeo4j extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val hiveQL = new PropertyDescriptor().name("hiveQL").displayName("HiveQL").defaultValue("").required(true) val hiveQL = new PropertyDescriptor()
.name("hiveQL")
.displayName("HiveQL")
.defaultValue("")
.required(true)
.example("")
val hdfsDirPath = new PropertyDescriptor().name("hdfsDirPath").displayName("hdfsDirPath").defaultValue("/piflow-CSV-of-Neo4j/xxxxx").required(true) val hdfsDirPath = new PropertyDescriptor()
val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("hdfs://192.168.3.138:8020").required(true) .name("hdfsDirPath")
.displayName("HdfsDirPath")
.defaultValue("/piflow-CSV-of-Neo4j/xxxxx")
.required(true)
.example("")
val fileName = new PropertyDescriptor().name("fileName").displayName("fileName").defaultValue("").required(true) val hdfsUrl = new PropertyDescriptor()
.name("hdfsUrl")
.displayName("HdfsUrl")
.defaultValue("hdfs://192.168.3.138:8020")
.required(true)
.example("")
val delimiter = new PropertyDescriptor().name("delimiter").displayName("delimiter").description("type is csv ,please set it ").defaultValue("¤").required(true) val fileName = new PropertyDescriptor()
.name("fileName")
.displayName("FileName")
.defaultValue("")
.required(true)
.example("")
val delimiter = new PropertyDescriptor()
.name("delimiter")
.displayName("Delimiter")
.description("type is csv ,please set it ")
.defaultValue("¤")
.required(true)
.example("")
//header //header
val header = new PropertyDescriptor().name("header").displayName("header").description("Whether the csv file have header or not").defaultValue("true").allowableValues(Set("true", "false")).required(true) val header = new PropertyDescriptor()
.name("header")
.displayName("Header")
.description("Whether the csv file have header or not")
.defaultValue("true")
.allowableValues(Set("true", "false"))
.required(true)
.example("")
val url=new PropertyDescriptor().name("url").displayName("url").description("for example bolt://0.0.1.1:7687").defaultValue("bolt://192.168.3.141:7687").required(true) val url=new PropertyDescriptor()
.name("url")
.displayName("Url")
.description("for example bolt://0.0.1.1:7687")
.defaultValue("bolt://127.0.0.1:7687")
.required(true)
.example("")
val userName=new PropertyDescriptor().name("userName").displayName("userName").description("the user").defaultValue("neo4j").required(true) val userName=new PropertyDescriptor()
.name("userName")
.displayName("UserName")
.description("the user")
.defaultValue("neo4j")
.required(true)
.example("")
val password=new PropertyDescriptor().name("password").displayName("password").description("the password").defaultValue("").required(true) val password=new PropertyDescriptor()
.name("password")
.displayName("Password")
.description("the password")
.defaultValue("")
.required(true)
.example("")
val cypher=new PropertyDescriptor().name("cypher").displayName("cypher").description(" the Cypher").defaultValue("").required(true) val cypher=new PropertyDescriptor()
.name("cypher")
.displayName("Cypher")
.description(" the Cypher")
.defaultValue("")
.required(true)
.example("")
descriptor = hiveQL :: descriptor descriptor = hiveQL :: descriptor
descriptor = hdfsUrl :: descriptor descriptor = hdfsUrl :: descriptor
descriptor = hdfsDirPath :: descriptor descriptor = hdfsDirPath :: descriptor
descriptor = fileName :: descriptor descriptor = fileName :: descriptor

View File

@ -44,16 +44,36 @@ class RunCypher extends ConfigurableStop{
override def getPropertyDescriptor(): List[PropertyDescriptor] = { override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List() var descriptor : List[PropertyDescriptor] = List()
val url=new PropertyDescriptor().name("url").displayName("url").description("for example bolt://0.0.1.1:7687").defaultValue("").required(true) val url=new PropertyDescriptor().name("url")
.displayName("url")
.description("for example bolt://0.0.1.1:7687")
.defaultValue("")
.required(true)
descriptor = url :: descriptor descriptor = url :: descriptor
val userName=new PropertyDescriptor().name("userName").displayName("userName").description("the user").defaultValue("").required(true) val userName=new PropertyDescriptor()
.name("userName")
.displayName("UserName")
.description("The user")
.defaultValue("")
.required(true)
descriptor = userName :: descriptor descriptor = userName :: descriptor
val password=new PropertyDescriptor().name("password").displayName("password").description("the password").defaultValue("").required(true) val password=new PropertyDescriptor()
.name("password")
.displayName("Password")
.description("The password of neo4j")
.defaultValue("")
.required(true)
.sensitive(true)
descriptor = password :: descriptor descriptor = password :: descriptor
val cql=new PropertyDescriptor().name("cql").displayName("cql").description(" the Cypher").defaultValue("").required(true) val cql=new PropertyDescriptor()
.name("cql")
.displayName("cql")
.description(" The Cypher")
.defaultValue("")
.required(true)
descriptor = cql :: descriptor descriptor = cql :: descriptor
descriptor descriptor

View File

@ -1,22 +1,24 @@
package cn.piflow.bundle.csv package cn.piflow.bundle.neo4j
import java.net.InetAddress
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
class FolderCsvParserTest { class HiveToNeo4jTest {
@Test @Test
def testFlow(): Unit ={ def testFlow(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/flow/csv/FolderCsvParser.json" val file = "src/main/resources/flow/neo4j/HiveToNeo4j.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)
@ -24,16 +26,20 @@ class FolderCsvParserTest {
//create flow //create flow
val flowBean = FlowBean(map) val flowBean = FlowBean(map)
val flow = flowBean.constructFlow() val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow //execute flow
val spark = SparkSession.builder() val spark = SparkSession.builder()
.master("local[*]") .master("local[*]")
.appName("piflow-hive-bundle") .appName("CsvParserTest")
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()

View File

@ -1,22 +1,24 @@
package cn.piflow.bundle.csv package cn.piflow.bundle.neo4j
import java.net.InetAddress
import cn.piflow.Runner import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil} import cn.piflow.conf.util.{FileUtil, OptionUtil}
import cn.piflow.util.PropertyUtil import cn.piflow.util.{PropertyUtil, ServerIpUtil}
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.h2.tools.Server import org.h2.tools.Server
import org.junit.Test import org.junit.Test
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
class CsvSaveAsOverwriteTest { class RunCypherTest {
@Test @Test
def testFlow(): Unit ={ def testFlow(): Unit ={
//parse flow json //parse flow json
val file = "src/main/resources/flow/csv/CsvSaveAsOverWrite.json" val file = "src/main/resources/flow/neo4j/RunCypher.json"
val flowJsonStr = FileUtil.fileReader(file) val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map) println(map)
@ -24,16 +26,20 @@ class CsvSaveAsOverwriteTest {
//create flow //create flow
val flowBean = FlowBean(map) val flowBean = FlowBean(map)
val flow = flowBean.constructFlow() val flow = flowBean.constructFlow()
val ip = InetAddress.getLocalHost.getHostAddress
cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile())
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start()
//execute flow //execute flow
val spark = SparkSession.builder() val spark = SparkSession.builder()
.master("local[*]") .master("local[*]")
.appName("piflow-hive-bundle") .appName("CsvParserTest")
.config("spark.driver.memory", "1g") .config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g") .config("spark.executor.memory", "2g")
.config("spark.cores.max", "2") .config("spark.cores.max", "2")
.config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.enableHiveSupport() .enableHiveSupport()
.getOrCreate() .getOrCreate()