From 65d73aa9cb3264fd67f19ba9002f54ab0aea2a69 Mon Sep 17 00:00:00 2001 From: xiaoxiao Date: Fri, 10 Aug 2018 10:33:46 +0800 Subject: [PATCH] 8.10 --- .../piflow/bundle/http/LoadZipFromUrl.scala | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 piflow-bundle/src/main/scala/cn/piflow/bundle/http/LoadZipFromUrl.scala diff --git a/piflow-bundle/src/main/scala/cn/piflow/bundle/http/LoadZipFromUrl.scala b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/LoadZipFromUrl.scala new file mode 100644 index 0000000..17d44b4 --- /dev/null +++ b/piflow-bundle/src/main/scala/cn/piflow/bundle/http/LoadZipFromUrl.scala @@ -0,0 +1,88 @@ +package cn.piflow.bundle.http + +import java.io._ +import java.net.{HttpURLConnection, URL} + +import cn.piflow.conf.{ConfigurableStop, HttpGroup, StopGroup} +import cn.piflow.conf.bean.PropertyDescriptor +import cn.piflow.conf.util.MapUtil +import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext} +import org.apache.spark.sql.SparkSession + +class LoadZipFromUrl extends ConfigurableStop{ + val inportCount: Int = 0 + val outportCount: Int = 1 + var url_str:String =_ + + def perform(in: JobInputStream, out: JobOutputStream, pec: JobContext): Unit = { + val url=new URL(url_str) + val uc:HttpURLConnection=url.openConnection().asInstanceOf[HttpURLConnection] + uc.setDoInput(true) + uc.connect() + val inputStream:InputStream=uc.getInputStream() + val size=uc.getContentLength + /*val savePath=new File("/xx/dblp/") + if(!savePath.exists()){ + savePath.mkdir() + } + println("*********savePath run****************")*/ + val urlname = url_str.split("/") + val len=urlname.length + val filename=urlname(len-1) + /*val file_path=savePath+"/"+filename + val file=new File(file_path) + if(file!=null || !file.exists()){ + file.createNewFile() + } + println("*********file create************") + + val outputStream=new FileOutputStream(file)*/ + println(size) + var byteArrayOutputStream:ByteArrayOutputStream=new ByteArrayOutputStream() + val buffer=new Array[Byte](1024*1024) + var byteRead= -1 + val spark = pec.get[SparkSession]() + import spark.sqlContext.implicits._ + var count=0 + while(((byteRead=inputStream.read(buffer)) != -1) && (byteRead != -1)){ + count=count+1 + println(count+":"+byteRead) + byteArrayOutputStream.write(buffer,0,byteRead) + } + val df_create_start_time=System.currentTimeMillis() + val byteArray=byteArrayOutputStream.toByteArray + val df=Seq((byteArray,filename)).toDF() + val df_create_end_time=System.currentTimeMillis() + println("df_create_time="+(df_create_end_time - df_create_start_time)) + val df_write_start_time=System.currentTimeMillis() + out.write(df) + val df_write_end_time=System.currentTimeMillis() + println("df_write_time="+(df_write_end_time - df_write_start_time)) + } + + def initialize(ctx: ProcessContext): Unit = { + + } + + + def setProperties(map: Map[String, Any]): Unit = { + url_str=MapUtil.get(map,key="url_str").asInstanceOf[String] + //file_Path=MapUtil.get(map,key="file_Path").asInstanceOf[String] + } + + override def getPropertyDescriptor(): List[PropertyDescriptor] = { + var descriptor : List[PropertyDescriptor] = null + val url_str = new PropertyDescriptor().name("url_str").displayName("URL").defaultValue("").required(true) + //val file_Path = new PropertyDescriptor().name("file_Path").displayName("File_Path").defaultValue("").required(true) + descriptor = url_str :: descriptor + //descriptor = file_Path :: descriptor + descriptor + } + + override def getIcon(): Array[Byte] = ??? + + override def getGroup(): StopGroup = { + HttpGroup + } + +}