lastest version
This commit is contained in:
parent
9129e94d5b
commit
594e622a5f
|
@ -1,5 +1,5 @@
|
|||
#include "client.h"
|
||||
void transread(const char *host,const char *filepath,const char *targetdir,const char *uid,const char *gid,const char *position,const char *size)
|
||||
void transread(const char *host,const char *filepath,const char *targetdir,const char *uid,const char *gid,int position,int size)
|
||||
{
|
||||
|
||||
Py_Initialize();
|
||||
|
@ -20,8 +20,8 @@ void transread(const char *host,const char *filepath,const char *targetdir,const
|
|||
PyTuple_SetItem(pArgs,2,Py_BuildValue("s",targetdir));
|
||||
PyTuple_SetItem(pArgs,3,Py_BuildValue("s",uid));
|
||||
PyTuple_SetItem(pArgs,4,Py_BuildValue("s",gid));
|
||||
PyTuple_SetItem(pArgs,5,Py_BuildValue("s",position));
|
||||
PyTuple_SetItem(pArgs,6,Py_BuildValue("s",size));
|
||||
PyTuple_SetItem(pArgs,5,Py_BuildValue("i",position));
|
||||
PyTuple_SetItem(pArgs,6,Py_BuildValue("i",size));
|
||||
pModule = PyImport_ImportModule("client");
|
||||
if(pModule == NULL)
|
||||
{
|
||||
|
@ -33,6 +33,7 @@ void transread(const char *host,const char *filepath,const char *targetdir,const
|
|||
printf("can't load function\n");
|
||||
}
|
||||
result = PyEval_CallObject(pFunc,pArgs);
|
||||
cout<<result<<endl;
|
||||
Py_DECREF(pArgs);
|
||||
if(result == NULL)
|
||||
cout<<"null"<<endl;
|
||||
|
|
2
client.h
2
client.h
|
@ -3,6 +3,6 @@
|
|||
#include <iostream>
|
||||
#include <Python.h>
|
||||
using namespace std;
|
||||
void transread(const char *host,const char *filepath,const char *targetdir,const char *uid,const char *gid,const char *position,const char *size);
|
||||
void transread(const char *host,const char *filepath,const char *targetdir,const char *uid,const char *gid,int position,int size);
|
||||
void transupload(const char *host,const char *filepath,const char *targetdir);
|
||||
#endif
|
||||
|
|
147
client.py
147
client.py
|
@ -13,6 +13,9 @@ from concurrent.futures import ThreadPoolExecutor
|
|||
import tornado.iostream
|
||||
from tornado.escape import utf8
|
||||
from tornado.log import gen_log
|
||||
from ctypes import *
|
||||
import struct
|
||||
import os
|
||||
#import time,datat
|
||||
|
||||
readchunky = False
|
||||
|
@ -43,16 +46,14 @@ class Client:
|
|||
bcnt = max(blen / 32, 1)
|
||||
return "".join([str(uuid.uuid1()).replace("-", "") for _ in range(bcnt)])
|
||||
print("_gen_boundary file_size",file_size)
|
||||
def put_stream(self, url, pos, size, filename, on_response, chunk_size=8192):
|
||||
"""Uploads file to provided url.
|
||||
|
||||
def put_stream(self, url, pos, size, filename, on_response, chunk_size=1024):
|
||||
'''Uploads file to provided url.
|
||||
:param url: URL to PUT the file data to
|
||||
:param filename: Name of the file (Content-Disposition header)
|
||||
:param file_size: Size of the file in bytes. Used to produce a Content-Size header from file_size.
|
||||
:param on_response: See AsyncHTTPClient.fetch
|
||||
|
||||
:return: Future content value under the provided url as binary string.
|
||||
"""
|
||||
'''
|
||||
uploadpos = pos
|
||||
#file_size = os.path.getsize(filename) # Be aware: this could also block! In production, you need to do this in a separated thread too!
|
||||
file_size = size # Be aware: this could also block! In production, you need to do this in a separated thread too!
|
||||
|
@ -116,6 +117,7 @@ class Client:
|
|||
write(post_tail)
|
||||
|
||||
request = tornado.httpclient.HTTPRequest(url=url, request_timeout=1200, method='POST', headers=headers,body_producer=body_producer)
|
||||
print("before1:"+threading.current_thread().getName()+url)
|
||||
|
||||
return tornado.httpclient.AsyncHTTPClient().fetch(request, callback=on_response)
|
||||
|
||||
|
@ -132,8 +134,9 @@ def geturlupload(action,host,targetpath,pos,size,totalsize):
|
|||
print(url)
|
||||
return url
|
||||
|
||||
def chunky(path,pos,totalsize,chunk):
|
||||
# print("self._max_body_size",self._max_body_size)
|
||||
'''def chunky(path,pos,totalsize,chunk):
|
||||
#print("self._max_body_size",self._max_body_size)
|
||||
print("chunk length",len(chunk))
|
||||
global total_downloaded
|
||||
global readchunky
|
||||
if readchunky == False and os.path.exists(path):
|
||||
|
@ -142,15 +145,18 @@ def chunky(path,pos,totalsize,chunk):
|
|||
if not os.path.exists(path):
|
||||
f = open(path,'w')
|
||||
f.close()
|
||||
f = open(path,'ab+')
|
||||
f.seek(int(pos))
|
||||
f.write(chunk)
|
||||
pos = pos+len(chunk)
|
||||
f.flush()
|
||||
f = open(path,'rb+')
|
||||
chunklength = len(chunk)-8
|
||||
pos,chunknew = struct.unpack('l%ds'%chunklength,chunk)
|
||||
print("decode pos is",pos)
|
||||
f.seek(pos)
|
||||
f.write(chunknew)
|
||||
print("after write position,chunksize",f.tell(),len(chunknew))
|
||||
pos = pos+len(chunknew)
|
||||
#f.flush()
|
||||
if pos==totalsize:
|
||||
f.close()
|
||||
total_downloaded += len(chunk)
|
||||
# print("chunk size",len(chunk))
|
||||
total_downloaded += len(chunknew)'''
|
||||
# the OS blocks on file reads + writes -- beware how big the chunks is as it could effect things
|
||||
|
||||
def sizebwchunky(chunk):
|
||||
|
@ -171,78 +177,115 @@ def sizebw(host,filepath):
|
|||
|
||||
@gen.coroutine
|
||||
def writer(host,filepath,targetdir,uid,gid,pos,size):
|
||||
#print("writer function")
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
file_name = targetdir+os.path.basename(filepath)
|
||||
'''if os.path.exists(targetdir):
|
||||
pass
|
||||
else:
|
||||
os.makedirs(targetdir)
|
||||
f = open(file_name,'w')
|
||||
f.close()'''
|
||||
request = HTTPRequest(geturlread("read",host,filepath,uid,gid,pos,size), streaming_callback=partial(chunky, file_name, pos, size), decompress_response=True, request_timeout=300)
|
||||
AsyncHTTPClient.configure('tornado.simple_httpclient.SimpleAsyncHTTPClient', max_body_size=1024*1024*1024)
|
||||
http_client = AsyncHTTPClient(force_instance=True)
|
||||
#AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
|
||||
#http_client = AsyncHTTPClient()
|
||||
response = yield http_client.fetch(request)
|
||||
# tornado.ioloop.IOLoop.instance().stop()
|
||||
print("total bytes downloaded was", total_downloaded)
|
||||
file_name = targetdir
|
||||
path = file_name
|
||||
chunk_size = 80*1024*1024
|
||||
no = int(size) // chunk_size
|
||||
i = 0
|
||||
global total_downloaded
|
||||
global readchunky
|
||||
#if readchunky == False and os.path.exists(path):
|
||||
# os.remove(path)
|
||||
# readchunky = True
|
||||
#if not os.path.exists(path):
|
||||
# f = open(path,'w')
|
||||
# f.close()
|
||||
while i<no:
|
||||
request = HTTPRequest(geturlread("read",host,filepath,uid,gid,pos,str(chunk_size)),request_timeout=300)
|
||||
pos = str(int(pos)+chunk_size)
|
||||
i = i+1
|
||||
http_client = AsyncHTTPClient()
|
||||
response = yield http_client.fetch(request)
|
||||
response = response.body
|
||||
#print("chunk length",len(response))
|
||||
f = open(path,'rb+')
|
||||
chunklength = len(response)-8
|
||||
posnew,chunknew = struct.unpack('l%ds'%chunklength,response)
|
||||
#print("posnew is",posnew)
|
||||
f.seek(posnew)
|
||||
f.write(chunknew)
|
||||
total_downloaded=total_downloaded+len(chunknew)
|
||||
#print("after write position,chunksize",f.tell(),len(chunknew))
|
||||
f.close()
|
||||
print("total bytes downloaded was", total_downloaded)
|
||||
if (int(size) % chunk_size) != 0:
|
||||
last = int(size) % chunk_size
|
||||
request = HTTPRequest(geturlread("read",host,filepath,uid,gid,pos,str(last)),request_timeout=300)
|
||||
http_client = AsyncHTTPClient()
|
||||
response = yield http_client.fetch(request)
|
||||
response = response.body
|
||||
#print("chunk length",len(response))
|
||||
f = open(path,'rb+')
|
||||
chunklength = len(response)-8
|
||||
posnew,chunknew = struct.unpack('l%ds'%chunklength,response)
|
||||
#print("posnew is",posnew)
|
||||
f.seek(posnew)
|
||||
f.write(chunknew)
|
||||
total_downloaded=total_downloaded+len(chunknew)
|
||||
#print("after write position,chunksize",f.tell(),len(chunknew))
|
||||
f.close()
|
||||
print("total bytes downloaded was", total_downloaded)
|
||||
if total_downloaded==FILESIZE:
|
||||
tornado.ioloop.IOLoop.instance().stop()
|
||||
|
||||
''' lib=cdll.LoadLibrary('./libpycall.so')
|
||||
print("start")
|
||||
func=lib.update_bitmap
|
||||
func.argtypes=(c_int,c_int,c_char_p)
|
||||
func=lib.update_bitmap(int(pos),int(size),filepath.encode("utf-8"))
|
||||
print("finish")
|
||||
'''
|
||||
|
||||
@gen.coroutine
|
||||
def upload(host,filepath,targetpath,pos,size):
|
||||
def on_response(request):
|
||||
print("on_response:"+threading.current_thread().getName())
|
||||
|
||||
print("=============== GOT RESPONSE ======")
|
||||
print(request.body.decode('ascii'))
|
||||
#print(request.body)
|
||||
print("===================================")
|
||||
tornado.ioloop.IOLoop.current().stop() # Stop the loop when the upload is complete.
|
||||
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
client = Client()
|
||||
print("before"+threading.current_thread().getName())
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
yield client.put_stream(geturlupload("upload",host,targetpath,pos,size,os.path.getsize(filepath)), pos, size, filepath, on_response)
|
||||
# print(geturlupload("upload",host,targetpath,pos,size))
|
||||
#tornado.ioloop.IOLoop.instance().start()
|
||||
# print("ioloop has already started")
|
||||
print("after:"+threading.current_thread().getName())
|
||||
#tornado.ioloop.IOLoop.current().stop() # Stop the loop when the upload is complete.
|
||||
|
||||
|
||||
|
||||
def readentrance(host,filepath,targetdir,uid,gid,pos,size):
|
||||
print(host,filepath,targetdir,uid,gid,pos,size)
|
||||
sizebw(host,filepath)
|
||||
tornado.ioloop.IOLoop.instance().start()
|
||||
filesize = FILESIZE
|
||||
streamno = 2
|
||||
streamno = 15
|
||||
start_time = time.time()
|
||||
if(int(size)>=filesize):
|
||||
streamsize = (filesize-int(pos)) // (streamno-1)
|
||||
else:
|
||||
streamsize = (int(size)) // (streamno-1)
|
||||
|
||||
i = 0
|
||||
threads = []
|
||||
while i < (streamno-1):
|
||||
threads.append(threading.Thread(target=writer,args=(host,filepath,targetdir,uid,gid,streamsize*i,streamsize)))
|
||||
# print(streamsize*i,streamsize)
|
||||
i=i+1
|
||||
if (streamsize*i) < filesize:
|
||||
threads.append(threading.Thread(target=writer,args=(host,filepath,targetdir,uid,gid,streamsize*i,filesize-streamsize*i)))
|
||||
# print(streamsize*i,filesize-streamsize*i)
|
||||
for t in threads:
|
||||
t.setDaemon(True)
|
||||
print("thread name is :",t.getName())
|
||||
t.start()
|
||||
# tornado.ioloop.IOLoop.instance().start()
|
||||
# t.join()
|
||||
tornado.ioloop.IOLoop.instance().start()
|
||||
i=0
|
||||
for t in threads:
|
||||
t.join()
|
||||
#tornado.ioloop.IOLoop.instance().stop()
|
||||
end_time = time.time()
|
||||
print("Total time :{}".format(end_time-start_time))
|
||||
|
||||
def uploadentrance(host,filepath,targetpath):
|
||||
streamno = 2
|
||||
streamno = 3
|
||||
start_time = time.time()
|
||||
filesize = os.path.getsize(filepath)
|
||||
streamsize = filesize // (streamno-1)
|
||||
i = 0
|
||||
|
@ -252,17 +295,15 @@ def uploadentrance(host,filepath,targetpath):
|
|||
i=i+1
|
||||
if (streamsize*i) < filesize:
|
||||
threads.append(threading.Thread(target=upload,args=(host,filepath,targetpath,streamsize*i,filesize-streamsize*i)))
|
||||
|
||||
for t in threads:
|
||||
t.setDaemon(True)
|
||||
print("thread name is :",t.getName())
|
||||
t.start()
|
||||
# tornado.ioloop.IOLoop.instance().start()
|
||||
#t.join()
|
||||
#print("upload all!")
|
||||
tornado.ioloop.IOLoop.instance().start()
|
||||
#for t in threads:
|
||||
t.join()
|
||||
end_time = time.time()
|
||||
print("Total time :{}".format(end_time-start_time))
|
||||
#tornado.ioloop.IOLoop.current().stop() # Stop the loop when the upload is complete.
|
||||
|
||||
|
||||
'''if __name__=="__main__":
|
||||
uploadentrance("202.122.37.90:28003","/root/leaf/transfer/night.mkv","/home/wangcong/leaf/upload/night.mkv")
|
||||
#uploadentrance("202.122.37.90:28006","/root/leaf/test.cpp","/root/leaf/pytoc/upload/test.cpp")
|
||||
tornado.ioloop.IOLoop.instance().start()'''
|
||||
|
|
3
main.cpp
3
main.cpp
|
@ -1,11 +1,12 @@
|
|||
#include "client.h"
|
||||
#include "client.cpp"
|
||||
int main(int argc,char *argv[])
|
||||
{ char *command = argv[1];
|
||||
//printf("%s\n",command);
|
||||
if(strcmp(command,"read")==0)
|
||||
{
|
||||
//transread("202.122.37.90:28003","/root/leaf/pytoc/upload/night.mkv","/root/leaf/","0","0","0","1000000000000");
|
||||
transread("202.122.37.90:28001","/home/wangcong/leaf/upload/night.mkv","/root/leaf/","0","0","0","1000000000000");
|
||||
transread("202.122.37.90:28001","/home/wangcong/leaf/upload/night.mkv","/root/leaf/night.mkv","0","0",0,800*1024*1024);
|
||||
}
|
||||
else if(strcmp(command,"upload")==0)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue