From 35befba5a996c6e52ee3551122d08b73999f39d7 Mon Sep 17 00:00:00 2001 From: itsikkes Date: Sun, 5 Jun 2016 13:55:41 +0300 Subject: [PATCH] build structure for async job execution --- monkey_business/cc/common.py | 31 ++++++++ monkey_business/cc/connectors/__init__.py | 11 ++- monkey_business/cc/connectors/demo.py | 6 +- monkey_business/cc/connectors/vcenter.py | 3 + monkey_business/cc/dbconfig.py | 1 - monkey_business/cc/main.py | 34 +-------- monkey_business/cc/tasks_manager.py | 89 ++++++++++++++++++++--- 7 files changed, 129 insertions(+), 46 deletions(-) create mode 100644 monkey_business/cc/common.py diff --git a/monkey_business/cc/common.py b/monkey_business/cc/common.py new file mode 100644 index 000000000..79822a492 --- /dev/null +++ b/monkey_business/cc/common.py @@ -0,0 +1,31 @@ +from connectors.vcenter import VCenterJob, VCenterConnector +from connectors.demo import DemoJob, DemoConnector + +available_jobs = [VCenterJob, DemoJob] + + +def get_connector_by_name(name): + for jobclass in available_jobs: + if name == jobclass.connector.__name__: + return jobclass.connector() + return None + + +def get_jobclass_by_name(name): + for jobclass in available_jobs: + if jobclass.__name__ == name: + return jobclass + + +def refresh_connector_config(mongo, connector): + properties = mongo.db.connector.find_one({"type": connector.__class__.__name__}) + if properties: + connector.load_properties(properties) + + +def load_connector(mongo, name): + con = get_connector_by_name(name) + if not con: + return None + refresh_connector_config(mongo, con) + return con \ No newline at end of file diff --git a/monkey_business/cc/connectors/__init__.py b/monkey_business/cc/connectors/__init__.py index 71520bc0c..c1688d2f6 100644 --- a/monkey_business/cc/connectors/__init__.py +++ b/monkey_business/cc/connectors/__init__.py @@ -36,6 +36,7 @@ class NetControllerConnector(object): def disconnect(self): return + class NetControllerJob(object): connector = NetControllerConnector _properties = { @@ -46,8 +47,9 @@ class NetControllerJob(object): } - def __init__(self): - pass + def __init__(self, existing_connector = None): + if existing_connector: + self.connector = existing_connector def get_job_properties(self): return self._properties @@ -61,4 +63,7 @@ class NetControllerJob(object): return None def run(self): - raise NotImplementedError() \ No newline at end of file + raise NotImplementedError() + + def get_results(self): + return [] \ No newline at end of file diff --git a/monkey_business/cc/connectors/demo.py b/monkey_business/cc/connectors/demo.py index a294b6526..e89ebce17 100644 --- a/monkey_business/cc/connectors/demo.py +++ b/monkey_business/cc/connectors/demo.py @@ -41,4 +41,8 @@ class DemoJob(NetControllerJob): } _enumerations = { "vlan": "get_vlans_list", - } \ No newline at end of file + } + + def run(self): + import time + time.sleep(30) \ No newline at end of file diff --git a/monkey_business/cc/connectors/vcenter.py b/monkey_business/cc/connectors/vcenter.py index 61e913100..0486ac396 100644 --- a/monkey_business/cc/connectors/vcenter.py +++ b/monkey_business/cc/connectors/vcenter.py @@ -162,3 +162,6 @@ class VCenterJob(NetControllerJob): "vlan": "get_vlans_list", } + def run(self): + pass + diff --git a/monkey_business/cc/dbconfig.py b/monkey_business/cc/dbconfig.py index ec88943b6..7aa6b26e4 100644 --- a/monkey_business/cc/dbconfig.py +++ b/monkey_business/cc/dbconfig.py @@ -1,5 +1,4 @@ SRV_ADDRESS = 'localhost:27017' - BROKER_URL = 'mongodb://%(srv)s/monkeybusiness' % {'srv': SRV_ADDRESS} MONGO_URI = BROKER_URL CELERY_RESULT_BACKEND = 'mongodb://%(srv)s/' % {'srv': SRV_ADDRESS} diff --git a/monkey_business/cc/main.py b/monkey_business/cc/main.py index 6fe51b504..1d18fda6b 100644 --- a/monkey_business/cc/main.py +++ b/monkey_business/cc/main.py @@ -1,5 +1,3 @@ -import os -import sys from flask import Flask, request, abort, send_from_directory from flask.ext import restful from flask.ext.pymongo import PyMongo @@ -7,17 +5,13 @@ from flask import make_response import bson.json_util import json from datetime import datetime -import dateutil.parser -from connectors.vcenter import VCenterJob, VCenterConnector -from connectors.demo import DemoJob, DemoConnector +from common import * import tasks_manager app = Flask(__name__) app.config.from_object('dbconfig') mongo = PyMongo(app) -available_jobs = [VCenterJob, DemoJob] - active_connectors = {} class Root(restful.Resource): @@ -98,20 +92,6 @@ class Connector(restful.Resource): {"$set": settings_json}, upsert=True) - -def get_connector_by_name(name): - for jobclass in available_jobs: - if name == jobclass.connector.__name__: - return jobclass.connector() - return None - - -def get_jobclass_by_name(name): - for jobclass in available_jobs: - if jobclass.__name__ == name: - return jobclass() - - class JobCreation(restful.Resource): def get(self, **kw): jobtype = request.args.get('type') @@ -127,11 +107,11 @@ class JobCreation(restful.Resource): job = None if not jobid: - job = get_jobclass_by_name(jobtype) + job = get_jobclass_by_name(jobtype)() else: loaded_job = mongo.db.job.find_one({"_id": bson.ObjectId(jobid)}) if loaded_job: - job = get_jobclass_by_name(loaded_job.get("type")) + job = get_jobclass_by_name(loaded_job.get("type"))() job.load_job_properties(loaded_job.get("properties")) if action == "delete": @@ -249,12 +229,6 @@ def output_json(obj, code, headers=None): return resp -def refresh_connector_config(name): - properties = mongo.db.connector.find_one({"type": name}) - if properties: - active_connectors[name].load_properties(properties) - - def update_connectors(): for con in available_jobs: connector_name = con.connector.__name__ @@ -262,7 +236,7 @@ def update_connectors(): active_connectors[connector_name] = con.connector() if not active_connectors[connector_name].is_connected(): - refresh_connector_config(connector_name) + refresh_connector_config(mongo, active_connectors[connector_name]) try: app.logger.info("Trying to activate connector: %s" % connector_name) active_connectors[connector_name].connect() diff --git a/monkey_business/cc/tasks_manager.py b/monkey_business/cc/tasks_manager.py index 953a8859d..e3995a744 100644 --- a/monkey_business/cc/tasks_manager.py +++ b/monkey_business/cc/tasks_manager.py @@ -1,8 +1,9 @@ -import os import time from flask import Flask +from datetime import datetime from flask.ext.pymongo import PyMongo from celery import Celery +from common import * def make_celery(app): celery = Celery(main='MONKEY_TASKS', backend=app.config['CELERY_RESULT_BACKEND'], @@ -22,24 +23,90 @@ fapp.config.from_object('dbconfig') celery = make_celery(fapp) mongo = PyMongo(fapp) +class JobExecution(object): + _jobinfo = None + _job = None + _mongo = None + _log = [] + + def __init__(self, mongo, jobinfo): + self._mongo = mongo + self._jobinfo = jobinfo + self.update_job_state("processing") + + job_class = get_jobclass_by_name(self._jobinfo["type"]) + con = job_class.connector() + refresh_connector_config(self._mongo, con) + self._job = job_class(con) + + def get_job(self): + return self._job + + def refresh_job_info(self): + self._jobinfo = self._mongo.db.job.find_one({"_id": self._jobinfo["_id"]}) + + def update_job_state(self, state): + self._jobinfo["execution"]["state"] = state + self._mongo.db.job.update({"_id": self._jobinfo["_id"]}, + {"$set": {"execution": self._jobinfo["execution"]}}) + + def _log_resutls(self, res): + self._mongo.db.results.update({"jobid": self._jobinfo["_id"]}, + {"$set": {"results": {"time" : datetime.now(), "res" : res}}}, + upsert=True) + + def log(self, text): + self._log.append("[%s] %s" % (datetime.now(), text)) + self._mongo.db.results.update({"jobid": self._jobinfo["_id"]}, + {"$set": {"log": self._log}}, + upsert=True) + + def run(self): + self.log("Starting job") + try: + self._job.run() + except Exception, e: + self.log("Exception raised while running: %s" % e) + self.update_job_state("error") + return False + self.log("done job startup") + self.update_job_state("running") + return True + + def get_results(self): + self.log("Trying to get results") + res = [] + try: + res = self._job.get_results() + except Exception, e: + self.log("Exception raised while getting results: %s" % e) + return False + self._log_resutls(res) + return True + + @celery.task def run_task(jobid): - task_id = run_task.request.id print "searching for ", jobid - job = mongo.db.job.find_one({"_id": jobid}) - if not job: + job_info = mongo.db.job.find_one({"_id": jobid}) + if not job_info: return False - job["execution"]["state"] = "processing" - mongo.db.job.update({"_id": jobid}, job) - time.sleep(30) + job_exec = JobExecution(mongo, job_info) + if not job_exec.get_job(): + job_exec.update_job_state(job_info, "error") + return False - job["execution"]["state"] = "done" - mongo.db.job.update({"_id": jobid}, job) - return "task: " + task_id + if not job_exec.run(): + return False + + if not job_exec.get_results(): + return False + + return "done task: " + run_task.request.id @celery.task def update_cache(connector): time.sleep(30) - return "job: " + repr(job) + return "connector: " + repr(connector)