job_info.py 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import time
  2. from app import models, schemas
  3. from app.services.datax import datax_create_job, datax_update_job, execute_job
  4. from app.utils.cron_utils import joint_cron_expression
  5. from sqlalchemy.orm import Session
  6. import app.crud as crud
  7. def create_job_info_services(db: Session, item: schemas.JobInfoCreate):
  8. create_time: int = int(time.time())
  9. item_dict = item.dict()
  10. # 定时任务对象转为cron表达式
  11. cron_expression_dict = item_dict.pop('cron_expression')
  12. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
  13. cron_select_type = cron_expression_dict["cron_select_type"]
  14. item_dict.update({
  15. 'cron_select_type': cron_select_type,
  16. 'job_cron': cron_expression,
  17. })
  18. # 分区信息拼接
  19. partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None
  20. partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None
  21. partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None
  22. partition_info_str = ''
  23. if partition_info is not None and partition_time is not None and partition_num is not None:
  24. partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
  25. elif partition_info is not None and (partition_time is None or partition_num is None):
  26. raise Exception('分区信息不完善')
  27. item_dict.update({
  28. 'partition_info': partition_info_str,
  29. })
  30. db_item = models.JobInfo(**item_dict, **{
  31. 'trigger_status': 0,
  32. 'create_time': create_time,
  33. 'update_time': create_time,
  34. 'delete_status': 1,
  35. })
  36. # 创建airflow端同步任务
  37. af_job = datax_create_job(db_item, db)
  38. # 创建本地同步任务
  39. db_item = crud.create_job_info(db, db_item)
  40. crud.create_relation(db, db_item.id,'datax', af_job['id'])
  41. return db_item
  42. def update_job_info_services(db: Session, id: int, update_item: schemas.JobInfoUpdate):
  43. # 获取任务信息
  44. db_item = crud.get_job_info(db,id)
  45. update_dict = update_item.dict(exclude_unset=True)
  46. # 定时任务对象转为cron表达式
  47. cron_expression_dict = update_dict.pop('cron_expression')
  48. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
  49. cron_select_type = cron_expression_dict["cron_select_type"]
  50. update_dict.update({
  51. 'cron_select_type': cron_select_type,
  52. 'job_cron': cron_expression,
  53. })
  54. # 分区信息拼接
  55. partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None
  56. partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys() and update_dict['partition_time'] != '' else None
  57. partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys() and update_dict['partition_num'] != '' else None
  58. partition_info_str = ''
  59. if partition_info is not None and partition_time is not None and partition_num is not None:
  60. partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
  61. elif partition_info is not None and (partition_time is None or partition_num is None):
  62. raise Exception('分区信息不完善')
  63. update_dict.update({
  64. 'partition_info': partition_info_str,
  65. })
  66. for k, v in update_dict.items():
  67. setattr(db_item, k, v)
  68. db_item.update_time = int(time.time())
  69. # 修改airflow端同步任务
  70. af_job = datax_update_job(db_item, db)
  71. crud.update_job_info(db,id,db_item)
  72. return db_item
  73. def execute_job_services(db: Session, job_id: int):
  74. relation = crud.get_af_id(db, job_id, 'datax')
  75. res = execute_job(relation.af_id)
  76. return res