optimize configure file

1.remove yarn related config
2.remove bundle path config
3.remove debug.path, checkpointPath and incrementPath config
This commit is contained in:
judy0131 2020-01-17 18:03:06 +08:00
parent 4f41ba9378
commit c0991b3cda
10 changed files with 160 additions and 54 deletions

View File

@ -1,24 +1,26 @@
server.ip=10.0.86.98
server.port=8001
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
spark.master=yarn
spark.deploy.mode=cluster
fs.defaultFS=hdfs://10.0.86.191:9000
yarn.resourcemanager.hostname=10.0.86.191
yarn.resourcemanager.address=10.0.86.191:8032
yarn.access.namenode=hdfs://10.0.86.191:9000
yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/
yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
#yarn.resourcemanager.address=10.0.86.191:8032
#yarn.access.namenode=hdfs://10.0.86.191:9000
#yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/
#yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
hive.metastore.uris=thrift://10.0.88.71:9083
#Hdfs path, these paths will be created autometicly
checkpoint.path=hdfs://10.0.86.89:9000/user/piflow/checkpoints/
debug.path=hdfs://10.0.86.89:9000/user/piflow/debug/
increment.path=hdfs://10.0.86.89:9000/user/piflow/increment/
#checkpoint.path=hdfs://10.0.86.89:9000/user/piflow/checkpoints/
#debug.path=hdfs://10.0.86.89:9000/user/piflow/debug/
#increment.path=hdfs://10.0.86.89:9000/user/piflow/increment/
#show data in log, set 0 if you do not show the logs
data.show=10
@ -26,4 +28,7 @@ data.show=10
#monitor the throughput of flow
monitor.throughput=true
h2.port=50001
h2.port=50001
piflow.bundle=piflow-server/target/piflow-server-0.9.jar

View File

@ -1,6 +1,6 @@
package cn.piflow.conf
import cn.piflow.util.{HdfsUtil, PropertyUtil}
import cn.piflow.util.{ConfigureUtil, HdfsUtil, PropertyUtil}
import cn.piflow.{IncrementalStop, JobContext}
/**
@ -12,7 +12,7 @@ abstract class ConfigurableIncrementalStop extends ConfigurableStop with Increme
override var incrementalPath: String = _
override def init(flowName : String, stopName : String): Unit = {
incrementalPath = PropertyUtil.getPropertyValue("increment.path").stripSuffix("/") + "/" + flowName + "/" + stopName
incrementalPath = ConfigureUtil.getIncrementPath().stripSuffix("/") + "/" + flowName + "/" + stopName
}

View File

@ -0,0 +1,77 @@
package cn.piflow.util
import java.io.{FileInputStream, InputStream}
import java.util.Properties
object ConfigureUtil {
val NOT_EXIST_FLAG = 0
val EXIST_FLAG = 1
def getCheckpointPath() : String = {
val item = "checkPointPath"
val hdfsFS = PropertyUtil.getPropertyValue("fs.defaultFS")
val checkpointPath = hdfsFS + "/user/piflow/checkpoints/"
val isCheckPointPathExist = H2Util.getFlag(item)
if(isCheckPointPathExist == NOT_EXIST_FLAG){
if(!HdfsUtil.exists(hdfsFS,checkpointPath)){
HdfsUtil.mkdir(hdfsFS,checkpointPath)
}
H2Util.addFlag(item, EXIST_FLAG)
}else{
checkpointPath
}
checkpointPath
}
def getDebugPath():String = {
val item = "debugPath"
val hdfsFS = PropertyUtil.getPropertyValue("fs.defaultFS")
val debugPath = hdfsFS + "/user/piflow/debug/"
val isDebugPathExist = H2Util.getFlag(item)
if(isDebugPathExist == NOT_EXIST_FLAG){
if(!HdfsUtil.exists(hdfsFS,debugPath)){
HdfsUtil.mkdir(hdfsFS,debugPath)
}
H2Util.addFlag(item, EXIST_FLAG)
}else{
debugPath
}
debugPath
}
def getIncrementPath():String = {
val item = "incrementPath"
val hdfsFS = PropertyUtil.getPropertyValue("fs.defaultFS")
val incrementPath = hdfsFS + "/user/piflow/increment/"
val isIncrementPathExist = H2Util.getFlag(item)
if(isIncrementPathExist == NOT_EXIST_FLAG){
if(!HdfsUtil.exists(hdfsFS,incrementPath)){
HdfsUtil.mkdir(hdfsFS,incrementPath)
}
H2Util.addFlag(item, EXIST_FLAG)
}else{
incrementPath
}
incrementPath
}
def getPiFlowBundlePath():String = {
val userDir = System.getProperty("user.dir")
var piflowBundlePath = PropertyUtil.getPropertyValue("piflow.bundle")
if(piflowBundlePath == null){
piflowBundlePath = userDir + "/lib/piflow-server-0.9.jar"
}
else
piflowBundlePath = userDir + "/" + piflowBundlePath
piflowBundlePath
}
def main(args: Array[String]): Unit = {
val piflowBundlePath = getPiFlowBundlePath()
}
}

View File

@ -30,9 +30,9 @@ object FlowLauncher {
.setAppName(flow.getFlowName())
.setMaster(PropertyUtil.getPropertyValue("spark.master"))
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
.setAppResource(ConfigureUtil.getPiFlowBundlePath())
.setVerbose(true)
.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
//.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle"))
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", flow.getDriverMemory())
.setConf("spark.executor.instances", flow.getExecutorNum())
@ -46,8 +46,12 @@ object FlowLauncher {
sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.address") != null)
sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
if(PropertyUtil.getPropertyValue("yarn.access.namenode") != null)
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
if(PropertyUtil.getPropertyValue("spark.yarn.access.namenode") != null)
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("spark.yarn.access.namenode"))
else
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("fs.defaultFS"))
if(PropertyUtil.getPropertyValue("yarn.stagingDir") != null)
sparkLauncher.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
if(PropertyUtil.getPropertyValue("yarn.jars") != null)
@ -59,7 +63,7 @@ object FlowLauncher {
println(f.getPath)
sparkLauncher.addJar(f.getPath)
})
sparkLauncher
}

View File

@ -1,5 +1,6 @@
package cn.piflow.util
import java.net.InetAddress
import java.sql.{Connection, DriverManager, ResultSet}
import java.util.Date
@ -15,7 +16,11 @@ object H2Util {
val CREATE_FLOW_TABLE = "create table if not exists flow (id varchar(255), groupId varchar(255), projectId varchar(255), pid varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
val CREATE_STOP_TABLE = "create table if not exists stop (flowId varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))"
val CREATE_THOUGHPUT_TABLE = "create table if not exists thoughput (flowId varchar(255), stopName varchar(255), portName varchar(255), count long)"
val CREATE_FLAG_TABLE = "create table if not exists configFlag(id bigint auto_increment, item varchar(255), flag int, createTime varchar(255))"
val serverIP = PropertyUtil.getPropertyValue("server.ip") + ":" + PropertyUtil.getPropertyValue("h2.port")
//val ip = InetAddress.getLocalHost.getHostAddress
//val serverIP = ip + ":" + PropertyUtil.getPropertyValue("h2.port")
//print("getHostAddress:" + ip + " in H2Util!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1")
val CONNECTION_URL = "jdbc:h2:tcp://" + serverIP + "/~/piflow;AUTO_SERVER=true"
var connection : Connection= null
@ -28,6 +33,7 @@ object H2Util {
statement.executeUpdate(CREATE_FLOW_TABLE)
statement.executeUpdate(CREATE_STOP_TABLE)
statement.executeUpdate(CREATE_THOUGHPUT_TABLE)
statement.executeUpdate(CREATE_FLAG_TABLE)
statement.close()
}catch {
case ex => println(ex)
@ -53,6 +59,7 @@ object H2Util {
statement.executeUpdate("drop table if exists flow")
statement.executeUpdate("drop table if exists stop")
statement.executeUpdate("drop table if exists thoughput")
statement.executeUpdate("drop table if exists flag")
statement.close()
} catch{
@ -622,6 +629,27 @@ object H2Util {
Map[String, Any]("project" -> projectInfoMap)
}
def addFlag(item:String, flag:Int) : Unit = {
val createTime = new Date().toString
val statement = getConnectionInstance().createStatement()
statement.setQueryTimeout(QUERY_TIME)
statement.executeUpdate("insert into configFlag(item, flag, createTime) values('" + item + "','" + flag + "','" + createTime +"')")
statement.close()
}
def getFlag(item : String) : Int = {
val statement = getConnectionInstance().createStatement()
statement.setQueryTimeout(QUERY_TIME)
var flag = 0
val flowGroupRS : ResultSet = statement.executeQuery("select flag from configFlag where item='" + item +"'")
if (flowGroupRS.next()){
flag = flowGroupRS.getInt("flag")
}
return flag
}
def main(args: Array[String]): Unit = {

View File

@ -0,0 +1 @@
create table if not exists configFlag(id bigint auto_increment, item varchar(255), flag int, createTime varchar(255))

View File

@ -11,7 +11,7 @@ import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil}
import cn.piflow.{FlowGroupExecution, Process, ProjectExecution, Runner}
import cn.piflow.api.util.PropertyUtil
import cn.piflow.conf.bean.{FlowGroupBean, ProjectBean}
import cn.piflow.util.{FileUtil, FlowState, H2Util, HdfsUtil}
import cn.piflow.util._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpPut}
@ -37,8 +37,8 @@ object API {
val project = projectBean.constructProject()
val projectExecution = Runner.create()
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.bind("checkpoint.path",ConfigureUtil.getCheckpointPath())
.bind("debug.path",ConfigureUtil.getDebugPath())
.start(project);
projectExecution
@ -67,8 +67,8 @@ object API {
val flowGroup = flowGroupBean.constructFlowGroup()
val flowGroupExecution = Runner.create()
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.bind("checkpoint.path",ConfigureUtil.getCheckpointPath())
.bind("debug.path",ConfigureUtil.getDebugPath())
.start(flowGroup);
flowGroupExecution
@ -113,7 +113,7 @@ object API {
.setAppName(appName)
.setMaster(PropertyUtil.getPropertyValue("spark.master"))
.setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode"))
.setAppResource(PropertyUtil.getPropertyValue("piflow.bundle"))
.setAppResource(ConfigureUtil.getPiFlowBundlePath())
.setVerbose(true)
.setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
.setConf("spark.driver.memory", dirverMem)
@ -136,13 +136,18 @@ object API {
sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname"))
if(PropertyUtil.getPropertyValue("yarn.resourcemanager.address") != null)
sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address"))
if(PropertyUtil.getPropertyValue("yarn.access.namenode") != null)
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode"))
if(PropertyUtil.getPropertyValue("spark.yarn.access.namenode") != null)
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("spark.yarn.access.namenode"))
else
sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("fs.defaultFS"))
if(PropertyUtil.getPropertyValue("yarn.stagingDir") != null)
sparkLauncher.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir"))
if(PropertyUtil.getPropertyValue("yarn.jars") != null)
sparkLauncher.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars"))
val handle = sparkLauncher.startApplication( new SparkAppHandle.Listener {
override def stateChanged(handle: SparkAppHandle): Unit = {
appId = handle.getAppId
@ -230,7 +235,7 @@ object API {
}
def getFlowCheckpoint(appId:String) : String = {
val checkpointPath = PropertyUtil.getPropertyValue("checkpoint.path").stripSuffix("/") + "/" + appId
val checkpointPath = ConfigureUtil.getCheckpointPath().stripSuffix("/") + "/" + appId
val checkpointList = HdfsUtil.getFiles(checkpointPath)
"""{"checkpoints":"""" + checkpointList.mkString(",") + """"}"""
}
@ -259,7 +264,7 @@ object API {
val json = """{"debugInfo" : [ """ + result + """]}"""
json*/
val debugPath :String = PropertyUtil.getPropertyValue("debug.path").stripSuffix("/") + "/" + appId + "/" + stopName + "/" + port;
val debugPath :String = ConfigureUtil.getDebugPath().stripSuffix("/") + "/" + appId + "/" + stopName + "/" + port;
val schema = HdfsUtil.getLine(debugPath + "_schema")
val result ="{\"schema\":\"" + schema+ "\", \"debugDataPath\": \""+ debugPath + "\"}"
result

View File

@ -1,5 +1,7 @@
package cn.piflow.api
import java.net.InetAddress
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
@ -444,7 +446,9 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
def run = {
val ip = PropertyUtil.getPropertyValue("server.ip")
//val ip = PropertyUtil.getPropertyValue("server.ip")
val ip = InetAddress.getLocalHost.getHostAddress
print("getHostAddress:" + ip + " in HTTPService!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1")
val port = PropertyUtil.getIntPropertyValue("server.port")
Http().bindAndHandleAsync(route, ip, port)
println("Server:" + ip + ":" + port + " Started!!!")
@ -454,30 +458,14 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
object Main {
/*def preparedPath() = {
val checkpointPath = PropertyUtil.getPropertyValue("checkpoint.path")
val fsDefaultName = "hdfs://10.0.86.89:9000"
if(!HdfsUtil.exists(fsDefaultName,checkpointPath)){
HdfsUtil.mkdir(fsDefaultName,checkpointPath)
}
val debugPath = PropertyUtil.getPropertyValue("debug.path")
if(!HdfsUtil.exists(debugPath)){
HdfsUtil.mkdir(debugPath)
}
val incrementPath = PropertyUtil.getPropertyValue("increment.path")
if(!HdfsUtil.exists(incrementPath)){
HdfsUtil.mkdir(incrementPath)
}
}*/
def flywayInit() = {
val ip = InetAddress.getLocalHost.getHostAddress
// Create the Flyway instance
val flyway: Flyway = new Flyway();
var url = "jdbc:h2:tcp://"+PropertyUtil.getPropertyValue("server.ip")+":"+PropertyUtil.getPropertyValue("h2.port")+"/~/piflow"
var url = "jdbc:h2:tcp://"+ip+":"+PropertyUtil.getPropertyValue("h2.port")+"/~/piflow"
// Point it to the database
flyway.setDataSource(url,null,null);
flyway.setLocations("db/migrations");

View File

@ -4,6 +4,7 @@ import cn.piflow.{Flow, Runner}
import cn.piflow.api.util.PropertyUtil
import cn.piflow.conf.bean.FlowGroupBean
import cn.piflow.conf.util.OptionUtil
import cn.piflow.util.ConfigureUtil
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
@ -26,12 +27,11 @@ object StartFlowGroupMain {
.appName(flowGroupBean.name)
.enableHiveSupport()
.getOrCreate()
//val checkpointPath = spark.sparkContext.getConf.get("checkpoint.path")
val process = Runner.create()
//.bind(classOf[SparkSession].getName, spark)
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.bind("checkpoint.path",ConfigureUtil.getCheckpointPath())
.bind("debug.path",ConfigureUtil.getDebugPath())
.start(flowGroup);
val applicationId = spark.sparkContext.applicationId
process.awaitTermination();

View File

@ -4,6 +4,7 @@ import cn.piflow.Runner
import cn.piflow.api.util.PropertyUtil
import cn.piflow.conf.bean.FlowBean
import cn.piflow.conf.util.OptionUtil
import cn.piflow.util.ConfigureUtil
import org.apache.spark.sql.SparkSession
import scala.util.parsing.json.JSON
@ -29,14 +30,11 @@ object StartFlowMain {
.getOrCreate()
println("hive.metastore.uris=" + spark.sparkContext.getConf.get("hive.metastore.uris") + "!!!!!!!")
//val checkpointPath = spark.sparkContext.getConf.get("checkpoint.path")
val process = Runner.create()
.bind(classOf[SparkSession].getName, spark)
//.bind("checkpoint.path","hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
//.bind("debug.path","hdfs://10.0.86.89:9000/xjzhu/piflow/debug/")
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
.bind("checkpoint.path",ConfigureUtil.getCheckpointPath())
.bind("debug.path",ConfigureUtil.getDebugPath())
.start(flow);
val applicationId = spark.sparkContext.applicationId
process.awaitTermination();