improve db sync and task log bugfix

This commit is contained in:
itsikkes 2016-06-06 11:50:33 +03:00
parent 7a0092c0f4
commit fdc562bbed
4 changed files with 25 additions and 13 deletions

View File

@ -71,7 +71,7 @@ function updateJobs() {
var jobsList = json.objects; var jobsList = json.objects;
for (var i = 0; i < jobsList.length; i++) { for (var i = 0; i < jobsList.length; i++) {
jobsTable.row.add([jobsList[i].id, jobsList[i].creation_time, jobsList[i].type,jobsList[i].execution.state, JSON.stringify(jobsList[i].properties)]); jobsTable.row.add([jobsList[i].id, jobsList[i].creation_time, jobsList[i].type,jobsList[i].state, JSON.stringify(jobsList[i].properties)]);
} }
jobsTable.draw(); jobsTable.draw();

View File

@ -42,8 +42,10 @@ class NetControllerConnector(object):
def set_logger(self, logger): def set_logger(self, logger):
self.log = logger self.log = logger
class NetControllerJob(object): class NetControllerJob(object):
connector_type = NetControllerConnector connector_type = NetControllerConnector
_connector = None _connector = None
_logger = None _logger = None

View File

@ -143,9 +143,12 @@ class JobCreation(restful.Resource):
job.load_job_properties(loaded_job.get("properties")) job.load_job_properties(loaded_job.get("properties"))
if action == "delete": if action == "delete":
if loaded_job.get("execution")["state"] == "pending": if loaded_job.get("state") == "pending":
mongo.db.job.remove({"_id": bson.ObjectId(jobid)}) res = mongo.db.job.remove({"_id": bson.ObjectId(jobid), "state": "pending"})
if res["nModified"] == 1:
return {'status': 'ok'} return {'status': 'ok'}
else:
return {'status': 'error deleting'}
else: else:
return {'status': 'bad state'} return {'status': 'bad state'}
@ -202,19 +205,17 @@ class JobCreation(restful.Resource):
return {'status': 'failed'} return {'status': 'failed'}
else: else:
execution_state = {"taskid": "",
"state" : "pending"}
new_job = { new_job = {
"creation_time": datetime.now(), "creation_time": datetime.now(),
"type": jobtype, "type": jobtype,
"properties": parsed_prop, "properties": parsed_prop,
"execution": execution_state, "taskid": "",
"state" : "pending",
} }
jobid = mongo.db.job.insert(new_job) jobid = mongo.db.job.insert(new_job)
async = tasks_manager.run_task.delay(jobid) async = tasks_manager.run_task.delay(jobid)
execution_state["taskid"] = async.id
mongo.db.job.update({"_id": jobid}, mongo.db.job.update({"_id": jobid},
{"$set": {"execution": execution_state}}) {"$set": {"taskid": async.id}})
return {'status': 'created'} return {'status': 'created'}

View File

@ -33,13 +33,17 @@ class JobExecution(object):
def __init__(self, mongo, jobinfo): def __init__(self, mongo, jobinfo):
self._mongo = mongo self._mongo = mongo
self._jobinfo = jobinfo self._jobinfo = jobinfo
self.update_job_state("processing")
job_class = get_jobclass_by_name(self._jobinfo["type"]) job_class = get_jobclass_by_name(self._jobinfo["type"])
con = job_class.connector_type() con = job_class.connector_type()
refresh_connector_config(self._mongo, con) refresh_connector_config(self._mongo, con)
self._job = job_class(con, self) self._job = job_class(con, self)
self._job.load_job_properties(self._jobinfo["properties"]) self._job.load_job_properties(self._jobinfo["properties"])
prev_log = self._mongo.db.results.find_one({"jobid": self._jobinfo["_id"]})
if prev_log:
self._log = prev_log["log"]
else:
self._log = []
def get_job(self): def get_job(self):
return self._job return self._job
@ -48,9 +52,8 @@ class JobExecution(object):
self._jobinfo = self._mongo.db.job.find_one({"_id": self._jobinfo["_id"]}) self._jobinfo = self._mongo.db.job.find_one({"_id": self._jobinfo["_id"]})
def update_job_state(self, state): def update_job_state(self, state):
self._jobinfo["execution"]["state"] = state
self._mongo.db.job.update({"_id": self._jobinfo["_id"]}, self._mongo.db.job.update({"_id": self._jobinfo["_id"]},
{"$set": {"execution": self._jobinfo["execution"]}}) {"$set": {"state": state}})
def _log_resutls(self, res): def _log_resutls(self, res):
self._mongo.db.results.update({"jobid": self._jobinfo["_id"]}, self._mongo.db.results.update({"jobid": self._jobinfo["_id"]},
@ -65,13 +68,15 @@ class JobExecution(object):
def run(self): def run(self):
self.log("Starting job") self.log("Starting job")
res = False
res = None
try: try:
res = self._job.run() res = self._job.run()
except Exception, e: except Exception, e:
self.log("Exception raised while running: %s" % e) self.log("Exception raised while running: %s" % e)
self.update_job_state("error") self.update_job_state("error")
return False return False
if res: if res:
self.log("Done job startup") self.log("Done job startup")
self.update_job_state("running") self.update_job_state("running")
@ -95,6 +100,10 @@ class JobExecution(object):
@celery.task @celery.task
def run_task(jobid): def run_task(jobid):
print "searching for ", jobid print "searching for ", jobid
aquire_task = mongo.db.job.update({"_id": jobid, "state": "pending"}, {"$set": {"state": "processing"}})
if aquire_task["nModified"] != 1:
return False
job_info = mongo.db.job.find_one({"_id": jobid}) job_info = mongo.db.job.find_one({"_id": jobid})
if not job_info: if not job_info:
return False return False