jm_job_info.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import time
  2. from sqlalchemy.orm import Session
  3. from app import models, schemas
  4. from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
  5. from app.utils.cron_utils import joint_cron_expression
  6. import app.crud as crud
  7. def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
  8. create_time = int(time.time())
  9. jm_job_info_create = item.dict()
  10. # 定时对象转为cron表达式
  11. cron_expression_item = jm_job_info_create.pop('cron_expression', None)
  12. if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
  13. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  14. cron_select_type = cron_expression_item["cron_select_type"]
  15. jm_job_info_create.update({
  16. 'cron_select_type': cron_select_type,
  17. 'cron_expression': cron_expression,
  18. })
  19. # 节点与边的剥离
  20. nodes = jm_job_info_create.pop('nodes', None)
  21. edges = jm_job_info_create.pop('edges', None)
  22. db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  23. .filter(models.JmJobInfo.delete_status != 0).first()
  24. if db_item:
  25. raise Exception('定时任务名称已存在')
  26. # 标签的存储
  27. tag = jm_job_info_create['tag']
  28. crud.find_and_update(db, '任务标签', tag)
  29. jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
  30. 'status': 0,
  31. 'delete_status': 1,
  32. 'create_time': create_time,
  33. 'update_time': create_time,
  34. })
  35. # 创建airflow端任务
  36. af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
  37. # 创建local端任务
  38. jm_job_info = crud.create_jm_job_info(db,jm_job_info)
  39. # 创建多作业节点与节点关系
  40. create_jm_job_node(db, nodes, edges, jm_job_info.id)
  41. # 创建关系
  42. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  43. return jm_job_info
  44. def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
  45. jm_job_info_update = item.dict(exclude_unset=True)
  46. # 定时对象转为cron表达式
  47. cron_expression_item = jm_job_info_update.pop('cron_expression', None)
  48. if jm_job_info_update['cron_type'] == 2:
  49. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  50. cron_select_type = cron_expression_item["cron_select_type"]
  51. jm_job_info_update.update({
  52. 'cron_select_type': cron_select_type,
  53. 'cron_expression': cron_expression,
  54. })
  55. # 节点与边的剥离
  56. nodes = jm_job_info_update.pop('nodes', None)
  57. edges = jm_job_info_update.pop('edges', None)
  58. db_item = db.query(models.JmJobInfo)\
  59. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  60. if not db_item:
  61. raise Exception('未找到该定时任务')
  62. db_name_item = db.query(models.JmJobInfo)\
  63. .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
  64. .filter(models.JmJobInfo.delete_status != 0)\
  65. .filter(models.JmJobInfo.id != item.id).first()
  66. if db_name_item:
  67. raise Exception('定时任务名称已存在')
  68. # 标签的存储
  69. tag = jm_job_info_update['tag']
  70. crud.find_and_update(db, '任务标签', tag)
  71. for k, v in jm_job_info_update.items():
  72. setattr(db_item, k, v)
  73. # 修改airflow端任务
  74. af_job = jm_job_update_job(db_item,nodes,edges,db)
  75. # 修改local端任务
  76. db_item.update_time = int(time.time())
  77. db_item = crud.update_jm_job_info(db,db_item)
  78. # 删除之前的作业节点并创建新作业节点与节点关系
  79. crud.delete_job_node(db, db_item.id)
  80. create_jm_job_node(db, nodes, edges, db_item.id)
  81. return db_item
  82. def create_jm_job_node(db: Session, nodes, edges, job_id):
  83. uuid_node_id = {}
  84. if nodes is None or len(nodes) == 0:
  85. return
  86. for node in nodes:
  87. uuid = node['id']
  88. node_item = models.JmJobNode(**{
  89. 'job_id': job_id,
  90. 'homework_id': node['homework_id'],
  91. 'homework_name': node['homework_name'],
  92. 'start_point': 1,
  93. })
  94. node_item = crud.create_jm_job_node(db,node_item)
  95. node_id = node_item.id
  96. uuid_node_id.update({uuid:node_id})
  97. if nodes is None or len(nodes) == 0:
  98. return
  99. for edge in edges:
  100. edge_item = models.JmJobEdge(**{
  101. 'job_id': job_id,
  102. 'in_node_id': uuid_node_id[edge['source']],
  103. 'out_node_id': uuid_node_id[edge['target']]
  104. })
  105. edge = crud.create_jm_job_edge(db,edge_item)
  106. return
  107. def update_jm_job_status_services(db: Session, job_id: int, status: int):
  108. job_relation = crud.get_af_id(db,job_id,'job')
  109. on_off_control(job_relation.af_id, status)
  110. return crud.update_jm_job_status(db,job_id,status)
  111. def execute_job_services(db: Session, jm_job_id: int):
  112. relation = crud.get_af_id(db, jm_job_id, 'job')
  113. res = execute_job(relation.af_id)
  114. return res