modify flow example

This commit is contained in:
judy0131 2020-05-22 16:37:01 +08:00
parent 73ebc17e57
commit 5a4d0440b7
5 changed files with 334 additions and 60 deletions

View File

@ -37,8 +37,7 @@ Try PiFlow v0.6 with: http://piflow.cstcloud.cn/piflow-web/
## Requirements
* JDK 1.8
* Scala-2.11.8
* Apache Maven 3.1.0 or newer
* Git Client (used during build process by 'bower' plugin)
* Apache Maven 3.1.0 or newer
* Spark-2.1.0、 Spark-2.2.0、 Spark-2.3.0
* Hadoop-2.6.0
@ -47,10 +46,10 @@ Try PiFlow v0.6 with: http://piflow.cstcloud.cn/piflow-web/
### To Build:
- `install external package`
mvn install:install-file -Dfile=/.../piflow/piflow-bundle/lib/spark-xml_2.11-0.4.2.jar -DgroupId=com.databricks -DartifactId=spark-xml_2.11 -Dversion=0.4.2 -Dpackaging=jar
mvn install:install-file -Dfile=/.../piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
mvn install:install-file -Dfile=/.../piflow/piflow-bundle/lib/ojdbc6-11.2.0.3.jar -DgroupId=oracle -DartifactId=ojdbc6 -Dversion=11.2.0.3 -Dpackaging=jar
mvn install:install-file -Dfile=/.../piflow/piflow-bundle/lib/edtftpj.jar -DgroupId=ftpClient -DartifactId=edtftp -Dversion=1.0.0 -Dpackaging=jar
mvn install:install-file -Dfile=/../piflow/piflow-bundle/lib/spark-xml_2.11-0.4.2.jar -DgroupId=com.databricks -DartifactId=spark-xml_2.11 -Dversion=0.4.2 -Dpackaging=jar
mvn install:install-file -Dfile=/../piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
mvn install:install-file -Dfile=/../piflow/piflow-bundle/lib/ojdbc6-11.2.0.3.jar -DgroupId=oracle -DartifactId=ojdbc6 -Dversion=11.2.0.3 -Dpackaging=jar
mvn install:install-file -Dfile=/../piflow/piflow-bundle/lib/edtftpj.jar -DgroupId=ftpClient -DartifactId=edtftp -Dversion=1.0.0 -Dpackaging=jar
- `mvn clean package -Dmaven.test.skip=true`
@ -73,10 +72,27 @@ Try PiFlow v0.6 with: http://piflow.cstcloud.cn/piflow-web/
### Run Piflow Server
- `run piflow server on intellij`:
- edit config.properties
- build piflow to generate piflow-server-0.9.jar
- main class is cn.piflow.api.Main(remember to set SPARK_HOME)
- `run piflow server on Intellij`:
- download piflow: git clone https://github.com/cas-bigdatalab/piflow.git
- import piflow into Intellij
- edit config.properties file
- build piflow to generate piflow jar:
- Edit Configurations --> Add New Configuration --> Maven
- Name: package
- Command line: clean package -Dmaven.test.skip=true -X
- run 'package' (piflow jar file will be built in ../piflow/piflow-server/target/piflow-server-0.9.jar)
- run HttpService:
- Edit Configurations --> Add New Configuration --> Application
- Name: HttpService
- Main class : cn.piflow.api.Main
- Environment Variable: SPARK_HOME=/opt/spark-2.2.0-bin-hadoop2.6(change the path to your spark home)
- run 'HttpService'
- test HttpService:
- run /../piflow/piflow-server/src/main/scala/cn/piflow/api/HTTPClientStartMockDataFlow.scala
- change the piflow server ip and port to your configure
- `run piflow server by release version`:
- download piflow.tar.gz:
@ -87,8 +103,25 @@ Try PiFlow v0.6 with: http://piflow.cstcloud.cn/piflow-web/
- unzip piflow.tar.gz:
tar -zxvf piflow.tar.gz
- edit config.properties
- run start.sh、stop.sh、 restart.sh、 status.sh
- edit config.properties
- run start.sh、stop.sh、 restart.sh、 status.sh
- test piflow server
- set PIFLOW_HOME
- vim /etc/profile
export PIFLOW_HOME=/yourPiflowPath/bin
export PATH=$PATH:$PIFLOW_HOME/bin
- command
piflow flow start example/mockDataFlow.json
piflow flow stop appID
piflow flow info appID
piflow flow log appID
piflow flowGroup start example/mockDataGroup.json
piflow flowGroup stop groupId
piflow flowGroup info groupId
- `how to configure config.properties`
#spark and yarn config

View File

@ -0,0 +1,43 @@
{
"flow": {
"name": "MockData",
"executorMemory": "1g",
"executorNumber": "1",
"uuid": "8a80d63f720cdd2301723b7461d92600",
"paths": [
{
"inport": "",
"from": "MockData",
"to": "ShowData",
"outport": ""
}
],
"executorCores": "1",
"driverMemory": "1g",
"stops": [
{
"name": "MockData",
"bundle": "cn.piflow.bundle.common.MockData",
"uuid": "8a80d63f720cdd2301723b7461d92604",
"properties": {
"schema": "title:String, author:String, age:Int",
"count": "10"
},
"customizedProperties": {
}
},
{
"name": "ShowData",
"bundle": "cn.piflow.bundle.external.ShowData",
"uuid": "8a80d63f720cdd2301723b7461d92602",
"properties": {
"showNumber": "5"
},
"customizedProperties": {
}
}
]
}
}

View File

@ -0,0 +1,153 @@
{
"group" : {
"flows" : [ {
"flow" : {
"executorNumber" : "1",
"driverMemory" : "1g",
"executorMemory" : "1g",
"executorCores" : "1",
"paths" : [ {
"inport" : "",
"from" : "MockData",
"to" : "ShowData",
"outport" : ""
} ],
"name" : "f4",
"stops" : [ {
"customizedProperties" : { },
"name" : "MockData",
"uuid" : "8a80d63f720cdd2301723b7745b72649",
"bundle" : "cn.piflow.bundle.common.MockData",
"properties" : {
"schema" : "title:String, author:String, age:Int",
"count" : "10"
}
}, {
"customizedProperties" : { },
"name" : "ShowData",
"uuid" : "8a80d63f720cdd2301723b7745b72647",
"bundle" : "cn.piflow.bundle.external.ShowData",
"properties" : {
"showNumber" : "5"
}
} ],
"uuid" : "8a80d63f720cdd2301723b7745b62645"
}
}, {
"flow" : {
"executorNumber" : "1",
"driverMemory" : "1g",
"executorMemory" : "1g",
"executorCores" : "1",
"paths" : [ {
"inport" : "",
"from" : "MockData",
"to" : "ShowData",
"outport" : ""
} ],
"name" : "f3",
"stops" : [ {
"customizedProperties" : { },
"name" : "MockData",
"uuid" : "8a80d63f720cdd2301723b7745b9265d",
"bundle" : "cn.piflow.bundle.common.MockData",
"properties" : {
"schema" : "title:String, author:String, age:Int",
"count" : "10"
}
}, {
"customizedProperties" : { },
"name" : "ShowData",
"uuid" : "8a80d63f720cdd2301723b7745b9265b",
"bundle" : "cn.piflow.bundle.external.ShowData",
"properties" : {
"showNumber" : "5"
}
} ],
"uuid" : "8a80d63f720cdd2301723b7745b82659"
}
} ],
"name" : "SimpleGroup",
"groups" : [ {
"group" : {
"flows" : [ {
"flow" : {
"executorNumber" : "1",
"driverMemory" : "1g",
"executorMemory" : "1g",
"executorCores" : "1",
"paths" : [ {
"inport" : "",
"from" : "MockData",
"to" : "ShowData",
"outport" : ""
} ],
"name" : "MockData",
"stops" : [ {
"customizedProperties" : { },
"name" : "MockData",
"uuid" : "8a80d63f720cdd2301723b7745b4261a",
"bundle" : "cn.piflow.bundle.common.MockData",
"properties" : {
"schema" : "title:String, author:String, age:Int",
"count" : "10"
}
}, {
"customizedProperties" : { },
"name" : "ShowData",
"uuid" : "8a80d63f720cdd2301723b7745b32618",
"bundle" : "cn.piflow.bundle.external.ShowData",
"properties" : {
"showNumber" : "5"
}
} ],
"uuid" : "8a80d63f720cdd2301723b7745b32616"
}
}, {
"flow" : {
"executorNumber" : "1",
"driverMemory" : "1g",
"executorMemory" : "1g",
"executorCores" : "1",
"paths" : [ {
"inport" : "",
"from" : "MockData",
"to" : "ShowData",
"outport" : ""
} ],
"name" : "MockData",
"stops" : [ {
"customizedProperties" : { },
"name" : "MockData",
"uuid" : "8a80d63f720cdd2301723b7745b5262e",
"bundle" : "cn.piflow.bundle.common.MockData",
"properties" : {
"schema" : "title:String, author:String, age:Int",
"count" : "10"
}
}, {
"customizedProperties" : { },
"name" : "ShowData",
"uuid" : "8a80d63f720cdd2301723b7745b5262c",
"bundle" : "cn.piflow.bundle.external.ShowData",
"properties" : {
"showNumber" : "5"
}
} ],
"uuid" : "8a80d63f720cdd2301723b7745b4262a"
}
} ],
"name" : "g1",
"uuid" : "8a80d63f720cdd2301723b7745b22615"
}
} ],
"conditions" : [ {
"entry" : "f4",
"after" : "g1"
}, {
"entry" : "f3",
"after" : "g1"
} ],
"uuid" : "8a80d63f720cdd2301723b7745b22614"
}
}

View File

@ -0,0 +1,80 @@
package cn.piflow.api
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.util.EntityUtils
object HTTPClientStartMockDataFlow {
def main(args: Array[String]): Unit = {
val json =
"""
|{
| "flow": {
| "name": "MockData",
| "executorMemory": "1g",
| "executorNumber": "1",
| "uuid": "8a80d63f720cdd2301723b7461d92600",
| "paths": [
| {
| "inport": "",
| "from": "MockData",
| "to": "ShowData",
| "outport": ""
| }
| ],
| "executorCores": "1",
| "driverMemory": "1g",
| "stops": [
| {
| "name": "MockData",
| "bundle": "cn.piflow.bundle.common.MockData",
| "uuid": "8a80d63f720cdd2301723b7461d92604",
| "properties": {
| "schema": "title:String, author:String, age:Int",
| "count": "10"
| },
| "customizedProperties": {
|
| }
| },
| {
| "name": "ShowData",
| "bundle": "cn.piflow.bundle.external.ShowData",
| "uuid": "8a80d63f720cdd2301723b7461d92602",
| "properties": {
| "showNumber": "5"
| },
| "customizedProperties": {
|
| }
| }
| ]
| }
|}
""".stripMargin
val url = "http://10.0.85.83:8001/flow/start"
val timeout = 1800
val requestConfig = RequestConfig.custom()
.setConnectTimeout(timeout*1000)
.setConnectionRequestTimeout(timeout*1000)
.setSocketTimeout(timeout*1000).build()
val client = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build()
val post:HttpPost = new HttpPost(url)
post.addHeader("Content-Type", "application/json")
post.setEntity(new StringEntity(json))
val response:CloseableHttpResponse = client.execute(post)
val entity = response.getEntity
val str = EntityUtils.toString(entity,"UTF-8")
println("Code is " + str)
}
}

View File

@ -1,55 +1,20 @@
1.maven error
apt-get install maven
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/spark-xml_2.11-0.4.2.jar -DgroupId=com.databricks -DartifactId=spark-xml_2.11 -Dversion=0.4.2 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/ojdbc6-11.2.0.3.jar -DgroupId=oracle -DartifactId=ojdbc6 -Dversion=11.2.0.3 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/edtftpj.jar -DgroupId=ftpClient -DartifactId=edtftp -Dversion=1.0.0 -Dpackaging=jar
apt-get install maven
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/spark-xml_2.11-0.4.2.jar -DgroupId=com.databricks -DartifactId=spark-xml_2.11 -Dversion=0.4.2 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/java_memcached-release_2.6.6.jar -DgroupId=com.memcached -DartifactId=java_memcached-release -Dversion=2.6.6 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/ojdbc6-11.2.0.3.jar -DgroupId=oracle -DartifactId=ojdbc6 -Dversion=11.2.0.3 -Dpackaging=jar
mvn install:install-file -Dfile=/opt/project/piflow/piflow-bundle/lib/edtftpj.jar -DgroupId=ftpClient -DartifactId=edtftp -Dversion=1.0.0 -Dpackaging=jar
2.packaging
2.Packaging by Intellij
1)Edit Configurations --> add Maven
Command line: clean package -Dmaven.test.skip=true -X
2)Build piflow-server-0.9.jar
clean package -Dmaven.test.skip=true -U
3.set SPARK_HOME in Configurations
Edit Configurations --> Application(HttpService) --> Configurations --> Environment Variable
4. yarn log aggregation
Edit yarn-site.xml, add the following content
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.log-aggregation.debug-enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name>
<value>3600</value>
</property>
5.kafka related jars are needed to put on the spark cluster
spark-streaming-kafka-0-10_2.11-2.1.0.jar
kafka_2.11-2.1.1.jar
kafka-clients-2.1.1.jar
start kafka server: ./bin/kafka-server-start.sh -daemon config/server.properties
stop kafka server: ./bin/kafka-server-stop.sh
start kafka producer: ./bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic streaming
start kafka consumer: ./bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic streaming
list topics:
./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
./bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
create topics:
./bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partictions 3 --topic newTopic
3.run main class in Intellij
6.flume related jars are needed to put on the spark cluster
spark-streaming-flume_2.11-2.1.0.jar
1)Edit Configurations --> Application
Main class: cn.piflow.api.Main
Environment Variable: SPARK_HOME=/opt/spark-2.2.0-bin-hadoop2.6;
start flume agent: bin/flume-ng agent -n streamingAgent -c conf -f conf/streaming.conf -Dflume.root.logger=INFO,console
7.socket text stream
nc -lk 9999