upload threading fulfilled
This commit is contained in:
parent
5d69645827
commit
2b09c47302
75
client.py
75
client.py
|
@ -2,7 +2,7 @@ from tornado.httpclient import HTTPRequest, AsyncHTTPClient
|
|||
from tornado.simple_httpclient import SimpleAsyncHTTPClient
|
||||
import tornado.ioloop
|
||||
import tornado.web
|
||||
import os,sys
|
||||
import os,sys,re,threading,time
|
||||
from tornado import gen
|
||||
from functools import partial
|
||||
import urllib.parse
|
||||
|
@ -41,8 +41,8 @@ class Client:
|
|||
blen = math.ceil(math.log(file_size, 2))
|
||||
bcnt = max(blen / 32, 1)
|
||||
return "".join([str(uuid.uuid1()).replace("-", "") for _ in range(bcnt)])
|
||||
|
||||
def put_stream(self, url, filename, on_response, chunk_size=8192):
|
||||
print("_gen_boundary file_size",file_size)
|
||||
def put_stream(self, url, pos, size, filename, on_response, chunk_size=8192):
|
||||
"""Uploads file to provided url.
|
||||
|
||||
:param url: URL to PUT the file data to
|
||||
|
@ -52,10 +52,11 @@ class Client:
|
|||
|
||||
:return: Future content value under the provided url as binary string.
|
||||
"""
|
||||
|
||||
print("filename is :",filename)
|
||||
file_size = os.path.getsize(filename) # Be aware: this could also block! In production, you need to do this in a separated thread too!
|
||||
uploadpos = pos
|
||||
#file_size = os.path.getsize(filename) # Be aware: this could also block! In production, you need to do this in a separated thread too!
|
||||
file_size = size # Be aware: this could also block! In production, you need to do this in a separated thread too!
|
||||
|
||||
print("filesize in put_stream is :",file_size)
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
if ext:
|
||||
content_type = mimetypes.types_map.get(ext, "application/octet-stream")
|
||||
|
@ -78,6 +79,7 @@ class Client:
|
|||
CRLF, '--', boundary, '--', CRLF
|
||||
]))
|
||||
content_length = len(post_head) + int(file_size) + len(post_tail)
|
||||
print("content_length is:",content_length)
|
||||
headers = {
|
||||
'Content-Type': 'multipart/form-data; boundary=' + boundary,
|
||||
'Content-Transfer-Encoding': 'binary',
|
||||
|
@ -90,11 +92,19 @@ class Client:
|
|||
sys.stdout.write(post_head.decode('ascii'))
|
||||
write(post_head)
|
||||
remaining = file_size
|
||||
print("remaining in body_produceris :",remaining)
|
||||
with open(filename, "rb") as fileobj:
|
||||
fileobj.seek(int(uploadpos))
|
||||
while remaining > 0:
|
||||
data = yield threadpool.submit(fileobj.read, chunk_size)
|
||||
#print("uploadpos in while is :",int(uploadpos))
|
||||
# data = yield threadpool.submit(fileobj.read(int(remaining)), chunk_size)
|
||||
data = fileobj.read(int(remaining))
|
||||
# print(str(data,encoding = "utf-8"))
|
||||
if data:
|
||||
remaining -= len(data)
|
||||
print("len(data) in if is :",len(data),"uploadpos in while is :",int(uploadpos),"remaining in if is :",len(data))
|
||||
#print("data in if is :",data)
|
||||
#print("remaining in if is :",len(data))
|
||||
if DEBUG:
|
||||
sys.stdout.write(data.decode('utf-8'))
|
||||
yield write(data)
|
||||
|
@ -103,9 +113,9 @@ class Client:
|
|||
if DEBUG:
|
||||
sys.stdout.write(post_tail.decode('ascii'))
|
||||
write(post_tail)
|
||||
|
||||
request = tornado.httpclient.HTTPRequest(url=url, request_timeout=1200, method='POST', headers=headers,body_producer=body_producer)
|
||||
|
||||
request = tornado.httpclient.HTTPRequest(url=url, request_timeout=1200, method='POST', headers=headers,
|
||||
body_producer=body_producer)
|
||||
return tornado.httpclient.AsyncHTTPClient().fetch(request, callback=on_response)
|
||||
|
||||
|
||||
|
@ -115,9 +125,9 @@ def geturlread(action,host,filepath,uid,gid,pos,size):
|
|||
print(url)
|
||||
return url
|
||||
|
||||
def geturlupload(action,host,targetpath):
|
||||
def geturlupload(action,host,targetpath,pos,size,totalsize):
|
||||
if action=="upload":
|
||||
url = "http://"+host+"/upload?targetpath="+targetpath
|
||||
url = "http://"+host+"/upload?targetpath="+targetpath+"&pos="+str(pos)+"&size="+str(size)+"&totalsize="+str(totalsize)
|
||||
print(url)
|
||||
return url
|
||||
|
||||
|
@ -151,16 +161,21 @@ def writer(host,filepath,targetdir,uid,gid,pos,size):
|
|||
print("total bytes downloaded was", total_downloaded)
|
||||
|
||||
@gen.coroutine
|
||||
def upload(host,filepath,targetpath):
|
||||
def upload(host,filepath,targetpath,pos,size):
|
||||
def on_response(request):
|
||||
print("=============== GOT RESPONSE ======")
|
||||
print(request.body.decode('ascii'))
|
||||
#print(request.body)
|
||||
print("===================================")
|
||||
tornado.ioloop.IOLoop.current().stop() # Stop the loop when the upload is omplete.
|
||||
tornado.ioloop.IOLoop.current().stop() # Stop the loop when the upload is complete.
|
||||
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
client = Client()
|
||||
yield client.put_stream(geturlupload("upload",host,targetpath), filepath, on_response)
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
yield client.put_stream(geturlupload("upload",host,targetpath,pos,size,os.path.getsize(filepath)), pos, size, filepath, on_response)
|
||||
# print(geturlupload("upload",host,targetpath,pos,size))
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
# print("ioloop has already started")
|
||||
|
||||
|
||||
|
||||
|
@ -170,7 +185,33 @@ def readentrance(host,filepath,targetdir,uid,gid,pos,size):
|
|||
tornado.ioloop.IOLoop.instance().start()
|
||||
|
||||
def uploadentrance(host,filepath,targetpath):
|
||||
upload(host,filepath,targetpath)
|
||||
streamno = 5
|
||||
filesize = os.path.getsize(filepath)
|
||||
streamsize = filesize // (streamno-1)
|
||||
i = 0
|
||||
threads = []
|
||||
print("filesize is:",filesize)
|
||||
while i < (streamno-1):
|
||||
threads.append(threading.Thread(target=upload,args=(host,filepath,targetpath,streamsize*i,streamsize)))
|
||||
#threads.append(threading.Thread(target=work))
|
||||
print(streamsize*i,streamsize)
|
||||
i=i+1
|
||||
if (streamsize*i) < filesize:
|
||||
threads.append(threading.Thread(target=upload,args=(host,filepath,targetpath,streamsize*i,filesize-streamsize*i)))
|
||||
#threads.append(threading.Thread(target=work))
|
||||
print(streamsize*i,filesize-streamsize*i)
|
||||
for t in threads:
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
t.join()
|
||||
print("upload all!")
|
||||
|
||||
def work():
|
||||
print ("worker")
|
||||
time.sleep(1)
|
||||
return
|
||||
|
||||
if __name__=="__main__":
|
||||
uploadentrance("202.122.37.90:28003","/root/leaf/transfer/night.mkv","/home/wangcong/leaf/upload/night.mkv")
|
||||
#uploadentrance("202.122.37.90:28006","/root/leaf/test.cpp","/root/leaf/pytoc/upload/test.cpp")
|
||||
tornado.ioloop.IOLoop.instance().start()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue