forked from opensci/piflow
parent
39dabc0eb7
commit
a887eefac5
|
@ -6,19 +6,26 @@ import java.util
|
|||
import java.net.URL
|
||||
|
||||
import cn.piflow.conf.ConfigurableStop
|
||||
import cn.piflow.util.PropertyUtil
|
||||
import org.clapper.classutil.ClassFinder
|
||||
|
||||
import scala.collection.mutable.{Map => MMap}
|
||||
|
||||
class PluginManager {
|
||||
|
||||
private var pluginPath = PropertyUtil.getClassPath()
|
||||
private val pluginMap = MMap[String, PluginClassLoader]()
|
||||
|
||||
def PlugInManager() = {}
|
||||
|
||||
def getPluginPath() : String = {
|
||||
this.pluginPath
|
||||
}
|
||||
|
||||
def getConfigurableStop(plugName: String, bundleName: String): ConfigurableStop = {
|
||||
try {
|
||||
val forName = Class.forName(bundleName, true, getLoader(plugName))
|
||||
val plugin = pluginPath + plugName
|
||||
val forName = Class.forName(bundleName, true, getLoader(plugin))
|
||||
val ins = forName.newInstance.asInstanceOf[ConfigurableStop]
|
||||
ins
|
||||
} catch {
|
||||
|
@ -35,11 +42,11 @@ class PluginManager {
|
|||
def getConfigurableStop(bundleName: String): ConfigurableStop = {
|
||||
val it = pluginMap.keys.iterator
|
||||
while (it.hasNext) {
|
||||
val pluginName = it.next
|
||||
val plugin = it.next
|
||||
try {
|
||||
val forName = Class.forName(bundleName, true, getLoader(pluginName))
|
||||
val forName = Class.forName(bundleName, true, getLoader(plugin))
|
||||
val ins = forName.newInstance.asInstanceOf[ConfigurableStop]
|
||||
System.out.println(bundleName + " is found in " + pluginName)
|
||||
System.out.println(bundleName + " is found in " + plugin)
|
||||
return ins
|
||||
|
||||
} catch {
|
||||
|
@ -48,7 +55,7 @@ class PluginManager {
|
|||
case e: InstantiationException =>
|
||||
e.printStackTrace()
|
||||
case e: ClassNotFoundException =>
|
||||
System.err.println(bundleName + " can not be found in " + pluginName)
|
||||
System.err.println(bundleName + " can not be found in " + plugin)
|
||||
//e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
@ -60,22 +67,20 @@ class PluginManager {
|
|||
var stopList = List[ConfigurableStop]()
|
||||
val pluginIterator = pluginMap.keys.iterator
|
||||
while (pluginIterator.hasNext) {
|
||||
val pluginName : String = pluginIterator.next
|
||||
val finder = ClassFinder(Seq(new File(pluginName)))
|
||||
val plugin : String = pluginIterator.next
|
||||
val finder = ClassFinder(Seq(new File(plugin)))
|
||||
val classes = finder.getClasses
|
||||
//val it = classes.iterator
|
||||
|
||||
try{
|
||||
//while(it.hasNext){
|
||||
|
||||
for( externalClass <- classes){
|
||||
|
||||
try {
|
||||
if(externalClass.superClassName.equals(ClassUtil.configurableStopClass) &&
|
||||
!externalClass.name.equals(ClassUtil.configurableStreamingStop) &&
|
||||
!externalClass.name.equals(ClassUtil.configurableIncrementalStop)){
|
||||
val forName = Class.forName(externalClass.name, true, getLoader(pluginName))
|
||||
val forName = Class.forName(externalClass.name, true, getLoader(plugin))
|
||||
val ins = forName.newInstance.asInstanceOf[ConfigurableStop]
|
||||
System.out.println("Find ConfigurableStop: " + externalClass.name + " in " + pluginName)
|
||||
System.out.println("Find ConfigurableStop: " + externalClass.name + " in " + plugin)
|
||||
stopList = ins +: stopList
|
||||
}
|
||||
|
||||
|
@ -83,10 +88,51 @@ class PluginManager {
|
|||
case e: IllegalAccessException =>
|
||||
e.printStackTrace()
|
||||
case e: InstantiationException =>
|
||||
System.err.println(externalClass.name + " can not be instantiation in " + pluginName)
|
||||
System.err.println(externalClass.name + " can not be instantiation in " + plugin)
|
||||
//e.printStackTrace()
|
||||
case e: ClassNotFoundException =>
|
||||
System.err.println(externalClass.name + " can not be found in " + pluginName)
|
||||
System.err.println(externalClass.name + " can not be found in " + plugin)
|
||||
}
|
||||
}
|
||||
}catch {
|
||||
case e: UnsupportedOperationException => {
|
||||
System.err.println("external plugin throw UnsupportedOperationException.")
|
||||
//e.printStackTrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
stopList
|
||||
}
|
||||
|
||||
def getPluginConfigurableStops(pluginName : String): List[ConfigurableStop] = {
|
||||
|
||||
var stopList = List[ConfigurableStop]()
|
||||
val plugin = this.getPluginPath() + pluginName
|
||||
if(pluginMap.contains(plugin)){
|
||||
|
||||
val finder = ClassFinder(Seq(new File(plugin)))
|
||||
val classes = finder.getClasses
|
||||
try{
|
||||
for( externalClass <- classes){
|
||||
|
||||
try {
|
||||
if(externalClass.superClassName.equals(ClassUtil.configurableStopClass) &&
|
||||
!externalClass.name.equals(ClassUtil.configurableStreamingStop) &&
|
||||
!externalClass.name.equals(ClassUtil.configurableIncrementalStop)){
|
||||
val forName = Class.forName(externalClass.name, true, getLoader(plugin))
|
||||
val ins = forName.newInstance.asInstanceOf[ConfigurableStop]
|
||||
System.out.println("Find ConfigurableStop: " + externalClass.name + " in " + plugin)
|
||||
stopList = ins +: stopList
|
||||
}
|
||||
|
||||
} catch {
|
||||
case e: IllegalAccessException =>
|
||||
e.printStackTrace()
|
||||
case e: InstantiationException =>
|
||||
System.err.println(externalClass.name + " can not be instantiation in " + plugin)
|
||||
//e.printStackTrace()
|
||||
case e: ClassNotFoundException =>
|
||||
System.err.println(externalClass.name + " can not be found in " + plugin)
|
||||
}
|
||||
}
|
||||
}catch {
|
||||
|
@ -108,7 +154,6 @@ class PluginManager {
|
|||
def loadPlugin(pluginName: String): Unit = {
|
||||
this.pluginMap.remove(pluginName)
|
||||
val loader = new PluginClassLoader
|
||||
//String pluginurl = "jar:file:/opt/project/piflow/classpath/" + pluginName + ".jar!/";
|
||||
val pluginurl = "jar:file:" + pluginName + "!/"
|
||||
var url : URL = null
|
||||
try
|
||||
|
|
|
@ -6,19 +6,16 @@ import java.text.SimpleDateFormat
|
|||
import java.util.{Date, Properties}
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import cn.piflow.api.HTTPService.pluginManager
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import cn.piflow.conf.util.{ClassUtil, MapUtil, OptionUtil, PluginManager}
|
||||
import cn.piflow.{GroupExecution, Process, Runner}
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import cn.piflow.conf.bean.{FlowBean, GroupBean}
|
||||
import cn.piflow.conf.util.ClassUtil.getJarFile
|
||||
import cn.piflow.util._
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpPut}
|
||||
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPut}
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.apache.http.util.EntityUtils
|
||||
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
|
||||
import org.apache.spark.launcher.{SparkAppHandle}
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
import scala.util.control.Breaks._
|
||||
|
@ -27,8 +24,7 @@ object API {
|
|||
|
||||
def addPlugin(pluginManager:PluginManager, pluginName : String) : Boolean = {
|
||||
var result = false
|
||||
val classpath = System.getProperty("user.dir")+ "/classpath/"
|
||||
val classpathFile = new File(classpath)
|
||||
val classpathFile = new File(pluginManager.getPluginPath())
|
||||
val jarFile = FileUtil.getJarFile(classpathFile)
|
||||
breakable{
|
||||
jarFile.foreach( i => {
|
||||
|
@ -46,8 +42,7 @@ object API {
|
|||
|
||||
def removePlugin(pluginManager:PluginManager, pluginName : String) : Boolean = {
|
||||
var result = false
|
||||
val classpath = System.getProperty("user.dir")+ "/classpath/"
|
||||
val classpathFile = new File(classpath)
|
||||
val classpathFile = new File(pluginManager.getPluginPath())
|
||||
val jarFile = FileUtil.getJarFile(classpathFile)
|
||||
breakable{
|
||||
jarFile.foreach( i => {
|
||||
|
@ -62,7 +57,15 @@ object API {
|
|||
result
|
||||
}
|
||||
|
||||
def getConfigurableStopInPlugin(pluginManager:PluginManager, pluginName : String) : String = {
|
||||
var bundleList = List[String]()
|
||||
val stops = pluginManager.getPluginConfigurableStops(pluginName)
|
||||
stops.foreach(s => {
|
||||
bundleList = s.getClass.getName +: bundleList
|
||||
})
|
||||
|
||||
"""{"bundles":"""" + bundleList.mkString(",") + """"}"""
|
||||
}
|
||||
|
||||
def getResourceInfo() : String = {
|
||||
|
||||
|
|
|
@ -12,8 +12,6 @@ import akka.http.scaladsl.server.Directives
|
|||
import akka.stream.ActorMaterializer
|
||||
import cn.piflow.GroupExecution
|
||||
import cn.piflow.api.HTTPService.pluginManager
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import cn.piflow.conf.util.ClassUtil.getJarFile
|
||||
import cn.piflow.conf.util.{MapUtil, OptionUtil, PluginManager}
|
||||
import cn.piflow.util._
|
||||
import com.typesafe.akka.extension.quartz.QuartzSchedulerExtension
|
||||
|
@ -415,11 +413,13 @@ object HTTPService extends DefaultJsonProtocol with Directives with SprayJsonSup
|
|||
val data = toJson(entity)
|
||||
val pluginName = data.get("plugin").getOrElse("").asInstanceOf[String]
|
||||
val isOk = API.addPlugin(pluginManager, pluginName)
|
||||
|
||||
val result = "Ok"
|
||||
Future.successful(HttpResponse(SUCCESS_CODE, entity = result))
|
||||
if(isOk){
|
||||
val bundles = API.getConfigurableStopInPlugin(pluginManager, pluginName)
|
||||
Future.successful(HttpResponse(SUCCESS_CODE, entity = bundles))
|
||||
}else{
|
||||
Future.successful(HttpResponse(FAIL_CODE, entity = "Fail"))
|
||||
}
|
||||
}
|
||||
|
||||
case ex => {
|
||||
println(ex)
|
||||
Future.successful(HttpResponse(FAIL_CODE, entity = "Fail"))
|
||||
|
@ -494,8 +494,7 @@ object Main {
|
|||
}
|
||||
|
||||
def initPlugin() = {
|
||||
val classpath = System.getProperty("user.dir")+ "/classpath/"
|
||||
val classpathFile = new File(classpath)
|
||||
val classpathFile = new File(pluginManager.getPluginPath())
|
||||
val jarFile = FileUtil.getJarFile(classpathFile)
|
||||
jarFile.foreach( i => {
|
||||
println(i.getAbsolutePath)
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
package cn.piflow.api
|
||||
|
||||
import cn.piflow.Runner
|
||||
import cn.piflow.api.util.PropertyUtil
|
||||
import cn.piflow.conf.bean.FlowBean
|
||||
import cn.piflow.conf.util.OptionUtil
|
||||
import cn.piflow.util.ConfigureUtil
|
||||
import cn.piflow.util.{ConfigureUtil, PropertyUtil}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import scala.util.parsing.json.JSON
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
package cn.piflow.api.util
|
||||
|
||||
import java.io.{File, FileInputStream, InputStream}
|
||||
import java.util.Properties
|
||||
|
||||
object PropertyUtil {
|
||||
private val prop: Properties = new Properties()
|
||||
var fis: InputStream = null
|
||||
var path :String = ""
|
||||
var classPath:String = ""
|
||||
try{
|
||||
//val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath
|
||||
//fis = this.getClass.getResourceAsStream("")
|
||||
val userDir = System.getProperty("user.dir")
|
||||
path = userDir + "/config.properties"
|
||||
classPath = userDir + "/classpath/"
|
||||
prop.load(new FileInputStream(path))
|
||||
} catch{
|
||||
case ex: Exception => ex.printStackTrace()
|
||||
}
|
||||
|
||||
def getConfigureFile() : String = {
|
||||
path
|
||||
}
|
||||
|
||||
def getClassPath():String = {
|
||||
classPath
|
||||
}
|
||||
|
||||
def getPropertyValue(propertyKey: String): String ={
|
||||
val obj = prop.get(propertyKey)
|
||||
if(obj != null){
|
||||
return obj.toString
|
||||
}
|
||||
null
|
||||
}
|
||||
|
||||
def getIntPropertyValue(propertyKey: String): Int ={
|
||||
val obj = prop.getProperty(propertyKey)
|
||||
if(obj != null){
|
||||
return obj.toInt
|
||||
}
|
||||
throw new NullPointerException
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue