diff --git a/config.properties b/config.properties index deff9f0..a32fd52 100644 --- a/config.properties +++ b/config.properties @@ -1,24 +1,26 @@ server.ip=10.0.86.98 server.port=8001 -piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar - spark.master=yarn spark.deploy.mode=cluster +fs.defaultFS=hdfs://10.0.86.191:9000 + yarn.resourcemanager.hostname=10.0.86.191 -yarn.resourcemanager.address=10.0.86.191:8032 -yarn.access.namenode=hdfs://10.0.86.191:9000 -yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/ -yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar +#yarn.resourcemanager.address=10.0.86.191:8032 +#yarn.access.namenode=hdfs://10.0.86.191:9000 +#yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/ +#yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/ + + hive.metastore.uris=thrift://10.0.88.71:9083 #Hdfs path, these paths will be created autometicly -checkpoint.path=hdfs://10.0.86.89:9000/user/piflow/checkpoints/ -debug.path=hdfs://10.0.86.89:9000/user/piflow/debug/ -increment.path=hdfs://10.0.86.89:9000/user/piflow/increment/ +#checkpoint.path=hdfs://10.0.86.89:9000/user/piflow/checkpoints/ +#debug.path=hdfs://10.0.86.89:9000/user/piflow/debug/ +#increment.path=hdfs://10.0.86.89:9000/user/piflow/increment/ #show data in log, set 0 if you do not show the logs data.show=10 @@ -26,4 +28,7 @@ data.show=10 #monitor the throughput of flow monitor.throughput=true -h2.port=50001 \ No newline at end of file +h2.port=50001 + + +piflow.bundle=piflow-server/target/piflow-server-0.9.jar \ No newline at end of file diff --git a/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableIncrementalStop.scala b/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableIncrementalStop.scala index 49db7b7..d5a2ff7 100644 --- a/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableIncrementalStop.scala +++ b/piflow-bundle/src/main/scala/cn/piflow/conf/ConfigurableIncrementalStop.scala @@ -1,6 +1,6 @@ package cn.piflow.conf -import cn.piflow.util.{HdfsUtil, PropertyUtil} +import cn.piflow.util.{ConfigureUtil, HdfsUtil, PropertyUtil} import cn.piflow.{IncrementalStop, JobContext} /** @@ -12,7 +12,7 @@ abstract class ConfigurableIncrementalStop extends ConfigurableStop with Increme override var incrementalPath: String = _ override def init(flowName : String, stopName : String): Unit = { - incrementalPath = PropertyUtil.getPropertyValue("increment.path").stripSuffix("/") + "/" + flowName + "/" + stopName + incrementalPath = ConfigureUtil.getIncrementPath().stripSuffix("/") + "/" + flowName + "/" + stopName } diff --git a/piflow-core/src/main/scala/cn/piflow/util/ConfigureUtil.scala b/piflow-core/src/main/scala/cn/piflow/util/ConfigureUtil.scala new file mode 100644 index 0000000..e5dde49 --- /dev/null +++ b/piflow-core/src/main/scala/cn/piflow/util/ConfigureUtil.scala @@ -0,0 +1,77 @@ +package cn.piflow.util + +import java.io.{FileInputStream, InputStream} +import java.util.Properties + +object ConfigureUtil { + + val NOT_EXIST_FLAG = 0 + val EXIST_FLAG = 1 + + def getCheckpointPath() : String = { + val item = "checkPointPath" + val hdfsFS = PropertyUtil.getPropertyValue("fs.defaultFS") + val checkpointPath = hdfsFS + "/user/piflow/checkpoints/" + + val isCheckPointPathExist = H2Util.getFlag(item) + if(isCheckPointPathExist == NOT_EXIST_FLAG){ + if(!HdfsUtil.exists(hdfsFS,checkpointPath)){ + HdfsUtil.mkdir(hdfsFS,checkpointPath) + } + H2Util.addFlag(item, EXIST_FLAG) + }else{ + checkpointPath + } + + checkpointPath + } + + def getDebugPath():String = { + val item = "debugPath" + val hdfsFS = PropertyUtil.getPropertyValue("fs.defaultFS") + val debugPath = hdfsFS + "/user/piflow/debug/" + val isDebugPathExist = H2Util.getFlag(item) + if(isDebugPathExist == NOT_EXIST_FLAG){ + if(!HdfsUtil.exists(hdfsFS,debugPath)){ + HdfsUtil.mkdir(hdfsFS,debugPath) + } + H2Util.addFlag(item, EXIST_FLAG) + }else{ + debugPath + } + debugPath + } + + def getIncrementPath():String = { + val item = "incrementPath" + val hdfsFS = PropertyUtil.getPropertyValue("fs.defaultFS") + val incrementPath = hdfsFS + "/user/piflow/increment/" + + val isIncrementPathExist = H2Util.getFlag(item) + if(isIncrementPathExist == NOT_EXIST_FLAG){ + if(!HdfsUtil.exists(hdfsFS,incrementPath)){ + HdfsUtil.mkdir(hdfsFS,incrementPath) + } + H2Util.addFlag(item, EXIST_FLAG) + }else{ + incrementPath + } + incrementPath + } + + def getPiFlowBundlePath():String = { + val userDir = System.getProperty("user.dir") + var piflowBundlePath = PropertyUtil.getPropertyValue("piflow.bundle") + if(piflowBundlePath == null){ + piflowBundlePath = userDir + "/lib/piflow-server-0.9.jar" + } + else + piflowBundlePath = userDir + "/" + piflowBundlePath + piflowBundlePath + } + + def main(args: Array[String]): Unit = { + val piflowBundlePath = getPiFlowBundlePath() + } + +} diff --git a/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala b/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala index 0dc8d8d..f609d6e 100644 --- a/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala +++ b/piflow-core/src/main/scala/cn/piflow/util/FlowLauncher.scala @@ -30,9 +30,9 @@ object FlowLauncher { .setAppName(flow.getFlowName()) .setMaster(PropertyUtil.getPropertyValue("spark.master")) .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) - .setAppResource(PropertyUtil.getPropertyValue("piflow.bundle")) + .setAppResource(ConfigureUtil.getPiFlowBundlePath()) .setVerbose(true) - .setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) + //.setConf("spark.jars", PropertyUtil.getPropertyValue("piflow.bundle")) .setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .setConf("spark.driver.memory", flow.getDriverMemory()) .setConf("spark.executor.instances", flow.getExecutorNum()) @@ -46,8 +46,12 @@ object FlowLauncher { sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) if(PropertyUtil.getPropertyValue("yarn.resourcemanager.address") != null) sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")) - if(PropertyUtil.getPropertyValue("yarn.access.namenode") != null) - sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) + + if(PropertyUtil.getPropertyValue("spark.yarn.access.namenode") != null) + sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("spark.yarn.access.namenode")) + else + sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("fs.defaultFS")) + if(PropertyUtil.getPropertyValue("yarn.stagingDir") != null) sparkLauncher.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) if(PropertyUtil.getPropertyValue("yarn.jars") != null) @@ -59,7 +63,7 @@ object FlowLauncher { println(f.getPath) sparkLauncher.addJar(f.getPath) }) - + sparkLauncher } diff --git a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala index a7fa238..af92a10 100644 --- a/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala +++ b/piflow-core/src/main/scala/cn/piflow/util/H2Util.scala @@ -1,5 +1,6 @@ package cn.piflow.util +import java.net.InetAddress import java.sql.{Connection, DriverManager, ResultSet} import java.util.Date @@ -15,7 +16,11 @@ object H2Util { val CREATE_FLOW_TABLE = "create table if not exists flow (id varchar(255), groupId varchar(255), projectId varchar(255), pid varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))" val CREATE_STOP_TABLE = "create table if not exists stop (flowId varchar(255), name varchar(255), state varchar(255), startTime varchar(255), endTime varchar(255))" val CREATE_THOUGHPUT_TABLE = "create table if not exists thoughput (flowId varchar(255), stopName varchar(255), portName varchar(255), count long)" + val CREATE_FLAG_TABLE = "create table if not exists configFlag(id bigint auto_increment, item varchar(255), flag int, createTime varchar(255))" val serverIP = PropertyUtil.getPropertyValue("server.ip") + ":" + PropertyUtil.getPropertyValue("h2.port") + //val ip = InetAddress.getLocalHost.getHostAddress + //val serverIP = ip + ":" + PropertyUtil.getPropertyValue("h2.port") + //print("getHostAddress:" + ip + " in H2Util!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1") val CONNECTION_URL = "jdbc:h2:tcp://" + serverIP + "/~/piflow;AUTO_SERVER=true" var connection : Connection= null @@ -28,6 +33,7 @@ object H2Util { statement.executeUpdate(CREATE_FLOW_TABLE) statement.executeUpdate(CREATE_STOP_TABLE) statement.executeUpdate(CREATE_THOUGHPUT_TABLE) + statement.executeUpdate(CREATE_FLAG_TABLE) statement.close() }catch { case ex => println(ex) @@ -53,6 +59,7 @@ object H2Util { statement.executeUpdate("drop table if exists flow") statement.executeUpdate("drop table if exists stop") statement.executeUpdate("drop table if exists thoughput") + statement.executeUpdate("drop table if exists flag") statement.close() } catch{ @@ -622,6 +629,27 @@ object H2Util { Map[String, Any]("project" -> projectInfoMap) } + def addFlag(item:String, flag:Int) : Unit = { + val createTime = new Date().toString + val statement = getConnectionInstance().createStatement() + statement.setQueryTimeout(QUERY_TIME) + statement.executeUpdate("insert into configFlag(item, flag, createTime) values('" + item + "','" + flag + "','" + createTime +"')") + statement.close() + } + + def getFlag(item : String) : Int = { + val statement = getConnectionInstance().createStatement() + statement.setQueryTimeout(QUERY_TIME) + var flag = 0 + + val flowGroupRS : ResultSet = statement.executeQuery("select flag from configFlag where item='" + item +"'") + if (flowGroupRS.next()){ + + flag = flowGroupRS.getInt("flag") + } + return flag + } + def main(args: Array[String]): Unit = { diff --git a/piflow-server/src/main/resources/db/migrations/V1.07__Init_table.sql b/piflow-server/src/main/resources/db/migrations/V1.07__Init_table.sql new file mode 100644 index 0000000..46196bd --- /dev/null +++ b/piflow-server/src/main/resources/db/migrations/V1.07__Init_table.sql @@ -0,0 +1 @@ +create table if not exists configFlag(id bigint auto_increment, item varchar(255), flag int, createTime varchar(255)) \ No newline at end of file diff --git a/piflow-server/src/main/scala/cn/piflow/api/API.scala b/piflow-server/src/main/scala/cn/piflow/api/API.scala index 1dec0be..f0d36d3 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/API.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/API.scala @@ -11,7 +11,7 @@ import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil} import cn.piflow.{FlowGroupExecution, Process, ProjectExecution, Runner} import cn.piflow.api.util.PropertyUtil import cn.piflow.conf.bean.{FlowGroupBean, ProjectBean} -import cn.piflow.util.{FileUtil, FlowState, H2Util, HdfsUtil} +import cn.piflow.util._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpPut} @@ -37,8 +37,8 @@ object API { val project = projectBean.constructProject() val projectExecution = Runner.create() - .bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path")) - .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) + .bind("checkpoint.path",ConfigureUtil.getCheckpointPath()) + .bind("debug.path",ConfigureUtil.getDebugPath()) .start(project); projectExecution @@ -67,8 +67,8 @@ object API { val flowGroup = flowGroupBean.constructFlowGroup() val flowGroupExecution = Runner.create() - .bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path")) - .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) + .bind("checkpoint.path",ConfigureUtil.getCheckpointPath()) + .bind("debug.path",ConfigureUtil.getDebugPath()) .start(flowGroup); flowGroupExecution @@ -113,7 +113,7 @@ object API { .setAppName(appName) .setMaster(PropertyUtil.getPropertyValue("spark.master")) .setDeployMode(PropertyUtil.getPropertyValue("spark.deploy.mode")) - .setAppResource(PropertyUtil.getPropertyValue("piflow.bundle")) + .setAppResource(ConfigureUtil.getPiFlowBundlePath()) .setVerbose(true) .setConf("spark.hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris")) .setConf("spark.driver.memory", dirverMem) @@ -136,13 +136,18 @@ object API { sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.hostname", PropertyUtil.getPropertyValue("yarn.resourcemanager.hostname")) if(PropertyUtil.getPropertyValue("yarn.resourcemanager.address") != null) sparkLauncher.setConf("spark.hadoop.yarn.resourcemanager.address", PropertyUtil.getPropertyValue("yarn.resourcemanager.address")) - if(PropertyUtil.getPropertyValue("yarn.access.namenode") != null) - sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("yarn.access.namenode")) + + if(PropertyUtil.getPropertyValue("spark.yarn.access.namenode") != null) + sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("spark.yarn.access.namenode")) + else + sparkLauncher.setConf("spark.yarn.access.namenode", PropertyUtil.getPropertyValue("fs.defaultFS")) + if(PropertyUtil.getPropertyValue("yarn.stagingDir") != null) sparkLauncher.setConf("spark.yarn.stagingDir", PropertyUtil.getPropertyValue("yarn.stagingDir")) if(PropertyUtil.getPropertyValue("yarn.jars") != null) sparkLauncher.setConf("spark.yarn.jars", PropertyUtil.getPropertyValue("yarn.jars")) + val handle = sparkLauncher.startApplication( new SparkAppHandle.Listener { override def stateChanged(handle: SparkAppHandle): Unit = { appId = handle.getAppId @@ -230,7 +235,7 @@ object API { } def getFlowCheckpoint(appId:String) : String = { - val checkpointPath = PropertyUtil.getPropertyValue("checkpoint.path").stripSuffix("/") + "/" + appId + val checkpointPath = ConfigureUtil.getCheckpointPath().stripSuffix("/") + "/" + appId val checkpointList = HdfsUtil.getFiles(checkpointPath) """{"checkpoints":"""" + checkpointList.mkString(",") + """"}""" } @@ -259,7 +264,7 @@ object API { val json = """{"debugInfo" : [ """ + result + """]}""" json*/ - val debugPath :String = PropertyUtil.getPropertyValue("debug.path").stripSuffix("/") + "/" + appId + "/" + stopName + "/" + port; + val debugPath :String = ConfigureUtil.getDebugPath().stripSuffix("/") + "/" + appId + "/" + stopName + "/" + port; val schema = HdfsUtil.getLine(debugPath + "_schema") val result ="{\"schema\":\"" + schema+ "\", \"debugDataPath\": \""+ debugPath + "\"}" result diff --git a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala index 265cadf..4769dc6 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/HTTPService.scala @@ -1,5 +1,7 @@ package cn.piflow.api +import java.net.InetAddress + import akka.actor.{ActorRef, ActorSystem, Props} import akka.http.scaladsl.Http import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport @@ -444,7 +446,9 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup def run = { - val ip = PropertyUtil.getPropertyValue("server.ip") + //val ip = PropertyUtil.getPropertyValue("server.ip") + val ip = InetAddress.getLocalHost.getHostAddress + print("getHostAddress:" + ip + " in HTTPService!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1") val port = PropertyUtil.getIntPropertyValue("server.port") Http().bindAndHandleAsync(route, ip, port) println("Server:" + ip + ":" + port + " Started!!!") @@ -454,30 +458,14 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup object Main { - /*def preparedPath() = { - val checkpointPath = PropertyUtil.getPropertyValue("checkpoint.path") - val fsDefaultName = "hdfs://10.0.86.89:9000" - if(!HdfsUtil.exists(fsDefaultName,checkpointPath)){ - HdfsUtil.mkdir(fsDefaultName,checkpointPath) - } - val debugPath = PropertyUtil.getPropertyValue("debug.path") - if(!HdfsUtil.exists(debugPath)){ - HdfsUtil.mkdir(debugPath) - } - - val incrementPath = PropertyUtil.getPropertyValue("increment.path") - if(!HdfsUtil.exists(incrementPath)){ - HdfsUtil.mkdir(incrementPath) - } - - }*/ def flywayInit() = { + val ip = InetAddress.getLocalHost.getHostAddress // Create the Flyway instance val flyway: Flyway = new Flyway(); - var url = "jdbc:h2:tcp://"+PropertyUtil.getPropertyValue("server.ip")+":"+PropertyUtil.getPropertyValue("h2.port")+"/~/piflow" + var url = "jdbc:h2:tcp://"+ip+":"+PropertyUtil.getPropertyValue("h2.port")+"/~/piflow" // Point it to the database flyway.setDataSource(url,null,null); flyway.setLocations("db/migrations"); diff --git a/piflow-server/src/main/scala/cn/piflow/api/StartFlowGroupMain.scala b/piflow-server/src/main/scala/cn/piflow/api/StartFlowGroupMain.scala index 9648dfd..a1a8568 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/StartFlowGroupMain.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/StartFlowGroupMain.scala @@ -4,6 +4,7 @@ import cn.piflow.{Flow, Runner} import cn.piflow.api.util.PropertyUtil import cn.piflow.conf.bean.FlowGroupBean import cn.piflow.conf.util.OptionUtil +import cn.piflow.util.ConfigureUtil import org.apache.spark.sql.SparkSession import scala.collection.mutable.ArrayBuffer @@ -26,12 +27,11 @@ object StartFlowGroupMain { .appName(flowGroupBean.name) .enableHiveSupport() .getOrCreate() - //val checkpointPath = spark.sparkContext.getConf.get("checkpoint.path") val process = Runner.create() //.bind(classOf[SparkSession].getName, spark) - .bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path")) - .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) + .bind("checkpoint.path",ConfigureUtil.getCheckpointPath()) + .bind("debug.path",ConfigureUtil.getDebugPath()) .start(flowGroup); val applicationId = spark.sparkContext.applicationId process.awaitTermination(); diff --git a/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala index 7fdcdd3..1471dd9 100644 --- a/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala +++ b/piflow-server/src/main/scala/cn/piflow/api/StartFlowMain.scala @@ -4,6 +4,7 @@ import cn.piflow.Runner import cn.piflow.api.util.PropertyUtil import cn.piflow.conf.bean.FlowBean import cn.piflow.conf.util.OptionUtil +import cn.piflow.util.ConfigureUtil import org.apache.spark.sql.SparkSession import scala.util.parsing.json.JSON @@ -29,14 +30,11 @@ object StartFlowMain { .getOrCreate() println("hive.metastore.uris=" + spark.sparkContext.getConf.get("hive.metastore.uris") + "!!!!!!!") - //val checkpointPath = spark.sparkContext.getConf.get("checkpoint.path") val process = Runner.create() .bind(classOf[SparkSession].getName, spark) - //.bind("checkpoint.path","hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/") - //.bind("debug.path","hdfs://10.0.86.89:9000/xjzhu/piflow/debug/") - .bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path")) - .bind("debug.path",PropertyUtil.getPropertyValue("debug.path")) + .bind("checkpoint.path",ConfigureUtil.getCheckpointPath()) + .bind("debug.path",ConfigureUtil.getDebugPath()) .start(flow); val applicationId = spark.sparkContext.applicationId process.awaitTermination();