rename bundleIn to inport

This commit is contained in:
bluejoe2008@gmail.com 2018-07-05 14:18:47 +08:00
parent 4da512ba7f
commit a95cc77c20
2 changed files with 26 additions and 1 deletions

View File

@ -0,0 +1,19 @@
package cn.piflow.lib
import cn.piflow._
class DoMerge extends Stop {
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
out.write(in.ports().map(in.read(_)).reduce((x, y) => x.union(y)));
}
}
class DoFork(outports: Seq[String] = Seq("op1", "op2")) extends Stop {
override def initialize(ctx: ProcessContext): Unit = {}
override def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = {
outports.foreach(out.write(_, in.read()));
}
}

View File

@ -12,6 +12,8 @@ trait JobInputStream {
def read(): DataFrame; def read(): DataFrame;
def ports(): Seq[String];
def read(inport: String): DataFrame; def read(inport: String): DataFrame;
} }
@ -20,7 +22,7 @@ trait JobOutputStream {
def write(data: DataFrame); def write(data: DataFrame);
def write(outport: String, data: DataFrame); def write(bundle: String, data: DataFrame);
def sendError(); def sendError();
} }
@ -248,6 +250,10 @@ class JobInputStreamImpl() extends JobInputStream {
.map(x => (x._1.inport, x._2.getDataFrame(x._1.outport))); .map(x => (x._1.inport, x._2.getDataFrame(x._1.outport)));
}; };
override def ports(): Seq[String] = {
inputs.keySet.toSeq;
}
override def read(): DataFrame = { override def read(): DataFrame = {
if (inputs.isEmpty) if (inputs.isEmpty)
throw new NoInputAvailableException(); throw new NoInputAvailableException();