spark process

This commit is contained in:
bluejoe2008@gmail.com 2018-05-06 21:29:31 +08:00
parent df50b9b40e
commit 215d68820f
4 changed files with 168 additions and 99 deletions

10
src/main/scala/etl.scala Normal file
View File

@ -0,0 +1,10 @@
/**
* Created by bluejoe on 2018/5/6.
*/
package cn.piflow
class SparkETLProcess extends Process{
override def run(pc: ProcessContext): Unit = {
}
}

View File

@ -3,8 +3,10 @@ package cn.piflow
import cn.piflow.util.Logging
import org.quartz.Trigger.CompletedExecutionInstruction
import org.quartz.impl.StdSchedulerFactory
import org.quartz.impl.matchers.GroupMatcher
import org.quartz.{Trigger => QuartzTrigger, _}
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => MMap}
/**
@ -31,42 +33,65 @@ trait Event {
}
case class ProcessCompletedListener(boundProcessName: String, executionContext: ExecutionContext, predecessors: String*) extends EventListener {
val dps = predecessors.distinct;
var fired = false;
val completed = MMap[String, Boolean]();
dps.foreach { processName =>
completed(processName) = false;
};
def handle(event: Event, args: Any) {
completed(event.asInstanceOf[ProcessCompleted].processName) = true;
if (completed.values.filter(!_).isEmpty) {
fired = true;
executionContext.fire(LaunchProcess(), boundProcessName);
}
}
}
/**
* start process while dependent processes completed
*/
object DependencyTrigger {
def dependOn(triggeredProcessName: String, dependencies: String*): Trigger = new Trigger() {
override def activate(context: ExecutionContext): Unit = {
val listener = new ProcessCompletedListener(triggeredProcessName, context, dependencies: _*);
dependencies.foreach { dependency =>
context.on(ProcessCompleted(dependency), listener);
def dependency(processName: String, dependentProcesses: String*): Trigger = new Trigger() {
override def activate(executionContext: ExecutionContext): Unit = {
val listener = new EventListener {
var fired = false;
val completed = MMap[String, Boolean]();
dependentProcesses.foreach { processName =>
completed(processName) = false;
};
def handle(event: Event, args: Any) {
completed(event.asInstanceOf[ProcessCompleted].processName) = true;
if (completed.values.filter(!_).isEmpty) {
fired = true;
executionContext.fire(LaunchProcess(), processName);
}
}
};
dependentProcesses.foreach { dependency =>
executionContext.on(ProcessCompleted(dependency), listener);
}
}
override def getTriggeredProcesses(): Seq[String] = Seq(triggeredProcessName);
override def getTriggeredProcesses(): Seq[String] = Seq(processName);
}
}
object CronTrigger {
/**
* start processes repeatedly
*/
object TimerTrigger {
def cron(cronExpr: String, processNames: String*): Trigger = new Trigger() {
override def activate(context: ExecutionContext): Unit = {
processNames.foreach { processName =>
context.cronProcess(processName, cronExpr);
context.scheduleProcessCronly(processName, cronExpr);
}
}
override def getTriggeredProcesses(): Seq[String] = processNames;
}
}
/**
* start processes while Events happen
*/
object EventTrigger {
def listen(event: Event, processNames: String*): Trigger = new Trigger() {
override def activate(context: ExecutionContext): Unit = {
processNames.foreach { processName =>
context.on(event, new EventListener() {
override def handle(event: Event, args: Any): Unit = {
context.scheduleProcess(processName);
}
});
}
}
@ -75,13 +100,17 @@ object CronTrigger {
}
trait Execution {
def start();
def stop();
def getRunningProcesses(): Seq[(String, String)];
def getScheduledProcesses(): Seq[String];
}
trait Runner {
def run(chain: Flow, starts: String*): Execution;
def run(flow: Flow, starts: String*): Execution;
}
trait ProcessContext {
@ -132,9 +161,9 @@ trait EventEmiter {
trait ExecutionContext extends EventEmiter {
def getFlow(): Flow;
def startProcess(name: String): Unit;
def scheduleProcess(name: String): Unit;
def cronProcess(cronExpr: String, processName: String): Unit;
def scheduleProcessCronly(cronExpr: String, processName: String): Unit;
}
class EventEmiterImpl extends EventEmiter with Logging {
@ -160,44 +189,7 @@ class EventEmiterImpl extends EventEmiter with Logging {
}
}
case class LaunchProcessListener(executionContext: ExecutionContext) extends EventListener {
def handle(event: Event, args: Any) {
executionContext.startProcess(args.asInstanceOf[String]);
}
}
class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logging {
val executionContext = new EventEmiterImpl() with ExecutionContext {
//listens on LaunchProcess
this.on(LaunchProcess(), new LaunchProcessListener(this));
override def startProcess(processName: String): Unit = {
val quartzTrigger = TriggerBuilder.newTrigger().startNow()
.build();
val quartzJob = JobBuilder.newJob(classOf[ProcessAsQuartzJob])
.usingJobData("processName", processName)
.build();
quartzScheduler.scheduleJob(quartzJob, quartzTrigger);
}
override def cronProcess(processName: String, cronExpr: String): Unit = {
val quartzTrigger = TriggerBuilder.newTrigger().startNow()
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpr))
.build();
val quartzJob = JobBuilder.newJob(classOf[CronAsQuartzJob])
.usingJobData("processName", processName)
.build();
quartzScheduler.scheduleJob(quartzJob, quartzTrigger);
}
override def getFlow(): Flow = flow;
};
def start(): Unit = {
triggers.foreach(_.activate(executionContext));
quartzScheduler.start();
@ -210,6 +202,56 @@ class ExecutionImpl(flow: Flow, starts: Seq[String]) extends Execution with Logg
quartzScheduler.shutdown();
}
def getRunningProcesses(): Seq[(String, String)] = {
quartzScheduler.getCurrentlyExecutingJobs()
.map { jec: JobExecutionContext =>
(jec.getFireInstanceId,
jec.getJobDetail.getJobDataMap.get("processName").asInstanceOf[String])
};
}
def getScheduledProcesses(): Seq[String] = {
quartzScheduler.getJobKeys(GroupMatcher.anyGroup())
.map(quartzScheduler.getJobDetail(_))
.map(_.getJobDataMap.get("processName").asInstanceOf[String])
.toSeq;
}
val executionContext = new EventEmiterImpl() with ExecutionContext {
//listens on LaunchProcess
this.on(LaunchProcess(), new EventListener() {
override def handle(event: Event, args: Any): Unit = {
scheduleProcess(args.asInstanceOf[String]);
}
});
private def _scheduleProcess(processName: String, scheduleBuilder: Option[ScheduleBuilder[_]] = None): Unit = {
val quartzTriggerBuilder = TriggerBuilder.newTrigger().startNow();
if (scheduleBuilder.isDefined) {
quartzTriggerBuilder.withSchedule(scheduleBuilder.get)
};
val quartzTrigger = quartzTriggerBuilder.build();
val quartzJob = JobBuilder.newJob(classOf[ProcessAsQuartzJob])
.usingJobData("processName", processName)
.build();
logger.debug(s"scheduled process: $processName");
quartzScheduler.scheduleJob(quartzJob, quartzTrigger);
}
override def scheduleProcess(processName: String): Unit = {
_scheduleProcess(processName);
}
override def scheduleProcessCronly(processName: String, cronExpr: String): Unit = {
_scheduleProcess(processName, Some(CronScheduleBuilder.cronSchedule(cronExpr)));
}
override def getFlow(): Flow = flow;
};
val quartzScheduler = StdSchedulerFactory.getDefaultScheduler();
quartzScheduler.getContext.put("executionContext", executionContext);
val triggers = flow.getTriggers();
@ -256,13 +298,4 @@ class ProcessAsQuartzJob extends Job with Logging {
val executionContext = context.getScheduler.getContext.get("executionContext").asInstanceOf[ExecutionContext];
executionContext.getFlow().getProcess(processName).run(null);
}
}
class CronAsQuartzJob extends Job with Logging {
override def execute(context: JobExecutionContext): Unit = {
val map = context.getJobDetail.getJobDataMap;
val processName = map.get("processName").asInstanceOf[String];
val executionContext = context.getScheduler.getContext.get("executionContext").asInstanceOf[ExecutionContext];
executionContext.fire(LaunchProcess(), processName);
}
}

View File

@ -8,4 +8,4 @@ log4j.logger.org=WARN
log4j.logger.io=WARN
log4j.logger.java=WARN
log4j.logger.org.quartz=WARN
log4j.logger.cn.piflow.ExecutionImpl=WARN
#log4j.logger.cn.piflow.ExecutionImpl=WARN

View File

@ -7,19 +7,18 @@ import org.apache.spark.sql.SparkSession
import org.junit.Test
class FlowTest {
@Test
def test1() {
private def _testFlow1(processCountWords: Process) {
val flow = new FlowImpl();
flow.addProcess("CleanHouse", new CleanHouse());
flow.addProcess("CopyTextFile", new CopyTextFile());
flow.addProcess("CountWords", new CountWords());
flow.addProcess("CountWords", processCountWords);
flow.addProcess("PrintCount", new PrintCount());
flow.addProcess("PrintMessage", new PrintMessage());
flow.addTrigger(DependencyTrigger.dependOn("CopyTextFile", "CleanHouse"));
flow.addTrigger(DependencyTrigger.dependOn("CountWords", "CopyTextFile"));
flow.addTrigger(DependencyTrigger.dependOn("PrintCount", "CountWords"));
flow.addTrigger(CronTrigger.cron("0/5 * * * * ? ", "PrintMessage"));
flow.addTrigger(DependencyTrigger.dependency("CopyTextFile", "CleanHouse"));
flow.addTrigger(DependencyTrigger.dependency("CountWords", "CopyTextFile"));
flow.addTrigger(DependencyTrigger.dependency("PrintCount", "CountWords"));
flow.addTrigger(TimerTrigger.cron("0/5 * * * * ? ", "PrintMessage"));
val runner = new RunnerImpl();
val exe = runner.run(flow, "CleanHouse");
@ -27,6 +26,48 @@ class FlowTest {
Thread.sleep(20000);
exe.stop();
}
@Test
def test1() {
_testFlow1(new CountWords());
class CountWords extends Process {
def run(pc: ProcessContext): Unit = {
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
import spark.implicits._
val count = spark.read.textFile("./out/honglou.txt")
.map(_.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""))
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc);
count.write.json("./out/wordcount");
spark.close();
}
}
}
@Test
def test2() {
val fg = new SparkETLProcess();
val node1 = fg.createNode(DoLoad(TextFile("./out/honglou.txt")));
val node2 = fg.createNode(DoMap(
"""
function(s){
return s.replaceAll("[\\x00-\\xff]||。|||“|”||| ", "");
}"""));
val node3 = fg.createNode(DoFlatMap(
"""
function(s){
return s.zip(s.drop(1)).map(t => "" + t._1 + t._2);
}"""));
val node4 = fg.createNode(DoWrite(JsonFile("./out/wordcount")));
fg.pipe(node1, node2, node3, node4);
_testFlow1(fg);
}
}
class PrintMessage extends Process {
@ -50,21 +91,6 @@ class CopyTextFile extends Process {
}
}
class CountWords extends Process {
def run(pc: ProcessContext): Unit = {
val spark = SparkSession.builder.master("local[4]")
.getOrCreate();
import spark.implicits._
val count = spark.read.textFile("./out/honglou.txt")
.map(_.replaceAll("[\\x00-\\xff]||。|||“|”||| ", ""))
.flatMap(s => s.zip(s.drop(1)).map(t => "" + t._1 + t._2))
.groupBy("value").count.sort($"count".desc);
count.write.json("./out/wordcount");
spark.close();
}
}
class PrintCount extends Process {
def run(pc: ProcessContext): Unit = {
val spark = SparkSession.builder.master("local[4]")