This commit is contained in:
bluejoe2008@gmail.com 2018-05-03 18:03:54 +08:00
commit 1228948ddb
7 changed files with 397 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
# Created by .ignore support plugin (hsz.mobi)
### Scala template
*.class
*.log
/.idea/
.DS_Store

151
piflow.iml Normal file
View File

@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/scala" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/scala" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="scala-sdk-2.11.8" level="application" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-core_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro-mapred:hadoop2:1.7.7" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro-ipc:1.7.7" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro:1.7.7" level="project" />
<orderEntry type="library" name="Maven: org.apache.avro:avro-ipc:tests:1.7.7" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.13" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" />
<orderEntry type="library" name="Maven: com.twitter:chill_2.11:0.8.0" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:kryo-shaded:3.0.3" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:minlog:1.3.0" level="project" />
<orderEntry type="library" name="Maven: org.objenesis:objenesis:2.1" level="project" />
<orderEntry type="library" name="Maven: com.twitter:chill-java:0.8.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.xbean:xbean-asm5-shaded:4.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-client:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-common:2.2.0" level="project" />
<orderEntry type="library" name="Maven: commons-cli:commons-cli:1.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-math:2.1" level="project" />
<orderEntry type="library" name="Maven: xmlenc:xmlenc:0.52" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.1" level="project" />
<orderEntry type="library" name="Maven: commons-lang:commons-lang:2.5" level="project" />
<orderEntry type="library" name="Maven: commons-configuration:commons-configuration:1.6" level="project" />
<orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
<orderEntry type="library" name="Maven: commons-digester:commons-digester:1.8" level="project" />
<orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils:1.7.0" level="project" />
<orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils-core:1.8.0" level="project" />
<orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:2.5.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-auth:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-compress:1.4.1" level="project" />
<orderEntry type="library" name="Maven: org.tukaani:xz:1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-hdfs:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.mortbay.jetty:jetty-util:6.1.26" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-app:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-common:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-client:2.2.0" level="project" />
<orderEntry type="library" name="Maven: com.google.inject:guice:3.0" level="project" />
<orderEntry type="library" name="Maven: javax.inject:javax.inject:1" level="project" />
<orderEntry type="library" name="Maven: aopalliance:aopalliance:1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-server-common:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-shuffle:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-api:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-core:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-common:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-jobclient:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-annotations:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-launcher_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-network-common_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.fusesource.leveldbjni:leveldbjni-all:1.8" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.6.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-network-shuffle_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-unsafe_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: net.java.dev.jets3t:jets3t:0.7.1" level="project" />
<orderEntry type="library" name="Maven: commons-codec:commons-codec:1.3" level="project" />
<orderEntry type="library" name="Maven: commons-httpclient:commons-httpclient:3.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.curator:curator-recipes:2.4.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.curator:curator-framework:2.4.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.curator:curator-client:2.4.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.5" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:14.0.1" level="project" />
<orderEntry type="library" name="Maven: javax.servlet:javax.servlet-api:3.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-math3:3.4.1" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.16" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:jul-to-slf4j:1.7.16" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:jcl-over-slf4j:1.7.16" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-log4j12:1.7.16" level="project" />
<orderEntry type="library" name="Maven: com.ning:compress-lzf:1.0.3" level="project" />
<orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.1.2.6" level="project" />
<orderEntry type="library" name="Maven: net.jpountz.lz4:lz4:1.3.0" level="project" />
<orderEntry type="library" name="Maven: org.roaringbitmap:RoaringBitmap:0.5.11" level="project" />
<orderEntry type="library" name="Maven: commons-net:commons-net:2.2" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-library:2.11.8" level="project" />
<orderEntry type="library" name="Maven: org.json4s:json4s-jackson_2.11:3.2.11" level="project" />
<orderEntry type="library" name="Maven: org.json4s:json4s-core_2.11:3.2.11" level="project" />
<orderEntry type="library" name="Maven: org.json4s:json4s-ast_2.11:3.2.11" level="project" />
<orderEntry type="library" name="Maven: com.thoughtworks.paranamer:paranamer:2.6" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scalap:2.11.0" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-compiler:2.11.0" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-parser-combinators_2.11:1.0.1" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-client:2.22.2" level="project" />
<orderEntry type="library" name="Maven: javax.ws.rs:javax.ws.rs-api:2.0.1" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-api:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-utils:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2.external:aopalliance-repackaged:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2.external:javax.inject:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-locator:2.4.0-b34" level="project" />
<orderEntry type="library" name="Maven: org.javassist:javassist:3.18.1-GA" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-common:2.22.2" level="project" />
<orderEntry type="library" name="Maven: javax.annotation:javax.annotation-api:1.2" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.bundles.repackaged:jersey-guava:2.22.2" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.hk2:osgi-resource-locator:1.0.1" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-server:2.22.2" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.media:jersey-media-jaxb:2.22.2" level="project" />
<orderEntry type="library" name="Maven: javax.validation:validation-api:1.1.0.Final" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.containers:jersey-container-servlet:2.22.2" level="project" />
<orderEntry type="library" name="Maven: org.glassfish.jersey.containers:jersey-container-servlet-core:2.22.2" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-all:4.0.42.Final" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty:3.8.0.Final" level="project" />
<orderEntry type="library" name="Maven: com.clearspring.analytics:stream:2.7.0" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.1.2" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-jvm:3.1.2" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-json:3.1.2" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-graphite:3.1.2" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.6.5" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.6.5" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-scala_2.11:2.6.5" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-reflect:2.11.7" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-paranamer:2.6.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.ivy:ivy:2.4.0" level="project" />
<orderEntry type="library" name="Maven: oro:oro:2.0.8" level="project" />
<orderEntry type="library" name="Maven: net.razorvine:pyrolite:4.13" level="project" />
<orderEntry type="library" name="Maven: net.sf.py4j:py4j:0.10.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-tags_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.scalatest:scalatest_2.11:2.2.6" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-xml_2.11:1.0.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-crypto:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.spark-project.spark:unused:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-sql_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: com.univocity:univocity-parsers:2.2.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-sketch_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.spark:spark-catalyst_2.11:2.1.0" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.janino:janino:3.0.0" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.janino:commons-compiler:3.0.0" level="project" />
<orderEntry type="library" name="Maven: org.antlr:antlr4-runtime:4.5.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-column:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-common:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-encoding:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-hadoop:1.8.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-format:2.3.0-incubating" level="project" />
<orderEntry type="library" name="Maven: org.apache.parquet:parquet-jackson:1.8.1" level="project" />
</component>
</module>

View File

@ -0,0 +1,58 @@
package cn.piflow.util
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.commons.lang.StringUtils
object FormatUtils {
def format(x: Any, nullString: String = "(null)"): String = {
x match {
case null => nullString
case None => nullString
case Some(m) => format(m, nullString);
case date: Date => new SimpleDateFormat("yyyy-MM-dd HH:mm:SS").format(date)
case _ => x.toString
}
}
def printTable(columns: Seq[String], data: Seq[Seq[Any]], nullString: String = "(null)") = {
val formatedColumns: Seq[String] = columns.map(format(_, nullString));
val formatedData: Seq[Seq[String]] = data.map(_.map(format(_, nullString)));
val sb = new StringBuilder
val numCols = formatedColumns.length
// Initialise the width of each column to a minimum value of '3'
val colWidths = Array.fill(numCols)(3)
// Compute the width of each column
for (row formatedData :+ formatedColumns) {
for ((cell, i) row.zipWithIndex) {
colWidths(i) = math.max(colWidths(i), cell.length)
}
}
// Create SeparateLine
val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()
// column names
formatedColumns.zipWithIndex.map {
case (cell, i)
StringUtils.rightPad(cell, colWidths(i))
}.addString(sb, "|", "|", "|\n")
sb.append(sep)
// data
formatedData.map {
_.zipWithIndex.map {
case (cell, i)
StringUtils.rightPad(cell, colWidths(i))
}.addString(sb, "|", "|", "|\n")
}
sb.append(sep);
println(sb);
}
}

View File

@ -0,0 +1,10 @@
package util
import org.apache.log4j.Logger
/**
* Created by bluejoe on 2017/10/14.
*/
trait Logging {
protected lazy val logger = Logger.getLogger(this.getClass);
}

View File

@ -0,0 +1,59 @@
package util
/**
* @author bluejoe2008@gmail.com
*/
import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream, PipedInputStream, PipedOutputStream, PrintWriter}
import java.net.ServerSocket
object MockNetCat {
def start(port: Int) = new MockNetCat(port);
}
class MockNetCat(port: Int) {
val pout = new PipedOutputStream();
val pin = new PipedInputStream(pout);
val serverSocket = new ServerSocket(port);
val listenThread = new Thread() {
override def run() {
val socket = serverSocket.accept();
val sout = socket.getOutputStream();
val sin = socket.getInputStream();
val thread1 = createSyncThread(pin, sout);
val thread2 = createSyncThread(sin, System.out);
thread1.start();
thread2.start();
thread1.join();
socket.shutdownOutput();
thread2.join();
}
};
listenThread.start();
def writeData(text: String) {
pout.write(text.getBytes());
}
def stop() = {
serverSocket.close();
listenThread.stop();
}
private def createSyncThread(in: InputStream, out: OutputStream) = new Thread() {
override def run() {
val writer = new PrintWriter(out);
val reader = new BufferedReader(new InputStreamReader(in));
var line = "";
do {
line = reader.readLine();
if (line != null) {
writer.println(line);
writer.flush();
}
} while (line != null)
}
}
}

View File

@ -0,0 +1,52 @@
package util
/**
* Created by bluejoe on 2017/9/30.
*/
object ReflectUtils {
implicit def reflected(o: AnyRef) = new ReflectedObject(o);
def singleton[T](implicit m: Manifest[T]): AnyRef = {
val field = Class.forName(m.runtimeClass.getName + "$").getDeclaredField("MODULE$");
field.setAccessible(true);
field.get();
}
def instanceOf[T](args: Any*)(implicit m: Manifest[T]): T = {
val constructor = m.runtimeClass.getDeclaredConstructor(args.map(_.getClass): _*);
constructor.setAccessible(true);
constructor.newInstance(args.map(_.asInstanceOf[Object]): _*).asInstanceOf[T];
}
def instanceOf(className: String)(args: Any*) = {
val constructor = Class.forName(className).getDeclaredConstructor(args.map(_.getClass): _*);
constructor.setAccessible(true);
constructor.newInstance(args.map(_.asInstanceOf[Object]): _*);
}
}
class ReflectedObject(o: AnyRef) {
//employee._get("company.name")
def _get(name: String): AnyRef = {
var o2 = o;
for (fn <- name.split("\\.")) {
val field = o2.getClass.getDeclaredField(fn);
field.setAccessible(true);
o2 = field.get(o2);
}
o2;
}
def _getLazy(name: String): AnyRef = {
_call(s"${name}$$lzycompute")();
}
def _call(name: String)(args: Any*): AnyRef = {
//val method = o.getClass.getDeclaredMethod(name, args.map(_.getClass): _*);
//TODO: supports overloaded methods?
val methods = o.getClass.getDeclaredMethods.filter(_.getName.equals(name));
val method = methods(0);
method.setAccessible(true);
method.invoke(o, args.map(_.asInstanceOf[Object]): _*);
}
}

View File

@ -0,0 +1,60 @@
import java.io.{File, FileInputStream, FileOutputStream}
import cn.piflow.{ChainImpl, Process, ProcessContext, RunnerImpl}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.spark.sql.SparkSession
import org.junit.Test
class ChainTest {
@Test
def test1() {
val chain = new ChainImpl();
val id1 = chain.addProcess("CopyTextFile", new CopyTextFile(), "comment");
val id2 = chain.addProcess("CountWords", new CountWords());
val id3 = chain.addProcess("PrintCount", new PrintCount());
chain.link(id1, id2);
chain.link(id2, id3);
val runner = new RunnerImpl();
val exe = runner.run(chain, id1);
FileUtils.deleteDirectory(new File("./out/wordcount"));
FileUtils.deleteQuietly(new File("./out/honglou.txt"));
exe.awaitComplete();
}
}
class CopyTextFile extends Process {
def run(pc: ProcessContext): Unit = {
val is = new FileInputStream(new File("/Users/bluejoe/testdata/honglou.txt"));
val os = new FileOutputStream(new File("./out/honglou.txt"));
IOUtils.copy(is, os);
}
}
class CountWords extends Process {
def run(pc: ProcessContext): Unit = {
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
import spark.implicits._
val count = spark.read.textFile("./out/honglou.txt")
.map(_.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""))
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc);
count.write.json("./out/wordcount");
spark.close();
}
}
class PrintCount extends Process {
def run(pc: ProcessContext): Unit = {
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
import spark.implicits._
val count = spark.read.json("./out/wordcount").sort($"count".desc);
count.show(40);
spark.close();
}
}