forked from opensci/piflow
fix bug:
1.can not connect hive 2.upgrade h2db schema for v6.0
This commit is contained in:
parent
5e124a441e
commit
ac6ff3fd88
|
@ -63,6 +63,26 @@ object H2Util {
|
|||
|
||||
}
|
||||
|
||||
def updateToVersion6() = {
|
||||
val h2Server = Server.createTcpServer("-tcp", "-tcpAllowOthers", "-tcpPort",PropertyUtil.getPropertyValue("h2.port")).start()
|
||||
try{
|
||||
|
||||
val ALTER_COLUMN = "alter table flowgroup add flowCount Int;"
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
statement.setQueryTimeout(QUERY_TIME)
|
||||
statement.executeUpdate(CREATE_PROJECT_TABLE)
|
||||
statement.executeUpdate(CREATE_GROUP_TABLE)
|
||||
statement.executeUpdate(CREATE_THOUGHPUT_TABLE)
|
||||
statement.executeUpdate(ALTER_COLUMN)
|
||||
statement.close()
|
||||
|
||||
} catch{
|
||||
case ex => println(ex)
|
||||
}finally {
|
||||
h2Server.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
def addFlow(appId:String,pId:String, name:String)={
|
||||
val startTime = new Date().toString
|
||||
val statement = getConnectionInstance().createStatement()
|
||||
|
@ -626,7 +646,18 @@ object H2Util {
|
|||
}catch {
|
||||
case ex => println(ex)
|
||||
}*/
|
||||
cleanDatabase()
|
||||
if (args.size != 1){
|
||||
println("Error args!!! Please enter Clean or UpdateToVersion6")
|
||||
}
|
||||
val operation = args(0)
|
||||
if(operation == "Clean"){
|
||||
cleanDatabase()
|
||||
}else if( operation == "UpdateToVersion6"){
|
||||
updateToVersion6()
|
||||
}else{
|
||||
println("Error args!!! Please enter Clean or UpdateToVersion6")
|
||||
}
|
||||
|
||||
//println(getFlowGroupInfo("group_9b41bab2-7c3a-46ec-b716-93b636545e5e"))
|
||||
|
||||
//val flowInfoMap = getFlowInfoMap("application_1544066083705_0864")
|
||||
|
|
|
@ -1,8 +0,0 @@
|
|||
<configuration>
|
||||
|
||||
<property>
|
||||
<name>hive.metastore.schema.verification</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
</configuration>
|
|
@ -12,6 +12,7 @@ object StartFlowMain {
|
|||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val flowJson = args(0)
|
||||
|
||||
println(flowJson)
|
||||
val map = OptionUtil.getAny(JSON.parseFull(flowJson)).asInstanceOf[Map[String, Any]]
|
||||
println(map)
|
||||
|
@ -23,12 +24,17 @@ object StartFlowMain {
|
|||
//execute flow
|
||||
val spark = SparkSession.builder()
|
||||
.appName(flowBean.name)
|
||||
.config("hive.metastore.uris",PropertyUtil.getPropertyValue("hive.metastore.uris"))
|
||||
.enableHiveSupport()
|
||||
.getOrCreate()
|
||||
|
||||
println("hive.metastore.uris=" + spark.sparkContext.getConf.get("hive.metastore.uris") + "!!!!!!!")
|
||||
//val checkpointPath = spark.sparkContext.getConf.get("checkpoint.path")
|
||||
|
||||
val process = Runner.create()
|
||||
.bind(classOf[SparkSession].getName, spark)
|
||||
//.bind("checkpoint.path","hdfs://10.0.86.89:9000/xjzhu/piflow/checkpoints/")
|
||||
//.bind("debug.path","hdfs://10.0.86.89:9000/xjzhu/piflow/debug/")
|
||||
.bind("checkpoint.path",PropertyUtil.getPropertyValue("checkpoint.path"))
|
||||
.bind("debug.path",PropertyUtil.getPropertyValue("debug.path"))
|
||||
.start(flow);
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue