add h2db version control

This commit is contained in:
judy0131 2019-12-06 10:56:08 +08:00
parent ac6ff3fd88
commit 2a9229103f
5 changed files with 95 additions and 16 deletions

View File

@ -1,27 +1,26 @@
server.ip=10.0.86.98
server.port=8001
#spark.master=spark://10.0.86.89:7077
#spark.master=spark://10.0.86.191:7077
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
spark.master=yarn
spark.deploy.mode=cluster
yarn.resourcemanager.hostname=10.0.86.191
yarn.resourcemanager.address=10.0.86.191:8032
yarn.access.namenode=hdfs://10.0.86.191:9000
yarn.stagingDir=hdfs://10.0.86.191:9000/tmp/
yarn.jars=hdfs://10.0.86.191:9000/user/spark/share/lib/*.jar
hive.metastore.uris=thrift://10.0.86.191:9083
#piflow.bundle=/opt/project/piflow/out/artifacts/piflow_bundle/piflow-bundle.jar
piflow.bundle=/opt/project/piflow/piflow-server/target/piflow-server-0.9.jar
yarn.url=http://10.0.86.191:8088/ws/v1/cluster/apps/
checkpoint.path=hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/
debug.path=hdfs://10.0.86.89:9000/xjzhu/piflow/debug/
increment.path=hdfs://10.0.86.89:9000/xjzhu/piflow/increment/
#show data in log
hive.metastore.uris=thrift://10.0.88.71:9083
#Hdfs path, these paths will be created autometicly
checkpoint.path=hdfs://10.0.86.89:9000/user/piflow/checkpoints/
debug.path=hdfs://10.0.86.89:9000/user/piflow/debug/
increment.path=hdfs://10.0.86.89:9000/user/piflow/increment/
#show data in log, set 0 if you do not show the logs
data.show=10
#monitor the throughput of flow

View File

@ -9,7 +9,7 @@
"name":"XmlParser",
"bundle":"cn.piflow.bundle.xml.XmlParser",
"properties":{
"xmlpath":"hdfs://10.0.86.89:9000/xjzhu/dblp.mini.xml",
"xmlpath":"hdfs://10.0.86.191:9000/xjzhu/dblp.mini.xml",
"rowTag":"phdthesis"
}
@ -49,7 +49,7 @@
"name":"JsonSave",
"bundle":"cn.piflow.bundle.json.JsonSave",
"properties":{
"jsonSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis.json"
"jsonSavePath":"hdfs://10.0.86.191:9000/xjzhu/phdthesis.json"
}
},
{
@ -57,7 +57,7 @@
"name":"CsvSave",
"bundle":"cn.piflow.bundle.csv.CsvSave",
"properties":{
"csvSavePath":"hdfs://10.0.86.89:9000/xjzhu/phdthesis_result.csv",
"csvSavePath":"hdfs://10.0.86.191:9000/xjzhu/phdthesis_result.csv",
"header":"true",
"delimiter":","
}

View File

@ -183,4 +183,35 @@ object HdfsUtil {
result
}
def exists(fsDefaultName: String, filePath : String) : Boolean = {
var result : Boolean = false
try{
val conf = new Configuration()
conf.set("fs.default.name",fsDefaultName)
fs = FileSystem.get(conf)
result = HdfsHelper.exists(fs, filePath)
}catch{
case ex : IOException => println(ex)
}
result
}
def mkdir(fsDefaultName: String, path:String) = {
var result : Boolean = false
try{
val conf = new Configuration()
conf.set("fs.default.name",fsDefaultName)
fs = FileSystem.get(conf)
result = HdfsHelper.createFolder(fs, path)
}catch{
case ex : IOException => println(ex)
}
result
}
}

View File

@ -62,6 +62,12 @@
<artifactId>akka-quartz-scheduler_2.11</artifactId>
<version>${akka.quartz.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.flywaydb/flyway-core -->
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
<version>5.0.7</version>
</dependency>
</dependencies>

View File

@ -10,13 +10,15 @@ import akka.stream.ActorMaterializer
import cn.piflow.{FlowGroupExecution, ProjectExecution}
import cn.piflow.api.util.PropertyUtil
import cn.piflow.conf.util.{MapUtil, OptionUtil}
import cn.piflow.util.{IdGenerator, JsonUtil}
import cn.piflow.util.{HdfsUtil, IdGenerator, JsonUtil}
import com.typesafe.akka.extension.quartz.QuartzSchedulerExtension
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.util.parsing.json.JSON
import org.apache.spark.launcher.SparkAppHandle
import org.flywaydb.core.Flyway
import org.flywaydb.core.api.FlywayException
import org.h2.tools.Server
import spray.json.DefaultJsonProtocol
@ -451,8 +453,49 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
}
object Main {
/*def preparedPath() = {
val checkpointPath = PropertyUtil.getPropertyValue("checkpoint.path")
val fsDefaultName = "hdfs://10.0.86.89:9000"
if(!HdfsUtil.exists(fsDefaultName,checkpointPath)){
HdfsUtil.mkdir(fsDefaultName,checkpointPath)
}
val debugPath = PropertyUtil.getPropertyValue("debug.path")
if(!HdfsUtil.exists(debugPath)){
HdfsUtil.mkdir(debugPath)
}
val incrementPath = PropertyUtil.getPropertyValue("increment.path")
if(!HdfsUtil.exists(incrementPath)){
HdfsUtil.mkdir(incrementPath)
}
}*/
def flywayInit() = {
// Create the Flyway instance
val flyway: Flyway = new Flyway();
var url = "jdbc:h2:tcp://"+PropertyUtil.getPropertyValue("server.ip")+":"+PropertyUtil.getPropertyValue("h2.port")+"/~/piflow"
// Point it to the database
flyway.setDataSource(url,null,null);
flyway.setLocations("db/migrations");
flyway.setEncoding("UTF-8");
flyway.setTable("FLYWAY_SCHEMA_HISTORY");
flyway.setBaselineOnMigrate(true);
try {
//Start the migration
flyway.migrate();
} catch {
case e: FlywayException=>
flyway.repair();
print(e);
}
}
def main(argv: Array[String]):Unit = {
HTTPService.run
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort",PropertyUtil.getPropertyValue("h2.port")).start()
flywayInit();
}
}