123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- from sqlalchemy.orm import Session
- from app import models, schemas
- from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
- from app.utils.cron_utils import joint_cron_expression
- import app.crud as crud
- def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
- jm_job_info_create = item.dict()
- # 定时对象转为cron表达式
- cron_expression_item = jm_job_info_create.pop('cron_expression', None)
- if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
- cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
- cron_select_type = cron_expression_item["cron_select_type"]
- jm_job_info_create.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression,
- })
- # 节点与边的剥离
- nodes = jm_job_info_create.pop('nodes', None)
- edges = jm_job_info_create.pop('edges', None)
- db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
- .filter(models.JmJobInfo.delete_status != 0).first()
- if db_item:
- raise Exception('定时任务名称已存在')
- # 标签的存储
- tag = jm_job_info_create['tag']
- crud.find_and_update(db, '任务标签', tag)
- jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
- 'status': 0,
- 'delete_status': 1,
- })
- # 创建airflow端任务
- af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
- # 创建local端任务
- jm_job_info = crud.create_jm_job_info(db,jm_job_info)
- # 创建多作业节点与节点关系
- create_jm_job_node(db, nodes, edges, jm_job_info.id)
- # 创建关系
- crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
- return jm_job_info
- def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
- jm_job_info_update = item.dict(exclude_unset=True)
- # 定时对象转为cron表达式
- cron_expression_item = jm_job_info_update.pop('cron_expression', None)
- if jm_job_info_update['cron_type'] == 2:
- cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
- cron_select_type = cron_expression_item["cron_select_type"]
- jm_job_info_update.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression,
- })
- # 节点与边的剥离
- nodes = jm_job_info_update.pop('nodes', None)
- edges = jm_job_info_update.pop('edges', None)
- db_item = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
- if not db_item:
- raise Exception('未找到该定时任务')
- db_name_item = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
- .filter(models.JmJobInfo.delete_status != 0)\
- .filter(models.JmJobInfo.id != item.id).first()
- if db_name_item:
- raise Exception('定时任务名称已存在')
- # 标签的存储
- tag = jm_job_info_update['tag']
- crud.find_and_update(db, '任务标签', tag)
- for k, v in jm_job_info_update.items():
- setattr(db_item, k, v)
- # 修改airflow端任务
- af_job = jm_job_update_job(db_item,nodes,edges,db)
- # 修改local端任务
- db_item = crud.update_jm_job_info(db,db_item)
- # 删除之前的作业节点并创建新作业节点与节点关系
- crud.delete_job_node(db, db_item.id)
- create_jm_job_node(db, nodes, edges, db_item.id)
- return db_item
- def create_jm_job_node(db: Session, nodes, edges, job_id):
- uuid_node_id = {}
- if nodes is None or len(nodes) == 0:
- return
- for node in nodes:
- uuid = node['id']
- node_item = models.JmJobNode(**{
- 'job_id': job_id,
- 'homework_id': node['homework_id'],
- 'homework_name': node['homework_name'],
- 'start_point': 1,
- })
- node_item = crud.create_jm_job_node(db,node_item)
- node_id = node_item.id
- uuid_node_id.update({uuid:node_id})
- if nodes is None or len(nodes) == 0:
- return
- for edge in edges:
- edge_item = models.JmJobEdge(**{
- 'job_id': job_id,
- 'in_node_id': uuid_node_id[edge['source']],
- 'out_node_id': uuid_node_id[edge['target']]
- })
- edge = crud.create_jm_job_edge(db,edge_item)
- return
- def update_jm_job_status_services(db: Session, job_id: int, status: int):
- job_relation = crud.get_af_id(db,job_id,'job')
- on_off_control(job_relation.af_id, status)
- return crud.update_jm_job_status(db,job_id,status)
- def execute_job_services(db: Session, jm_job_id: int):
- relation = crud.get_af_id(db, jm_job_id, 'job')
- res = execute_job(relation.af_id)
- return res
|