monkey/monkey_business/cc/tasks_manager.py

186 lines
5.1 KiB
Python

import time
from flask import Flask
from datetime import datetime
from flask.ext.pymongo import PyMongo
from celery import Celery
from common import *
def make_celery(app):
celery = Celery(main='MONKEY_TASKS', backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
fapp = Flask(__name__)
fapp.config.from_object('dbconfig')
celery = make_celery(fapp)
mongo = PyMongo(fapp)
class JobExecution(object):
_jobinfo = None
_job = None
_mongo = None
_log = []
def __init__(self, mongo, jobinfo):
self._mongo = mongo
self._jobinfo = jobinfo
job_class = get_jobclass_by_name(self._jobinfo["type"])
con = job_class.connector_type()
refresh_connector_config(self._mongo, con)
self._job = job_class(con, self)
self._job.load_job_properties(self._jobinfo["properties"])
prev_log = self._mongo.db.results.find_one({"jobid": self._jobinfo["_id"]})
if prev_log:
self._log = prev_log["log"]
else:
self._log = []
def get_job(self):
return self._job
def refresh_job_info(self):
self._jobinfo = self._mongo.db.job.find_one({"_id": self._jobinfo["_id"]})
def update_job_state(self, state):
self._mongo.db.job.update({"_id": self._jobinfo["_id"]},
{"$set": {"state": state}})
def _log_resutls(self, res):
self._mongo.db.results.update({"jobid": self._jobinfo["_id"]},
{"$set": {"results": {"time" : datetime.now(), "res" : res}}},
upsert=True)
def log(self, text):
self._log.append([datetime.now().isoformat(), text])
self._mongo.db.results.update({"jobid": self._jobinfo["_id"]},
{"$set": {"log": self._log}},
upsert=True)
def run(self):
self.log("Starting job")
res = None
try:
res = self._job.run()
except Exception, e:
self.log("Exception raised while running: %s" % e)
self.update_job_state("error")
return False
if res:
self.log("Done job startup")
self.update_job_state("running")
else:
self.log("Job startup error")
self.update_job_state("error")
return res
def get_results(self):
self.log("Trying to get results")
res = []
try:
res = self._job.get_results()
except Exception, e:
self.log("Exception raised while getting results: %s" % e)
return False
self._log_resutls(res)
return True
def stop(self):
self.log("Trying to stop...")
res = None
try:
res = self._job.stop()
except Exception, e:
self.log("Exception raised while running: %s" % e)
self.update_job_state("error")
return False
if res:
self.log("Done stop job")
self.update_job_state("ended")
else:
self.log("Job stopping error")
self.update_job_state("error")
return res
@celery.task
def run_task(jobid):
acquire_task = mongo.db.job.update({"_id": jobid, "state": "pending"}, {"$set": {"state": "processing"}})
if acquire_task["nModified"] != 1:
return False
job_info = mongo.db.job.find_one({"_id": jobid})
if not job_info:
return False
job_exec = None
try:
job_exec = JobExecution(mongo, job_info)
except Exception, e:
print "init JobExecution exception - ", e
return False
if not job_exec.get_job():
job_exec.update_job_state("error")
return False
if not job_exec.run():
return False
if not job_exec.get_results():
return False
return "done task: " + run_task.request.id
@celery.task
def stop_task(jobid):
acquire_task = mongo.db.job.update({"_id": jobid, "state": "running"}, {"$set": {"state": "stopping"}})
if acquire_task["nModified"] != 1:
print "could not acquire lock on job"
return False
job_info = mongo.db.job.find_one({"_id": jobid})
if not job_info:
print "could not get job info"
return False
job_exec = None
try:
job_exec = JobExecution(mongo, job_info)
except Exception, e:
print "init JobExecution exception - ", e
return False
if not job_exec.get_job():
job_exec.update_job_state("error")
return False
job_exec.get_results()
if not job_exec.stop():
print "error stopping"
return False
return "done stop_task"
@celery.task
def update_cache(connector):
time.sleep(30)
return "connector: " + repr(connector)