yang qi dong
This commit is contained in:
yanfqidong0604 2018-10-12 21:47:51 +08:00
parent 7b61f021dc
commit 13d321d689
7 changed files with 421 additions and 52 deletions

View File

@ -11,6 +11,16 @@
<artifactId>piflow-bundle</artifactId>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.solr/solr-solrj -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>piflow</groupId>
<artifactId>piflow-core</artifactId>
@ -67,6 +77,7 @@
<version>0.4.2</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>

View File

@ -0,0 +1,41 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"checkpoint":"Merge",
"stops":[
{
"uuid":"0000",
"name":"SelectHiveQL",
"bundle":"cn.piflow.bundle.hive.SelectHiveQL",
"properties":{
"hiveQL":"select * from sparktest.cleanfield"
}
},
{
"uuid":"1111",
"name":"PutIntoSolr",
"bundle":"cn.piflow.bundle.solr.PutIntoSolr",
"properties":{
"solrURL":"http://10.0.88.9:8886/solr",
"SolrCollection":"core01"
}
}
],
"paths":[
{
"from":"SelectHiveQL",
"outport":"",
"inport":"",
"to":"PutIntoSolr"
}
]
}
}

View File

@ -0,0 +1,48 @@
{
"flow":{
"name":"test",
"uuid":"1234",
"checkpoint":"Merge",
"stops":[
{
"uuid":"0000",
"name":"GetFromSolr",
"bundle":"cn.piflow.bundle.solr.GetFromSolr",
"properties":{
"solrURL":"http://10.0.88.9:8886/solr",
"SolrCollection":"core01",
"QueryStr":"*:*",
"start":"1",
"rows":"2",
"sortBy":"id",
"DescentOrAscend":"Ascend",
"fl":"pid,id",
"fq":"",
"df":"",
"indent":"true|on"
}
},
{
"uuid":"1111",
"name":"JsonSave",
"bundle":"cn.piflow.bundle.json.JsonSave",
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/SolrText01.json"
}
}
],
"paths":[
{
"from":"GetFromSolr",
"outport":"",
"inport":"",
"to":"JsonSave"
}
]
}
}

View File

@ -0,0 +1,177 @@
package cn.piflow.bundle.solr
import java.util
import scala.collection.JavaConversions._
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.solr.client.solrj.SolrQuery
import org.apache.solr.client.solrj.SolrQuery.ORDER
import org.apache.solr.client.solrj.impl.HttpSolrClient
import org.apache.solr.client.solrj.response.QueryResponse
import org.apache.solr.common.{ SolrDocumentList}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable.{ ListBuffer}
class GetFromSolr extends ConfigurableStop{
override val authorEmail: String ="18525746364@163.com"
override val description: String = ""
override val inportCount: Int = 0
override val outportCount: Int = 1
var solrURL:String=_
var SolrCollection:String=_
var q:String=_
var start:String=_
var rows:String=_
var sortBy:String=_
var DescentOrAscend:String=_
var fl:String=_
var fq:String=_
var df:String=_
var indent:String=_
var ss: SparkSession=_
override def setProperties(map: Map[String, Any]): Unit = {
solrURL=MapUtil.get(map,"solrURL").asInstanceOf[String]
SolrCollection=MapUtil.get(map,"SolrCollection").asInstanceOf[String]
q=MapUtil.get(map,"QueryStr").asInstanceOf[String]
start=MapUtil.get(map,"start").asInstanceOf[String]
rows=MapUtil.get(map,"rows").asInstanceOf[String]
sortBy=MapUtil.get(map,"sortBy").asInstanceOf[String]
DescentOrAscend=MapUtil.get(map,"DescentOrAscend").asInstanceOf[String]
fl=MapUtil.get(map,"fl").asInstanceOf[String]
fq=MapUtil.get(map,"fq").asInstanceOf[String]
df=MapUtil.get(map,"df").asInstanceOf[String]
indent=MapUtil.get(map,"indent").asInstanceOf[String]
}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
var url=solrURL+"/"+SolrCollection
ss =pec.get[SparkSession]()
val context: SparkContext = ss.sparkContext
val client: HttpSolrClient = new HttpSolrClient.Builder(url).build()
val query: SolrQuery = new SolrQuery()
query.set("q",q)
if(start.size>0){
query.setStart(start.toInt)
}
if(rows.size>0){
query .setRows(rows.toInt)
}
if(sortBy.size>0){
if(DescentOrAscend.equals("descent")||DescentOrAscend.equals("Descent")){
query.setSort(sortBy,ORDER.desc)
}else if(DescentOrAscend.equals("Ascend")||DescentOrAscend.equals("ascend")){
query.setSort(sortBy,ORDER.asc)
}else{
query.setSort(sortBy,ORDER.asc)
}
}
if(fl.size>0){
query.set("fl",fl)
}
if(fq.size>0){
query.set("fq",fq)
}
if(df.size>0){
query.set("df",fq)
}
if(indent.size>0){
query.set("df",indent)
}
val response: QueryResponse = client.query(query)
var resultsList:SolrDocumentList=response.getResults()
var listbuffer : ListBuffer[Map[String, AnyRef]] = new ListBuffer[Map[String,AnyRef]]
var keySTR:String=""
for (i <- resultsList) {
val SolrDocumentMap: util.Map[String, AnyRef] = i.getFieldValueMap
val set: util.Set[String] = SolrDocumentMap.keySet()
var DFmap:Map[String,AnyRef]=Map()
keySTR=""
for(x <- set){
var key:String=x
keySTR+=(key+" ")
DFmap += ( key -> SolrDocumentMap.get(key))
}
listbuffer += DFmap
}
val rows1: List[Row] = listbuffer.toList.map(map => {
val values: Iterable[AnyRef] = map.values
val seq: Seq[AnyRef] = values.toSeq
val seqSTR: Seq[String] = values.toSeq.map(x=>x.toString)
val row: Row = Row.fromSeq(seqSTR)
row
})
val rowRDD: RDD[Row] = context.makeRDD(rows1)
val arrKey: Array[String] = keySTR.split(" ")
val fields: Array[StructField] = arrKey.map(d=>StructField(d,StringType,nullable = true))
val schema: StructType = StructType(fields)
val Fdf: DataFrame = ss.createDataFrame(rowRDD,schema)
Fdf.show()
out.write(Fdf)
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("./src/main/resources/ShellExecutor.jpg")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.SolrGroup.toString)
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val solrURL = new PropertyDescriptor().name("solrURL").displayName("solrURL").defaultValue("").allowableValues(Set("")).required(true)
descriptor = solrURL :: descriptor
val SolrCollection = new PropertyDescriptor().name("SolrCollection").displayName("SolrCollection").defaultValue("").allowableValues(Set("")).required(true)
descriptor = SolrCollection :: descriptor
val q = new PropertyDescriptor().name("q").displayName("q").defaultValue("").allowableValues(Set("")).description("Query String").required(true)
descriptor = q :: descriptor
val start = new PropertyDescriptor().name("start").displayName("start").defaultValue("Start Display Position").allowableValues(Set("")).description(" ").required(false)
descriptor = start :: descriptor
val rows = new PropertyDescriptor().name("rows").displayName("rows").description("Display Number Each Page").defaultValue("").allowableValues(Set("")).required(false)
descriptor = rows :: descriptor
val sortBy = new PropertyDescriptor().name("sortBy").displayName("sortBy").description("Sort Field").defaultValue("").allowableValues(Set("")).required(false)
descriptor = sortBy :: descriptor
val DescentOrAscend = new PropertyDescriptor().name("DescentOrAscend").displayName("DescentOrAscend").defaultValue("").allowableValues(Set("")).required(false)
descriptor = DescentOrAscend :: descriptor
val fq = new PropertyDescriptor().name("fq").displayName("fq").description("Filter Query").defaultValue("").allowableValues(Set("")).required(false)
descriptor = fq :: descriptor
val fl = new PropertyDescriptor().name("fl").description("Return Field").displayName("fl").defaultValue("").allowableValues(Set("")).required(false)
descriptor = fl :: descriptor
val df = new PropertyDescriptor().name("df").displayName("df").description("Default Query Field").defaultValue("").allowableValues(Set("")).required(false)
descriptor = df :: descriptor
val indent = new PropertyDescriptor().name("indent").displayName("indent").defaultValue("").allowableValues(Set("")).required(false)
descriptor = indent :: descriptor
descriptor
}
}

View File

@ -0,0 +1,82 @@
package cn.piflow.bundle.solr
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import cn.piflow.conf.{ConfigurableStop, StopGroupEnum}
import cn.piflow.conf.bean.PropertyDescriptor
import cn.piflow.conf.util.{ImageUtil, MapUtil}
import org.apache.solr.client.solrj.impl.HttpSolrClient
import org.apache.solr.client.solrj.response.UpdateResponse
import org.apache.solr.common.SolrInputDocument
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.DataFrame
class PutIntoSolr extends ConfigurableStop{
override val authorEmail: String = "yangqidong@cnic.cn"
override val description: String = ""
override val inportCount: Int = 1
override val outportCount: Int = 0
var solrURL:String=_
var SolrCollection:String=_
override def setProperties(map: Map[String, Any]): Unit = {
solrURL=MapUtil.get(map,"solrURL").asInstanceOf[String]
SolrCollection=MapUtil.get(map,"SolrCollection").asInstanceOf[String]
}
var client: HttpSolrClient =_
var doc:SolrInputDocument =_
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
val df: DataFrame = in.read()
val SchemaList: List[StructField] = df.schema.toList
val length: Int = SchemaList.length
var url=solrURL+"/"+SolrCollection
df.collect().foreach(row=>{
client= new HttpSolrClient.Builder(url).build()
doc= new SolrInputDocument()
for(x<-0 until length){
doc.addField(SchemaList(x).name,row.get(x))
}
val update: UpdateResponse = client.add(doc)
client.commit()
})
}
override def initialize(ctx: ProcessContext): Unit = {
}
override def getIcon(): Array[Byte] = {
ImageUtil.getImage("./src/main/resources/ShellExecutor.jpg")
}
override def getGroup(): List[String] = {
List(StopGroupEnum.SolrGroup.toString)
}
override def getPropertyDescriptor(): List[PropertyDescriptor] = {
var descriptor : List[PropertyDescriptor] = List()
val solrURL = new PropertyDescriptor().name("solrURL").displayName("solrURL").defaultValue("").allowableValues(Set("")).required(true)
descriptor = solrURL :: descriptor
val SolrCollection = new PropertyDescriptor().name("SolrCollection").displayName("SolrCollection").defaultValue("").allowableValues(Set("")).required(true)
descriptor = SolrCollection :: descriptor
descriptor
}
}

View File

@ -1,52 +0,0 @@
package cn.piflow.bundle
import cn.piflow.bundle.json.{JsonPathParser, JsonSave}
import cn.piflow.{FlowImpl, Path, Runner}
import org.apache.spark.sql.SparkSession
import org.junit.Test
class JsonTest {
@Test
def testJsonPathParser(): Unit ={
val JsonPathParserParameters = Map("jsonPath"->"hdfs://10.0.86.89:9000/xjzhu/student.json", "tag"->"student")
val JsonSavePathParameters = Map("jsonSavePath" -> "hdfs://10.0.86.89:9000/xjzhu/example_json_save")
val flow = new FlowImpl();
val jsonPathParserStop = new JsonPathParser()
jsonPathParserStop.setProperties(JsonPathParserParameters)
val jsonSaveStop = new JsonSave()
jsonSaveStop.setProperties(JsonSavePathParameters)
flow.addStop("JsonPathParser", jsonPathParserStop)
flow.addStop("JsonSave", jsonSaveStop)
flow.addPath(Path.from("JsonPathParser").to("JsonSave"));
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/piflow-jar-bundle/out/artifacts/piflow-jar-bundle/piflow-jar-bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.start(flow);
process.awaitTermination();
spark.close();
}
@Test
def testJsonStringParser(): Unit ={
}
}

View File

@ -0,0 +1,62 @@
package cn.piflow.bundle
import cn.piflow.Runner
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.{FileUtil, OptionUtil}
import org.apache.spark.sql.SparkSession
import org.junit.Test
import scala.util.parsing.json.JSON
class SolrTest {
@Test
def testFlow(): Unit ={
//parse flow json
val file = "src/main/resources/solrGET.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
println(map)
//create flow
val flowBean = FlowBean(map)
val flow = flowBean.constructFlow()
//execute flow
val spark = SparkSession.builder()
.master("spark://10.0.86.89:7077")
.appName("piflow-hive-bundle")
.config("spark.driver.memory", "1g")
.config("spark.executor.memory", "2g")
.config("spark.cores.max", "2")
.config("spark.jars","/opt/project/gitwork/out/artifacts/piflow_bundle/piflow_bundle.jar")
.enableHiveSupport()
.getOrCreate()
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path", "hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
.start(flow);
process.awaitTermination();
val pid = process.pid();
println(pid + "!!!!!!!!!!!!!!!!!!!!!")
spark.close();
}
@Test
def testFlow2json() = {
//parse flow json
val file = "src/main/resources/flow.json"
val flowJsonStr = FileUtil.fileReader(file)
val map = OptionUtil.getAny(JSON.parseFull(flowJsonStr)).asInstanceOf[Map[String, Any]]
//create flow
val flowBean = FlowBean(map)
val flowJson = flowBean.toJson()
println(flowJson)
}
}