client with pos
This commit is contained in:
parent
2d82d7eb9f
commit
93e4c6ec50
156
client.py
156
client.py
|
@ -2,7 +2,8 @@ from tornado.httpclient import HTTPRequest, AsyncHTTPClient
|
|||
from tornado.simple_httpclient import SimpleAsyncHTTPClient
|
||||
import tornado.ioloop
|
||||
import tornado.web
|
||||
import os,sys,re,threading,time
|
||||
import os,sys,re,time
|
||||
import threading
|
||||
from tornado import gen
|
||||
from functools import partial
|
||||
import urllib.parse
|
||||
|
@ -14,6 +15,7 @@ import tornado.iostream
|
|||
from tornado.escape import utf8
|
||||
from tornado.log import gen_log
|
||||
from ctypes import *
|
||||
import struct
|
||||
import os
|
||||
#import time,datat
|
||||
|
||||
|
@ -45,16 +47,14 @@ class Client:
|
|||
bcnt = max(blen / 32, 1)
|
||||
return "".join([str(uuid.uuid1()).replace("-", "") for _ in range(bcnt)])
|
||||
print("_gen_boundary file_size",file_size)
|
||||
def put_stream(self, url, pos, size, filename, on_response, chunk_size=8192):
|
||||
"""Uploads file to provided url.
|
||||
|
||||
def put_stream(self, url, pos, size, filename, on_response, chunk_size=1024):
|
||||
'''Uploads file to provided url.
|
||||
:param url: URL to PUT the file data to
|
||||
:param filename: Name of the file (Content-Disposition header)
|
||||
:param file_size: Size of the file in bytes. Used to produce a Content-Size header from file_size.
|
||||
:param on_response: See AsyncHTTPClient.fetch
|
||||
|
||||
:return: Future content value under the provided url as binary string.
|
||||
"""
|
||||
'''
|
||||
uploadpos = pos
|
||||
#file_size = os.path.getsize(filename) # Be aware: this could also block! In production, you need to do this in a separated thread too!
|
||||
file_size = size # Be aware: this could also block! In production, you need to do this in a separated thread too!
|
||||
|
@ -118,6 +118,7 @@ class Client:
|
|||
write(post_tail)
|
||||
|
||||
request = tornado.httpclient.HTTPRequest(url=url, request_timeout=1200, method='POST', headers=headers,body_producer=body_producer)
|
||||
print("before1:"+threading.current_thread().getName()+url)
|
||||
|
||||
return tornado.httpclient.AsyncHTTPClient().fetch(request, callback=on_response)
|
||||
|
||||
|
@ -134,8 +135,9 @@ def geturlupload(action,host,targetpath,pos,size,totalsize):
|
|||
print(url)
|
||||
return url
|
||||
|
||||
def chunky(path,pos,totalsize,chunk):
|
||||
# print("self._max_body_size",self._max_body_size)
|
||||
'''def chunky(path,pos,totalsize,chunk):
|
||||
#print("self._max_body_size",self._max_body_size)
|
||||
print("chunk length",len(chunk))
|
||||
global total_downloaded
|
||||
global readchunky
|
||||
if readchunky == False and os.path.exists(path):
|
||||
|
@ -144,15 +146,18 @@ def chunky(path,pos,totalsize,chunk):
|
|||
if not os.path.exists(path):
|
||||
f = open(path,'w')
|
||||
f.close()
|
||||
f = open(path,'ab+')
|
||||
f.seek(int(pos))
|
||||
f.write(chunk)
|
||||
pos = pos+len(chunk)
|
||||
f.flush()
|
||||
f = open(path,'rb+')
|
||||
chunklength = len(chunk)-8
|
||||
pos,chunknew = struct.unpack('l%ds'%chunklength,chunk)
|
||||
print("decode pos is",pos)
|
||||
f.seek(pos)
|
||||
f.write(chunknew)
|
||||
print("after write position,chunksize",f.tell(),len(chunknew))
|
||||
pos = pos+len(chunknew)
|
||||
#f.flush()
|
||||
if pos==totalsize:
|
||||
f.close()
|
||||
total_downloaded += len(chunk)
|
||||
# print("chunk size",len(chunk))
|
||||
total_downloaded += len(chunknew)'''
|
||||
# the OS blocks on file reads + writes -- beware how big the chunks is as it could effect things
|
||||
|
||||
def sizebwchunky(chunk):
|
||||
|
@ -173,83 +178,122 @@ def sizebw(host,filepath):
|
|||
|
||||
@gen.coroutine
|
||||
def writer(host,filepath,targetdir,uid,gid,pos,size):
|
||||
#print("writer function")
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
file_name = targetdir+os.path.basename(filepath)
|
||||
'''if os.path.exists(targetdir):
|
||||
pass
|
||||
else:
|
||||
os.makedirs(targetdir)
|
||||
f = open(file_name,'w')
|
||||
f.close()'''
|
||||
request = HTTPRequest(geturlread("read",host,filepath,uid,gid,pos,size), streaming_callback=partial(chunky, file_name, pos, size), decompress_response=True, request_timeout=300)
|
||||
AsyncHTTPClient.configure('tornado.simple_httpclient.SimpleAsyncHTTPClient', max_body_size=1024*1024*1024)
|
||||
http_client = AsyncHTTPClient(force_instance=True)
|
||||
#AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
|
||||
#http_client = AsyncHTTPClient()
|
||||
response = yield http_client.fetch(request)
|
||||
# tornado.ioloop.IOLoop.instance().stop()
|
||||
print("total bytes downloaded was", total_downloaded)
|
||||
if total_downloaded==FILESIZE:
|
||||
file_name = targetdir
|
||||
path = file_name
|
||||
chunk_size = 80*1024*1024
|
||||
no = int(size) // chunk_size
|
||||
i = 0
|
||||
global total_downloaded
|
||||
global readchunky
|
||||
#if readchunky == False and os.path.exists(path):
|
||||
# os.remove(path)
|
||||
# readchunky = True
|
||||
#if not os.path.exists(path):
|
||||
# f = open(path,'w')
|
||||
# f.close()
|
||||
while i<no:
|
||||
request = HTTPRequest(geturlread("read",host,filepath,uid,gid,pos,str(chunk_size)),request_timeout=300)
|
||||
pos = str(int(pos)+chunk_size)
|
||||
i = i+1
|
||||
http_client = AsyncHTTPClient()
|
||||
response = yield http_client.fetch(request)
|
||||
response = response.body
|
||||
print("chunk length",len(response))
|
||||
f = open(path,'rb+')
|
||||
chunklength = len(response)-8
|
||||
posnew,chunknew = struct.unpack('l%ds'%chunklength,response)
|
||||
#print("posnew is",posnew)
|
||||
f.seek(posnew)
|
||||
f.write(chunknew)
|
||||
total_downloaded=total_downloaded+len(chunknew)
|
||||
#print("after write position,chunksize",f.tell(),len(chunknew))
|
||||
f.close()
|
||||
print("total bytes downloaded was", total_downloaded)
|
||||
if (int(size) % chunk_size) != 0:
|
||||
last = int(size) % chunk_size
|
||||
request = HTTPRequest(geturlread("read",host,filepath,uid,gid,pos,str(last)),request_timeout=300)
|
||||
http_client = AsyncHTTPClient()
|
||||
response = yield http_client.fetch(request)
|
||||
response = response.body
|
||||
#print("chunk length",len(response))
|
||||
f = open(path,'rb+')
|
||||
chunklength = len(response)-8
|
||||
posnew,chunknew = struct.unpack('l%ds'%chunklength,response)
|
||||
#print("posnew is",posnew)
|
||||
f.seek(posnew)
|
||||
f.write(chunknew)
|
||||
total_downloaded=total_downloaded+len(chunknew)
|
||||
#print("after write position,chunksize",f.tell(),len(chunknew))
|
||||
f.close()
|
||||
print("total bytes downloaded was", total_downloaded)
|
||||
if total_downloaded==realsize:
|
||||
tornado.ioloop.IOLoop.instance().stop()
|
||||
''' lib=cdll.LoadLibrary('./libpycall.so')
|
||||
=======
|
||||
lib=cdll.LoadLibrary('./libpycall.so')
|
||||
>>>>>>> 2d82d7eb9ff824fb9c04107ae3974609b756dc08
|
||||
print("start")
|
||||
func=lib.update_bitmap
|
||||
func.argtypes=(c_int,c_int,c_char_p)
|
||||
func=lib.update_bitmap(int(pos),int(size),filepath.encode("utf-8"))
|
||||
print("finish")
|
||||
<<<<<<< HEAD
|
||||
'''
|
||||
|
||||
@gen.coroutine
|
||||
def upload(host,filepath,targetpath,pos,size):
|
||||
def on_response(request):
|
||||
print("on_response:"+threading.current_thread().getName())
|
||||
|
||||
print("=============== GOT RESPONSE ======")
|
||||
print(request.body.decode('ascii'))
|
||||
#print(request.body)
|
||||
print("===================================")
|
||||
tornado.ioloop.IOLoop.current().stop() # Stop the loop when the upload is complete.
|
||||
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
client = Client()
|
||||
print("before"+threading.current_thread().getName())
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
yield client.put_stream(geturlupload("upload",host,targetpath,pos,size,os.path.getsize(filepath)), pos, size, filepath, on_response)
|
||||
# print(geturlupload("upload",host,targetpath,pos,size))
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
# print("ioloop has already started")
|
||||
print("after:"+threading.current_thread().getName())
|
||||
#tornado.ioloop.IOLoop.current().stop() # Stop the loop when the upload is complete.
|
||||
|
||||
|
||||
|
||||
def readentrance(host,filepath,targetdir,uid,gid,pos,size):
|
||||
print(host,filepath,targetdir,uid,gid,pos,size)
|
||||
sizebw(host,filepath)
|
||||
tornado.ioloop.IOLoop.instance().start()
|
||||
filesize = FILESIZE
|
||||
streamno = 2
|
||||
streamno = 3
|
||||
start_time = time.time()
|
||||
global realsize
|
||||
if(int(size)>=filesize):
|
||||
realsize = filesize
|
||||
streamsize = (filesize-int(pos)) // (streamno-1)
|
||||
else:
|
||||
realsize = int(size)
|
||||
streamsize = (int(size)) // (streamno-1)
|
||||
|
||||
i = 0
|
||||
threads = []
|
||||
while i < (streamno-1):
|
||||
threads.append(threading.Thread(target=writer,args=(host,filepath,targetdir,uid,gid,streamsize*i,streamsize)))
|
||||
# print(streamsize*i,streamsize)
|
||||
threads.append(threading.Thread(target=writer,args=(host,filepath,targetdir,uid,gid,int(pos)+streamsize*i,streamsize)))
|
||||
i=i+1
|
||||
if (streamsize*i) < filesize:
|
||||
threads.append(threading.Thread(target=writer,args=(host,filepath,targetdir,uid,gid,streamsize*i,filesize-streamsize*i)))
|
||||
# print(streamsize*i,filesize-streamsize*i)
|
||||
if (streamsize*i) < realsize:
|
||||
threads.append(threading.Thread(target=writer,args=(host,filepath,targetdir,uid,gid,int(pos)+streamsize*i,realsize-streamsize*i)))
|
||||
for t in threads:
|
||||
t.setDaemon(True)
|
||||
print("thread name is :",t.getName())
|
||||
t.start()
|
||||
# tornado.ioloop.IOLoop.instance().start()
|
||||
# t.join()
|
||||
tornado.ioloop.IOLoop.instance().start()
|
||||
i=0
|
||||
for t in threads:
|
||||
t.join()
|
||||
#tornado.ioloop.IOLoop.instance().stop()
|
||||
end_time = time.time()
|
||||
print("Total time :{}".format(end_time-start_time))
|
||||
|
||||
def uploadentrance(host,filepath,targetpath):
|
||||
streamno = 2
|
||||
streamno = 3
|
||||
start_time = time.time()
|
||||
filesize = os.path.getsize(filepath)
|
||||
streamsize = filesize // (streamno-1)
|
||||
i = 0
|
||||
|
@ -259,17 +303,15 @@ def uploadentrance(host,filepath,targetpath):
|
|||
i=i+1
|
||||
if (streamsize*i) < filesize:
|
||||
threads.append(threading.Thread(target=upload,args=(host,filepath,targetpath,streamsize*i,filesize-streamsize*i)))
|
||||
|
||||
for t in threads:
|
||||
t.setDaemon(True)
|
||||
print("thread name is :",t.getName())
|
||||
t.start()
|
||||
# tornado.ioloop.IOLoop.instance().start()
|
||||
#t.join()
|
||||
#print("upload all!")
|
||||
tornado.ioloop.IOLoop.instance().start()
|
||||
#for t in threads:
|
||||
t.join()
|
||||
end_time = time.time()
|
||||
print("Total time :{}".format(end_time-start_time))
|
||||
#tornado.ioloop.IOLoop.current().stop() # Stop the loop when the upload is complete.
|
||||
|
||||
|
||||
'''if __name__=="__main__":
|
||||
uploadentrance("202.122.37.90:28003","/root/leaf/transfer/night.mkv","/home/wangcong/leaf/upload/night.mkv")
|
||||
#uploadentrance("202.122.37.90:28006","/root/leaf/test.cpp","/root/leaf/pytoc/upload/test.cpp")
|
||||
tornado.ioloop.IOLoop.instance().start()'''
|
||||
|
|
Loading…
Reference in New Issue