forked from opensci/piflow
fix bug: can not find user defined Configurable Stop
This commit is contained in:
parent
fd2ef07acd
commit
9102bf97bb
Binary file not shown.
|
@ -16,11 +16,6 @@
|
|||
<artifactId>piflow-core</artifactId>
|
||||
<version>0.9</version>
|
||||
</dependency>
|
||||
<!--<dependency>
|
||||
<groupId>piflow</groupId>
|
||||
<artifactId>piflow-conf</artifactId>
|
||||
<version>0.9</version>
|
||||
</dependency>-->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.clapper</groupId>
|
||||
|
|
|
@ -10,7 +10,7 @@ import cn.piflow.conf.util.MapUtil
|
|||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class EmailClean extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var columnName:String=_
|
||||
|
|
|
@ -14,7 +14,7 @@ import org.apache.spark.sql.SparkSession
|
|||
import scala.reflect.macros.ParseException
|
||||
|
||||
class IdentityNumberClean extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
//var regex:String =_
|
||||
|
|
|
@ -8,7 +8,7 @@ import cn.piflow.conf.util.MapUtil
|
|||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class PhoneNumberClean extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var columnName:String=_
|
||||
|
|
|
@ -8,7 +8,7 @@ import cn.piflow.conf.util.MapUtil
|
|||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class TitleClean extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var columnName:String=_
|
||||
|
|
|
@ -9,7 +9,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class Fork extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = -1
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class Merge extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = -1
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class SelectField extends ConfigurableStop {
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
|
|||
|
||||
class CsvParser extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -7,9 +7,9 @@ import cn.piflow.conf.{ConfigurableStop, CsvGroup, StopGroup, StopGroupEnum}
|
|||
import org.apache.spark.sql.SaveMode
|
||||
|
||||
class CsvSave extends ConfigurableStop{
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
override val inportCount: Int = 1
|
||||
override val outportCount: Int = 0
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 0
|
||||
|
||||
var csvSavePath: String = _
|
||||
var header: Boolean = _
|
||||
|
|
|
@ -10,7 +10,7 @@ import org.apache.hadoop.conf.Configuration
|
|||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
|
||||
class FetchFile extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var hdfs_path:String =_
|
||||
|
|
|
@ -13,7 +13,7 @@ import org.apache.hadoop.conf.Configuration
|
|||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class PutFile extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var hdfs_path:String =_
|
||||
|
|
|
@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession
|
|||
|
||||
|
||||
class RegexTextProcess extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var regex:String =_
|
||||
|
|
|
@ -13,7 +13,7 @@ import sun.net.ftp.{FtpClient, FtpDirEntry}
|
|||
import scala.reflect.io.Directory
|
||||
|
||||
class LoadFromFtp extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var url_str:String =_
|
||||
|
|
|
@ -13,7 +13,7 @@ import sun.net.ftp.{FtpClient, FtpDirEntry}
|
|||
import scala.reflect.io.Directory
|
||||
|
||||
class UploadToFtp extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var url_str:String =_
|
||||
|
|
|
@ -8,7 +8,7 @@ import org.apache.spark.sql.SparkSession
|
|||
|
||||
class PutHiveQL extends ConfigurableStop {
|
||||
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class PutHiveStreaming extends ConfigurableStop {
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 0
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class SelectHiveQL extends ConfigurableStop {
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
|
|||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
class LoadZipFromUrl extends ConfigurableStop{
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
var url_str:String =_
|
||||
|
|
|
@ -11,7 +11,7 @@ import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInp
|
|||
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||
|
||||
class UnGZip extends ConfigurableStop {
|
||||
override val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val authorEmail: String = "xiaoxiao@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
val fileTypes:List[String]=List("tar.gz","tar","gz")
|
||||
|
|
|
@ -9,7 +9,7 @@ import org.apache.spark.sql.SparkSession
|
|||
import scala.beans.BeanProperty
|
||||
|
||||
class JdbcRead extends ConfigurableStop {
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class JdbcWrite extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 0
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class JsonPathParser extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
@ -49,7 +49,7 @@ class JsonPathParser extends ConfigurableStop{
|
|||
}
|
||||
|
||||
class JsonStringParser extends ConfigurableStop{
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class JsonSave extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 0
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class DataFrameRowParser extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import sys.process._
|
|||
|
||||
class ShellExecutor extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 0
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -17,9 +17,9 @@ import scala.util.control.Breaks._
|
|||
* Created by admin on 2018/8/27.
|
||||
*/
|
||||
class FolderXmlParser extends ConfigurableStop{
|
||||
override val authorEmail: String = "lijie"
|
||||
override val inportCount: Int = -1
|
||||
override val outportCount: Int = 1
|
||||
val authorEmail: String = "lijie"
|
||||
val inportCount: Int = -1
|
||||
val outportCount: Int = 1
|
||||
|
||||
var rowTag:String = _
|
||||
var xmlpath:String = _
|
||||
|
|
|
@ -11,7 +11,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class XmlParser extends ConfigurableStop {
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 1
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import scala.beans.BeanProperty
|
|||
|
||||
class XmlSave extends ConfigurableStop{
|
||||
|
||||
override val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val authorEmail: String = "xjzhu@cnic.cn"
|
||||
val inportCount: Int = 1
|
||||
val outportCount: Int = 0
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import cn.piflow.Stop
|
|||
import cn.piflow.conf.bean.PropertyDescriptor
|
||||
|
||||
|
||||
trait ConfigurableStop extends Stop{
|
||||
abstract class ConfigurableStop extends Stop{
|
||||
|
||||
val authorEmail : String
|
||||
val inportCount : Int
|
||||
|
|
|
@ -16,7 +16,7 @@ object ClassUtil {
|
|||
val configurableStopClass:String = "cn.piflow.conf.ConfigurableStop"
|
||||
//val classpath:String = "/opt/project/piflow/classpath"
|
||||
|
||||
/*def findAllConfigurableStop() : List[String] = {
|
||||
/*def findAllConfigurableStopByClassFinder() : List[String] = {
|
||||
|
||||
val classpath = System.getProperty("user.dir")
|
||||
var stopList : List[String] = List()
|
||||
|
@ -63,17 +63,29 @@ object ClassUtil {
|
|||
var stopList:List[ConfigurableStop] = List()
|
||||
|
||||
val classpathFile = new File(classpath)
|
||||
println("classpath is " + classpath)
|
||||
//println("classpath is " + classpath)
|
||||
val finder = ClassFinder(getJarFile(classpathFile))
|
||||
val classes = finder.getClasses
|
||||
val classMap = ClassFinder.classInfoMap(classes)
|
||||
val it = classes.iterator
|
||||
|
||||
while(it.hasNext) {
|
||||
val externalClass = it.next()
|
||||
if(externalClass.superClassName.equals(configurableStopClass)){
|
||||
|
||||
val stopIntance = Class.forName(externalClass.name).newInstance()
|
||||
stopList = stopIntance.asInstanceOf[ConfigurableStop] +: stopList
|
||||
}
|
||||
}
|
||||
stopList
|
||||
|
||||
/*val classMap = ClassFinder.classInfoMap(classes)
|
||||
val plugins = ClassFinder.concreteSubclasses(configurableStopClass,classMap)
|
||||
plugins.foreach{
|
||||
pluginClassInfo =>
|
||||
val plugin = Class.forName(pluginClassInfo.name).newInstance()
|
||||
stopList = plugin.asInstanceOf[ConfigurableStop] +: stopList
|
||||
}
|
||||
stopList
|
||||
stopList*/
|
||||
}
|
||||
|
||||
def findAllGroups() : List[String] = {
|
||||
|
@ -89,10 +101,22 @@ object ClassUtil {
|
|||
var stop:Option[ConfigurableStop] = None
|
||||
|
||||
val classpathFile = new File(classpath)
|
||||
println("classpath is " + classpath)
|
||||
//println("classpath is " + classpath)
|
||||
val finder = ClassFinder(getJarFile(classpathFile))
|
||||
val classes = finder.getClasses
|
||||
val classMap = ClassFinder.classInfoMap(classes)
|
||||
val it = classes.iterator
|
||||
|
||||
while(it.hasNext) {
|
||||
val externalClass = it.next()
|
||||
if(externalClass.superClassName.equals(configurableStopClass)){
|
||||
if (externalClass.name.equals(bundle)){
|
||||
val stopIntance = Class.forName(externalClass.name).newInstance()
|
||||
stop = Some(stopIntance.asInstanceOf[ConfigurableStop])
|
||||
return stop
|
||||
}
|
||||
}
|
||||
}
|
||||
/*val classMap = ClassFinder.classInfoMap(classes)
|
||||
val plugins = ClassFinder.concreteSubclasses(configurableStopClass,classMap)
|
||||
plugins.foreach{
|
||||
pluginClassInfo =>
|
||||
|
@ -101,7 +125,7 @@ object ClassUtil {
|
|||
stop = Some(plugin.asInstanceOf[ConfigurableStop])
|
||||
return stop
|
||||
}
|
||||
}
|
||||
}*/
|
||||
stop
|
||||
}
|
||||
|
||||
|
@ -153,9 +177,10 @@ object ClassUtil {
|
|||
("sensitive" -> property.sensitive.toString)) }) )
|
||||
val jsonString = compactRender(json)
|
||||
jsonString
|
||||
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
/*def main(args: Array[String]): Unit = {
|
||||
//val stop = findConfigurableStop("cn.piflow.bundle.Class1")
|
||||
//val allConfigurableStopList = findAllConfigurableStop()
|
||||
/*val propertyDescriptorList = findConfigurableStopPropertyDescriptor("cn.piflow.bundle.Xjzhu")
|
||||
|
@ -166,13 +191,15 @@ object ClassUtil {
|
|||
val str = propertyJsonList.mkString(start, ",", end)
|
||||
println(str)*/
|
||||
|
||||
val stop = findConfigurableStopInClasspath("cn.piflow.bundle.Xjzhu")
|
||||
val temp = 1
|
||||
//println(findAllGroups());
|
||||
/*val stop = findAllConfigurableStop()
|
||||
stop.foreach(s => println(s.getClass.getName))
|
||||
val temp = 1*/
|
||||
|
||||
//val stoplist = findAllGroups();
|
||||
//println(stoplist)
|
||||
|
||||
}
|
||||
/*val stop = findConfigurableStop("cn.piflow.bundle.Xjzhu")
|
||||
println(stop.getClass.getName)*/
|
||||
|
||||
|
||||
}*/
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue