186 lines
5.1 KiB
Python
186 lines
5.1 KiB
Python
import time
|
|
from flask import Flask
|
|
from datetime import datetime
|
|
from flask.ext.pymongo import PyMongo
|
|
from celery import Celery
|
|
from common import *
|
|
|
|
def make_celery(app):
|
|
celery = Celery(main='MONKEY_TASKS', backend=app.config['CELERY_RESULT_BACKEND'],
|
|
broker=app.config['BROKER_URL'])
|
|
celery.conf.update(app.config)
|
|
TaskBase = celery.Task
|
|
class ContextTask(TaskBase):
|
|
abstract = True
|
|
def __call__(self, *args, **kwargs):
|
|
with app.app_context():
|
|
return TaskBase.__call__(self, *args, **kwargs)
|
|
celery.Task = ContextTask
|
|
return celery
|
|
|
|
fapp = Flask(__name__)
|
|
fapp.config.from_object('dbconfig')
|
|
celery = make_celery(fapp)
|
|
mongo = PyMongo(fapp)
|
|
|
|
|
|
class JobExecution(object):
|
|
_jobinfo = None
|
|
_job = None
|
|
_mongo = None
|
|
_log = []
|
|
|
|
def __init__(self, mongo, jobinfo):
|
|
self._mongo = mongo
|
|
self._jobinfo = jobinfo
|
|
|
|
job_class = get_jobclass_by_name(self._jobinfo["type"])
|
|
con = job_class.connector_type()
|
|
refresh_connector_config(self._mongo, con)
|
|
self._job = job_class(con, self)
|
|
self._job.load_job_properties(self._jobinfo["properties"])
|
|
prev_log = self._mongo.db.results.find_one({"jobid": self._jobinfo["_id"]})
|
|
if prev_log:
|
|
self._log = prev_log["log"]
|
|
else:
|
|
self._log = []
|
|
|
|
def get_job(self):
|
|
return self._job
|
|
|
|
def refresh_job_info(self):
|
|
self._jobinfo = self._mongo.db.job.find_one({"_id": self._jobinfo["_id"]})
|
|
|
|
def update_job_state(self, state):
|
|
self._mongo.db.job.update({"_id": self._jobinfo["_id"]},
|
|
{"$set": {"state": state}})
|
|
|
|
def _log_resutls(self, res):
|
|
self._mongo.db.results.update({"jobid": self._jobinfo["_id"]},
|
|
{"$set": {"results": {"time" : datetime.now(), "res" : res}}},
|
|
upsert=True)
|
|
|
|
def log(self, text):
|
|
self._log.append([datetime.now().isoformat(), text])
|
|
self._mongo.db.results.update({"jobid": self._jobinfo["_id"]},
|
|
{"$set": {"log": self._log}},
|
|
upsert=True)
|
|
|
|
def run(self):
|
|
self.log("Starting job")
|
|
|
|
res = None
|
|
try:
|
|
res = self._job.run()
|
|
except Exception, e:
|
|
self.log("Exception raised while running: %s" % e)
|
|
self.update_job_state("error")
|
|
return False
|
|
|
|
if res:
|
|
self.log("Done job startup")
|
|
self.update_job_state("running")
|
|
else:
|
|
self.log("Job startup error")
|
|
self.update_job_state("error")
|
|
return res
|
|
|
|
def get_results(self):
|
|
self.log("Trying to get results")
|
|
res = []
|
|
try:
|
|
res = self._job.get_results()
|
|
except Exception, e:
|
|
self.log("Exception raised while getting results: %s" % e)
|
|
return False
|
|
self._log_resutls(res)
|
|
return True
|
|
|
|
def stop(self):
|
|
self.log("Trying to stop...")
|
|
res = None
|
|
|
|
try:
|
|
res = self._job.stop()
|
|
except Exception, e:
|
|
self.log("Exception raised while running: %s" % e)
|
|
self.update_job_state("error")
|
|
return False
|
|
|
|
if res:
|
|
self.log("Done stop job")
|
|
self.update_job_state("ended")
|
|
else:
|
|
self.log("Job stopping error")
|
|
self.update_job_state("error")
|
|
|
|
return res
|
|
|
|
|
|
@celery.task
|
|
def run_task(jobid):
|
|
acquire_task = mongo.db.job.update({"_id": jobid, "state": "pending"}, {"$set": {"state": "processing"}})
|
|
if acquire_task["nModified"] != 1:
|
|
return False
|
|
|
|
job_info = mongo.db.job.find_one({"_id": jobid})
|
|
if not job_info:
|
|
return False
|
|
|
|
job_exec = None
|
|
try:
|
|
job_exec = JobExecution(mongo, job_info)
|
|
except Exception, e:
|
|
print "init JobExecution exception - ", e
|
|
return False
|
|
|
|
if not job_exec.get_job():
|
|
job_exec.update_job_state("error")
|
|
return False
|
|
|
|
if not job_exec.run():
|
|
return False
|
|
|
|
if not job_exec.get_results():
|
|
return False
|
|
|
|
return "done task: " + run_task.request.id
|
|
|
|
|
|
@celery.task
|
|
def stop_task(jobid):
|
|
acquire_task = mongo.db.job.update({"_id": jobid, "state": "running"}, {"$set": {"state": "stopping"}})
|
|
if acquire_task["nModified"] != 1:
|
|
print "could not acquire lock on job"
|
|
return False
|
|
|
|
job_info = mongo.db.job.find_one({"_id": jobid})
|
|
if not job_info:
|
|
print "could not get job info"
|
|
return False
|
|
|
|
job_exec = None
|
|
try:
|
|
job_exec = JobExecution(mongo, job_info)
|
|
except Exception, e:
|
|
print "init JobExecution exception - ", e
|
|
return False
|
|
|
|
if not job_exec.get_job():
|
|
job_exec.update_job_state("error")
|
|
return False
|
|
|
|
job_exec.get_results()
|
|
|
|
if not job_exec.stop():
|
|
print "error stopping"
|
|
return False
|
|
|
|
return "done stop_task"
|
|
|
|
|
|
@celery.task
|
|
def update_cache(connector):
|
|
time.sleep(30)
|
|
return "connector: " + repr(connector)
|