This commit is contained in:
lj044500 2018-09-04 10:48:35 +08:00
parent ed1c4e47fa
commit e4d51735cc
1 changed files with 50 additions and 5 deletions

View File

@ -3,7 +3,7 @@ package cn.piflow.bundle
import cn.piflow._
import cn.piflow.bundle.common.SelectField
import cn.piflow.bundle.hive.PutHiveStreaming
import cn.piflow.bundle.xml.XmlParser
import cn.piflow.bundle.xml.{FolderXmlParser, XmlParser}
import org.apache.spark.sql.SparkSession
import org.junit.Test
@ -13,7 +13,6 @@ class XmlTest {
def testNodeXML(): Unit = {
val flow = new FlowImpl();
/*val schema = StructType(Array(
StructField("_key", StringType, nullable = true),
StructField("_mdate", StringType, nullable = true),
@ -24,9 +23,9 @@ class XmlTest {
StructField("ee", StringType, nullable = true),
StructField("note", StringType, nullable = true)
))*/
val xmlParserParameters = Map("xmlpath"->"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", "rowTag" -> "phdthesis")
val xmlParserParameters = Map("xmlpath" -> "hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml", "rowTag" -> "phdthesis")
val selectedFieldParameters = Map("selectedField"->"title,author,pages")
val selectedFieldParameters = Map("selectedField" -> "title,author,pages")
val putHiveStreamingParameters = Map("database" -> "sparktest", "table" -> "dblp_phdthesis")
@ -52,7 +51,7 @@ class XmlTest {
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "3")
.config("spark.jars","/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
.config("spark.jars", "/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
.enableHiveSupport()
.getOrCreate()
@ -63,4 +62,50 @@ class XmlTest {
process.awaitTermination();
spark.close();
}
@Test
def testFolderXml(): Unit ={
val flow = new FlowImpl();
val folderXmlParserParameters = Map("xmlpath" -> "hdfs://10.0.86.89:9000/test", "rowTag" -> "phdthesis")
val selectedFieldParameters = Map("schema" -> "title,author,pages")
val putHiveStreamingParameters = Map("database" -> "sparktest", "table" -> "xmldblp_phdthesis")
val folderXmlParserStop = new FolderXmlParser
folderXmlParserStop.setProperties(folderXmlParserParameters)
val selectFieldStop = new SelectField
selectFieldStop.setProperties(selectedFieldParameters)
val putHiveStreamingStop = new PutHiveStreaming
putHiveStreamingStop.setProperties(putHiveStreamingParameters)
flow.addStop("FolderXmlParser", folderXmlParserStop);
flow.addPath(Path.from("FolderXmlParser").to("SelectField"))
flow.addStop("SelectField", selectFieldStop);
flow.addStop("PutHiveStreaming", putHiveStreamingStop);
flow.addPath(Path.from("SelectField").to("PutHiveStreaming"))
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("DblpParserTest")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "3")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
// .config("spark.jars", "/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
}