write transformer in javascript
This commit is contained in:
parent
a3e5ac00d0
commit
8aab03c2b7
|
@ -7,7 +7,6 @@ import java.util.concurrent.atomic.AtomicInteger
|
||||||
import java.util.{Map => JMap}
|
import java.util.{Map => JMap}
|
||||||
import javax.script.{Compilable, CompiledScript => JCompiledScript, ScriptEngineManager}
|
import javax.script.{Compilable, CompiledScript => JCompiledScript, ScriptEngineManager}
|
||||||
|
|
||||||
import jdk.nashorn.api.scripting.ScriptObjectMirror
|
|
||||||
import org.apache.spark.api.java.function.{FlatMapFunction, MapFunction}
|
import org.apache.spark.api.java.function.{FlatMapFunction, MapFunction}
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
|
||||||
|
|
||||||
|
@ -217,30 +216,10 @@ class JavaScriptEngine extends ScriptEngine {
|
||||||
bindings.asInstanceOf[JMap[String, Any]].putAll(args);
|
bindings.asInstanceOf[JMap[String, Any]].putAll(args);
|
||||||
|
|
||||||
val value = compiled.eval(bindings);
|
val value = compiled.eval(bindings);
|
||||||
if (value.isInstanceOf[java.lang.Integer]
|
|
||||||
|| value.isInstanceOf[java.lang.String]
|
|
||||||
|| value.isInstanceOf[java.lang.Double]
|
|
||||||
|| value.isInstanceOf[java.lang.Boolean]) {
|
|
||||||
value;
|
value;
|
||||||
}
|
}
|
||||||
|
|
||||||
else if (value.isInstanceOf[ScriptObjectMirror]) {
|
|
||||||
if (value.asInstanceOf[ScriptObjectMirror].isArray) {
|
|
||||||
value.asInstanceOf[ScriptObjectMirror].values().asInstanceOf[java.util.Collection[_]]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
else {
|
|
||||||
throw new UnrecognizedValueException(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class UnrecognizedValueException(value: Any)
|
|
||||||
extends RuntimeException(s"unrecognized value: $value") {
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object Tool {
|
object Tool {
|
||||||
|
|
|
@ -52,16 +52,10 @@ class FlowTest {
|
||||||
val fg = new SparkETLProcess();
|
val fg = new SparkETLProcess();
|
||||||
val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text"));
|
val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text"));
|
||||||
val s2 = fg.transform(s1, DoMap(
|
val s2 = fg.transform(s1, DoMap(
|
||||||
"""
|
SparkETLTest.SCRIPT_1, classOf[String]));
|
||||||
function(s){
|
|
||||||
return s.replaceAll("[\\x00-\\xff]|,|。|:|.|“|”|?|!| ", "");
|
|
||||||
}""", classOf[String]));
|
|
||||||
|
|
||||||
val s3 = fg.transform(s2, DoFlatMap(
|
val s3 = fg.transform(s2, DoFlatMap(
|
||||||
"""
|
SparkETLTest.SCRIPT_2, classOf[String]));
|
||||||
function(s){
|
|
||||||
return s.zip(s.drop(1)).map(t => "" + t._1 + t._2);
|
|
||||||
}""", classOf[String]));
|
|
||||||
|
|
||||||
fg.writeStream(s3, TextFile("./out/wordcount", "json"));
|
fg.writeStream(s3, TextFile("./out/wordcount", "json"));
|
||||||
|
|
||||||
|
|
|
@ -17,14 +17,8 @@ class ScriptEngineTest {
|
||||||
Assert.assertEquals("BLUEJOE", s);
|
Assert.assertEquals("BLUEJOE", s);
|
||||||
Assert.assertEquals(classOf[String], s.getClass);
|
Assert.assertEquals(classOf[String], s.getClass);
|
||||||
|
|
||||||
engine.eval("function arr(){return [1,2,3];}");
|
val s2 = engine.eval("(function (){return java.util.Arrays.asList([1,2,3]);})();");
|
||||||
|
println(s2);
|
||||||
val s2 = engine.asInstanceOf[Invocable].invokeFunction("arr");
|
|
||||||
val som = s2.asInstanceOf[ScriptObjectMirror];
|
|
||||||
Assert.assertEquals(true, som.isArray);
|
|
||||||
println(som.values());
|
|
||||||
println(som.values().getClass);
|
|
||||||
println(som.values().asInstanceOf[util.Collection[_]].iterator());
|
|
||||||
|
|
||||||
println(engine.eval("1;").getClass);
|
println(engine.eval("1;").getClass);
|
||||||
println(engine.eval("'abc';").getClass);
|
println(engine.eval("'abc';").getClass);
|
||||||
|
@ -42,6 +36,15 @@ class ScriptEngineTest {
|
||||||
println(m);
|
println(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testTs(): Unit = {
|
||||||
|
val engine = new ScriptEngineManager().getEngineByName("typescript");
|
||||||
|
val s = engine.eval("""["1","2","3"]""");
|
||||||
|
println(s);
|
||||||
|
val m = ScriptUtils.convert(s, classOf[Array[String]]);
|
||||||
|
println(m);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testScala(): Unit = {
|
def testScala(): Unit = {
|
||||||
val engine = new ScriptEngineManager().getEngineByName("scala");
|
val engine = new ScriptEngineManager().getEngineByName("scala");
|
||||||
|
|
|
@ -14,12 +14,25 @@ class SparkETLTest {
|
||||||
val fg = new SparkETLProcess();
|
val fg = new SparkETLProcess();
|
||||||
val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text"));
|
val s1 = fg.loadStream(TextFile("./out/honglou.txt", "text"));
|
||||||
val s2 = fg.transform(s1, DoMap(
|
val s2 = fg.transform(s1, DoMap(
|
||||||
|
SparkETLTest.SCRIPT_1, classOf[String]));
|
||||||
|
|
||||||
|
val s3 = fg.transform(s2, DoFlatMap(
|
||||||
|
SparkETLTest.SCRIPT_2, classOf[String]));
|
||||||
|
|
||||||
|
fg.writeStream(s3, TextFile("./out/wordcount", "json"));
|
||||||
|
|
||||||
|
FileUtils.deleteDirectory(new File("./out/wordcount"));
|
||||||
|
fg.run(new SparkProcessContext());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object SparkETLTest {
|
||||||
|
val SCRIPT_1 =
|
||||||
"""
|
"""
|
||||||
function(s){
|
function(s){
|
||||||
return s.replaceAll("[\\x00-\\xff]|,|。|:|.|“|”|?|!| ", "");
|
return s.replaceAll("[\\x00-\\xff]|,|。|:|.|“|”|?|!| ", "");
|
||||||
}""", classOf[String]));
|
}""";
|
||||||
|
val SCRIPT_2 =
|
||||||
val s3 = fg.transform(s2, DoFlatMap(
|
|
||||||
"""
|
"""
|
||||||
function (s) {
|
function (s) {
|
||||||
var arr = Array();
|
var arr = Array();
|
||||||
|
@ -28,13 +41,7 @@ class SparkETLTest {
|
||||||
arr.push(s.substring(i, i + 2));
|
arr.push(s.substring(i, i + 2));
|
||||||
}
|
}
|
||||||
|
|
||||||
return arr;
|
return java.util.Arrays.asList(arr);
|
||||||
}
|
|
||||||
""", classOf[String]));
|
|
||||||
|
|
||||||
fg.writeStream(s3, TextFile("./out/wordcount", "json"));
|
|
||||||
|
|
||||||
FileUtils.deleteDirectory(new File("./out/wordcount"));
|
|
||||||
fg.run(new SparkProcessContext());
|
|
||||||
}
|
}
|
||||||
|
""";
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue