forked from opensci/piflow
Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
b624812503
|
@ -99,9 +99,7 @@ class ScopusSearchArticle extends ConfigurableStop {
|
|||
|
||||
prismUrls = regula(reg, titleJsonString)
|
||||
for (x <- 0 until prismUrls.size) {
|
||||
println(num+"----------------------"+scopusIds.get(x))
|
||||
num+=1
|
||||
|
||||
|
||||
prismUrl = prismUrls.get(x) + s"?field=author&apikey=${apiKey}&httpAccept=application%2Fjson"
|
||||
authorJsonString = getHttp(prismUrl)
|
||||
|
||||
|
@ -128,10 +126,11 @@ class ScopusSearchArticle extends ConfigurableStop {
|
|||
|
||||
inputStream = new ByteArrayInputStream((titleJsonString.toString + "\n").getBytes("utf-8"))
|
||||
IOUtils.copyBytes(inputStream, scopusOut, 4096, false)
|
||||
//suspend (ms)
|
||||
Thread.sleep(waitTime*1000)
|
||||
}
|
||||
|
||||
//suspend (ms)
|
||||
Thread.sleep(waitTime*1000)
|
||||
|
||||
})
|
||||
|
||||
}catch {
|
||||
|
|
|
@ -12,7 +12,7 @@ abstract class ConfigurableIncrementalStop extends ConfigurableStop with Increme
|
|||
override var incrementalPath: String = _
|
||||
|
||||
override def init(flowName : String, stopName : String): Unit = {
|
||||
incrementalPath = PropertyUtil.getPropertyValue("increment.path") + "/" + flowName + "/" + stopName
|
||||
incrementalPath = PropertyUtil.getPropertyValue("increment.path").stripSuffix("/") + "/" + flowName + "/" + stopName
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -11,9 +11,8 @@ import org.apache.spark.sql._
|
|||
import org.apache.spark.streaming.{Seconds, StreamingContext}
|
||||
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
|
||||
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
|
||||
import org.apache.spark.sql.functions.max
|
||||
import org.apache.spark.sql.functions.{col, max}
|
||||
|
||||
trait JobInputStream {
|
||||
def isEmpty(): Boolean;
|
||||
|
@ -513,8 +512,46 @@ class JobOutputStreamImpl() extends JobOutputStream with Logging {
|
|||
mapDataFrame.foreach(en => {
|
||||
val portName = if(en._1.equals("")) "default" else en._1
|
||||
val portDataPath = debugPath + "/" + portName
|
||||
val portSchemaPath = debugPath + "/" + portName + "_schema"
|
||||
//println(portDataPath)
|
||||
en._2.apply().write.json(portDataPath)
|
||||
//println(en._2.apply().schema)
|
||||
val jsonDF = en._2.apply().na.fill("")
|
||||
var schemaStr = ""
|
||||
val schema = jsonDF.schema.foreach(f => {
|
||||
schemaStr = schemaStr + "," + f.name
|
||||
})
|
||||
schemaStr = schemaStr.stripPrefix(",")
|
||||
HdfsUtil.saveLine(portSchemaPath,schemaStr )
|
||||
jsonDF.write.json(portDataPath)
|
||||
|
||||
/*val df = en._2.apply()
|
||||
val resutl1 = df.withColumn("is_ee_null", col("ee").isNull)
|
||||
resutl1.show()
|
||||
|
||||
println("Older Schema: " + df.schema)
|
||||
|
||||
val arrayColumnExp = "if(# is null , array(), #) as #"
|
||||
val arrayColumns = df.schema.filter(_.dataType.typeName == "array").map(_.name)
|
||||
println(arrayColumns)
|
||||
|
||||
var columnsList = List[String]()
|
||||
df.schema.foreach(f => {
|
||||
if(arrayColumns.contains(f.name)){
|
||||
val t = arrayColumnExp.replace("#",f.name)
|
||||
columnsList = t +: columnsList
|
||||
}
|
||||
else{
|
||||
columnsList = f.name +: columnsList
|
||||
}
|
||||
|
||||
})
|
||||
println("Newer Schema: " + df.schema)
|
||||
|
||||
val noneDF = df.selectExpr(columnsList:_*)
|
||||
|
||||
noneDF.show()
|
||||
noneDF.na.fill("").na.fill(0.0).na.fill(0).write.json(portDataPath)*/
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -228,7 +228,7 @@ object API {
|
|||
|
||||
|
||||
def getFlowDebugData(appId : String, stopName : String, port : String) : String = {
|
||||
var result = ""
|
||||
/*var result = ""
|
||||
val debugPath = PropertyUtil.getPropertyValue("debug.path").stripSuffix("/") + "/" + appId + "/" + stopName + "/" + port;
|
||||
val properties = new Properties()
|
||||
val hdfs = FileSystem.get(URI.create(debugPath), new Configuration())
|
||||
|
@ -238,14 +238,22 @@ object API {
|
|||
fileList.filter(!_.equals("_SUCCESS")).foreach( file => {
|
||||
var stream = hdfs.open(new Path(file))
|
||||
def readLines = Stream.cons(stream.readLine(),Stream.continually(stream.readLine()))
|
||||
readLines.takeWhile( _ != null).foreach( line => {
|
||||
readLines.takeWhile( _ != null).foreach( line => {dfhs
|
||||
|
||||
println(line)
|
||||
result += line + "\n"
|
||||
result += line + ","
|
||||
})
|
||||
})
|
||||
|
||||
result.stripSuffix("\n")
|
||||
result = result.stripSuffix(",")
|
||||
|
||||
val json = """{"debugInfo" : [ """ + result + """]}"""
|
||||
json*/
|
||||
|
||||
val debugPath :String = PropertyUtil.getPropertyValue("debug.path").stripSuffix("/") + "/" + appId + "/" + stopName + "/" + port;
|
||||
val schema = HdfsUtil.getLine(debugPath + "_schema")
|
||||
val result = """{"schema":$schema, "debugDataPath": $debugPath}"""
|
||||
result
|
||||
}
|
||||
|
||||
def getStopInfo(bundle : String) : String = {
|
||||
|
|
Loading…
Reference in New Issue