first commit

This commit is contained in:
yangchenwo 2017-10-23 20:46:52 +08:00
parent f1f0d1abf6
commit 82087130ba
52 changed files with 182553 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,61 @@
# gwac_dbgen
Tools to generate GWAC simulation data, combine, convert to binary, load into MonetDB.
Configuration
1. create directory: gwac, gwac/gwac_pipeline
2. clone a copy of gwac data generation code
git clone https://github.com/wan-meng/gwac_dbgen.git
3. copy the RA240_DEC10_sqd225.cat to the upper level directory of gwac_dbgen;
4. pipeline.py
add the absolute path of above dirctories to your python path: eg,
sys.path.append('/data/gwac/gwac_dbgen')
sys.path.append('/data/gwac/gwac_pipeline')
change all the "cbddir, binarydir" of the pipeline().
5. simulator_pipeline.py
change the prefixes of "destdir, templatefile" of dictionary "stargenparams" according to your current machine.
change dictionary "machine_tableno", append the hostname of your current machine to end.
Start generating simulated catalogs.
1. Generate simulated catalogs for one night (2400 catalogs/day).
UnComment the first pipeline(), at the same time still keep the next four pipeline() being commented.
you can change this number in the for loop of pipeline.py
pipeline.py will call functions from simulator_pipeline.py to do the real genaration work.
it will create the directory containing simulated catalogs under the top gwac directory: catalog.csv.
2. Combine multiple catalog file into large ones to speed up database loading process.
Uncomment the second pipeline(), comment the first pipeline().
configure the combination rate parameter: cmbrate.
pipeline.py will call functions from combineCata.py.py to do the real genaration work.
the combined large files are also created in a cbddir directory under the top level: like combinedcsv-200in1-270M.
a log file will also be created in gwac_dbgen dir, nameed like logcomb-20160909151931-200in1.
3. Convert the combined files from CSV format to binary column files, which will be even faster when loading into database by the bulk loading technology of MonetDB, using multiporcessing.
compile csv2bin.c:
gcc array.c -c
gcc csv2bin.c -c
gcc array.o csv2bin.o -o csv2bin
uncommonet the third pipeline(), comment back the second pipeline().
pipeline.py will call functions from multicsv2bin.py to do the real conversion work.
the binary files are created in a binarydir directory under the top level: like binarycatalogs-200in1, files like RA240_DEC10_sqd225-ccd16-0001.cat-1,..., RA240_DEC10_sqd225-ccd16-0001.cat-22.
4. Load the binary column files into MonetDB.
It needs to first install MonetDB from tarball:
wget https://www.monetdb.org/downloads/sources/Jun2016-SP1/MonetDB-11.23.7.tar.bz2
tar xjvf MonetDB-11.23.7.tar.bz2
./bootstrap
./configure --prefix=/data/monetdbJul2016 --enable-optimize && make -j8 && make install
create ~/.monetdb file on the node:
cat ~/.monetdb
user=monetdb
password=monetdb
save_history=true
width=42
language=sql
add monetdb to PATH: vi ~/.bashrc
PATH=$PATH:/data/monetdbJul2016/bin
and create a dbfarm and db named 'gwacdb':
monetdbd create dbfarm
monetdbd start dbfarm
monetdb create gwacdb
monetdb start gwacdb

View File

@ -0,0 +1,82 @@
#include <stdlib.h>
#include <stdio.h>
#include "array.h"
void initIntArray(IntArray *a, size_t initialSize) {
a->array = (int *)malloc(initialSize * sizeof(int));
a->used = 0;
a->size = initialSize;
}
void insertIntArray(IntArray *a, int element) {
if (a->used == a->size) {
a->size *= 2;
a->array = (int *)realloc(a->array, a->size * sizeof(int));
}
a->array[a->used++] = element;
}
void freeIntArray(IntArray *a) {
free(a->array);
a->array = NULL;
a->used = a->size = 0;
}
void initShortArray(ShortArray *a, size_t initialSize) {
a->array = (short *)malloc(initialSize * sizeof(short));
a->used = 0;
a->size = initialSize;
}
void insertShortArray(ShortArray *a, short element) {
if (a->used == a->size) {
a->size *= 2;
a->array = (short *)realloc(a->array, a->size * sizeof(short));
}
a->array[a->used++] = element;
}
void freeShortArray(ShortArray *a) {
free(a->array);
a->array = NULL;
a->used = a->size = 0;
}
void initDoubleArray(DoubleArray *a, size_t initialSize) {
a->array = (double *)malloc(initialSize * sizeof(double));
a->used = 0;
a->size = initialSize;
}
void insertDoubleArray(DoubleArray*a, double element) {
if (a->used == a->size) {
a->size *= 2;
a->array = (double *)realloc(a->array, a->size * sizeof(double));
}
a->array[a->used++] = element;
}
void freeDoubleArray(DoubleArray *a) {
free(a->array);
a->array = NULL;
a->used = a->size = 0;
}
void initTupleArray(TupleArray *a, size_t initialSize) {
a->array = (Tuple *)malloc(initialSize * sizeof(Tuple));
a->used = 0;
a->size = initialSize;
}
void insertTupleArray(TupleArray *a, Tuple element){
if (a->used == a->size) {
a->size *= 2;
a->array = (Tuple *)realloc(a->array, a->size * sizeof(Tuple));
}
a->array[a->used++] = element;
}
void freeTupleArray(TupleArray *a){
free(a->array);
a->array = NULL;
a->used = a->size = 0;
}

View File

@ -0,0 +1,98 @@
//array.h
#define ARRAY_INITIAL_SIZE 100000
typedef struct {
int *array;
size_t used;
size_t size;
} IntArray;
void initIntArray(IntArray *a, size_t initialSize);
void insertIntArray(IntArray *a, int element);
void freeIntArray(IntArray *a);
typedef struct {
double *array;
size_t used;
size_t size;
} DoubleArray;
void initDoubleArray(DoubleArray *a, size_t initialSize);
void insertDoubleArray(DoubleArray *a, double element);
void freeDoubleArray(DoubleArray *a);
typedef struct {
short *array;
size_t used;
size_t size;
} ShortArray;
void initShortArray(ShortArray *a, size_t initialSize);
void insertShortArray(ShortArray *a, short element);
void freeShortArray(ShortArray *a);
/*one row from file line is a struct unit, hard code the number of fields to 22. if add or delete, need modify*/
/*no alignment: sizeof(structure) == sizeof(first_member) + ... + sizeof(last_member).*/
typedef struct __attribute__((packed))
{
char fnum[2]; //the number of fields in this tuple, actually unsigned short.
char f1_lens[4]; //the bytes in the first field, actually unsigned int.
char f1[4]; //data content of the first field, actually int.
char f2_lens[4]; //the bytes in the second field, actually unsigned int.
char f2[2]; //data content of the second field, actually short.
char f3_lens[4];
char f3[8];
char f4_lens[4];
char f4[8];
char f5_lens[4];
char f5[8];
char f6_lens[4];
char f6[8];
char f7_lens[4];
char f7[8];
char f8_lens[4];
char f8[8];
char f9_lens[4];
char f9[8];
char f10_lens[4];
char f10[8];
char f11_lens[4];
char f11[8];
char f12_lens[4];
char f12[8];
char f13_lens[4];
char f13[8];
char f14_lens[4];
char f14[8];
char f15_lens[4];
char f15[8];
char f16_lens[4];
char f16[8];
char f17_lens[4];
char f17[8];
char f18_lens[4];
char f18[8];
char f19_lens[4];
char f19[8];
char f20_lens[4];
char f20[8];
char f21_lens[4];
char f21[8];
char f22_lens[4];
char f22[4];
} Tuple;
typedef struct __attribute__ ((__packed__))
{
Tuple *array;
size_t used;
size_t size;
} TupleArray;
void initTupleArray(TupleArray *a, size_t initialSize);
void insertTupleArray(TupleArray *a, Tuple element);
void freeTupleArray(TupleArray *a);

BIN
dataGen/gwac_dbgen/array.o Normal file

Binary file not shown.

0
dataGen/gwac_dbgen/cmd Normal file
View File

View File

@ -0,0 +1,66 @@
import os, multiprocessing,numpy
from timer import Timer
def genCombingname(catano, sourcedir, prefix, suffix):
combingname = sourcedir+prefix+"%04d" %catano+suffix+' '
return combingname
def genCombingnames(catano, ratio, sourcedir, prefix, suffix):
start = catano
end = catano+ratio-1
combingnames = ''
for catano in range(start,end+1):
combingnames += genCombingname(catano, sourcedir, prefix, suffix)
#print combingnames
#like: /data/sim-catalogs/RA240_DEC10_sqd300-0001.cat /data/sim-catalogs/RA240_DEC10_sqd300-0002.cat /data/sim-catalogs/RA240_DEC10_sqd300-0003.cat
return combingnames
def genDestname(round, ratio, destdir, prefix, suffix):
destname = destdir+prefix+"%04d" %round+suffix
return destname
def combine(startno, endno, ratio, destdir='/data/sim-240cata-7G/',sourcedir='/data/sim-catalogs/', prefix='RA240_DEC10_sqd300-', suffix='.cat'):
"""
combine ratio catalogs into one larger catalog.
"""
round = numpy.ceil(float(startno)/ratio)
for catano in range(startno,endno,ratio): #start=1,end=86400,step=240
combinednames = genCombingnames(catano,ratio,sourcedir,prefix,suffix)
destname = genDestname(round, ratio, destdir, prefix, suffix)
cmd = "cat %s > %s" %(combinednames, destname)
os.system(cmd)
round += 1
print "finish combine %d files into %s" %(ratio, destname)
def combine_once(catano, combinednames, destname, ratio):
cmd = "cat %s > %s" %(combinednames, destname)
os.system(cmd)
print "finish combine %d files into %s" %(ratio, destname)
def multicombine(startno, endno, ratio, destdir='/data/sim-240cata-7G/',sourcedir='/data/sim-catalogs/', prefix='RA240_DEC10_sqd300-', suffix='.cat'):
pool = multiprocessing.Pool(multiprocessing.cpu_count()-1)
round = numpy.ceil(float(startno)/ratio)
for x in range(startno,endno,ratio):
combinednames = genCombingnames(x,ratio,sourcedir,prefix,suffix)
destname = genDestname(round, ratio, destdir, prefix, suffix)
pool.apply_async(combine_once, (x,), dict(combinednames=combinednames, destname=destname, ratio=ratio))
round += 1
pool.close()
pool.join()
#################
#Usage: first clean up /data/sim-240cata-7G/ directory.
#nohup python combineCata.py >logcomb-20141017-48in1 &
#startno=1
#endno = 10008
ratio = 48
destdir = "/data/sim-240cata-7G/"
if __name__ == '__main__':
with Timer() as t:
#combine(1, 86400, ratio=7200, destdir="/data/sim-240cata-7G/")
combine(1, 10008, ratio=ratio, destdir="/data/sim-240cata-7G/")
#combine(1, 1000, ratio=100, destdir="/data/sim-240cata-7G/")
#combine(1, 10000, ratio=1000, destdir="/data/sim-240cata-7G/")
print "combine %d catalogs, %d:1, elasped time: %.3f s" %(endno-startno+1, ratio, t.secs)

Binary file not shown.

View File

@ -0,0 +1,145 @@
import os, sys, socket
from combineCata import combine, multicombine
from multicsv2bin import convert, multiconvert
from load_binarytoDB import load
from timer import Timer
import numpy as np
import scipy as S
from time import strftime
import stargen_notemplt_mp as stargen
import pywcs
import pyfits
#from WU Chao
def readcat(fil):
"""
return:
ra,dec,imag,sigi,mag1,magB,magV,gmag,rmag
"""
ff=open(fil)
oo = []
for f0 in ff:
f0t = f0.strip()
id = f0t[0:10]
ra = f0t[11:21]
dec = f0t[23:36]
mag1 = f0t[36:43]
magB = f0t[168:175]
magV = f0t[175:182]
gmag = f0t[182:189]
rmag = f0t[189:196]
imag = f0t[196:203]
if len(imag.strip())==0:
imag= -9999
sigi= f0t[227:233]
if len(sigi.strip()) ==0:
sigi = -9999
if len(magB.strip())==0:
magB = -9999
if len(magV.strip())==0:
magV = -9999
if len(gmag.strip())==0:
gmag = -9999
if len(rmag.strip())==0:
rmag = -9999
oo.append([ra,dec,imag,sigi,mag1,magB,magV,gmag,rmag])
return S.array(oo,dtype='float')
#from WU Chao
def calSquareDegree_conver(sqdeg,dec0):
"""
input sqdeg: unit in deg.
dec0: dec center in decimal deg.
assume decw = sqrt(sqdeg)
return: raw
ref:http://en.wikipedia.org/wiki/Solid_angle
"""
sa = sqdeg/((180./S.pi)**2)
decw = S.sqrt(sqdeg)
raw = S.degrees(sa /(S.sin(S.radians(dec0+decw/2.) ) - S.sin(S.radians(dec0-decw/2.))))
return raw
def genwcs(rac,decc,pixscal=11.7,sizes=[4096,4096]):
"""
rac: center ra,
decc: center dec,
pixscal: pixel scale in arcsec/pixel
sizes: CCD pixels size, e.g. 1k= [1024,1024], 4k= [4096,4096]
"""
pixd= pixscal/3600.
xc = sizes[0]/2. + 0.5
yc = sizes[1]/2. + 0.5
ddtm= S.zeros(sizes)
wcs = pywcs.WCS()
wcs.naxis1 = sizes[0]
wcs.naxis2 = sizes[1]
wcs.wcs.ctype = ['RA---TAN', 'DEC--TAN']
wcs.wcs.crpix = S.array([xc,yc])
wcs.wcs.crval = S.array([rac,decc])
wcs.wcs.cd = S.array([[pixd,0],[0,pixd]])
wcs.wcs.cdelt = S.array([ 1., 1.])
hh = wcs.to_header()
hh.update('IMAGEW',wcs.naxis1)
hh.update('IMAGEH',wcs.naxis2)
hdu = pyfits.PrimaryHDU(header=hh)
return wcs
def getcats(rac,decc,decw,sqd=180):
"""
get point sources from UCAC4 catalog.
"""
raw = calSquareDegree_conver(sqd,decc)
#generate a ucac4.txt catalog.
cmd = "/home/mengw/ucac4/access/u4test %s %s %s %s /home/mengw/ucac4/u4b/" %(rac,decc,raw,decw)
print "cmd = %s " %cmd
os.system('rm -fr ucac4.txt')
os.system(cmd)
dd=readcat('ucac4.txt')
#ra,dec,imag,sigi,mag1,magB,magV,gmag,rmag
#only use ra,dec,mag1
radecmag=S.array(dd).take([0,1,4], axis=1) # only take column number of 0,1,4
extfile="RA%s_DEC%s_sqd%s.txt" %(rac,decc,sqd)
S.savetxt(extfile, radecmag)
return radecmag
def getxy(rac,decc,extfile,outfile, pixscal=11.7, sizes=[4096,4096],outwcsfile='tt1.wcs'):
'''
get xy from catalog file with RA DEC values, add x,y as last two columns into outfile.
'''
wcs= genwcs(rac,decc, pixscal,sizes) #wcs file is generated at runtime everytime.
radecmag = S.loadtxt(extfile)
x,y=wcs.wcs_sky2pix(radecmag[:,0],radecmag[:,1],0)
xy =S.array(zip(x,y))
oo =S.concatenate((radecmag,xy),axis=1)
#filter x,y out of [0, 4096][0, 4096]
oo=oo[(oo[:,3]>0) & (oo[:,3]<4096) &(oo[:,4]>0) &(oo[:,4]<4096)]
#add id column as zeroth column
id=np.arange(1,len(oo)+1, dtype=np.uint32)
oo=np.concatenate((zip(id),oo),axis=1)
S.savetxt(outfile, oo, fmt=["%d","%10.4f","%10.4f","%10.4f","%10.4f","%10.4f"])
return oo
def gettempfromUCAC4(rac,decc,sqd=225,pixscal=11.7,sizes=[4096,4096]):
decw=np.floor(np.sqrt(sqd))
getcats(rac,decc,decw,sqd)
outfile="RA%s_DEC%s_sqd%s.cat" %(rac,decc,sqd)
extfile="RA%s_DEC%s_sqd%s.txt" %(rac,decc,sqd)
oo=getxy(rac,decc,extfile,outfile,pixscal,sizes)
gettempfromUCAC4(240, 10, 180)

BIN
dataGen/gwac_dbgen/csv2bin Normal file

Binary file not shown.

View File

@ -0,0 +1,212 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "array.h"
void itoa (int n,char s[])
{
int i,j,sign,k;
char tmp;
if((sign=n)<0)//record positive or negative
n=-n;//let n be positive
i=0;
do{
s[i++]=n%10+'0';//fetch next digit
}while ((n/=10)>0);//delete this digit
if(sign<0)
s[i++]='-';
s[i]='\0';
for(j=i-1,k=0;j>k;j--,k++)
{
tmp = s[k];
s[k] = s[j];
s[j] = tmp;
}
}
int main(int argc, char* argv[])
{
char const* const fileName = argv[1]; /* should check that argc > 1 */
FILE* file;
char line[256];
IntArray a1,a22;
ShortArray a2;
DoubleArray a3,a4,a5,a6,a7,a8,a9,a10,a11,a12,a13,a14,a15,a16,a17,a18,a19,a20,a21;
int v1,v22;
short v2;
double v3,v4,v5,v6,v7,v8,v9,v10,v11,v12,v13,v14,v15,v16,v17,v18,v19,v20,v21;
int i;
FILE *fp;
char fname[128];
file = fopen(fileName, "r"); /* should check the result */
if(file == NULL)
{
puts("Couldn't open file.");
exit(1);
}
/*initially 100000 numbers.*/
initIntArray(&a1, ARRAY_INITIAL_SIZE);
initIntArray(&a22, ARRAY_INITIAL_SIZE);
initShortArray(&a2, ARRAY_INITIAL_SIZE);
initDoubleArray(&a3, ARRAY_INITIAL_SIZE);
initDoubleArray(&a4, ARRAY_INITIAL_SIZE);
initDoubleArray(&a5, ARRAY_INITIAL_SIZE);
initDoubleArray(&a6, ARRAY_INITIAL_SIZE);
initDoubleArray(&a7, ARRAY_INITIAL_SIZE);
initDoubleArray(&a8, ARRAY_INITIAL_SIZE);
initDoubleArray(&a9, ARRAY_INITIAL_SIZE);
initDoubleArray(&a10, ARRAY_INITIAL_SIZE);
initDoubleArray(&a11, ARRAY_INITIAL_SIZE);
initDoubleArray(&a12, ARRAY_INITIAL_SIZE);
initDoubleArray(&a13, ARRAY_INITIAL_SIZE);
initDoubleArray(&a14, ARRAY_INITIAL_SIZE);
initDoubleArray(&a15, ARRAY_INITIAL_SIZE);
initDoubleArray(&a16, ARRAY_INITIAL_SIZE);
initDoubleArray(&a17, ARRAY_INITIAL_SIZE);
initDoubleArray(&a18, ARRAY_INITIAL_SIZE);
initDoubleArray(&a19, ARRAY_INITIAL_SIZE);
initDoubleArray(&a20, ARRAY_INITIAL_SIZE);
initDoubleArray(&a21, ARRAY_INITIAL_SIZE);
while (fgets(line, sizeof(line), file)) {
/* note that fgets don't strip the terminating \n, checking its
* presence would allow to handle lines longer that sizeof(line) */
sscanf(line,"%d %hi %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %lf %d",&v1,&v2,&v3,&v4,&v5,&v6,&v7,&v8,&v9,&v10,&v11,&v12,&v13,&v14,&v15,&v16,&v17,&v18,&v19,&v20,&v21,&v22);
insertIntArray(&a1,v1);
insertShortArray(&a2,v2);
insertDoubleArray(&a3,v3);
insertDoubleArray(&a4,v4);
insertDoubleArray(&a5,v5);
insertDoubleArray(&a6,v6);
insertDoubleArray(&a7,v7);
insertDoubleArray(&a8,v8);
insertDoubleArray(&a9,v9);
insertDoubleArray(&a10,v10);
insertDoubleArray(&a11,v11);
insertDoubleArray(&a12,v12);
insertDoubleArray(&a13,v13);
insertDoubleArray(&a14,v14);
insertDoubleArray(&a15,v15);
insertDoubleArray(&a16,v16);
insertDoubleArray(&a17,v17);
insertDoubleArray(&a18,v18);
insertDoubleArray(&a19,v19);
insertDoubleArray(&a20,v20);
insertDoubleArray(&a21,v21);
insertIntArray(&a22,v22);
}
//finish loading to dynamic array.
for(i=1;i<23;i++){
memset(fname, 0, 128);
//fname should be catano-column.
sprintf(fname,"%s-%d",fileName,i);
fp=fopen(fname,"wb");
switch(i) {
case 1:
fwrite(a1.array, sizeof(int), a1.used, fp);
break;
case 2:
fwrite(a2.array, sizeof(short),a2.used, fp);
break;
case 3:
fwrite(a3.array, sizeof(double),a3.used, fp);
break;
case 4:
fwrite(a4.array, sizeof(double),a4.used, fp);
break;
case 5:
fwrite(a5.array, sizeof(double),a5.used, fp);
break;
case 6:
fwrite(a5.array, sizeof(double),a6.used, fp);
break;
case 7:
fwrite(a7.array, sizeof(double),a7.used, fp);
break;
case 8:
fwrite(a8.array, sizeof(double),a8.used, fp);
break;
case 9:
fwrite(a9.array, sizeof(double),a9.used, fp);
break;
case 10:
fwrite(a10.array, sizeof(double),a10.used, fp);
break;
case 11:
fwrite(a11.array, sizeof(double),a11.used, fp);
break;
case 12:
fwrite(a12.array, sizeof(double),a12.used, fp);
break;
case 13:
fwrite(a13.array, sizeof(double),a13.used, fp);
break;
case 14:
fwrite(a14.array, sizeof(double),a14.used, fp);
break;
case 15:
fwrite(a15.array, sizeof(double),a15.used, fp);
break;
case 16:
fwrite(a16.array, sizeof(double),a16.used, fp);
break;
case 17:
fwrite(a17.array, sizeof(double),a17.used, fp);
break;
case 18:
fwrite(a18.array, sizeof(double),a18.used, fp);
break;
case 19:
fwrite(a19.array, sizeof(double),a19.used, fp);
break;
case 20:
fwrite(a20.array, sizeof(double),a20.used, fp);
break;
case 21:
fwrite(a21.array, sizeof(double),a21.used, fp);
break;
case 22:
fwrite(a22.array, sizeof(int),a22.used, fp);
break;
default:
printf("wrong column number.\n");
break;
}
fclose(fp);
}
printf("succesfully convert catalog to binary files and write to disk: %s.\n",fname);
freeIntArray(&a1);
freeIntArray(&a22);
freeShortArray(&a2);
freeDoubleArray(&a3);
freeDoubleArray(&a4);
freeDoubleArray(&a5);
freeDoubleArray(&a6);
freeDoubleArray(&a7);
freeDoubleArray(&a8);
freeDoubleArray(&a9);
freeDoubleArray(&a10);
freeDoubleArray(&a11);
freeDoubleArray(&a12);
freeDoubleArray(&a13);
freeDoubleArray(&a14);
freeDoubleArray(&a15);
freeDoubleArray(&a16);
freeDoubleArray(&a17);
freeDoubleArray(&a18);
freeDoubleArray(&a19);
freeDoubleArray(&a20);
freeDoubleArray(&a21);
/* may check feof here to make a difference between eof and io failure -- network
* timeout for instance */
return 0;
}

Binary file not shown.

View File

@ -0,0 +1 @@
1

View File

@ -0,0 +1,78 @@
import numpy as np
import os
from timer import Timer
import multiprocessing as mp
def genBinfilenames(catastart=1, prefix="RA240_DEC10_sqd300-", suffix=".cat", bindir="/data/binarycatalogs/"):
"""
#binary filenames format:dir/prefix-index.suffix-littleindex
#like: /home/mengw/simulator/ucac4/RA240_DEC10_sqd300-0001.cat-1
"""
binfilenames = ""
for i in range(1,23):
binfilenames += "'%s%s%04d%s-%d'," %(bindir,prefix,catastart,suffix,i)
binfilenames = binfilenames[0:-1]
return binfilenames
#usage: getSqlCmd(genBinfilenames(1))
def getSqlCmd(tblno, infilenames):
cmd = "COPY BINARY INTO targets%s FROM (%s)\"" %(tblno, binfilenames)
print cmd.strip('"\'')
return cmd
#usage: getBashMclientCmd(genBinfilenames(1))
def getBashMclientCmd(dbname, tblno, binfilenames):
cmd = "mclient %s -s \"COPY BINARY INTO targets%s FROM (%s)\"" %(dbname, tblno, binfilenames)
print cmd.strip('"\'')
return cmd
def load(log, tblno, startno, num, prefix="RA240_DEC10_sqd300-",suffix=".cat",bindir="/data/binarycatalogs", dbname='gwacdb'):
"""
load the binary data into single table target#tblno and measure loading time, record into a log.
"""
logfile=open(log, 'w+')
with Timer() as T:
for catano in range(startno, startno+num):
binfilenames = genBinfilenames(catano, prefix, suffix, bindir)
cmd = "echo 'mcilent %s -s COPY INTO targetss%d'; mclient %s -s \"COPY BINARY INTO targetss%d FROM (%s)\" " %(dbname, tblno, dbname, tblno, binfilenames)
with Timer() as t:
os.system(cmd)
print >>logfile,"=> loading catano %d elapsed time: %.3f s\n" %(catano, t.secs)
print >>logfile,"Total loading elapsed time: %.3f s\n" %(T.secs)
cmd = """echo 'mclient %s -s INSERT INTO targets%d..FROM targetss%d'; mclient %s -i -s "insert into targets%d(imageid, zone, ra, \\"dec\\", mag, pixel_x, pixel_y, ra_err, dec_err, x, y, z, flux, flux_err, normmag, flag, background, threshold, mag_err, ellipticity, class_star, orig_catid) select * from targetss%d;" """ %(dbname, tblno, tblno, dbname, tblno, tblno)
with Timer() as t1:
os.system(cmd)
print >>logfile,"=> INSERT INTO target%d ...FROM targetss%d time: %.3f\n" %(tblno, tblno, t1.secs)
logfile.close()
def loadOneTable(tableno, prefix="RA240_DEC10_sqd300-",suffix=".cat",bindir="/data/binarycatalogs"):
"""
load the binary data into MonetDB and measure loading time, record into a log.
"""
catastart = ids[tableno-1,1]
cataend = ids[tableno-1,2]
for catano in range(catastart, cataend+1):
binfilenames = genBinfilenames(catano, prefix, suffix, bindir)
cmd = "mclient -d gwacdb -f tab -s \"trace COPY BINARY INTO loadtarget%d FROM (%s)\" 1>>logcopy%d 2>&1" %(tableno, binfilenames, tableno)
with Timer() as t:
os.system(cmd)
print "=> loading catano %d into table %d elapsed time: %.3f s" %(catano, tableno, t.secs)
def multiLoad(catastart, catacount, prefix="RA240_DEC10_sqd300-",suffix=".cat",bindir="/data/binarycatalogs"):
"""
load multiple tables simultaneously using multiload to
"""
segcount = catacount/tablecount
for i in range(tablecount):
ids[i] = [i+1, segcount*i+1, segcount*(i+1)]
pool = mp.Pool(processes=tablecount)
results = pool.map(loadOneTable, ids[:,0])
# Synchronize the main process with the job processes to ensure proper cleanup.
pool.close()
pool.join()

Binary file not shown.

View File

@ -0,0 +1,105 @@
#!/usr/bin/env python
# script name: multicsv2bin.py
# Usage : multicsv2bin.py
# purpose: convert csv catalogs into MonetDB compliant binary format.
#
# Author: Wan Meng <wanmeng@bao.ac.cn>
#
#------------------------------------
import numpy as np
import os, sys
from itertools import count
import multiprocessing as mp
from timer import Timer
def gen_filenames(catastart, catadir="/data/sim-240cata-7G/", prefix="RA240_DEC10_sqd300-",suffix=".cat"):
"""
return a generator of a list of auto increment filenames
cataname like "RA240_DEC10_sqd300-0001.cat"
from the start_th number of catalog file.
"""
for i in count(catastart):
yield "%s%s%04d%s" %(catadir, prefix, i, suffix)
#bindir = "/data/binarycatalogs-12in1"
#bindir = "/data/binarycatalogs-24in1"
#bindir = "/data/binarycatalogs-48in1"
catadir = "/data/sim-240cata-7G/"
prefix = "RA240_DEC10_sqd300-"
def convert(catastart, bindir, catadir, prefix, suffix, n=None):
"""
convert one or n catalogues from csv format to binary. Could designate another destionation dir to store results of binary catalogs.
input filenames are given by several parts, so could jigsaw a batch of input ascii files.
format:{prefix}{catastart}{suffix}. prefix must be absolute path!
like: {RA240_DEC10_sqd300-}{1}{.cat}
output binary files name: prefix-index.suffix-littleindex. The index field is padded with leading zeros.
like: RA240_DEC10_sqd300-0001.cat-1
"""
if n is not None: #single process version, convert a batch of files at a time.
g=gen_filenames(catastart,catadir,prefix,suffix)
for x in range(n):
catname=g.next()
outputname = "%s-*" %catname
content = '''/usr/bin/time -f %e '''
#csv2bincmd = os.path.join(sys.path[0], 'csv2bin ')
csv2bincmd = os.path.join(os.getcwd(), 'csv2bin ')
#cmd = content + csv2bincmd + catname + "; mv %s %s" %(outputname, bindir)
#cmd1 = content + csv2bincmd + catname
#print cmd1
#os.system(cmd1)
cmd2 = "cp %s %s" %(outputname, bindir)
#cmd2 = "cp /home/wamdm/wm/gwac/combinedcsv-200in1-270M/RA240_DEC10_sqd225-ccd17-0001.cat-{1..22} /home/wamdm/wm/gwac/binarycatalogs-200in1"
print cmd2
os.system(cmd2)
else: #multiprocessing version, convert one file at a time.
catname = "%s%s%04d%s" %(catadir, prefix, catastart, suffix)
outputname = "%s-*" %catname
content = '''/usr/bin/time -f %e '''
#csv2bincmd = os.path.join(sys.path[0], 'csv2bin ')
csv2bincmd = os.path.join(os.getcwd(), 'csv2bin ')
cmd = content + csv2bincmd + catname + "; mv %s %s" %(outputname, bindir)
cmd1 = content + csv2bincmd + catname
os.system(cmd1)
cmd2 = "mv %s %s" %(outputname, bindir)
#cmd2 = "cp /home/wamdm/wm/gwac/combinedcsv-200in1-270M/RA240_DEC10_sqd225-ccd17-0001.cat-1 /home/wamdm/wm/gwac/binarycatalogs-200in1"
print cmd2
os.system(cmd2)
def convert_wrap(args):
return convert(*args)
#one day has 86400 catalogs, create to 86400.
#run this script in the /data filesystem
#convert(42066,prefix="/data/sim-catalogs/RA240_DEC10_sqd300-",catastart=10001)
#def multiconvert(processes, n1cata=14658, n2cata=54999):
def multiconvert(n1cata, n2cata, bindir, catadir, prefix, suffix):
"""
benefit from multiporcessing by splitting task of genarating thousands of catalogs
into individual catalog to be processed simultaneously.
"""
# Create a pool class and run the jobs-the number of jobs is equal to the number of catalogs needed to generate.
pool = mp.Pool(processes=mp.cpu_count()-1)
args = [(catano, bindir,catadir,prefix,suffix) for catano in range(n1cata, n2cata+1)]
#results = pool.map(convert, range(n1cata, n2cata+1))
results = pool.map(convert_wrap, args)
# Synchronize the main process with the job processes to ensure proper cleanup.
pool.close()
pool.join()
#usage1: in shell
#nohup python multicsv2bin.py > logmult-20141017-cs63M-48in1 &
#start = 1
#end = 209
#binarydir = "/data/binarycatalogs-48in1"
#if __name__ == '__main__':
# with Timer() as t:
# if __name__ == "__main__":
# #launch 40 cores to convert binary catalogs from 1 to 86400.
# multiconvert(start, end)
# print "multicsv2bin: convert catalogs from %d to %d elapsed time: %.3f s." %(start, end, t.secs)
#usage2: in ipython
# from multicsv2bin import *
# convert(1)
#convert(1, "/home/wamdm/wm/gwac/binarycatalogs-200in1", "/home/wamdm/wm/gwac/combinedcsv-200in1-270M/", "RA240_DEC10_sqd225-ccd17-", ".cat", 1)

Binary file not shown.

View File

@ -0,0 +1,78 @@
import os, sys, socket
from datetime import datetime
sys.path.append(os.path.split( os.path.realpath( sys.argv[0] ) )[0])
#sys.path.append('../gwac_pipeline')
from simulator_pipeline import pipeline, stargenparams, machine_tableno
#import simulator_pipeline
####################time col
import datetime
t_str = '2017-10-12 00:00:00'
d = datetime.datetime.strptime(t_str, '%Y-%m-%d %H:%M:%S')
now=datetime.datetime.now()
timedelta=now-d
timestamp=timedelta.days*24*3600 + timedelta.seconds
####################time col
ccdno=int(sys.argv[1])
rac=int(sys.argv[2])
decc=int(sys.argv[3])
# the initial x and y
xi=int(sys.argv[4])
yi=int(sys.argv[5])
abnormpoint=int(sys.argv[6])
fp=open(os.path.split( os.path.realpath( sys.argv[0] ) )[0]+'/init_imageid.txt','r')
c=fp.readline()
fp.close()
catano=int(c)+1
fp=open(os.path.split( os.path.realpath( sys.argv[0] ) )[0]+'/init_imageid.txt','w')
fp.write(str(catano))
fp.close()
#os.system("sudo sysctl -w vm.swappiness=70")
#2400 is the number of each day's catalogs of one CCD.
#n1=i
#n2=i+2399
for i in '1':
cmbrate = 200 #2400/200=12 and 34M*200=6.6G ,*15proc=100G, plus copy-on-write = 200G, can fit in total mem 256G.
ratiosize = {200:'270M'}
startno=1
endno=1
stargenparams['ccdno']=ccdno
stargenparams['newcatano']=catano
stargenparams['n1cata']=timestamp
stargenparams['n2cata']=timestamp+(endno-1)*15
stargenparams['rac']=rac
stargenparams['decc']=decc
stargenparams['xi']=xi
stargenparams['yi']=yi
stargenparams['abnormpoint']=abnormpoint
tblno=1 #The back one is used to put data into monetdb, and does not has effect to generate star data
#tblno=machine_tableno[socket.gethostname()]
srcdir=stargenparams['destdir']
cbddir="../combinedcsv-%din1-%s/" %(cmbrate, ratiosize[cmbrate])
binarydir="../binarycatalogs-%din1/" %cmbrate
prefix="RA%03d_DEC%d_sqd%d-ccd%s-" %(stargenparams['rac'], stargenparams['decc'], stargenparams['sqd'],stargenparams['ccdno'])
suffix=".cat"
dbname='gwacdb'
# print "\nInitial parameters:\nstartno="+str(startno)+"\nendno="+str(endno)+"\ncmbrate="+str(cmbrate)+"\nhostname:"+socket.gethostname()+"\nccdno(tblno)="+str(tblno)+"\nsrcdir="+str(srcdir)+"\ncbddir="+str(cbddir)+"\nbinarydir="+str(binarydir)+"\nprefix="+prefix+"\nsuffix="+suffix+"\nratiosize[%d]=" %cmbrate +ratiosize[cmbrate] +"\ndbname="+dbname+"\n"
#1.Generate simulated catalog files. Uncomment the next line if this step is finished.
#edit the prefix of cbddir, binarydir according to your machine.
os.chdir('.')
pipeline(ifsimcat=True, ifcomb=False, ifconv=False, ifload=False, startno=startno, endno=endno, ratio=cmbrate, tblno=tblno, cbddir=cbddir, srcdir=srcdir, binarydir=binarydir, prefix=prefix, suffix=suffix, dbname=dbname)
#2.Combine multiple catalogs into one so as to speed up db loading. Uncomment the next line if this step is finished.
# pipeline(ifsimcat=False, ifcomb=True, ifconv=False, ifload=False, startno=startno, endno=endno, ratio=cmbrate, tblno=tblno, cbddir=cbddir, srcdir=srcdir, binarydir=binarydir, prefix=prefix, suffix=suffix, dbname=dbname)
#startTime = datetime.now()
#3. Convert the combined files from CSV format to binary. Uncomment the next line if this step is finished.
#pipeline(ifsimcat=False, ifcomb=False, ifconv=True, ifload=False, startno=int(stargenparams['n1cata']), endno=int(stargenparams['n2cata']), ratio=cmbrate, tblno=machine_tableno[socket.gethostname()], cbddir="/home/wamdm/wm/gwac/combinedcsv-%din1-%s/" %(cmbrate, ratiosize[cmbrate]), srcdir=stargenparams['destdir'], binarydir="/home/wamdm/wm/gwac/binarycatalogs-%din1/" %cmbrate, prefix="RA%03d_DEC%d_sqd%d-ccd%s-" %(stargenparams['rac'], stargenparams['decc'], stargenparams['sqd'],stargenparams['ccdno']), suffix=".cat", dbname='gwacdb')
#pipeline(ifsimcat=False, ifcomb=False, ifconv=True, ifload=False, startno=startno, endno=endno, ratio=cmbrate, tblno=tblno, cbddir=cbddir, srcdir=srcdir, binarydir=binarydir, prefix=prefix, suffix=suffix, dbname=dbname)
#4. Load the binary files into MonetDB.
#pipeline(ifsimcat=False, ifcomb=False, ifconv=False, ifload=True, startno=int(stargenparams['n1cata']), endno=int(stargenparams['n2cata']), ratio=cmbrate, tblno=machine_tableno[socket.gethostname()], cbddir="/data/gwac/combinedcsv-%din1-%s/" %(cmbrate, ratiosize[cmbrate]), srcdir=stargenparams['destdir'], binarydir="/data/gwac/binarycatalogs-%din1/" %cmbrate, prefix="RA%03d_DEC%d_sqd%d-ccd%s-" %(stargenparams['rac'], stargenparams['decc'], stargenparams['sqd'],stargenparams['ccdno']), suffix=".cat", dbname='gwacdb')
#print datetime.now() - startTime
#os.chdir('/data/gwac/gwac_pipeline/')
#cmd="./5_gwac_uniquecatalog.sh %d %d" %(n1,n2)
#os.system(cmd)
sys.exit(0)
#os.system("sudo sysctl -w vm.swappiness=0")

View File

@ -0,0 +1,173 @@
import os, sys, socket
from combineCata import combine, multicombine
from multicsv2bin import convert, multiconvert
from load_binarytoDB import load
from timer import Timer
import numpy as np
from time import strftime
import stargen_notemplt_mp as stargen
year = strftime("%Y")
mon = strftime("%m")
day = strftime("%d")
hour = strftime("%H")
min = strftime("%M")
sec = strftime("%S")
def todaystr():
'''
get date string
date format="YYYYMMDDHHMMSS"
'''
return year+mon+day+hour+min+sec
#ratiosize = {96:'130M', 192:'260M', 384:'520M', 768:'1G', 1536:'2G', 300:'400M'}
ratiosize = {200:'270M'}
#ratiosize = {1:'1.35M'}
def call_combine(today,ratio,startno,endno, cbddir, srcdir, prefix,suffix):
log1file=os.path.join(sys.path[0], 'logcomb-%s-%din1' %(today, ratio))
log1 = open(log1file, 'w+')
cmd = "mkdir -p %s" %cbddir
os.system(cmd)
#combine ratio catalogs into a larger one.
with Timer() as t1:
combine(startno, endno, ratio, destdir=cbddir, sourcedir=srcdir, prefix=prefix, suffix=suffix)
print >>log1, "combine %d catalogs, %d:1, elasped time: %.3f s" %(endno-startno+1, ratio, t1.secs)
log1.close()
def call_multicombine(today,ratio,startno,endno, cbddir, srcdir, prefix,suffix):
log1file=os.path.join(sys.path[0], 'logcomb-%s-%din1' %(today, ratio))
log1 = open(log1file, 'w+')
cmd = "mkdir -p %s" %cbddir
os.system(cmd)
#combine ratio catalogs into a larger one.
with Timer() as t1:
multicombine(startno, endno, ratio, destdir=cbddir, sourcedir=srcdir, prefix=prefix, suffix=suffix)
print >>log1, "combine %d catalogs, %d:1, elasped time: %.3f s" %(endno-startno+1, ratio, t1.secs)
log1.close()
def call_multiconvert(today, ratio, startno, endno, binarydir, cbddir, prefix,suffix):
#create a dirctory first for placing converted binary catalogs.
start = int(np.ceil(float(startno)/ratio))
end = int(np.ceil(float(endno)/ratio))
#arguments for convert
cmd = "mkdir -p %s" %binarydir
os.system(cmd)
log2file=os.path.join(sys.path[0], 'logmult-%s-cs%s-%din1-%dto%d' %(today, ratiosize[ratio], ratio, endno, end))
log2 = open(log2file, 'w+')
with Timer() as t2:
multiconvert(start, end, bindir=binarydir, catadir=cbddir, prefix=prefix, suffix=suffix)
print >>log2,"convert catalogs from %d to %d elapsed time: %.3f s." %(start, end, t2.secs)
log2.close()
def call_convert(today, ratio, startno, endno, binarydir, cbddir, prefix, suffix):
#create a dirctory first for placing converted binary catalogs.
start = startno
end = int(np.ceil(float(endno)/ratio))
#arguments for convert
cmd = "mkdir -p %s" %binarydir
os.system(cmd)
log2file=os.path.join(sys.path[0], 'logmult-%s-cs%s-%din1-%dto%d' %(today, ratiosize[ratio], ratio, endno, end))
log2 = open(log2file, 'w+')
with Timer() as t2:
convert(start, bindir=binarydir, catadir=cbddir, prefix=prefix, suffix=suffix, n=end)
print >>log2,"convert catalogs from %d to %d elapsed time: %.3f s." %(start, end, t2.secs)
log2.close()
def call_load(today, ratio, tblno, startno, endno, binarydir, prefix, suffix, dbname):
#only drop targets table when startno=1, ie loading from an empty, otherwise it means we dont need to load from scratch, no need to delete table.
if startno == 1:
os.system("echo 'drop table targetss%d and targets%s and targets_seq'; mclient %s -s 'drop table targetss%s'" %(tblno, tblno, dbname, tblno))
os.system("mclient %s -s 'drop table targets%s CASCADE'" %(dbname, tblno))
os.system("mclient %s -s 'drop sequence targets_seq'" %dbname);
os.system("echo 'create table targetss and targets' ");
os.system(" mclient %s -s 'create table targetss%s(imageid int, zone smallint, ra double, \"dec\" double, mag double, pixel_x double, pixel_y double, ra_err double, dec_err double, x double, y double, z double, flux double, flux_err double, normmag double, flag double, background double, threshold double, mag_err double, ellipticity double, class_star double, orig_catid int)' " %(dbname, tblno))
os.system(" mclient %s -s 'CREATE SEQUENCE \"targets_seq\" as BIGINT'" %dbname)
os.system(""" mclient %s -s 'create table targets%s(id bigint DEFAULT NEXT VALUE FOR \"targets_seq\", imageid int, zone smallint, ra double, \"dec\" double, mag double, pixel_x double, pixel_y double, ra_err double, dec_err double, x double, y double, z double, flux double, flux_err double, normmag double, flag double, background double, threshold double, mag_err double, ellipticity double, class_star double, orig_catid int, PRIMARY KEY (id) )' """ %(dbname, tblno))
else: #when startno is no 1, means loading continues with existing data, so no need to delete targets1 table.
os.system("echo 'drop table targetss%d '; mclient %s -s 'drop table targetss%s'" %(tblno, dbname, tblno))
os.system("echo 'create table targetss'");
os.system(" mclient %s -s 'create table targetss%s(imageid int, zone smallint, ra double, \"dec\" double, mag double, pixel_x double, pixel_y double, ra_err double, dec_err double, x double, y double, z double, flux double, flux_err double, normmag double, flag double, background double, threshold double, mag_err double, ellipticity double, class_star double, orig_catid int)' " %(dbname, tblno))
#load binary files into MonetDB.
log3 = os.path.join(sys.path[0], 'logload-%s-cs%s-%din1-%dto%d' %(today, ratiosize[ratio], ratio, startno, endno))
if ratio != 1:
#using one core to load one table.
num = int(np.ceil(float(endno-startno+1)/ratio))
#combinedstartno is the start combined-catalog number we need to find in binarycatalogs-xin1 folder, should only be interger multiple of ratio +1.
combinedstartno = int(startno/ratio) + 1
else:
num = int(np.ceil(float(endno-startno+1)))
combinedstartno = int(startno)
with Timer() as t3:
load(log3, tblno, startno=combinedstartno, num=num, prefix=prefix,suffix=suffix,bindir=binarydir, dbname=dbname)
etime = t3.secs
with open(log3, 'a') as logfile:
logfile.write("finished loading %d %s catalogs into MonetDB, elapsed time: %.3f s." %(num, ratiosize[ratio], etime))
def pipeline(ifsimcat, ifcomb, ifconv, ifload, startno, endno, ratio, tblno, cbddir, srcdir, binarydir, prefix="RA240_DEC10_sqd225-",suffix=".cat", dbname='gwacdb'):
#first check if startno is valid. startno should only be interger multiple of ratio +1.
if startno!= 1 and startno % ratio != 1:
sys.exit("error: startno should only be interger multiple of ratio +1, invalid startno: %d" %startno)
today = todaystr()
ratio=ratiosize.keys()[0]
if ifsimcat==True:
#stargenparams['n1cata']=startno
#stargenparams['n2cata']=endno
stargen.star_generator(**stargenparams)
if ifcomb==True:
call_multicombine(today,ratio,startno,endno, cbddir, srcdir, prefix,suffix)
if ifconv==True:
call_multiconvert(today,ratio,startno, endno, binarydir, cbddir, prefix, suffix)
if ifload==True:
call_load(today, ratio, tblno, startno, endno, binarydir, prefix, suffix, dbname)
#when simulator_pipeline.py is used independently, uncomment machine_tableno and stargenparams
machine_tableno = {
'stones01.scilens.private' : 0,
'stones02.scilens.private' : 1,
'stones03.scilens.private' : 2,
'stones04.scilens.private' : 3,
'stones05.scilens.private' : 4,
'stones06.scilens.private' : 5,
'stones07.scilens.private' : 6,
'stones08.scilens.private' : 7,
'stones09.scilens.private' : 8,
'stones10.scilens.private' : 9,
'stones11.scilens.private' : 10,
'stones12.scilens.private' : 11,
'stones13.scilens.private' : 12,
'stones14.scilens.private' : 13,
'stones15.scilens.private' : 14,
'stones16.scilens.private' : 15,
'gwacdb' : 16,
'wamdm80' : 17
}
stargenparams = {
'n1cata' : 1,
'n2cata' : 24, #12000,
'abnormpoint' : 0,
'newcatano' : 1,
'ccdno' : 1,
'destdir' : os.path.split( os.path.realpath( sys.argv[0] ) )[0]+"/../catalog.csv/",
#'templatefile' : '/data/gwac/RA240_DEC10_sqd180_233.2_246.8_3.4_16.6_175637.cat',
'templatefile' : os.path.split( os.path.realpath( sys.argv[0] ) )[0]+'/RA240_DEC10_sqd225.cat',
'rac' : 240,
'decc' : 10,
'sqd' : 225,
'pixscal' : 11.7,
'xi' : 0,
'yi' : 0,
'sizes' : [4096,4096],
'zoneheight' : 10. #arcsec
}
#
#cmbrate = 1 #2400/200=12 and 34M*200=6.6G ,*15proc=100G, plus copy-on-write = 200G, can fit in total mem 256G.
#pipeline(ifsimcat=True, ifcomb=True, ifconv=True, ifload=False, startno=int(stargenparams['n1cata']), endno=int(stargenparams['n2cata']), ratio=cmbrate, tblno=machine_tableno[socket.gethostname()], cbddir="/scratch/meng/gwac/combinedcsv-%din1-%s/" %(cmbrate, ratiosize[cmbrate]), srcdir=stargenparams['destdir'], binarydir="/scratch/meng/gwac/binarycatalogs-%din1/" %cmbrate, prefix="RA%03d_DEC%d_sqd%d-ccd%s-" %(stargenparams['rac'], stargenparams['decc'], stargenparams['sqd'],stargenparams['ccdno']), suffix=".cat", dbname='gwacdb')
#usage:
#nohup python pipeline.py &

Binary file not shown.

View File

@ -0,0 +1,445 @@
# script name: binary_simulator_notemplt_mp.py
# Usage : from binary_simulator_notemplt_mp.py
# purpose:
# one day GWAC has 86400 catalogs.
# this script generates simulated ASCII catalogs only, not generate template from UCAC4!
# could work in either single process or multiple process mode by parameter overloading.
#
# *note* :
# To generate template from UCAC4 using: binary_simulatorbinary.py
# Author: Wan Meng <wanmeng@bao.ac.cn>
#
#------------------------------------
import os, sys, shutil
from os.path import getsize
#import matplotlib.pyplot as plt
import datetime
import scipy as S
import numpy as np
import pywcs
import pyfits
import re
import pdb
#import monetdb.sql
import struct
import ctypes
import multiprocessing as mp, logging, functools
from itertools import count
import ntpath
import warnings
import random
warnings.simplefilter("ignore")
#from WU Chao
def readcat(fil):
"""
return:
ra,dec,imag,sigi,mag1,magB,magV,gmag,rmag
"""
ff=open(fil)
oo = []
for f0 in ff:
f0t = f0.strip()
id = f0t[0:10]
ra = f0t[11:21]
dec = f0t[23:36]
mag1 = f0t[36:43]
magB = f0t[168:175]
magV = f0t[175:182]
gmag = f0t[182:189]
rmag = f0t[189:196]
imag = f0t[196:203]
if len(imag.strip())==0:
imag= -9999
sigi= f0t[227:233]
if len(sigi.strip()) ==0:
sigi = -9999
if len(magB.strip())==0:
magB = -9999
if len(magV.strip())==0:
magV = -9999
if len(gmag.strip())==0:
gmag = -9999
if len(rmag.strip())==0:
rmag = -9999
oo.append([ra,dec,imag,sigi,mag1,magB,magV,gmag,rmag])
return S.array(oo,dtype='float')
#from WU Chao
def calSquareDegree_conver(sqdeg,dec0):
"""
input sqdeg: unit in deg.
dec0: dec center in decimal deg.
assume decw = sqrt(sqdeg)
return: raw
ref:http://en.wikipedia.org/wiki/Solid_angle
"""
sa = sqdeg/((180./S.pi)**2)
decw = S.sqrt(sqdeg)
raw = S.degrees(sa /(S.sin(S.radians(dec0+decw/2.) ) - S.sin(S.radians(dec0-decw/2.))))
return raw
def getcats(rac,decc,decw,sqd=180,outfile=None):
"""
get point sources from UCAC4 catalog.
"""
raw = calSquareDegree_conver(sqd,decc)
#generate a ucac4.txt catalog.
cmd = "u4test %s %s %s %s u4b/" %(rac,decc,raw,decw)
print "cmd = %s " %cmd
os.system('rm -fr ucac4.txt')
os.system(cmd)
dd=readcat('ucac4.txt')
#ra,dec,imag,sigi,mag1,magB,magV,gmag,rmag
#only use ra,dec,mag1
radecmag=S.array(dd).take([0,1,4], axis=1) # only take column number of 0,1,4
#wcs need to change according to ra,dec, used to (ra, dec)-->(x, y)
wcs= genwcs(rac, decc, pixscal, sizes)
#use wcs to translate RA,DEC to x,y, according to new rac,decc.
ra=radecmag[0]
dec=radecmag[1]
x,y = wcs.wcs_sky2pix(ra, dec, 0) #(RA,DEC)-->(x,y)
#add the x,y field as the 3th,4th column.
dd=np.concatenate(radecmag, x, axis=1)
dd=np.concatenate(dd, y, axis=1)
if outfile:
S.savetxt(outfile,radecmag)
else:
outfile="RA%s_DEC%s_sqd%s.txt" %(rac,decc,sqd)
S.savetxt(outfile, dd)
return radecmag
#from WU Chao
def genwcs(rac,decc,pixscal=11.7,sizes=[4096,4096]):
"""
rac: center ra,
decc: center dec,
pixscal: pixel scale in arcsec/pixel
sizes: CCD pixels size, e.g. 1k= [1024,1024], 4k= [4096,4096]
"""
pixd= pixscal/3600.
xc = sizes[0]/2. + 0.5
yc = sizes[1]/2. + 0.5
ddtm= S.zeros(sizes)
wcs = pywcs.WCS()
wcs.naxis1 = sizes[0]
wcs.naxis2 = sizes[1]
wcs.wcs.ctype = ['RA---TAN', 'DEC--TAN']
wcs.wcs.crpix = S.array([xc,yc])
wcs.wcs.crval = S.array([rac,decc])
wcs.wcs.cd = S.array([[pixd,0],[0,pixd]])
wcs.wcs.cdelt = S.array([ 1., 1.])
hh = wcs.to_header()
hh.update('IMAGEW',wcs.naxis1)
hh.update('IMAGEH',wcs.naxis2)
hdu = pyfits.PrimaryHDU(header=hh)
return wcs
#produce one simulation catalog
def addGaussNoisetoxy(dd, wcs_transtool,sigma=0.07):
"""
This is first kind of x,y noise, add Noise distribution ~N(0, 0.1*pixel) of shape: (radecmagxy.shape[0],2) to samplefile[x,y column]
change RA,DEC according to x,y. wcs is a local variable produced by firstimg.py
r^2=x^2+y^2, sigma_x=0.07,sigma_y=0.07,so r^2=0.1,
"""
#set the number of stars
n=dd.shape[0]
#noise = np.random.normal(0, sigma,(n,2)) # scale is standard deviation
#add noise to xy
#dd[:,3:5] += 0#noise
# mapping to CCD, filter to reserve only column x,y both >0 and <4096!
dd =dd[(dd[:,3]>0) & (dd[:,4]>0) & (dd[:,3]<4096) & (dd[:,4]<4096)]
#change RA,DEC of noise xy
RA,DEC= wcs_transtool.wcs_pix2sky(dd[:,3], dd[:,4], 0) #(x,y)->(RA,DEC)
dd[:,0] = RA # dd[:,0:2]=RA,DEC doesn't work!
dd[:,1] = DEC
return dd
#add delta_ra, delta_dec to RA,DEC, then translate RA,DEC to x,y
def addMountMovetoRADEC(dd, rac=150, decc=40, peakval=10., pixscal=11.7, sizes=[4096,4096]):
"""
Mount moving need to be add to x,y, this is second kind of x,y error, each image has the same err value.
range from (-10.,10.) pixel.
the ra,dec of stars within CCD camera is constant(star positions are constant), only its xy are changed.
"""
#change delta in pixel to degree. 10 pix=0.0325 deg
mov_deg = peakval*pixscal/3600.
#delta_ra, delta_dec =np.random.uniform(-mov_deg, mov_deg, size=2)
delta_ra = 0
delta_dec = 0
#add mount moving to ra,dec.
ra=dd[:,0]
dec=dd[:,1]
#mount is moving, but stars position is constant,so star ra,dec aren't changed.
#ra[:] = ra+delta_ra #ra[:] is a view of ra!
#dec[:] = dec+delta_dec
#mount center is a new ra,dec.
rac=rac+delta_ra
decc=decc+delta_dec
#wcs need to change according to new ra,dec.
wcs= genwcs(rac, decc, pixscal, sizes)
#use wcs to translate RA,DEC to x,y, according to new rac,decc.
x,y = wcs.wcs_sky2pix(ra, dec, 0) #(RA,DEC)-->(x,y)
dd[:,3] = x # xy[:,3:4]=RA,DEC doesn't work!
dd[:,4] = y
# mapping to CCD, filter to reserve only column x,y both >0 and <4096!
dd =dd[(dd[:,3]>0) & (dd[:,4]>0) & (dd[:,3]<sizes[0]) & (dd[:,4]<sizes[1])]
return dd
def addGaussMagErrtoMag(dd):
"""
add merr directly to mag column. for each mag, the magerr follows Gauss distribution, only difference is sigma.
from xlp, the magerr~N(0,sigma)
for those objects with mag>14 the relation is:
m(x)=m0+m1*x+m2*x*x+m3*x*x*x
m0 = -73.7154
m1 = 15.7868
m2 = -1.13085
m3 = 0.0271142
where magerr sigma is m(x), x is mag
for those obj. with mag <14, it will be
n(x)=n0+n1*x+n2*x*x+n3*x*x*x
n0 = -1.2375
n1 = 0.362355
n2 = -0.035126
n3 = 0.00113084
"""
mag=dd[:,2] #mag is the 3rd column.
m0 = -73.7154
m1 = 15.7868
m2 = -1.13085
m3 = 0.0271142
#m(x)=m0+m1*x+m2*x*x+m3*x*x*x
n0 = -1.2375
n1 = 0.362355
n2 = -0.035126
n3 = 0.00113084
#n(x)=n0+n1*x+n2*x*x+n3*x*x*x
merr_sigma= np.where(mag>14, m0+m1*mag+m2*np.power(mag,2)+m3*np.power(mag,3), n0+n1*mag+n2*np.power(mag,2)+n3*np.power(mag,3))
#plt.figure()
#plt.ylim(0,1)
#plt.plot(mag, merr_sigma,'r.')
merr_sigma =np.abs(merr_sigma) #for 6,7,8 mag, there are minus values.
merr = [np.random.normal(0,sigma_i) for sigma_i in merr_sigma]
merr = np.array(merr)
#add a column merr to[ra,dec,mag,x,y] after mag as 4th column. zip(merr) must be a column vector when axis=1
#become [ra,dec,mag,merr,x,y]
#oo =np.concatenate((dd[:,0:3],zip(merr),dd[:,3:5]),axis=1)
#add merr to mag.
mag[:] = mag +merr
return dd
def addExtinctiontoMag(dd, abnormpoint, peakval=0.5) :
"""
add atmospheric extinction to mag, this is second kind of mag error, each image has the same err value.
range from (-0.5, 0.5)
"""
mag=dd[:,2]
n=mag.shape[0]
delta_mag2=np.random.uniform(-peakval,peakval, size=n)
for i in range(abnormpoint):
a=random.randint(0, n-1)
mag[a]=mag[a]+50
#directly add delta_mag2 to mag column of dd.
dd[:,2] =mag + delta_mag2 #mag[:] is a view of mag.
return dd
def genOneSimuCatalog(catano, abnormpoint, newcatano, ccd, destdir, templatefile, rac, decc, xi, yi, sqd, wcs_transtool, origintime, pixscal=11.7, sizes=[4096,4096], zoneheig=10.):
"""
do the simulation according to catano number.
"""
dirname = ntpath.dirname(templatefile) #like '/data/sim-catalogs'
#basename = ntpath.basename(templatefile) #like 'RA240_DEC10_sqd300.cat'
#samplefilepurename= re.findall(r'.+?(?=.cat)', basename)[0] #like 'RA240_DEC10_sqd300'
#catalog number start from 1,not 0.
#- zerofill zfill() or %04d is ok.
timestamp = catano
#catano = (timestamp - origintime)/15 + 1
catano=newcatano
simcatafilename = "%sRA%03d_DEC%d_sqd%d-ccd%s-%04d.cat" %(destdir, rac, decc, sqd, ccd, catano)
#String that will be written at the beginning of the file.
head=''
#head="# 1 NUMBER Running object number\n"
#head += "# 2 ALPHA_J2000 Right ascension of barycenter (J2000) [deg]\n"
#head += "# 3 DELTA_J2000 Declination of barycenter (J2000) [deg]\n"
#head += "# 4 MAG [mag]\n"
#head += "# 5 X_IMAGE Object position along x [pixel]\n"
#head += "# 6 Y_IMAGE Object position along y [pixel]"
#use the template catalog to generate simulation catalog
dd = np.loadtxt(templatefile,usecols=(1,2,3,4,5))
#add x,y noise and to x,y column.
dd=addGaussNoisetoxy(dd, wcs_transtool,sigma=0.0000001)#0.07
#print(dd[:,3])
oo=addMountMovetoRADEC(dd, rac=rac, decc=decc, peakval=1.709)
#add mag gauss err and atmospheric extinction to mag column.
uu=addGaussMagErrtoMag(oo)
dd=addExtinctiontoMag(uu, abnormpoint, 0.5)
#add the zone as the first column, using decl/20 arcsec.
zone=np.floor(dd[:,1]/zoneheig*3600)
dd=np.concatenate((zip(zone),dd), axis=1)
#add the imageid field:catano as the zeroth column.
n=dd.shape[0]
imageid=np.ones(n,dtype=np.uint32)*catano
dd=np.concatenate((zip(imageid),dd), axis=1)
ccdid=np.ones(n,dtype=np.uint32)*ccd
dd=np.concatenate((zip(ccdid),dd), axis=1)
#add ra_err and dec_err as seventh and eighth column, using random values.
radec_err=np.random.random((n,2))
dd=np.concatenate((dd,radec_err),axis=1)
#add x,y,z as the ninth,tenth,eleventh column, calculated by ra,dec.
x=np.cos(np.radians(dd[:,2]))*np.cos(np.radians(dd[:,3]))
y=np.sin(np.radians(dd[:,2]))*np.cos(np.radians(dd[:,3]))
z=np.sin(np.radians(dd[:,3]))
dd=np.concatenate((dd,zip(x),zip(y),zip(z)),axis=1)
#compute flux = 10^(-0.4*magnitude)*zero_point_flux as the twelve column.
#zero point flux(Jy) at V band is 3640 in Johnson system. output unit is mJy.
Fo=3640
flux=np.power(10, -0.4*dd[:,3])*Fo*1000
dd=np.concatenate((dd,zip(flux)),axis=1)
#add 13th-20th fields,all random.
fillrandoms=np.random.random((n,8))
dd=np.concatenate((dd,fillrandoms),axis=1)
#add the orig_catid as 21th column.
num=np.arange(1,n+1,dtype=np.uint32)
dd=np.concatenate((dd,zip(num)),axis=1)
####################time col
date=np.arange(1,n+1, dtype=np.uint32)
date[date>0]=timestamp
dd=np.concatenate((dd,zip(date)),axis=1)
####################time col
#add the shifting to xy
dd[:,6]=dd[:,6]+xi
dd[:,7]=dd[:,7]+yi
#save to simulation file.
if head:
np.savetxt(simcatafilename, dd, fmt=["%d","%10.4f","%10.4f","%10.4f","%10.4f","%10.4f"], header=head)
else:
#every field is seperated with a blank space, only need to assign the decimal place. x/y/z need more accuracy.
np.savetxt(simcatafilename, dd, fmt=["%d","%d","%d","%.4f","%.4f","%.4f","%.4f","%.4f","%.4f","%.4f","%.16f","%.16f","%.16f","%.4f","%.4f","%.4f","%.4f","%.4f","%.4f","%.4f","%.4f","%.4f","%d","%d"])
#print str(simcatafilename)
filesize=getsize(simcatafilename)/1024.0/1024.0
now=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print str(n)+' '+str(filesize)+' '+now
#transform the txt format to binary catalog, one column writes to one separate binary file.
# if not set format, int type in file will be read as float type.
"""
aa=np.loadtxt(simcatafilename,dtype={'names':('imageid','zone','ra','dec','mag','x_pix','y_pix','ra_err','dec_err','x','y','z','flux','flux_err','normmag','flag','background','threshold','mag_err','ellipticity','class_star','orig_catid'),'formats':(np.int32,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.float64,np.int32)}) #read the format.
col=22
for i in range(0,col):
fp=open("%s-%d" %(simcatafilename,i),'wb+')
if type(aa[0][i]) is np.int32:
bytes = ctypes.create_string_buffer(4*n)
for j in range(0,n):
struct.pack_into('i',bytes,j*4,aa[j][i])
elif type(aa[0][i]) is np.float64:
bytes = ctypes.create_string_buffer(8*n)
for j in range(0,n):
struct.pack_into('d',bytes,j*8,aa[j][i])
fp.write(bytes)
fp.close()
"""
# return simulated catalog file name can be used to convert to binary format.
return simcatafilename
def getOneNightCatalog(catano, abnormpoint, newcatano, ccd, destdir, templatefile, rac, decc, xi, yi, sqd, pixscal, sizes, zoneheig, origintime):
"""
produce one night's catalog from UCAC4 sample catalog file just produced by firstimg.py, name like: RA200_DEC40_sqd250.txt
generate a single catalog in multi processing mode.
"""
wcs=genwcs(rac, decc, pixscal, sizes)
genOneSimuCatalog(catano, abnormpoint, newcatano, ccd, destdir, templatefile, rac, decc, xi, yi, sqd, wcs, origintime, pixscal, sizes, zoneheig)
def star_generator(**workparams):
"""
benefit from multiporcessing by splitting task of genarating thousands of catalogs
into individual catalog to be processed simultaneously.
"""
if not os.path.exists(workparams['templatefile']):
sys.exit('%s: template file does not exits.' %workparams['templatefile'])
if os.path.exists(workparams['destdir']):
shutil.rmtree(workparams['destdir'])
if not os.path.exists(workparams['destdir']):
os.makedirs(workparams['destdir'])
# Create a pool class and run the jobs-the number of jobs is equal to the number of catalogs needed to generate.
pool = mp.Pool(processes=mp.cpu_count()-1)
pool.map(functools.partial(getOneNightCatalog, abnormpoint=workparams['abnormpoint'], newcatano=workparams['newcatano'], ccd=workparams['ccdno'], destdir=workparams['destdir'], templatefile=workparams['templatefile'], rac=workparams['rac'], decc=workparams['decc'], xi=workparams['xi'], yi=workparams['yi'], sqd=workparams['sqd'], pixscal=workparams['pixscal'], sizes=workparams['sizes'], zoneheig=workparams['zoneheight'], origintime=workparams['n1cata']), range(workparams['n1cata'], workparams['n2cata']+1,15))
# Synchronize the main process with the job processes to ensure proper cleanup.
pool.close()
pool.join()
'''
workparams = {
'n1cata' : 1,
'n2cata' : 2400,
'ccdno' : machine_tableno[],
'destdir' : "../catalog.csv",
'templatefile' : 'RA240_DEC10_sqd225.cat',
'rac' : 240,
'decc' : 10,
'sqd' : 225,
'pixscal' : 11.7,
'sizes' : [4096,4096],
'zoneheight' : 10. #arcsec
}
'''
# End main
if __name__ == "__main__":
star_generator(**workparams)

Binary file not shown.

View File

@ -0,0 +1,6 @@
#!/bin/bash
TIME_1=`date +%s`
echo $TIME_1
python pipeline.py
TIME_2=`date +%s`
echo $TIME_2

View File

@ -0,0 +1,16 @@
import time
class Timer(object):
def __init__(self, verbose=False):
self.verbose = verbose
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *args):
self.end = time.time()
self.secs = self.end - self.start
self.msecs = self.secs * 1000 # millisecs
if self.verbose:
print 'elapsed time: %f ms' % self.msecs

Binary file not shown.

View File

@ -0,0 +1,16 @@
分布式GWAC数据生成与入库系统
该系统借助于redis cluster和redis-cluster-tool使用前需搭建redis cluster并将redis-cluster-tool文件夹移至$REDIS_HOME下并将redis cluster配合为高可用模式。
功能描述该系统为分布式系统将gwac交叉验证程序作为后台程序每15秒产生一次数据将数据文件地址发送给后台程序后台程序对其入库能够实现入库在规定时间内入库没有完成构建一级缓存暂存未完成数据可继续进行下一次入库该模式是一种非阻塞的入库方式。
其中一个节点为master剩余为slave节点master节点下$GWAC_HOME/gwac_dbgen_cluster/nodehostname配置slave节点的主机名。
主要操作流程:
1 配置好nodehostname后将gwac文件夹复制到所有slave节点。
2 ./genTemplateStartSquirrel.sh启动交叉验证和入库程序Squirrel如果已经有模板星表则不生成新的模板星表否则生成新的模板星表。在这个过程之前可以使用deleteTemplateTable.sh清空模板星表以产生新的或使用clearLogs.sh清空master和slave的日志文件需要注意的是master节点默认每5个产生周期(一个产生周期默认15s)和运行结束前写一条日志信息下次运行时会读取上一条日志并继续在其上累加如已产生的文件总量已生成的星总行数等信息设计目的为gwac只有晚上开白天停每次晚上开启能够继续记录gwac从第一次开启累计到现在的总量如果想要继续累加一定不能删除master下的log文件。
3./sumLineAndDataSize.sh $NUM启动总程序开始生成星表并入库$NUM是生成次数默认一次15秒其余参数可以进入sumLineAndDataSize.sh修改。在运行期间可以在任意能连上集群的节点执行
./stopGen.sh $STOP,停止集群,$STOP可以取两个值一个为stop代表下一个产生周期(15s)后系统正常停止force代表现在立刻停止系统用于在系统出现故障时使用。
./addAbnormalStar.sh $HOST $STARNUM用于在$HOST节点上产生$STARNUM个异常星异常星的默认标准为亮度大于50.
./getSumData.sh 用于获取当前系统,总的累计数,默认包括总星行数,数据总量,数据戳。

View File

@ -0,0 +1,11 @@
#!/bin/bash
sumport=1984
masterhost="wamdm100"
abhost=$1
bernoulliParam=$2
geometricParam=$3
echo "abnormal $abhost $bernoulliParam $geometricParam" | nc -q 0 $masterhost 1984
echo "finished."
exit 0

View File

@ -0,0 +1,14 @@
#/bin/bash
rm -rf logs/*
echo "clear $(hostname)'s logs"
username=$(whoami)
for line in `cat nodehostname | awk -F "-" '{print $1}'`
do
{
ssh $username@$line "rm -rf gwac/gwac_dbgen_cluster/logs/*"
echo "clear $line's logs"
} &
done
wait

View File

@ -0,0 +1,14 @@
#/bin/bash
rm -rf ../template_table/*
echo "delete $(hostname)'s template table."
username=$(whoami)
for line in `awk -F "-" '{print $1}' nodehostname`
do
{
ssh $username@$line "rm -rf gwac/template_table/*"
echo "delete $line's template table"
} &
done
wait

View File

@ -0,0 +1,8 @@
#!/bin/bash
username="wamdm"
./sumLineAndDataSize.sh 1 "gt"

View File

@ -0,0 +1,16 @@
#!/bin/bash
# if the template table is existing, the previous template is used.
if [ -f "../template_table/template" ]
then
echo "A template Table is existing."
else
rm -rf ../template_table
mkdir ../template_table
file=$(ls ../catalog.csv/)
templateFile=`echo $file | awk '{gsub(substr($0,index($0,"sqd")),"template");print $0}'`
mv ../catalog.csv/$file ../template_table/$templateFile
fi
#awk '{$2=null;$24=null; print }' template_table/$file | awk -F " " '{print $1" "$2}' >template_table/template
#rm -rf template_table/$file

View File

@ -0,0 +1,8 @@
#!/bin/bash
masterhost="wamdm100"
hostname=$(hostname)
echo "print $hostname" | nc -q 0 $masterhost 1984
tmp=`nc -lp 1985`
echo $tmp > stalog.txt
#echo "17000 33.6789 2016-11-17 12:12:12" >stalog.txt
echo "Saving is finished."

View File

@ -0,0 +1,43 @@
#!/bin/bash
fpid=$1
masterhost=$2
stoppost=1986
hostname=$(hostname)
ps ax | grep [.]/sendStaDataPerNode.sh | awk '{print $1}' | while read pid
do
if [ $fpid -ne $pid ]
then
kill $pid
fi
done
#lsof -Pnl +M -i4 | grep $stoppost | awk '{print $2}' | xargs kill >/dev/null 2>&1
tmp=`nc -lp $stoppost`
#echo "$tmp" >>1.txt
if [ "$tmp" == "force" ]
then
spid=`ps -ax | awk '{ print $1 }' | grep $fpid`
while [ "$fpid" == "$spid" ]
do
kill -9 $fpid
spid=`ps -ax | awk '{ print $1 }' | grep $fpid`
done
pgrep "astroDB_cache" | while read Spid
do
kill $Spid
done
echo "$hostname has stopped!" | ./send.py $masterhost $stoppost
exit 0
elif [ "$tmp" == "kill" ]
then
echo "0 0 slave_exit 0 0" > /tmp/slave.pipe
echo "Squirrel_exit" > /tmp/Squirrel_pipe_test
exit 0
elif [ "$tmp" == "exit" ]
then
exit 0
else
echo "$hostname's \"force\" is error, and the real message is $tmp." | ./send.py $masterhost $stoppost
fi

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
import socket
import threading
import SocketServer
import json, types,string
import os, time
import sys
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request.recv(1024)
#jdata = json.loads(data)
#print "Receive data from '%r'"% (data)
#print "Receive jdata from '%r'"% (jdata)
#rec_src = jdata[0]['src']
#rec_dst = jdata
#print filename
file_object = open('listen_cache.txt','a')
file_object.write(data)
file_object.close
#cur_thread = threading.current_thread()
#response = [{'thread':cur_thread.name,'src':rec_src,'dst':rec_dst}]
#jresp = json.dumps(response)
#self.request.sendall(jresp)
#rec_cmd = "proccess "+rec_src+" -o "+rec_dst
#print "CMD '%r'" % (rec_cmd)
#os.system(rec_cmd)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
pass
if __name__ == "__main__":
# Port 0 means to select an arbitrary unused port
HOST, PORT = sys.argv[1], int(sys.argv[2])
#print HOST
filename='listen_cache.txt'
clean=":>%s" %filename
os.system(clean)
stoppid="lsof -Pnl +M -i4 | grep %d | awk \'{print $2}\' | xargs kill >/dev/null 2>&1" %PORT
os.system(stoppid)
SocketServer.TCPServer.allow_reuse_address = True
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
#print "Server loop running in thread:", server_thread.name
#print " .... waiting for connection"
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()

View File

@ -0,0 +1,4 @@
#!/bin/bash
: > listen_cache.txt
lsof -Pnl +M -i4 | grep 1984 | awk '{print $2}' | xargs kill >/dev/null 2>&1
nc -lk 1984 >> listen_cache.txt

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
import socket
import threading
import SocketServer
import json, types,string
import os, time
import sys
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request.recv(1024)
#jdata = json.loads(data)
#print "Receive data from '%r'"% (data)
#print "Receive jdata from '%r'"% (jdata)
#rec_src = jdata[0]['src']
#rec_dst = jdata
#print filename
file_object = open('listen_stop.txt','a')
file_object.write(data)
file_object.close
#cur_thread = threading.current_thread()
#response = [{'thread':cur_thread.name,'src':rec_src,'dst':rec_dst}]
#jresp = json.dumps(response)
#self.request.sendall(jresp)
#rec_cmd = "proccess "+rec_src+" -o "+rec_dst
#print "CMD '%r'" % (rec_cmd)
#os.system(rec_cmd)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
pass
if __name__ == "__main__":
# Port 0 means to select an arbitrary unused port
HOST, PORT = sys.argv[1], int(sys.argv[2])
#print HOST
stoppid="lsof -Pnl +M -i4 | grep %d | awk \'{print $2}\' | xargs kill >/dev/null 2>&1" %PORT
os.system(stoppid)
filename='listen_stop.txt'
clean=":>%s" %filename
os.system(clean)
SocketServer.TCPServer.allow_reuse_address = True
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
# Start a thread with the server -- that thread will then start one
# more thread for each request
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
#print "Server loop running in thread:", server_thread.name
#print " .... waiting for connection"
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()

View File

@ -0,0 +1,5 @@
#!/bin/bash
: > listen_stop.txt
lsof -Pnl +M -i4 | grep 1986 | awk '{print $2}' | xargs kill >/dev/null 2>&1
nc -lk 1986 >>listen_stop.txt
#wait $$ 2>/dev/null #suppress Terminated message

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,20 @@
wamdm101 has stopped!
wamdm102 has stopped!
wamdm103 has stopped!
wamdm104 has stopped!
wamdm105 has stopped!
wamdm106 has stopped!
wamdm107 has stopped!
wamdm108 has stopped!
wamdm109 has stopped!
wamdm110 has stopped!
wamdm111 has stopped!
wamdm112 has stopped!
wamdm113 has stopped!
wamdm114 has stopped!
wamdm115 has stopped!
wamdm116 has stopped!
wamdm117 has stopped!
wamdm118 has stopped!
wamdm119 has stopped!
finished

View File

@ -0,0 +1,387 @@
TotalLine TotalSize(MB) TotalTime(s) StorageLine CrossCertifiedErrorLine Timestamp
0 0 0 0 0 0 0
702812 135.573 40 702812 0 2017-10-15 23:29:09
1581327 305.374 80 1581327 0 2017-10-15 23:29:50
2459842 475.679 120 2459842 0 2017-10-15 23:30:31
3338357 645.984 160 3338357 0 2017-10-15 23:31:13
4216872 816.289 200 4216872 0 2017-10-15 23:31:54
5095387 986.594 240 5095387 0 2017-10-15 23:32:35
5973902 1156.89 280 5973902 0 2017-10-15 23:33:16
6852417 1327.19 320 6852417 0 2017-10-15 23:33:57
7730932 1497.49 360 7730932 0 2017-10-15 23:34:38
8609447 1667.79 400 8609447 0 2017-10-15 23:35:19
9487962 1838.09 440 9487962 0 2017-10-15 23:36:01
10366477 2008.39 480 10366477 0 2017-10-15 23:36:42
11244992 2178.69 520 11244992 0 2017-10-15 23:37:23
12123507 2348.99 560 12123507 0 2017-10-15 23:38:04
13002022 2519.29 600 13002022 0 2017-10-15 23:38:45
13880537 2689.59 640 13880537 0 2017-10-15 23:39:25
14759052 2859.89 680 14759052 0 2017-10-15 23:40:06
15637567 3030.19 720 15637567 0 2017-10-15 23:40:47
16516082 3200.49 760 16516082 0 2017-10-15 23:41:28
17394597 3371.13 800 17394597 0 2017-10-15 23:42:09
18273112 3542.28 840 18273112 0 2017-10-15 23:42:50
19151627 3713.43 880 19151627 0 2017-10-15 23:43:31
20030142 3884.58 920 20030142 0 2017-10-15 23:44:12
20908657 4055.73 960 20908657 0 2017-10-15 23:44:53
21787172 4226.88 1000 21787172 0 2017-10-15 23:45:34
22665687 4398.03 1040 22665687 0 2017-10-15 23:46:15
23544202 4569.18 1080 23544202 0 2017-10-15 23:46:56
24422717 4740.33 1120 24422717 0 2017-10-15 23:47:37
25301232 4911.48 1160 25301232 0 2017-10-15 23:48:18
26179747 5082.63 1200 26179747 0 2017-10-15 23:48:59
27058262 5253.78 1240 27058262 0 2017-10-15 23:49:40
27936777 5424.93 1280 27936777 0 2017-10-15 23:50:20
28815292 5596.08 1320 28815292 0 2017-10-15 23:51:02
29693807 5767.23 1360 29693807 0 2017-10-15 23:51:43
30572322 5938.38 1400 30572322 0 2017-10-15 23:52:24
31450837 6109.53 1440 31450837 0 2017-10-15 23:53:05
32329352 6280.68 1480 32329352 0 2017-10-15 23:53:47
33207867 6451.83 1520 33207867 0 2017-10-15 23:54:27
34086382 6622.98 1560 34086382 0 2017-10-15 23:55:08
34964897 6794.13 1600 34964897 0 2017-10-15 23:55:49
35843412 6965.28 1640 35843412 0 2017-10-15 23:56:30
36721927 7136.43 1680 36721927 0 2017-10-15 23:57:11
37600442 7307.58 1720 37600442 0 2017-10-15 23:57:52
38478957 7478.73 1760 38478957 0 2017-10-15 23:58:33
39357472 7649.88 1800 39357472 0 2017-10-15 23:59:14
40235987 7821.03 1840 40235987 0 2017-10-15 23:59:54
41114502 7992.18 1880 41114502 0 2017-10-16 00:00:35
41993017 8163.33 1920 41993017 0 2017-10-16 00:01:16
42871532 8334.48 1960 42871532 0 2017-10-16 00:01:57
43750047 8505.63 2000 43750047 0 2017-10-16 00:02:38
44628562 8676.78 2040 44628562 0 2017-10-16 00:03:19
45507077 8847.93 2080 45507077 0 2017-10-16 00:04:00
46385592 9019.08 2120 46385592 0 2017-10-16 00:04:41
47264107 9190.23 2160 47264107 0 2017-10-16 00:05:22
48142622 9361.38 2200 48142622 0 2017-10-16 00:06:04
49021137 9532.53 2240 49021137 0 2017-10-16 00:06:45
49899652 9703.68 2280 49899652 0 2017-10-16 00:07:27
50778167 9874.83 2320 50778167 0 2017-10-16 00:08:08
51656682 10045.9 2360 51656682 0 2017-10-16 00:08:48
52535197 10216.9 2400 52535197 0 2017-10-16 00:09:29
53413712 10387.9 2440 53413712 0 2017-10-16 00:10:10
54292227 10558.9 2480 54292227 0 2017-10-16 00:10:51
55170742 10729.9 2520 55170742 0 2017-10-16 00:11:32
56049257 10900.9 2560 56049257 0 2017-10-16 00:12:14
56927772 11071.9 2600 56927772 0 2017-10-16 00:12:55
57806287 11242.9 2640 57806287 0 2017-10-16 00:13:35
58684802 11413.9 2680 58684802 0 2017-10-16 00:14:16
59563317 11584.9 2720 59563317 0 2017-10-16 00:14:57
60441832 11755.9 2760 60441832 0 2017-10-16 00:15:39
61320347 11926.9 2800 61320347 0 2017-10-16 00:16:19
62198862 12097.9 2840 62198862 0 2017-10-16 00:17:00
63077377 12268.9 2880 63077377 0 2017-10-16 00:17:41
63955892 12439.9 2920 63955892 0 2017-10-16 00:18:22
64834407 12610.9 2960 64834407 0 2017-10-16 00:19:03
65712922 12781.9 3000 65712922 0 2017-10-16 00:19:44
66591437 12952.9 3040 66591437 0 2017-10-16 00:20:25
67469952 13123.9 3080 67469952 0 2017-10-16 00:21:06
68348467 13294.9 3120 68348467 0 2017-10-16 00:21:46
69226982 13465.9 3160 69226982 0 2017-10-16 00:22:27
70105497 13636.9 3200 70105497 0 2017-10-16 00:23:08
70984012 13807.9 3240 70984012 0 2017-10-16 00:23:49
71862527 13978.9 3280 71862527 0 2017-10-16 00:24:30
72741042 14149.9 3320 72741042 0 2017-10-16 00:25:11
73619557 14320.9 3360 73619557 0 2017-10-16 00:25:52
74498072 14491.9 3400 74498072 0 2017-10-16 00:26:33
75376587 14662.9 3440 75376587 0 2017-10-16 00:27:14
76255102 14833.9 3480 76255102 0 2017-10-16 00:27:55
77133617 15004.9 3520 77133617 0 2017-10-16 00:28:36
78012132 15175.9 3560 78012132 0 2017-10-16 00:29:17
78890647 15346.9 3600 78890647 0 2017-10-16 00:29:58
79769162 15517.9 3640 79769162 0 2017-10-16 00:30:39
80647677 15688.9 3680 80647677 0 2017-10-16 00:31:20
81526192 15859.9 3720 81526192 0 2017-10-16 00:32:00
82404707 16030.9 3760 82404707 0 2017-10-16 00:32:41
83283222 16201.9 3800 83283222 0 2017-10-16 00:33:22
84161737 16372.9 3840 84161737 0 2017-10-16 00:34:03
85040252 16543.9 3880 85040252 0 2017-10-16 00:34:44
85918767 16714.9 3920 85918767 0 2017-10-16 00:35:25
86797282 16885.9 3960 86797282 0 2017-10-16 00:36:06
87675797 17056.9 4000 87675797 0 2017-10-16 00:36:47
88554312 17227.9 4040 88554312 0 2017-10-16 00:37:27
89432827 17398.9 4080 89432827 0 2017-10-16 00:38:09
90311342 17569.9 4120 90311342 0 2017-10-16 00:38:50
91189857 17740.9 4160 91189857 0 2017-10-16 00:39:31
92068372 17911.9 4200 92068372 0 2017-10-16 00:40:12
92946887 18082.9 4240 92946887 0 2017-10-16 00:40:53
93825402 18253.9 4280 93825402 0 2017-10-16 00:41:34
94703917 18424.9 4320 94703917 0 2017-10-16 00:42:15
95582432 18595.9 4360 95582432 0 2017-10-16 00:42:56
96460947 18766.9 4400 96460947 0 2017-10-16 00:43:37
97339462 18937.9 4440 97339462 0 2017-10-16 00:44:18
98217977 19108.9 4480 98217977 0 2017-10-16 00:44:59
99096492 19279.9 4520 99096492 0 2017-10-16 00:45:40
99975007 19450.9 4560 99975007 0 2017-10-16 00:46:22
100853522 19621.9 4600 100853522 0 2017-10-16 00:47:02
101732037 19792.9 4640 101732037 0 2017-10-16 00:47:43
102610552 19963.9 4680 102610552 0 2017-10-16 00:48:24
103489067 20134.9 4720 103489067 0 2017-10-16 00:49:05
104367582 20305.9 4760 104367582 0 2017-10-16 00:49:45
105246097 20476.9 4800 105246097 0 2017-10-16 00:50:26
106124612 20647.9 4840 106124612 0 2017-10-16 00:51:07
107003127 20818.9 4880 107003127 0 2017-10-16 00:51:48
107881642 20989.9 4920 107881642 0 2017-10-16 00:52:29
108760157 21160.9 4960 108760157 0 2017-10-16 00:53:10
109638672 21331.9 5000 109638672 0 2017-10-16 00:53:52
110517187 21502.9 5040 110517187 0 2017-10-16 00:54:33
111395702 21673.9 5080 111395702 0 2017-10-16 00:55:13
112274217 21844.9 5120 112274217 0 2017-10-16 00:55:55
113152732 22015.9 5160 113152732 0 2017-10-16 00:56:36
114031247 22186.9 5200 114031247 0 2017-10-16 00:57:16
114909762 22357.9 5240 114909762 0 2017-10-16 00:57:57
115788277 22528.9 5280 115788277 0 2017-10-16 00:58:38
116666792 22699.9 5320 116666792 0 2017-10-16 00:59:19
117545307 22870.9 5360 117545307 0 2017-10-16 01:00:00
118423822 23041.9 5400 118423822 0 2017-10-16 01:00:41
119302337 23212.9 5440 119302337 0 2017-10-16 01:01:22
120180852 23383.9 5480 120180852 0 2017-10-16 01:02:03
121059367 23554.9 5520 121059367 0 2017-10-16 01:02:44
121937882 23725.9 5560 121937882 0 2017-10-16 01:03:25
122816397 23896.9 5600 122816397 0 2017-10-16 01:04:07
123694912 24067.9 5640 123694912 0 2017-10-16 01:04:47
124573427 24238.9 5680 124573427 0 2017-10-16 01:05:28
125451942 24409.9 5720 125451942 0 2017-10-16 01:06:09
126330457 24580.9 5760 126330457 0 2017-10-16 01:06:50
127208972 24751.9 5800 127208972 0 2017-10-16 01:07:32
128087487 24922.9 5840 128087487 0 2017-10-16 01:08:13
128966002 25093.9 5880 128966002 0 2017-10-16 01:08:54
129844517 25264.9 5920 129844517 0 2017-10-16 01:09:35
130723032 25435.9 5960 130723032 0 2017-10-16 01:10:16
131601547 25606.9 6000 131601547 0 2017-10-16 01:10:57
132480062 25777.9 6040 132480062 0 2017-10-16 01:11:38
133358577 25948.9 6080 133358577 0 2017-10-16 01:12:19
134237092 26119.9 6120 134237092 0 2017-10-16 01:13:00
135115607 26290.9 6160 135115607 0 2017-10-16 01:13:42
135994122 26461.9 6200 135994122 0 2017-10-16 01:14:23
136872637 26632.9 6240 136872637 0 2017-10-16 01:15:03
137751152 26803.9 6280 137751152 0 2017-10-16 01:15:44
138629667 26974.9 6320 138629667 0 2017-10-16 01:16:25
139508182 27145.9 6360 139508182 0 2017-10-16 01:17:06
140386697 27316.9 6400 140386697 0 2017-10-16 01:17:47
141265212 27487.9 6440 141265212 0 2017-10-16 01:18:28
142143727 27658.9 6480 142143727 0 2017-10-16 01:19:10
143022242 27829.9 6520 143022242 0 2017-10-16 01:19:51
143900757 28000.9 6560 143900757 0 2017-10-16 01:20:31
144779272 28171.9 6600 144779272 0 2017-10-16 01:21:14
145657787 28342.9 6640 145657787 0 2017-10-16 01:21:55
146536302 28513.9 6680 146536302 0 2017-10-16 01:22:36
147414817 28684.9 6720 147414817 0 2017-10-16 01:23:17
148293332 28855.9 6760 148293332 0 2017-10-16 01:23:58
149171847 29026.9 6800 149171847 0 2017-10-16 01:24:39
150050362 29197.9 6840 150050362 0 2017-10-16 01:25:20
150928877 29368.9 6880 150928877 0 2017-10-16 01:26:01
151807392 29539.9 6920 151807392 0 2017-10-16 01:26:42
152685907 29710.9 6960 152685907 0 2017-10-16 01:27:23
153564422 29881.9 7000 153564422 0 2017-10-16 01:28:04
154442937 30052.9 7040 154442937 0 2017-10-16 01:28:45
155321452 30223.9 7080 155321452 0 2017-10-16 01:29:27
156199967 30394.9 7120 156199967 0 2017-10-16 01:30:08
157078482 30565.9 7160 157078482 0 2017-10-16 01:30:49
157956997 30736.9 7200 157956997 0 2017-10-16 01:31:30
158835512 30907.9 7240 158835512 0 2017-10-16 01:32:10
159714027 31078.9 7280 159714027 0 2017-10-16 01:32:51
160592542 31249.9 7320 160592542 0 2017-10-16 01:33:32
161471057 31420.9 7360 161471057 0 2017-10-16 01:34:13
162349572 31591.9 7400 162349572 0 2017-10-16 01:34:54
163228087 31762.9 7440 163228087 0 2017-10-16 01:35:35
164106602 31933.9 7480 164106602 0 2017-10-16 01:36:16
164985117 32104.9 7520 164985117 0 2017-10-16 01:36:57
165863632 32275.9 7560 165863632 0 2017-10-16 01:37:38
166742147 32446.9 7600 166742147 0 2017-10-16 01:38:18
167620662 32617.9 7640 167620662 0 2017-10-16 01:38:59
168499177 32788.9 7680 168499177 0 2017-10-16 01:39:40
169377692 32959.9 7720 169377692 0 2017-10-16 01:40:22
170256207 33130.9 7760 170256207 0 2017-10-16 01:41:02
171134722 33301.9 7800 171134722 0 2017-10-16 01:41:43
172013237 33472.9 7840 172013237 0 2017-10-16 01:42:24
172891752 33643.9 7880 172891752 0 2017-10-16 01:43:05
173770267 33814.9 7920 173770267 0 2017-10-16 01:43:46
174648782 33985.9 7960 174648782 0 2017-10-16 01:44:27
175527297 34157.3 8000 175527297 0 2017-10-16 01:45:08
176405812 34329.3 8040 176405812 0 2017-10-16 01:45:49
177284327 34501.3 8080 177284327 0 2017-10-16 01:46:30
178162842 34673.3 8120 178162842 0 2017-10-16 01:47:11
179041357 34845.3 8160 179041357 0 2017-10-16 01:47:51
179919872 35017.3 8200 179919872 0 2017-10-16 01:48:33
180798387 35189.3 8240 180798387 0 2017-10-16 01:49:13
181676902 35361.3 8280 181676902 0 2017-10-16 01:49:54
182555417 35533.3 8320 182555417 0 2017-10-16 01:50:35
183433932 35705.3 8360 183433932 0 2017-10-16 01:51:16
184312447 35877.3 8400 184312447 0 2017-10-16 01:51:57
185190962 36049.3 8440 185190962 0 2017-10-16 01:52:38
186069477 36221.3 8480 186069477 0 2017-10-16 01:53:19
186947992 36393.3 8520 186947992 0 2017-10-16 01:54:00
187826507 36565.3 8560 187826507 0 2017-10-16 01:54:41
188705022 36737.3 8600 188705022 0 2017-10-16 01:55:22
189583537 36909.3 8640 189583537 0 2017-10-16 01:56:03
190462052 37081.3 8680 190462052 0 2017-10-16 01:56:44
191340567 37253.3 8720 191340567 0 2017-10-16 01:57:25
192219082 37425.3 8760 192219082 0 2017-10-16 01:58:06
193097597 37597.3 8800 193097597 0 2017-10-16 01:58:47
193976112 37769.3 8840 193976112 0 2017-10-16 01:59:28
194854627 37941.3 8880 194854627 0 2017-10-16 02:00:09
195733142 38113.3 8920 195733142 0 2017-10-16 02:00:51
196611657 38285.3 8960 196611657 0 2017-10-16 02:01:31
197490172 38457.3 9000 197490172 0 2017-10-16 02:02:12
198368687 38629.3 9040 198368687 0 2017-10-16 02:02:53
199247202 38801.3 9080 199247202 0 2017-10-16 02:03:34
200125717 38973.3 9120 200125717 0 2017-10-16 02:04:15
201004232 39145.3 9160 201004232 0 2017-10-16 02:04:57
201882747 39317.3 9200 201882747 0 2017-10-16 02:05:38
202761262 39489.3 9240 202761262 0 2017-10-16 02:06:19
203639777 39661.3 9280 203639777 0 2017-10-16 02:07:00
204518292 39833.3 9320 204518292 0 2017-10-16 02:07:41
205396807 40005.3 9360 205396807 0 2017-10-16 02:08:22
206275322 40177.3 9400 206275322 0 2017-10-16 02:09:03
207153837 40349.3 9440 207153837 0 2017-10-16 02:09:44
208032352 40521.3 9480 208032352 0 2017-10-16 02:10:25
208910867 40693.3 9520 208910867 0 2017-10-16 02:11:06
209789382 40865.3 9560 209789382 0 2017-10-16 02:11:47
210667897 41037.3 9600 210667897 0 2017-10-16 02:12:28
211546412 41209.3 9640 211546412 0 2017-10-16 02:13:09
212424927 41381.3 9680 212424927 0 2017-10-16 02:13:50
213303442 41553.3 9720 213303442 0 2017-10-16 02:14:31
214181957 41725.3 9760 214181957 0 2017-10-16 02:15:12
215060472 41897.3 9800 215060472 0 2017-10-16 02:15:53
215938987 42069.3 9840 215938987 0 2017-10-16 02:16:34
216817502 42241.3 9880 216817502 0 2017-10-16 02:17:15
217696017 42413.3 9920 217696017 0 2017-10-16 02:17:56
218574532 42585.3 9960 218574532 0 2017-10-16 02:18:37
219453047 42757.3 10000 219453047 0 2017-10-16 02:19:18
220331562 42929.3 10040 220331562 0 2017-10-16 02:19:59
221210077 43101.3 10080 221210077 0 2017-10-16 02:20:40
222088592 43273.3 10120 222088592 0 2017-10-16 02:21:20
222967107 43445.3 10160 222967107 0 2017-10-16 02:22:01
223845622 43617.3 10200 223845622 0 2017-10-16 02:22:42
224724137 43789.3 10240 224724137 0 2017-10-16 02:23:23
225602652 43961.3 10280 225602652 0 2017-10-16 02:24:04
226481167 44133.3 10320 226481167 0 2017-10-16 02:24:45
227359682 44305.3 10360 227359682 0 2017-10-16 02:25:26
228238197 44477.3 10400 228238197 0 2017-10-16 02:26:07
229116712 44649.3 10440 229116712 0 2017-10-16 02:26:48
229995227 44821.3 10480 229995227 0 2017-10-16 02:27:29
230873742 44993.3 10520 230873742 0 2017-10-16 02:28:10
231752257 45165.3 10560 231752257 0 2017-10-16 02:28:51
232630772 45337.3 10600 232630772 0 2017-10-16 02:29:32
233509287 45509.3 10640 233509287 0 2017-10-16 02:30:13
234387802 45681.3 10680 234387802 0 2017-10-16 02:30:54
235266317 45853.3 10720 235266317 0 2017-10-16 02:31:35
236144832 46025.3 10760 236144832 0 2017-10-16 02:32:16
237023347 46197.3 10800 237023347 0 2017-10-16 02:32:57
237901862 46369.3 10840 237901862 0 2017-10-16 02:33:38
238780377 46541.3 10880 238780377 0 2017-10-16 02:34:19
239658892 46713.3 10920 239658892 0 2017-10-16 02:35:00
240537407 46885.3 10960 240537407 0 2017-10-16 02:35:41
241415922 47057.3 11000 241415922 0 2017-10-16 02:36:22
242294437 47229.3 11040 242294437 0 2017-10-16 02:37:03
243172952 47401.3 11080 243172952 0 2017-10-16 02:37:44
244051467 47573.3 11120 244051467 0 2017-10-16 02:38:25
244929982 47745.3 11160 244929982 0 2017-10-16 02:39:06
245808497 47917.3 11200 245808497 0 2017-10-16 02:39:47
246687012 48089.3 11240 246687012 0 2017-10-16 02:40:29
247565527 48261.3 11280 247565527 0 2017-10-16 02:41:10
248444042 48433.3 11320 248444042 0 2017-10-16 02:41:51
249322557 48605.3 11360 249322557 0 2017-10-16 02:42:32
250201072 48777.3 11400 250201072 0 2017-10-16 02:43:13
251079587 48949.3 11440 251079587 0 2017-10-16 02:43:54
251958102 49121.3 11480 251958102 0 2017-10-16 02:44:35
252836617 49293.3 11520 252836617 0 2017-10-16 02:45:16
253715132 49465.3 11560 253715132 0 2017-10-16 02:45:58
254593647 49637.3 11600 254593647 0 2017-10-16 02:46:39
255472162 49809.3 11640 255472162 0 2017-10-16 02:47:20
256350677 49981.3 11680 256350677 0 2017-10-16 02:48:01
257229192 50153.3 11720 257229192 0 2017-10-16 02:48:43
258107707 50325.3 11760 258107707 0 2017-10-16 02:49:24
258986222 50497.3 11800 258986222 0 2017-10-16 02:50:05
259864737 50669.3 11840 259864737 0 2017-10-16 02:50:46
260743252 50841.3 11880 260743252 0 2017-10-16 02:51:27
261621767 51013.3 11920 261621767 0 2017-10-16 02:52:08
262500282 51185.3 11960 262500282 0 2017-10-16 02:52:49
263378797 51357.3 12000 263378797 0 2017-10-16 02:53:30
264257312 51529.3 12040 264257312 0 2017-10-16 02:54:12
265135827 51701.3 12080 265135827 0 2017-10-16 02:54:53
266014342 51873.3 12120 266014342 0 2017-10-16 02:55:34
266892857 52045.3 12160 266892857 0 2017-10-16 02:56:15
267771372 52217.3 12200 267771372 0 2017-10-16 02:56:56
268649887 52389.3 12240 268649887 0 2017-10-16 02:57:37
269528402 52561.3 12280 269528402 0 2017-10-16 02:58:18
270406917 52733.3 12320 270406917 0 2017-10-16 02:58:59
271285432 52905.3 12360 271285432 0 2017-10-16 02:59:40
272163947 53077.3 12400 272163947 0 2017-10-16 03:00:21
273042462 53249.3 12440 273042462 0 2017-10-16 03:01:02
273920977 53421.3 12480 273920977 0 2017-10-16 03:01:43
274799492 53593.3 12520 274799492 0 2017-10-16 03:02:25
275678007 53765.3 12560 275678007 0 2017-10-16 03:03:06
276556522 53937.3 12600 276556522 0 2017-10-16 03:03:47
277435037 54109.3 12640 277435037 0 2017-10-16 03:04:28
278313552 54281.3 12680 278313552 0 2017-10-16 03:05:09
279192067 54453.3 12720 279192067 0 2017-10-16 03:05:51
280070582 54625.3 12760 280070582 0 2017-10-16 03:06:32
280949097 54797.3 12800 280949097 0 2017-10-16 03:07:13
281827612 54969.3 12840 281827612 0 2017-10-16 03:07:54
282706127 55141.3 12880 282706127 0 2017-10-16 03:08:35
283584642 55313.3 12920 283584642 0 2017-10-16 03:09:16
284463157 55485.3 12960 284463157 0 2017-10-16 03:09:57
285341672 55657.3 13000 285341672 0 2017-10-16 03:10:38
286220187 55829.3 13040 286220187 0 2017-10-16 03:11:19
287098702 56001.3 13080 287098702 0 2017-10-16 03:12:00
287977217 56173.3 13120 287977217 0 2017-10-16 03:12:42
288855732 56345.3 13160 288855732 0 2017-10-16 03:13:23
289734247 56517.3 13200 289734247 0 2017-10-16 03:14:04
290612762 56689.3 13240 290612762 0 2017-10-16 03:14:45
291491277 56861.3 13280 291491277 0 2017-10-16 03:15:27
292369792 57033.3 13320 292369792 0 2017-10-16 03:16:08
293248307 57205.3 13360 293248307 0 2017-10-16 03:16:49
294126822 57377.3 13400 294126822 0 2017-10-16 03:17:30
295005337 57549.3 13440 295005337 0 2017-10-16 03:18:12
295883852 57721.3 13480 295883852 0 2017-10-16 03:18:53
296762367 57893.3 13520 296762367 0 2017-10-16 03:19:34
297640882 58065.3 13560 297640882 0 2017-10-16 03:20:16
298519397 58237.3 13600 298519397 0 2017-10-16 03:20:58
299397912 58409.3 13640 299397912 0 2017-10-16 03:21:39
300276427 58581.3 13680 300276427 0 2017-10-16 03:22:20
301154942 58753.3 13720 301154942 0 2017-10-16 03:23:01
302033457 58925.3 13760 302033457 0 2017-10-16 03:23:42
302911972 59097.3 13800 302911972 0 2017-10-16 03:24:23
303790487 59269.3 13840 303790487 0 2017-10-16 03:25:04
304669002 59441.3 13880 304669002 0 2017-10-16 03:25:45
305547517 59613.3 13920 305547517 0 2017-10-16 03:26:27
306426032 59785.3 13960 306426032 0 2017-10-16 03:27:08
307304547 59957.3 14000 307304547 0 2017-10-16 03:27:49
308183062 60129.3 14040 308183062 0 2017-10-16 03:28:30
309061577 60301.3 14080 309061577 0 2017-10-16 03:29:11
309940092 60473.3 14120 309940092 0 2017-10-16 03:29:53
310818607 60645.3 14160 310818607 0 2017-10-16 03:30:34
311697122 60817.3 14200 311697122 0 2017-10-16 03:31:15
312575637 60989.3 14240 312575637 0 2017-10-16 03:31:57
313454152 61161.3 14280 313454152 0 2017-10-16 03:32:37
314332667 61333.3 14320 314332667 0 2017-10-16 03:33:19
315211182 61505.3 14360 315211182 0 2017-10-16 03:34:00
316089697 61677.3 14400 316089697 0 2017-10-16 03:34:42
316968212 61849.3 14440 316968212 0 2017-10-16 03:35:23
317846727 62021.3 14480 317846727 0 2017-10-16 03:36:04
318725242 62193.3 14520 318725242 0 2017-10-16 03:36:46
319603757 62365.3 14560 319603757 0 2017-10-16 03:37:27
320482272 62537.3 14600 320482272 0 2017-10-16 03:38:09
321360787 62709.3 14640 321360787 0 2017-10-16 03:38:49
322239302 62881.3 14680 322239302 0 2017-10-16 03:39:30
323117817 63053.3 14720 323117817 0 2017-10-16 03:40:12
323996332 63225.3 14760 323996332 0 2017-10-16 03:40:53
324874847 63397.3 14800 324874847 0 2017-10-16 03:41:35
325753362 63569.3 14840 325753362 0 2017-10-16 03:42:16
326631877 63741.3 14880 326631877 0 2017-10-16 03:42:57
327510392 63913.3 14920 327510392 0 2017-10-16 03:43:38
328388907 64085.3 14960 328388907 0 2017-10-16 03:44:19
329267422 64257.3 15000 329267422 0 2017-10-16 03:45:00
330145937 64429.3 15040 330145937 0 2017-10-16 03:45:41
331024452 64601.3 15080 331024452 0 2017-10-16 03:46:22
331902967 64773.3 15120 331902967 0 2017-10-16 03:47:03
332781482 64945.3 15160 332781482 0 2017-10-16 03:47:44
333659997 65117.3 15200 333659997 0 2017-10-16 03:48:25
334538512 65289.3 15240 334538512 0 2017-10-16 03:49:07
335417027 65461.3 15280 335417027 0 2017-10-16 03:49:48
336295542 65633.3 15320 336295542 0 2017-10-16 03:50:29
337174057 65805.3 15360 337174057 0 2017-10-16 03:51:10
337174057 65805.3 15360 337174057 0 2017-10-16 03:51:10

View File

@ -0,0 +1 @@
wamdm101-208-10-0-0

View File

@ -0,0 +1,19 @@
wamdm101-208-10-0-0
wamdm102-208-26-0-4096
wamdm103-208-42-0-8192
wamdm104-208-58-0-12288
wamdm105-224-10-4096-0
wamdm106-224-26-4096-4096
wamdm107-224-42-4096-8192
wamdm108-224-58-4096-12288
wamdm109-240-10-8192-0
wamdm110-240-26-8192-4096
wamdm111-240-42-8192-8192
wamdm112-240-58-8192-12288
wamdm113-256-10-12288-0
wamdm114-256-26-12288-4096
wamdm115-256-42-12288-8192
wamdm116-256-58-12288-12288
wamdm117-272-10-16384-0
wamdm118-272-26-16384-4096
wamdm119-272-42-16384-8192

View File

@ -0,0 +1,90 @@
nc: Address already in use
nc: Address already in use
nc: Address already in use
nc: Address already in use
nc: Address already in use
nc: Address already in use
nc: Address already in use
./listenStop.sh: line 3: 30733 Terminated nc -lk 1986 >> listen_stop.txt
./listenStop.sh: line 3: 22443 Terminated nc -lk 1986 >> listen_stop.txt
./listenStop.sh: line 3: 27811 Terminated nc -lk 1986 >> listen_stop.txt
nc: Address already in use
./listenSlaves.sh: line 4: 18444 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 19255 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 11680 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 7313 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 2893 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 30784 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 25889 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 4810 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 29016 Terminated nc -lk 1984 >> listen_cache.txt
./listenSlaves.sh: line 4: 18146 Terminated nc -lk 1984 >> listen_cache.txt
Traceback (most recent call last):
File "./listenSlaves.py", line 37, in <module>
HOST, PORT = sys.argv[1], int(sys.argv[2])
IndexError: list index out of range
Traceback (most recent call last):
File "./listenSlaves.py", line 37, in <module>
HOST, PORT = sys.argv[1], int(sys.argv[2])
IndexError: list index out of range
Traceback (most recent call last):
File "./listenSlaves.py", line 45, in <module>
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
File "/usr/lib/python2.7/SocketServer.py", line 419, in __init__
self.server_bind()
File "/usr/lib/python2.7/SocketServer.py", line 430, in server_bind
self.socket.bind(self.server_address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
socket.error: [Errno 98] Address already in use
Traceback (most recent call last):
File "./listenSlaves.py", line 45, in <module>
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
File "/usr/lib/python2.7/SocketServer.py", line 419, in __init__
self.server_bind()
File "/usr/lib/python2.7/SocketServer.py", line 430, in server_bind
self.socket.bind(self.server_address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
socket.error: [Errno 98] Address already in use
Traceback (most recent call last):
File "./listenSlaves.py", line 45, in <module>
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
File "/usr/lib/python2.7/SocketServer.py", line 419, in __init__
self.server_bind()
File "/usr/lib/python2.7/SocketServer.py", line 430, in server_bind
self.socket.bind(self.server_address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
socket.error: [Errno 98] Address already in use
Traceback (most recent call last):
File "./listenSlaves.py", line 45, in <module>
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
File "/usr/lib/python2.7/SocketServer.py", line 419, in __init__
self.server_bind()
File "/usr/lib/python2.7/SocketServer.py", line 430, in server_bind
self.socket.bind(self.server_address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
socket.error: [Errno 98] Address already in use
Traceback (most recent call last):
File "./listenSlaves.py", line 45, in <module>
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
File "/usr/lib/python2.7/SocketServer.py", line 419, in __init__
self.server_bind()
File "/usr/lib/python2.7/SocketServer.py", line 430, in server_bind
self.socket.bind(self.server_address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
socket.error: [Errno 98] Address already in use
Traceback (most recent call last):
File "./listenSlaves.py", line 45, in <module>
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
File "/usr/lib/python2.7/SocketServer.py", line 419, in __init__
self.server_bind()
File "/usr/lib/python2.7/SocketServer.py", line 430, in server_bind
self.socket.bind(self.server_address)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
socket.error: [Errno 98] Address already in use
./killSend.sh: line 11: kill: (24331) - No such process

View File

@ -0,0 +1,33 @@
#!/bin/bash
#paralleling the orders cannot write in sumLineAndDataSize.sh, becuase "wait" order will wait all child process of this father process. sumLineAndDataSize.sh has created some chlid processes to cause wait order to not exit
abhost=$1
order=$2
aborder=$3
username=$4
# in nodehostname, each line is hostname-ra0-dec0
for line in `cat nodehostname`
do
ccdno=$(($ccdno+1))
eval $(echo $line | awk -F "-" '{print "host="$1";ra0="$2";dec0="$3";xi="$4";yi="$5}')
{
sen=""
if [ "$host" == "$abhost" ]
then
orderSen=$aborder
else
orderSen=$order
fi
for word in `echo $orderSen | awk -v co=$ccdno -v ra=$ra0 -v dec=$dec0 -v x=$xi -v y=$yi '{for(i=1;i<=NF;++i) if($i=="ccdno") print co; else if($i=="ra0") print ra; else if($i=="dec0") print dec; else if($i=="xi") print x; else if($i=="yi") print y; else print $i}'`
do
sen="$sen $word"
done
#echo "$username@$host \"$sen\""
ssh -f -n -t $username@$host "$sen"
echo "Simulated data generator is running on $host."
}& #& means mutil-process
done
wait
#

View File

@ -0,0 +1,38 @@
#!/bin/bash
recoverRedisSleepTime=$1
username="wamdm"
while true
do
ip=`ps ax | grep [r]edis-server | awk 'FNR==1{print $6}'`
#echo $ip
if [ "$ip" == "" ]
then
for host in `awk -F "-" '{print $1}' nodehostname`
do
ip=`ssh $username@$host "ps ax | grep [r]edis-server | awk 'FNR==1{print $"6"}'"`
if [ "$ip" != "" ]
then
break
fi
done
if [ "$ip" == "" ]
then
echo "there is no avilible ip"
exit 0
fi
fi
cd $HOME/redis-3.2.11
ipset=`redis-cluster-tool/redis-cluster-tool -a $ip -C "cluster_cluster_info 11" | awk 'BEGIN{i=1} {split($1,a,"[");split(a[2],b,"]"); err[i]=$3;ms[i]=a[1];ip[i++]=b[1]} END{ for(j=1;j<i;j++) if(err[j]=="error") {if(ms[j]=="master" && ms[j+1]=="" || ms[j]=="slave") {printf("%s\n",ip[j])} else print "" }}'`
cd utils/create-cluster
for LINE in `echo $ipset`
do
#echo $LINE
./create-cluster renode $LINE $ip
echo $LINE has been restarted
done
sleep $recoverRedisSleepTime
cd ../..
done

View File

@ -0,0 +1,4 @@
#!/bin/bash
hostname=`hostname`
free -k | awk 'FNR==3{print $3}' >> ${hostname}_memUsage.txt

View File

@ -0,0 +1,43 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
import socket
import threading
import SocketServer
import sys
def client(ip, port, message):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
# try:
# print "Send: {}".format(message)
sock.sendall(message)
# response = sock.recv(1024)
# jresp = json.loads(response)
# print "Recv: ",jresp
# finally:
sock.close()
if __name__ == "__main__":
# Port 0 means to select an arbitrary unused port
HOST, PORT = sys.argv[1], int(sys.argv[2])
if len(sys.argv)<4:
msg1=sys.stdin.read()
else:
msg1 = sys.argv[3]
msg1=msg1+'\n'
# print msg1
# msg2 = [{'src':"ln", 'dst':"lndst"}]
# msg3 = [{'src':"xj", 'dst':"xjdst"}]
#jmsg1 = json.dumps(msg1)
#jmsg2 = json.dumps(msg2)
#jmsg3 = json.dumps(msg3)
client(HOST, PORT, msg1)
#client(HOST, PORT, jmsg2)
#client(HOST, PORT, jmsg3)

View File

@ -0,0 +1,166 @@
#!/bin/bash
#genFileNum=$1
masterhost=$1
ccdno=$2
bernoulliParam=$3
geometricParam=$4
redisIPBackup=$5
templateTable=$6
sourceFilePath=$7
threadNumber=$8
getTemplateFlag=$9
redisWriteMode=${10}
blockNum=${11}
ra0=${12}
dec0=${13}
xi=${14}
yi=${15}
#sleeptime=15
post=1984
stoppost=1986
tmp=""
hostname=$(hostname)
maxLogSize=10485760
maxTimeOut=2
#redisIP=`ps -ef | grep [r]edis-server | awk BEGIN{i=0}'{a[i++]=$9} END{srand();j=int(rand()*100%NR);print a[j]}'`
echo "exit" | nc -q 0 $hostname $stoppost #Terminate the previous killSend.sh
nohup ./killSend.sh $$ $masterhost &
#for i in $(seq 1 $genFileNum)
#do
#t1=$((time -p (python ../gwac_dbgen/pipeline.py $ccdno >dtmp)) 2>&1 | head -1 |awk '{print $2}')
#eval $(python ../gwac_dbgen/pipeline.py $ccdno | awk -F '{a=$1} END{print "tmp="a}')
#backup the original star table
#nohup scp -r ../catalog.csv/* ../data_backup &
###############################+ test template data to put in redis without cross validation.
#dt=`date "+%Y-%m-%d %H:%M:%S"` #annotate pipelinedata=`python ../gwac_dbgen/pipeline.py $ccdno $abnum`and the catalog.csv/$sourceFile and template_table/template are the same.
#pipelinedata="0 0 $dt"
##############################+
resultNum=0
abStarNum=0
stopinfo="stop $hostname $resultNum $abStarNum"
if [ "$getTemplateFlag" == "gt" ]
then
pipelinedata=`python ../gwac_dbgen/pipeline.py $ccdno $ra0 $dec0 $xi $yi 0`
./genTemplateTable.sh
./send.py $masterhost $post "$stopinfo"
exit 0
fi
##########jedis client
#sourceFile=$sourceFilePath/$(ls ../$sourceFilePath)
#/home/wamdm/jdk1.7.0_79/bin/java -jar ../JedisTest.jar ../$sourceFile 5 1 >>1.txt 2>&1
#redisMonitorData="1 1 1"
##########jedis client
if [ "$getTemplateFlag" == "first" ]
then
redisIP=`ps -ef | grep [r]edis-server | awk 'FNR==1{print $9}'`
if [ "$redisIP" == "" ]
then
redisIP=$redisIPBackup
fi
eval $(echo $redisIP | awk -F ":" '{i=$1;p=$2} END{print "ip="i";port="p}')
rm -rf /tmp/slave.pipe
mkfifo /tmp/slave.pipe
pgrep "astroDB_cache" | while read Spid
do
kill $Spid
done
rm -rf /tmp/Squirrel_pipe_test # delete Squirrel pipe file to prevent unknown reason to block write to Squirrel_pipe_test at the first time (echo "$hostname $pipelinedata ../$sourceFile" > /tmp/Squirrel_pipe_test)
#rm -rf ${hostname}_memUsage.txt # delete the last the file of memory usage
templateTable="$templateTable/$(ls ../$templateTable)"
paramStr="-times 1 -redisHost $redisIP -method plane -grid 4,4 -errorRadius 1.5 -searchRadius 50 -ref ../$templateTable -threadNumber $threadNumber -width 3016 -height 3016 -terminal -writeRedisMode $redisWriteMode -xi $xi -yi $yi"
if [ "$blockNum" != "-1" ]
then
paramStr="$paramStr -blockNum $blockNum"
fi
nohup ../astroDB_cache/astroDB_cache $paramStr &
getTemplateFlag="next"
fi
while true
do
if [ "$getTemplateFlag" == "slave_exit" ]
then
exit 0
fi
pipelinedata=`python ../gwac_dbgen/pipeline.py $ccdno $ra0 $dec0 $xi $yi 0`
sourceFile=$sourceFilePath/$(ls ../$sourceFilePath)
#if astroDB_cache is launch_failed, this shell will be blocked here
echo "$hostname $pipelinedata $HOME/gwac/$sourceFile $bernoulliParam $geometricParam" > /tmp/Squirrel_pipe_test
sumresult=""
for((timeout=1;timeout<=$maxTimeOut;timeout++))
do
sleep 0.5
result=`$HOME/redis-3.2.11/src/redis-cli -h $ip -p $port -c lpop $hostname`
if [ "$result" == "" ]
then
continue
fi
echo "$result" | ./send.py $masterhost $post
sumresult="$sumresult\n$result"
eval $(echo "$result" | awk -v abs=$abStarNum '{abs+=$NF} END{print "abStarNum="abs}')
resultNum=$(($resultNum+1))
# if [ $resultNum -eq 1 ] # break until receiving one result, but need a large maxTimeOut. E.g., maxTimeOut=10000000
# then
# break
# fi
done
#./sampleMemUsage.sh # sample the usage amount of memory
stopinfo="stop $hostname $resultNum $abStarNum"
sumresult=`echo $sumresult | cut -c 3-`
if [ "$sumresult" == "" ]
then
stopinfo="$stopinfo timeout"
else
#### log
newLogName=`ls logs/ | grep "slave" | tail -1`
if [ "$newLogName" == "" ]
then
newLogName="slave_${hostname}_1_`date "+%Y-%m-%d-%H:%M:%S"`"
echo -e "PipelineLine Pipelinesize PipelineDate RedisTime(s) RedisStorageLine CrossCertifiedErrorLine\n0 0 0 0 0 0" > logs/$newLogName
fi
logSize=`ls -lk logs/$newLogName 2>/dev/null | awk '{print $5}'`
if [ $logSize -le $maxLogSize ]
then
echo -e $sumresult >>logs/$newLogName
else
newLogNum=$(echo $newLogName | awk -F "_" '{print $3}')
newLogNum=$(($newLogNum+1))
newLogName="slave_${hostname}_${newLogNum}_`date "+%Y-%m-%d-%H:%M:%S"`"
echo -e "PipelineLine Pipelinesize PipelineDate RedisTime(s) RedisStorageLine CrossCertifiedErrorLine\n$sumresult" > logs/$newLogName
fi
####log
fi
#echo $(hostname) >>dtmp
#t2=$((time -p (cat dtmp | nc -q 0 $masterhost $post)) 2>&1 | head -1 |awk '{print $2}')
#timeleft=`echo "$sleeptime-($t1+$t2)"| bc`
#sleep $timeleft
#done
./send.py $masterhost $post "$stopinfo"
#pkill killSend.sh >/dev/null 2>&1 # easily cause redis-server terminates unexpectedly
#lsof -Pnl +M -i4 | grep $stoppost | awk '{print $2}' | xargs kill >/dev/null 2>&1
#rm -rf dtmp
resultNum=0
abStarNum=0
eval $(cat < /tmp/slave.pipe | awk '{bp=$1;gp=$2;gt=$3;ra=$4;dec=$5;x=$6;y=$7} END{print "bernoulliParam="bp";geometricParam="gp";getTemplateFlag="gt";ra0="ra";dec0="dec";xi="x";yi="y}')
done

View File

@ -0,0 +1,60 @@
#!/bin/bash
masterhost="wamdm100"
stoppostMC=1987
hostname=$(hostname)
lsof -Pnl +M -i4 | grep $stoppostMC | awk '{print $2}' | xargs kill >/dev/null 2>&1
if [ "$1" == "force" ]
then
nohup >/dev/null 2>&1 ./listenStop.py $hostname $stoppostMC 2>&1 &
#pid=`lsof -Pnl +M -i4 | grep $stopPost | awk '{print $2}'`
#if [ "$pid" == "" ]
#then
# echo "run \" nohup ./listenStop.sh & \" first."
# exit 0
#fi
sleep 0.2
echo "force $hostname" | nc -q 0 $masterhost 1984
:>listen_stop.txt
#./send.py $masterhost 1984 "stop! $hostname"
#nohup nc -lk $stopPost >>listen_stop.txt 2>&1 &
#./printStopInfo.sh
#./listenStop.sh &
tmp=""
i=1
until [ "$tmp" == "finished" ]
do
tmp=`sed -n ${i}p listen_stop.txt`
#echo $i
#sleep 1
if [ "$tmp" == "" ]
then
continue
fi
echo $tmp
i=$(($i+1))
done
#echo $tmp
pid=`lsof -Pnl +M -i4 | grep $stoppostMC | awk '{print $2}'`
while [ "$pid" != "" ]
do
#echo "!!!"
kill $pid
wait $pid 2>/dev/null #suppress Terminated message
pid=`lsof -Pnl +M -i4 | grep $stoppostMC | awk '{print $2}'`
done
exit 0
fi
if [ "$1" == "stop!" ]
then
echo "stop! $hostname" | nc -q 0 $masterhost 1984
echo "waiting..."
tmp=`nc -lp $stoppostMC`
echo $tmp
exit 0
fi
echo "Usage: $0 [stop! | force ]"
echo "stop! -- Stop cluster next time in a normal way."
echo "force -- Forced to stop cluster now.."

View File

@ -0,0 +1,15 @@
#!/bin/bash
HDFSPath="AstroDB_simulation_original_data"
dataBackupPath="data_backup"
username="wamdm"
echo -e "\nBackup last simulation data into HDFS... \n"
for host in `awk -F "-" '{print $1}' nodehostname`
do
{
ssh $username@$host "source /etc/profile; cd gwac; hadoop fs -put $dataBackupPath/* /$HDFSPath; rm -rf $dataBackupPath/* " #"source" loads the environment value of a remote node.
echo "$host has finished the data backup."
}&
done
wait

View File

@ -0,0 +1,289 @@
#!/bin/bash
sumLine=0
sumDZ=0
itertime=0
storageLine=0
ccLine=0
datestamp=""
timestamp=""
sleeptime=8 # assigned by nextsleeptime or firstsleeptime
nextsleeptime=8 # the rest are the normal sleep time
firstsleeptime=30 #the first time running needs to prepare for long time
username="wamdm"
mastername="wamdm100"
nodenum=`cat nodehostname | wc -l`
recoverRedisSleepTime=10
gennum=$1 #the total number of images per ccd
sumpost=1984
stoppost=1986 # master and slave
printpost=1985
stoppostMC=1987 #master and client
icRowNum=1
abhost="null" #abnormal star node,
#abnormal star bernoulli Param.
#control the abnorm is occuring or not. bernoulliParam [0,1] is larger,
#the abnorm will occur more easily.
bernoulliDefault="0.1"
geometricDefault="0.995"
bernoulliParam="0"
geometricParam="0"
#geometric Param control the number of the abnormal number
#control the number of abnormal stars if the abnorm is occuring.
#if geometricParam is larger in abnorm phase, the abnormal stars will be more.
aborder="" #the order which the abnormal node need to run
ordre="" #the order which the normal node need to run
redisWriteMode="perBlockAsaString"
blockNum=10000 # use in "perBlockAsaString" , "-1" represents the current parameter is invalid
redisIP="10.0.82.111:7001"
maxLogSize=10485760 #B, 10MB
maxCacheSize=10485760 #B, 1MB
threadNumber=20 #threads write to redis
writeLogRound=5 #write master log per 5 * $sleeptime (s)
stopFlag="" #used for detecting "stop!" order
stopClient="" #used for detecting "stop!" order
#getTemplateFlag could be "first" :the first run flag to lanuch astroDB_cache
#"gt" means the template table gen phase
getTemplateFlag="first"
if [ -n "$2" ] ;then
getTemplateFlag="gt" #genTemplateTable
fi
templateTable="template_table" #gwac/ is root
sourceFilePath="catalog.csv" #gwac/ is root
newLogName=$(ls logs/ | grep "master" | tail -1)
#pid=`lsof -Pnl +M -i4 | grep $sumpost | awk '{ print $2}'`
cRowNum=1 #the current listen_cache.txt row line
#echo $pid
if [ ! -n "$1" ] ;then
echo "input the total number of images per ccd."
exit 0
fi
#elif [ "$pid" == "" ]
#then
# echo "run \" nohup ./listenSlaves.sh & \" first."
# exit 0
#fi
if [ "$getTemplateFlag" == "first" ]
then
if [ "$newLogName" == "" ]
then
newLogName="master_${mastername}_1_`date "+%Y-%m-%d-%H:%M:%S"`"
echo -e "TotalLine TotalSize(MB) TotalTime(s) StorageLine CrossCertifiedErrorLine Timestamp\n0 0 0 0 0 0 0" > logs/$newLogName
fi
# put the recoverRedisNode in the background to recover the redis server.
nohup >/dev/null 2>&1 ./recoverRedisNode.sh $recoverRedisSleepTime 2>&1 &
eval $(tail -1 logs/$newLogName | awk '{sl=$1;sd=$2;it=$3;stl=$4;ccl=$5;dt=$6;time=$7} END{print "sumLine="sl";sumDZ="sd";itertime="it";storageLine="stl";ccLine="ccl";datestamp="dt";timestamp="time}')
fi
:>listen_cache.txt
lsof -Pnl +M -i4 | grep $sumpost | awk '{print $2}' | xargs kill >/dev/null 2>&1
nohup >/dev/null 2>&1 ./listenSlaves.py $mastername $sumpost 2>&1 &
for((gentime=1;gentime<=$gennum;gentime++))
do
ccdno=0
nodesta=0
current=`date "+%Y-%m-%d %H:%M:%S"`
timeStamp=`date -d "$current" +%s`
nsec=`date "+%N"`
currentTimeStamp1=$(echo "$timeStamp*1000.0+$nsec/1000000.0"|bc)
#echo $currentTimeStamp1
#abhost="wamdm81"
if [ "$getTemplateFlag" == "gt" -o "$getTemplateFlag" == "first" ]
then
order="cd gwac/gwac_dbgen_cluster/; nohup ./sendStaDataPerNode.sh $mastername ccdno $bernoulliDefault $geometricDefault $redisIP $templateTable $sourceFilePath $threadNumber $getTemplateFlag $redisWriteMode $blockNum ra0 dec0 xi yi >/dev/null 2>&1 &"
aborder="cd gwac/gwac_dbgen_cluster/; nohup ./sendStaDataPerNode.sh $mastername ccdno $bernoulliParam $geometricParam $redisIP $templateTable $sourceFilePath $threadNumber $getTemplateFlag $redisWriteMode $blockNum ra0 dec0 xi yi >/dev/null 2>&1 &"
if [ "$getTemplateFlag" == "first" ]
then
sleeptime=$firstsleeptime
getTemplateFlag="next"
fi
else
sleeptime=$nextsleeptime
order="echo $bernoulliDefault $geometricDefault $getTemplateFlag ra0 dec0 xi yi >/tmp/slave.pipe"
aborder="echo $bernoulliParam $geometricParam $getTemplateFlag ra0 dec0 xi yi >/tmp/slave.pipe"
fi
# abhost cannot be "", or else the shell will skip this parameter
./parallelSendOrder.sh $abhost "$order" "$aborder" "$username"
#sleep 1000
gentimecopy=$gentime
echo "finished! ($gentime times)"
abhost="null"
bernoulliParam="0"
geometricParam="0"
while true
do
tmp=`sed -n ${cRowNum}p listen_cache.txt`
if [ "$tmp" == "" ]
then
#clean listen_cache to reduce its size
cacheSize=`ls -lk listen_cache.txt 2>/dev/null | awk '{print $5}'`
if [ $cacheSize -gt $maxCacheSize ]
then
:>listen_cache.txt
cRowNum=1
fi
#clean listen_cache to reduce its size
continue
fi
cRowNum=$(($cRowNum+1))
pretmp=`echo $tmp | awk '{print $1}'`
suftmp=`echo $tmp | awk '{print substr($0,length($1)+2)}'`
#echo $tmp
if [ "$pretmp" == "stop" ]
then
nodesta=$(($nodesta+1))
eval $(echo $suftmp | awk '{h=$1;rn=$2;abs=$3;t=$4} END{print "host="h";resultNum="rn";abStarNum="$3";timeout="t}')
#echo $suftmp $resultNum
if [ "$timeout" == "" ]
then
if [ $abStarNum -eq 0 ]
then
echo "dataGen on $host has stopped, including $resultNum results ($nodesta/$nodenum nodes)."
else
echo -e "dataGen on $host has stopped, including $resultNum results and \033[33;1m$abStarNum\033[0m abnormal stars ($nodesta/$nodenum nodes)."
fi
else
if [ $abStarNum -eq 0 ]
then
echo -e "\033[31;1mdataGen on $host is $timeout, including $resultNum results ($nodesta/$nodenum nodes).\033[0m"
else
echo -e "\033[31;1mdataGen on $host is $timeout, including $resultNum results and \033[33;1m$abStarNum\033[0m abnormal stars ($nodesta/$nodenum nodes).\033[0m"
fi
fi
if [ $nodesta -eq $nodenum ]
then
#echo "all nodes have finished data generation ($i time)."
current=`date "+%Y-%m-%d %H:%M:%S"`
timeStamp=`date -d "$current" +%s`
nsec=`date "+%N"`
currentTimeStamp2=$(echo "$timeStamp*1000.0+$nsec/1000000.0"|bc)
interval=$(echo "$sleeptime-($currentTimeStamp2-$currentTimeStamp1)/1000.0" | bc)
echo "all nodes have finished data generation ($gentimecopy times), and sleep $interval(s)."
sleep $interval
#lsof -Pnl +M -i4 | grep $sumpost | awk '{print $2}' | xargs kill
#pkill listenSlaves.sh
break
fi
continue
elif [ "$pretmp" == "force" ]
then
echo -e "\033[33;1mStop cluster now!\033[0m"
for line1 in `awk -F "-" '{print $1}' nodehostname`
do
echo "force" | nc -q 0 $line1 $stoppost #master and slave
info=`nc -lp $stoppost` # master and slave
echo "$info"
echo "$info" | ./send.py $suftmp $stoppostMC #master and client
done
echo "finished" | ./send.py $suftmp $stoppostMC #master and client
lsof -Pnl +M -i4 | grep $sumpost | awk '{print $2}' | xargs kill
ps ax | grep [.]/recoverRedisNode.sh | awk '{print $1}' | while read rpid
do
kill $rpid
done
itertime=$(($itertime+($gentimecopy-1)*$sleeptime))
if [ "$getTemplateFlag" != "gt" ]
then
#### log
logSize=`ls -lk logs/$newLogName 2>/dev/null | awk '{print $5}'`
if [ $logSize -le $maxLogSize ]
then
echo $sumLine $sumDZ $itertime $storageLine $ccLine $datestamp $timestamp >>logs/$newLogName
exit 0
fi
newLogNum=$(echo $newLogName | awk -F "_" '{print $3}')
newLogNum=$(($newLogNum+1))
newLogName="master_${mastername}_${newLogNum}_`date "+%Y-%m-%d-%H:%M:%S"`"
echo -e "TotalLine TotalSize(MB) TotalTime(s) StorageLine CrossCertifiedErrorLine Timestamp\n$sumLine $sumDZ $itertime $storageLine $ccLine $datestamp $timestamp" > logs/$newLogName
####log
fi
elif [ "$pretmp" == "stop!" ]
then
echo -e "\033[33;1mCluster will stop next time in the normal way!\033[0m"
gentime=$gennum
stopFlag="finished!"
stopClient=$suftmp
elif [ "$pretmp" == "print" ]
then
itertimetmp=$(($itertime+($gentimecopy-1)*$sleeptime))
echo $sumLine $sumDZ $datestamp $timestamp | nc -q 0 $suftmp $printpost
elif [ "$pretmp" == "abnormal" ]
then
eval $(echo $suftmp | awk '{h=$1;b=$2;g=$3} END{print "abhost="h";bernoulliParam="b";geometricParam="g}')
echo -e "\033[33;1mset abnormal stars on $abhost next time!\033[0m"
else
eval $(echo $suftmp | awk -v sl=$sumLine -v sd=$sumDZ -v stl=$storageLine -v ccl=$ccLine -v dt=$datestamp -v time=$timestamp '{sl+=$1;sd+=$2;stl+=$6;ccl+=$7;dt=$3;time=$4} END{print "sumLine="sl";sumDZ="sd";storageLine="stl";ccLine="ccl";datestamp="dt";timestamp="time}')
# echo $sumLine $sumDZ $storageLine $ccLine $datestamp $timestamp
fi
done
logRound=`expr $gentimecopy % $writeLogRound`
if [ $logRound -eq 0 ]
then
itertimeRound=$(($itertime+$gentimecopy*$sleeptime))
####log
logSize=`ls -lk logs/$newLogName 2>/dev/null | awk '{print $5}'`
if [ $logSize -le $maxLogSize ]
then
echo $sumLine $sumDZ $itertimeRound $storageLine $ccLine $datestamp $timestamp >>logs/$newLogName
else
newLogNum=$(echo $newLogName | awk -F "_" '{print $3}')
newLogNum=$(($newLogNum+1))
newLogName="master_${mastername}_${newLogNum}_`date "+%Y-%m-%d-%H:%M:%S"`"
echo -e "TotalLine TotalSize(MB) TotalTime(s) StorageLine CrossCertifiedErrorLine Timestamp\n$sumLine $sumDZ $itertimeRound $storageLine $ccLine $datestamp $timestamp" > logs/$newLogName
####log
fi
fi
done 2>/dev/null
if [ "$getTemplateFlag" == "next" ]
then
ps ax | grep [.]/recoverRedisNode.sh | awk '{print $1}' | while read rpid
do
kill $rpid
done
for line in `awk -F "-" '{print $1}' nodehostname` #
do
./send.py $line $stoppost "kill"
done
elif [ "$getTemplateFlag" == "gt" ]
then
for line in `awk -F "-" '{print $1}' nodehostname`
do
./send.py $line $stoppost "exit"
done
fi 2>/dev/null
pid=`lsof -Pnl +M -i4 | grep $sumpost | awk '{print $2}'`
kill $pid
wait $pid 2>/dev/null # do not display "kill" information
itertime=$(($itertime+$gentimecopy*$sleeptime))
#echo $gentimecopy $gentime
if [ "$getTemplateFlag" == "next" ]
then
####log
logSize=`ls -lk logs/$newLogName 2>/dev/null | awk '{print $5}'`
if [ $logSize -le $maxLogSize ]
then
echo $sumLine $sumDZ $itertime $storageLine $ccLine $datestamp $timestamp >>logs/$newLogName
else
newLogNum=$(echo $newLogName | awk -F "_" '{print $3}')
newLogNum=$(($newLogNum+1))
newLogName="master_${mastername}_${newLogNum}_`date "+%Y-%m-%d-%H:%M:%S"`"
echo -e "TotalLine TotalSize(MB) TotalTime(s) StorageLine CrossCertifiedErrorLine Timestamp\n$sumLine $sumDZ $itertime $storageLine $ccLine $datestamp $timestamp" > logs/$newLogName
####log
fi
fi
if [ "$stopFlag" != "" ]
then
echo $stopFlag | nc -q 0 $stopClient $stoppostMC
fi

View File

@ -0,0 +1,21 @@
2017年4月28日
当前版本为gwac_dbgen_cluster 2.0版本
版本特性:
1、更新Squirrel写缓存系统为astroDB_cache, astroDB_cache系统支持4种写模式和两种存储格式两种存储格式为KEY-LIST和KEY-ZSET为了方便查询我们默认使用KEY-ZSET结构4种写redis模式分别为
"perLineWithCache" 每颗星一张逻辑表即对应一条list或一个zset为了缓解网络延迟影响引入在astroDB_cache内部增加本地内存缓存每次仅写部分星的数据到Redis引入问题是查询延迟因为本地缓存数据没有及时写入Redis
"perLineWithNoCache"每颗星一张逻辑表即对应一条list或一个zset每次写全部星的数据到Redis
"starTableAsaString":每个CCD一张逻辑表即对应一条list或一个zset
"perBlockAsaString":通过地理位置xy对星聚簇成blockNum块每一块一张逻辑表即对应一条list或一个zset能有效缓解网络延迟的性能下降并不会引入查询延迟默认支持
2、引入随机异常星生成算法和基于异常星的间隔索引创建算法生成的星不在是单点异常而是阶段异常行为如下
异常星 亮度
abstar1 .......... ............
abstar2 .......... .. ......
异常星的异常亮度按照perLineWithNoCache模式存储。
3、更新gwac_dbgen_cluster的sumLineAndDataSize.sh可并发向多台机器发送运行指令。更新sendStaDataPerNode.sh为常驻进程而不用每次都要ssh重新启动在运行以减少不必要的开销。当sumLineAndDataSize.sh运行完指定次数后sendStaDataPerNode.sh和astroDB_cache将结束。
4、支持不同节点生成不同天区的星表每个ccd星表有自己ra和dec中心为了支持该特性修改nodehostname内部每一行位hostname-ra0-dec0ra0和dec0为每个ccd星表的中心赤经赤尾。
5、当前所有CCD产生的xy坐标都在一个大的xy坐标系下因此根据范围查询时能对多个ccd观测范围查询即搜索范围如果大于一个ccd的观测范围是支持的。