job_info.py 4.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import time
  2. from app import models, schemas
  3. from app.services.datax import datax_create_job, datax_update_job, execute_job
  4. from app.utils.cron_utils import joint_cron_expression
  5. from sqlalchemy.orm import Session
  6. import app.crud as crud
  7. from configs.globals import g
  8. def create_job_info_services(db: Session, item: schemas.JobInfoCreate):
  9. create_time: int = int(time.time())
  10. item_dict = item.dict()
  11. name_item = db.query(models.JobInfo)\
  12. .filter(models.JobInfo.job_desc == item.job_desc)\
  13. .filter(models.JobInfo.project_id == g.project_id)\
  14. .filter(models.JobInfo.delete_status == 1).first()
  15. if name_item:
  16. raise Exception('同步配置名称重复')
  17. # 定时任务对象转为cron表达式
  18. cron_expression_dict = item_dict.pop('cron_expression')
  19. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
  20. cron_select_type = cron_expression_dict["cron_select_type"]
  21. item_dict.update({
  22. 'cron_select_type': cron_select_type,
  23. 'job_cron': cron_expression,
  24. })
  25. # 分区信息拼接
  26. partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None
  27. partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None
  28. partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None
  29. partition_info_str = ''
  30. if partition_info is not None and partition_time is not None and partition_num is not None:
  31. partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
  32. elif partition_info is not None and (partition_time is None or partition_num is None):
  33. raise Exception('分区信息不完善')
  34. item_dict.update({
  35. 'partition_info': partition_info_str,
  36. })
  37. db_item = models.JobInfo(**item_dict, **{
  38. 'user_id': g.user_id,
  39. 'project_id': g.project_id,
  40. 'trigger_status': 0,
  41. 'create_time': create_time,
  42. 'update_time': create_time,
  43. 'delete_status': 1,
  44. })
  45. # 创建airflow端同步任务
  46. af_job = datax_create_job(db_item, db)
  47. # 创建本地同步任务
  48. db_item = crud.create_job_info(db, db_item)
  49. job_info = db_item.to_dict()
  50. crud.create_relation(db, db_item.id,'datax', af_job['id'])
  51. return job_info
  52. def update_job_info_services(db: Session, id: int, update_item: schemas.JobInfoUpdate):
  53. # 获取任务信息
  54. db_item = crud.get_job_info(db,id)
  55. name_item = db.query(models.JobInfo)\
  56. .filter(models.JobInfo.job_desc == update_item.job_desc)\
  57. .filter(models.JobInfo.project_id == g.project_id)\
  58. .filter(models.JobInfo.delete_status == 1)\
  59. .filter(models.JobInfo.id != id).first()
  60. if name_item:
  61. raise Exception('同步配置名称重复')
  62. update_dict = update_item.dict(exclude_unset=True)
  63. # 定时任务对象转为cron表达式
  64. cron_expression_dict = update_dict.pop('cron_expression')
  65. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
  66. cron_select_type = cron_expression_dict["cron_select_type"]
  67. update_dict.update({
  68. 'cron_select_type': cron_select_type,
  69. 'job_cron': cron_expression,
  70. })
  71. # 分区信息拼接
  72. partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None
  73. partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys() and update_dict['partition_time'] != '' else None
  74. partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys() and update_dict['partition_num'] != '' else None
  75. partition_info_str = ''
  76. if partition_info is not None and partition_time is not None and partition_num is not None:
  77. partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
  78. elif partition_info is not None and (partition_time is None or partition_num is None):
  79. raise Exception('分区信息不完善')
  80. update_dict.update({
  81. 'partition_info': partition_info_str,
  82. })
  83. for k, v in update_dict.items():
  84. setattr(db_item, k, v)
  85. db_item.update_time = int(time.time())
  86. # 修改airflow端同步任务
  87. af_job = datax_update_job(db_item, db)
  88. crud.update_job_info(db,id,db_item)
  89. return db_item
  90. def execute_job_services(db: Session, job_id: int):
  91. relation = crud.get_af_id(db, job_id, 'datax')
  92. res = execute_job(relation.af_id)
  93. return res