build structure for async job execution

This commit is contained in:
itsikkes 2016-06-05 13:55:41 +03:00
parent 1acab50f51
commit 35befba5a9
7 changed files with 129 additions and 46 deletions

View File

@ -0,0 +1,31 @@
from connectors.vcenter import VCenterJob, VCenterConnector
from connectors.demo import DemoJob, DemoConnector
available_jobs = [VCenterJob, DemoJob]
def get_connector_by_name(name):
for jobclass in available_jobs:
if name == jobclass.connector.__name__:
return jobclass.connector()
return None
def get_jobclass_by_name(name):
for jobclass in available_jobs:
if jobclass.__name__ == name:
return jobclass
def refresh_connector_config(mongo, connector):
properties = mongo.db.connector.find_one({"type": connector.__class__.__name__})
if properties:
connector.load_properties(properties)
def load_connector(mongo, name):
con = get_connector_by_name(name)
if not con:
return None
refresh_connector_config(mongo, con)
return con

View File

@ -36,6 +36,7 @@ class NetControllerConnector(object):
def disconnect(self):
return
class NetControllerJob(object):
connector = NetControllerConnector
_properties = {
@ -46,8 +47,9 @@ class NetControllerJob(object):
}
def __init__(self):
pass
def __init__(self, existing_connector = None):
if existing_connector:
self.connector = existing_connector
def get_job_properties(self):
return self._properties
@ -61,4 +63,7 @@ class NetControllerJob(object):
return None
def run(self):
raise NotImplementedError()
raise NotImplementedError()
def get_results(self):
return []

View File

@ -41,4 +41,8 @@ class DemoJob(NetControllerJob):
}
_enumerations = {
"vlan": "get_vlans_list",
}
}
def run(self):
import time
time.sleep(30)

View File

@ -162,3 +162,6 @@ class VCenterJob(NetControllerJob):
"vlan": "get_vlans_list",
}
def run(self):
pass

View File

@ -1,5 +1,4 @@
SRV_ADDRESS = 'localhost:27017'
BROKER_URL = 'mongodb://%(srv)s/monkeybusiness' % {'srv': SRV_ADDRESS}
MONGO_URI = BROKER_URL
CELERY_RESULT_BACKEND = 'mongodb://%(srv)s/' % {'srv': SRV_ADDRESS}

View File

@ -1,5 +1,3 @@
import os
import sys
from flask import Flask, request, abort, send_from_directory
from flask.ext import restful
from flask.ext.pymongo import PyMongo
@ -7,17 +5,13 @@ from flask import make_response
import bson.json_util
import json
from datetime import datetime
import dateutil.parser
from connectors.vcenter import VCenterJob, VCenterConnector
from connectors.demo import DemoJob, DemoConnector
from common import *
import tasks_manager
app = Flask(__name__)
app.config.from_object('dbconfig')
mongo = PyMongo(app)
available_jobs = [VCenterJob, DemoJob]
active_connectors = {}
class Root(restful.Resource):
@ -98,20 +92,6 @@ class Connector(restful.Resource):
{"$set": settings_json},
upsert=True)
def get_connector_by_name(name):
for jobclass in available_jobs:
if name == jobclass.connector.__name__:
return jobclass.connector()
return None
def get_jobclass_by_name(name):
for jobclass in available_jobs:
if jobclass.__name__ == name:
return jobclass()
class JobCreation(restful.Resource):
def get(self, **kw):
jobtype = request.args.get('type')
@ -127,11 +107,11 @@ class JobCreation(restful.Resource):
job = None
if not jobid:
job = get_jobclass_by_name(jobtype)
job = get_jobclass_by_name(jobtype)()
else:
loaded_job = mongo.db.job.find_one({"_id": bson.ObjectId(jobid)})
if loaded_job:
job = get_jobclass_by_name(loaded_job.get("type"))
job = get_jobclass_by_name(loaded_job.get("type"))()
job.load_job_properties(loaded_job.get("properties"))
if action == "delete":
@ -249,12 +229,6 @@ def output_json(obj, code, headers=None):
return resp
def refresh_connector_config(name):
properties = mongo.db.connector.find_one({"type": name})
if properties:
active_connectors[name].load_properties(properties)
def update_connectors():
for con in available_jobs:
connector_name = con.connector.__name__
@ -262,7 +236,7 @@ def update_connectors():
active_connectors[connector_name] = con.connector()
if not active_connectors[connector_name].is_connected():
refresh_connector_config(connector_name)
refresh_connector_config(mongo, active_connectors[connector_name])
try:
app.logger.info("Trying to activate connector: %s" % connector_name)
active_connectors[connector_name].connect()

View File

@ -1,8 +1,9 @@
import os
import time
from flask import Flask
from datetime import datetime
from flask.ext.pymongo import PyMongo
from celery import Celery
from common import *
def make_celery(app):
celery = Celery(main='MONKEY_TASKS', backend=app.config['CELERY_RESULT_BACKEND'],
@ -22,24 +23,90 @@ fapp.config.from_object('dbconfig')
celery = make_celery(fapp)
mongo = PyMongo(fapp)
class JobExecution(object):
_jobinfo = None
_job = None
_mongo = None
_log = []
def __init__(self, mongo, jobinfo):
self._mongo = mongo
self._jobinfo = jobinfo
self.update_job_state("processing")
job_class = get_jobclass_by_name(self._jobinfo["type"])
con = job_class.connector()
refresh_connector_config(self._mongo, con)
self._job = job_class(con)
def get_job(self):
return self._job
def refresh_job_info(self):
self._jobinfo = self._mongo.db.job.find_one({"_id": self._jobinfo["_id"]})
def update_job_state(self, state):
self._jobinfo["execution"]["state"] = state
self._mongo.db.job.update({"_id": self._jobinfo["_id"]},
{"$set": {"execution": self._jobinfo["execution"]}})
def _log_resutls(self, res):
self._mongo.db.results.update({"jobid": self._jobinfo["_id"]},
{"$set": {"results": {"time" : datetime.now(), "res" : res}}},
upsert=True)
def log(self, text):
self._log.append("[%s] %s" % (datetime.now(), text))
self._mongo.db.results.update({"jobid": self._jobinfo["_id"]},
{"$set": {"log": self._log}},
upsert=True)
def run(self):
self.log("Starting job")
try:
self._job.run()
except Exception, e:
self.log("Exception raised while running: %s" % e)
self.update_job_state("error")
return False
self.log("done job startup")
self.update_job_state("running")
return True
def get_results(self):
self.log("Trying to get results")
res = []
try:
res = self._job.get_results()
except Exception, e:
self.log("Exception raised while getting results: %s" % e)
return False
self._log_resutls(res)
return True
@celery.task
def run_task(jobid):
task_id = run_task.request.id
print "searching for ", jobid
job = mongo.db.job.find_one({"_id": jobid})
if not job:
job_info = mongo.db.job.find_one({"_id": jobid})
if not job_info:
return False
job["execution"]["state"] = "processing"
mongo.db.job.update({"_id": jobid}, job)
time.sleep(30)
job_exec = JobExecution(mongo, job_info)
if not job_exec.get_job():
job_exec.update_job_state(job_info, "error")
return False
job["execution"]["state"] = "done"
mongo.db.job.update({"_id": jobid}, job)
return "task: " + task_id
if not job_exec.run():
return False
if not job_exec.get_results():
return False
return "done task: " + run_task.request.id
@celery.task
def update_cache(connector):
time.sleep(30)
return "job: " + repr(job)
return "connector: " + repr(connector)