From 6873944e028144ccb2baa4db2cd51a5dbb18a69e Mon Sep 17 00:00:00 2001 From: judy0131 Date: Thu, 15 Aug 2019 14:26:41 +0800 Subject: [PATCH 1/3] modify flow debug api --- .../scala/cn/piflow/conf/ConfigurableIncrementalStop.scala | 2 +- piflow-core/src/main/scala/cn/piflow/main.scala | 3 ++- piflow-server/src/main/scala/cn/piflow/api/API.scala | 7 +++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableIncrementalStop.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableIncrementalStop.scala index 70b8b46..49db7b7 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableIncrementalStop.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableIncrementalStop.scala @@ -12,7 +12,7 @@ abstract class ConfigurableIncrementalStop extends ConfigurableStop with Increme override var incrementalPath: String = _ override def init(flowName : String, stopName : String): Unit = { - incrementalPath = PropertyUtil.getPropertyValue("increment.path") + "/" + flowName + "/" + stopName + incrementalPath = PropertyUtil.getPropertyValue("increment.path").stripSuffix("/") + "/" + flowName + "/" + stopName } diff --git a/piflow-core/src/main/scala/cn/piflow/main.scala b/piflow-core/src/main/scala/cn/piflow/main.scala index 80838ab..db38011 100644 --- a/piflow-core/src/main/scala/cn/piflow/main.scala +++ b/piflow-core/src/main/scala/cn/piflow/main.scala @@ -514,7 +514,8 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging { val portName = if(en._1.equals("")) "default" else en._1 val portDataPath = debugPath + "/" + portName //println(portDataPath) - en._2.apply().write.json(portDataPath) + println(en._2.apply().schema) + en._2.apply().na.fill("null").write.json(portDataPath) }) } diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 059dfc3..191a9dd 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -241,11 +241,14 @@ object API { readLines.takeWhile( _ != null).foreach( line => { println(line) - result += line + "\n" + result += line + "," }) }) - result.stripSuffix("\n") + result = result.stripSuffix(",") + + val json = """{"debugInfo" : [ """ + result + """]}""" + json } def getStopInfo(bundle : String) : String = { From 89fb2fb5b0ce5db6a44c517601a6744e08ed5dbc Mon Sep 17 00:00:00 2001 From: or <43975495+yg000@users.noreply.github.com> Date: Mon, 19 Aug 2019 17:14:21 +0800 Subject: [PATCH 2/3] Update ScopusSearchArticle.scala --- .../nsfc/externalAcquisition/ScopusSearchArticle.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/externalAcquisition/ScopusSearchArticle.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/externalAcquisition/ScopusSearchArticle.scala index 8f219b4..0a7c1ae 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/externalAcquisition/ScopusSearchArticle.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/nsfc/externalAcquisition/ScopusSearchArticle.scala @@ -99,9 +99,7 @@ class ScopusSearchArticle extends ConfigurableStop { prismUrls = regula(reg, titleJsonString) for (x <- 0 until prismUrls.size) { - println(num+"----------------------"+scopusIds.get(x)) - num+=1 - + prismUrl = prismUrls.get(x) + s"?field=author&apikey=${apiKey}&httpAccept=application%2Fjson" authorJsonString = getHttp(prismUrl) @@ -128,10 +126,11 @@ class ScopusSearchArticle extends ConfigurableStop { inputStream = new ByteArrayInputStream((titleJsonString.toString + "\n").getBytes("utf-8")) IOUtils.copyBytes(inputStream, scopusOut, 4096, false) + //suspend (ms) + Thread.sleep(waitTime*1000) } - //suspend (ms) - Thread.sleep(waitTime*1000) + }) }catch { From b4d790ce83b617c1baab3fe68144056bc726f9a6 Mon Sep 17 00:00:00 2001 From: judy0131 Date: Mon, 19 Aug 2019 18:54:03 +0800 Subject: [PATCH 3/3] modify debug run mode api --- .../src/main/scala/cn/piflow/main.scala | 44 +++++++++++++++++-- .../src/main/scala/cn/piflow/api/API.scala | 11 +++-- 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/piflow-core/src/main/scala/cn/piflow/main.scala b/piflow-core/src/main/scala/cn/piflow/main.scala index db38011..cc71390 100644 --- a/piflow-core/src/main/scala/cn/piflow/main.scala +++ b/piflow-core/src/main/scala/cn/piflow/main.scala @@ -11,9 +11,8 @@ import org.apache.spark.sql._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} - import scala.collection.mutable.{ArrayBuffer, Map => MMap} -import org.apache.spark.sql.functions.max +import org.apache.spark.sql.functions.{col, max} trait JobInputStream { def isEmpty(): Boolean; @@ -513,9 +512,46 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging { mapDataFrame.foreach(en => { val portName = if(en._1.equals("")) "default" else en._1 val portDataPath = debugPath + "/" + portName + val portSchemaPath = debugPath + "/" + portName + "_schema" //println(portDataPath) - println(en._2.apply().schema) - en._2.apply().na.fill("null").write.json(portDataPath) + //println(en._2.apply().schema) + val jsonDF = en._2.apply().na.fill("") + var schemaStr = "" + val schema = jsonDF.schema.foreach(f => { + schemaStr = schemaStr + "," + f.name + }) + schemaStr = schemaStr.stripPrefix(",") + HdfsUtil.saveLine(portSchemaPath,schemaStr ) + jsonDF.write.json(portDataPath) + + /*val df = en._2.apply() + val resutl1 = df.withColumn("is_ee_null", col("ee").isNull) + resutl1.show() + + println("Older Schema: " + df.schema) + + val arrayColumnExp = "if(# is null , array(), #) as #" + val arrayColumns = df.schema.filter(_.dataType.typeName == "array").map(_.name) + println(arrayColumns) + + var columnsList = List[String]() + df.schema.foreach(f => { + if(arrayColumns.contains(f.name)){ + val t = arrayColumnExp.replace("#",f.name) + columnsList = t +: columnsList + } + else{ + columnsList = f.name +: columnsList + } + + }) + println("Newer Schema: " + df.schema) + + val noneDF = df.selectExpr(columnsList:_*) + + noneDF.show() + noneDF.na.fill("").na.fill(0.0).na.fill(0).write.json(portDataPath)*/ + }) } diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 191a9dd..0d81de6 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -228,7 +228,7 @@ object API { def getFlowDebugData(appId : String, stopName : String, port : String) : String = { - var result = "" + /*var result = "" val debugPath = PropertyUtil.getPropertyValue("debug.path").stripSuffix("/") + "/" + appId + "/" + stopName + "/" + port; val properties = new Properties() val hdfs = FileSystem.get(URI.create(debugPath), new Configuration()) @@ -238,7 +238,7 @@ object API { fileList.filter(!_.equals("_SUCCESS")).foreach( file => { var stream = hdfs.open(new Path(file)) def readLines = Stream.cons(stream.readLine(),Stream.continually(stream.readLine())) - readLines.takeWhile( _ != null).foreach( line => { + readLines.takeWhile( _ != null).foreach( line => {dfhs println(line) result += line + "," @@ -248,7 +248,12 @@ object API { result = result.stripSuffix(",") val json = """{"debugInfo" : [ """ + result + """]}""" - json + json*/ + + val debugPath :String = PropertyUtil.getPropertyValue("debug.path").stripSuffix("/") + "/" + appId + "/" + stopName + "/" + port; + val schema = HdfsUtil.getLine(debugPath + "_schema") + val result = """{"schema":$schema, "debugDataPath": $debugPath}""" + result } def getStopInfo(bundle : String) : String = {