From e028914c3316480fa293c74b7ef68bb6b1a72d3e Mon Sep 17 00:00:00 2001 From: bao319 <826013665@qq.com> Date: Tue, 31 Mar 2020 21:19:55 +0800 Subject: [PATCH] update stop --- piflow-bundle/config.properties | 9 +- .../main/resources/flow/csv/CsvParser.json | 2 +- .../resources/flow/neo4j/HiveToNeo4j.json | 43 ++++++++++ .../main/resources/flow/neo4j/RunCypher.json | 38 +++++++++ .../cn/piflow/bundle/csv/CsvParser.scala | 4 +- .../bundle/neo4j/AllFieldsCleanNeo4j.scala | 6 +- .../cn/piflow/bundle/neo4j/HiveToNeo4j.scala | 84 +++++++++++++++---- .../cn/piflow/bundle/neo4j/RunCypher.scala | 28 ++++++- .../HiveToNeo4jTest.scala} | 18 ++-- .../RunCypherTest.scala} | 18 ++-- 10 files changed, 209 insertions(+), 41 deletions(-) create mode 100644 piflow-bundle/src/main/resources/flow/neo4j/HiveToNeo4j.json create mode 100644 piflow-bundle/src/main/resources/flow/neo4j/RunCypher.json rename piflow-bundle/src/test/scala/cn/piflow/bundle/{csv/FolderCsvParserTest.scala => neo4j/HiveToNeo4jTest.scala} (72%) rename piflow-bundle/src/test/scala/cn/piflow/bundle/{csv/CsvSaveAsOverwriteTest.scala => neo4j/RunCypherTest.scala} (72%) diff --git a/piflow-bundle/config.properties b/piflow-bundle/config.properties index 74d256e..7dfb143 100644 --- a/piflow-bundle/config.properties +++ b/piflow-bundle/config.properties @@ -2,13 +2,12 @@ spark.master=yarn spark.deploy.mode=cluster #hdfs default file system -fs.defaultFS=hdfs://10.0.86.191:9000 - +fs.defaultFS=hdfs://192.168.3.138:8020 #yarn resourcemanager hostname -yarn.resourcemanager.hostname=10.0.86.191 +yarn.resourcemanager.hostname=192.168.3.139 #if you want to use hive, set hive metastore uris -hive.metastore.uris=thrift://10.0.88.71:9083 +hive.metastore.uris=thrift://192.168.3.140:9083 #show data in log, set 0 if you do not show the logs data.show=10 @@ -20,4 +19,4 @@ monitor.throughput=true server.port=8001 #h2db port -h2.port=50001 +h2.port=50001 \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/flow/csv/CsvParser.json b/piflow-bundle/src/main/resources/flow/csv/CsvParser.json index 91bb754..1293757 100644 --- a/piflow-bundle/src/main/resources/flow/csv/CsvParser.json +++ b/piflow-bundle/src/main/resources/flow/csv/CsvParser.json @@ -9,7 +9,7 @@ "name":"CsvParser", "bundle":"cn.piflow.bundle.csv.CsvParser", "properties":{ - "csvPath":"hdfs://192.168.3.138:8020/test/", + "csvPath":"hdfs://192.168.3.138:8020/test/csvparse.csv", "header": "true", "delimiter":",", "schema":"id,name,gender,age" diff --git a/piflow-bundle/src/main/resources/flow/neo4j/HiveToNeo4j.json b/piflow-bundle/src/main/resources/flow/neo4j/HiveToNeo4j.json new file mode 100644 index 0000000..4dcb72d --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/neo4j/HiveToNeo4j.json @@ -0,0 +1,43 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "checkpoint":"Merge", + "stops":[ + { + "uuid":"1111", + "name":"SelectHiveQL", + "bundle":"cn.piflow.bundle.hive.SelectHiveQL", + "properties":{ + "hiveQL":"select * from test.user1" + } + }, + { + "uuid":"0000", + "name":"HiveToNeo4j", + "bundle":"cn.piflow.bundle.neo4j.HiveToNeo4j", + "properties": { + "hdfsDirPath": "/test", + "hdfsUrl":"hdfs://192.168.3.138:8020", + "fileName": "user1.csv", + "delimiter":",", + "header": "true", + "url":"bolt://192.168.3.141:7687", + "userName": "neo4j", + "password":"null", + "cypher": "USING PERIODIC COMMIT 100 LOAD CSV WITH HEADERS FROM 'http://192.168.3.138:50070/webhdfs/v1/test/user1.csv? op=OPEN' AS line FIELDTERMINATOR ',' CREATE (n:user{userid:line.id,username:line.name,userscore:line.score,userschool:line.school,userclass:line.class})" + } + + } + ], + "paths":[ + { + "from":"SelectHiveQL", + "outport":"", + "inport":"", + "to":"HiveToNeo4j" + } + + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/resources/flow/neo4j/RunCypher.json b/piflow-bundle/src/main/resources/flow/neo4j/RunCypher.json new file mode 100644 index 0000000..1f68b63 --- /dev/null +++ b/piflow-bundle/src/main/resources/flow/neo4j/RunCypher.json @@ -0,0 +1,38 @@ +{ + "flow":{ + "name":"test", + "uuid":"1234", + "checkpoint":"Merge", + "stops":[ + { + "uuid":"1111", + "name":"SelectHiveQL", + "bundle":"cn.piflow.bundle.hive.SelectHiveQL", + "properties":{ + "hiveQL":"select * from test.user1" + } + }, + { + "uuid":"0000", + "name":"HiveToNeo4j", + "bundle":"cn.piflow.bundle.neo4j.HiveToNeo4j", + "properties": { + "url":"bolt://192.168.3.141:7687", + "userName": "neo4j", + "password":"null", + "cypher": "USING PERIODIC COMMIT 100 LOAD CSV WITH HEADERS FROM 'http://192.168.3.138:50070/webhdfs/v1/test/user1.csv? op=OPEN' AS line FIELDTERMINATOR ',' CREATE (n:user{userid:line.id,username:line.name,userscore:line.score,userschool:line.school,userclass:line.class})" + } + + } + ], + "paths":[ + { + "from":"SelectHiveQL", + "outport":"", + "inport":"", + "to":"HiveToNeo4j" + } + + ] + } +} \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala index 19b9a49..776199f 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/csv/CsvParser.scala @@ -11,7 +11,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} class CsvParser extends ConfigurableStop{ val authorEmail: String = "xjzhu@cnic.cn" - val description: String = "Parse csv file" + val description: String = "Parse csv file or folder" val inportList: List[String] = List(Port.DefaultPort) val outportList: List[String] = List(Port.DefaultPort) @@ -69,7 +69,7 @@ class CsvParser extends ConfigurableStop{ val csvPath = new PropertyDescriptor() .name("csvPath") .displayName("CsvPath") - .description("The path of csv file") + .description("The path of csv file or folder") .defaultValue("") .required(true) .example("hdfs://127.0.0.1:9000/test/") diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/AllFieldsCleanNeo4j.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/AllFieldsCleanNeo4j.scala index 2d6837c..ada6be0 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/AllFieldsCleanNeo4j.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/AllFieldsCleanNeo4j.scala @@ -14,8 +14,8 @@ class AllFieldsCleanNeo4j extends ConfigurableStop { val authorEmail: String = "anhong12@cnic.cn" val description: String = "Clean DataFrame for NSFC Neo4j" - val inportList: List[String] = List(Port.DefaultPort.toString) - val outportList: List[String] = List(Port.DefaultPort.toString) + val inportList: List[String] = List(Port.DefaultPort) + val outportList: List[String] = List(Port.DefaultPort) // @@ -81,7 +81,7 @@ class AllFieldsCleanNeo4j extends ConfigurableStop { } override def getGroup(): List[String] = { - List(StopGroup.Neo4jGroup.toString) + List(StopGroup.Neo4jGroup) } } diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/HiveToNeo4j.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/HiveToNeo4j.scala index 938b377..6483954 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/HiveToNeo4j.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/HiveToNeo4j.scala @@ -14,8 +14,8 @@ import scala.collection.mutable.ArrayBuffer class HiveToNeo4j extends ConfigurableStop{ override val authorEmail: String = "anhong12@cnic.cn" override val description: String = "Hive to Neo4j" - override val inportList: List[String] =List(Port.DefaultPort.toString) - override val outportList: List[String] = List(Port.DefaultPort.toString) + override val inportList: List[String] =List(Port.DefaultPort) + override val outportList: List[String] = List(Port.DefaultPort) var hiveQL:String = _ @@ -29,7 +29,6 @@ class HiveToNeo4j extends ConfigurableStop{ var pathARR:ArrayBuffer[String]=ArrayBuffer() var oldFilePath:String = _ - var url : String =_ var userName : String =_ var password : String =_ @@ -157,29 +156,86 @@ class HiveToNeo4j extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val hiveQL = new PropertyDescriptor().name("hiveQL").displayName("HiveQL").defaultValue("").required(true) + val hiveQL = new PropertyDescriptor() + .name("hiveQL") + .displayName("HiveQL") + .defaultValue("") + .required(true) + .example("") - val hdfsDirPath = new PropertyDescriptor().name("hdfsDirPath").displayName("hdfsDirPath").defaultValue("/piflow-CSV-of-Neo4j/xxxxx").required(true) - val hdfsUrl = new PropertyDescriptor().name("hdfsUrl").displayName("hdfsUrl").defaultValue("hdfs://192.168.3.138:8020").required(true) + val hdfsDirPath = new PropertyDescriptor() + .name("hdfsDirPath") + .displayName("HdfsDirPath") + .defaultValue("/piflow-CSV-of-Neo4j/xxxxx") + .required(true) + .example("") - val fileName = new PropertyDescriptor().name("fileName").displayName("fileName").defaultValue("").required(true) + val hdfsUrl = new PropertyDescriptor() + .name("hdfsUrl") + .displayName("HdfsUrl") + .defaultValue("hdfs://192.168.3.138:8020") + .required(true) + .example("") - val delimiter = new PropertyDescriptor().name("delimiter").displayName("delimiter").description("type is csv ,please set it ").defaultValue("¤").required(true) + val fileName = new PropertyDescriptor() + .name("fileName") + .displayName("FileName") + .defaultValue("") + .required(true) + .example("") + + val delimiter = new PropertyDescriptor() + .name("delimiter") + .displayName("Delimiter") + .description("type is csv ,please set it ") + .defaultValue("¤") + .required(true) + .example("") //header - val header = new PropertyDescriptor().name("header").displayName("header").description("Whether the csv file have header or not").defaultValue("true").allowableValues(Set("true", "false")).required(true) + val header = new PropertyDescriptor() + .name("header") + .displayName("Header") + .description("Whether the csv file have header or not") + .defaultValue("true") + .allowableValues(Set("true", "false")) + .required(true) + .example("") - val url=new PropertyDescriptor().name("url").displayName("url").description("for example bolt://0.0.1.1:7687").defaultValue("bolt://192.168.3.141:7687").required(true) + val url=new PropertyDescriptor() + .name("url") + .displayName("Url") + .description("for example bolt://0.0.1.1:7687") + .defaultValue("bolt://127.0.0.1:7687") + .required(true) + .example("") - val userName=new PropertyDescriptor().name("userName").displayName("userName").description("the user").defaultValue("neo4j").required(true) + val userName=new PropertyDescriptor() + .name("userName") + .displayName("UserName") + .description("the user") + .defaultValue("neo4j") + .required(true) + .example("") - val password=new PropertyDescriptor().name("password").displayName("password").description("the password").defaultValue("").required(true) + val password=new PropertyDescriptor() + .name("password") + .displayName("Password") + .description("the password") + .defaultValue("") + .required(true) + .example("") - val cypher=new PropertyDescriptor().name("cypher").displayName("cypher").description(" the Cypher").defaultValue("").required(true) + val cypher=new PropertyDescriptor() + .name("cypher") + .displayName("Cypher") + .description(" the Cypher") + .defaultValue("") + .required(true) + .example("") descriptor = hiveQL :: descriptor - descriptor = hdfsUrl :: descriptor descriptor = hdfsDirPath :: descriptor descriptor = fileName :: descriptor diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/RunCypher.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/RunCypher.scala index 614b8c9..3facf75 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/RunCypher.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/neo4j/RunCypher.scala @@ -44,16 +44,36 @@ class RunCypher extends ConfigurableStop{ override def getPropertyDescriptor(): List[PropertyDescriptor] = { var descriptor : List[PropertyDescriptor] = List() - val url=new PropertyDescriptor().name("url").displayName("url").description("for example bolt://0.0.1.1:7687").defaultValue("").required(true) + val url=new PropertyDescriptor().name("url") + .displayName("url") + .description("for example bolt://0.0.1.1:7687") + .defaultValue("") + .required(true) descriptor = url :: descriptor - val userName=new PropertyDescriptor().name("userName").displayName("userName").description("the user").defaultValue("").required(true) + val userName=new PropertyDescriptor() + .name("userName") + .displayName("UserName") + .description("The user") + .defaultValue("") + .required(true) descriptor = userName :: descriptor - val password=new PropertyDescriptor().name("password").displayName("password").description("the password").defaultValue("").required(true) + val password=new PropertyDescriptor() + .name("password") + .displayName("Password") + .description("The password of neo4j") + .defaultValue("") + .required(true) + .sensitive(true) descriptor = password :: descriptor - val cql=new PropertyDescriptor().name("cql").displayName("cql").description(" the Cypher").defaultValue("").required(true) + val cql=new PropertyDescriptor() + .name("cql") + .displayName("cql") + .description(" The Cypher") + .defaultValue("") + .required(true) descriptor = cql :: descriptor descriptor diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/FolderCsvParserTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/neo4j/HiveToNeo4jTest.scala similarity index 72% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/csv/FolderCsvParserTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/neo4j/HiveToNeo4jTest.scala index 30a00fe..e97b706 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/FolderCsvParserTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/neo4j/HiveToNeo4jTest.scala @@ -1,22 +1,24 @@ -package cn.piflow.bundle.csv +package cn.piflow.bundle.neo4j + +import java.net.InetAddress import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} -import cn.piflow.util.PropertyUtil +import cn.piflow.util.{PropertyUtil, ServerIpUtil} import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class FolderCsvParserTest { +class HiveToNeo4jTest { @Test def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow/csv/FolderCsvParser.json" + val file = "src/main/resources/flow/neo4j/HiveToNeo4j.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -24,16 +26,20 @@ class FolderCsvParserTest { //create flow val flowBean = FlowBean(map) val flow = flowBean.constructFlow() + + val ip = InetAddress.getLocalHost.getHostAddress + cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile()) + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() //execute flow val spark = SparkSession.builder() .master("local[*]") - .appName("piflow-hive-bundle") + .appName("CsvParserTest") .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate() diff --git a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsOverwriteTest.scala b/piflow-bundle/src/test/scala/cn/piflow/bundle/neo4j/RunCypherTest.scala similarity index 72% rename from piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsOverwriteTest.scala rename to piflow-bundle/src/test/scala/cn/piflow/bundle/neo4j/RunCypherTest.scala index 5845fad..95d53e0 100644 --- a/piflow-bundle/src/test/scala/cn/piflow/bundle/csv/CsvSaveAsOverwriteTest.scala +++ b/piflow-bundle/src/test/scala/cn/piflow/bundle/neo4j/RunCypherTest.scala @@ -1,22 +1,24 @@ -package cn.piflow.bundle.csv +package cn.piflow.bundle.neo4j + +import java.net.InetAddress import cn.piflow.Runner import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.{FileUtil, OptionUtil} -import cn.piflow.util.PropertyUtil +import cn.piflow.util.{PropertyUtil, ServerIpUtil} import org.apache.spark.sql.SparkSession import org.h2.tools.Server import org.junit.Test import scala.util.parsing.json.JSON -class CsvSaveAsOverwriteTest { +class RunCypherTest { @Test def testFlow(): Unit ={ //parse flow json - val file = "src/main/resources/flow/csv/CsvSaveAsOverWrite.json" + val file = "src/main/resources/flow/neo4j/RunCypher.json" val flowJsonStr = FileUtil.fileReader(file) val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]] println(map) @@ -24,16 +26,20 @@ class CsvSaveAsOverwriteTest { //create flow val flowBean = FlowBean(map) val flow = flowBean.constructFlow() + + val ip = InetAddress.getLocalHost.getHostAddress + cn.piflow.util.FileUtil.writeFile("server.ip=" + ip, ServerIpUtil.getServerIpFile()) + val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort", "50001").start() //execute flow val spark = SparkSession.builder() .master("local[*]") - .appName("piflow-hive-bundle") + .appName("CsvParserTest") .config("spark.driver.memory", "1g") .config("spark.executor.memory", "2g") .config("spark.cores.max", "2") - .config("hive.metastore.uris", PropertyUtil.getPropertyValue("hive.metastore.uris")) + .config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .enableHiveSupport() .getOrCreate()