import time from app import models, schemas from app.services.datax import datax_create_job, datax_update_job, execute_job from app.utils.cron_utils import joint_cron_expression from sqlalchemy.orm import Session from sqlalchemy import func import app.crud as crud from configs.globals import g def create_job_info_services(db: Session, item: schemas.JobInfoCreate): create_time: int = int(time.time()) item_dict = item.dict() name_item = db.query(models.JobInfo)\ .filter(models.JobInfo.job_desc == func.binary(item.job_desc))\ .filter(models.JobInfo.project_id == g.project_id)\ .filter(models.JobInfo.delete_status == 1).first() if name_item: raise Exception('同步配置名称重复') # 定时任务对象转为cron表达式 cron_expression_dict = item_dict.pop('cron_expression') cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict)) cron_select_type = cron_expression_dict["cron_select_type"] item_dict.update({ 'cron_select_type': cron_select_type, 'job_cron': cron_expression, }) # 分区信息拼接 partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None partition_info_str = '' if partition_info is not None and partition_time is not None and partition_num is not None: partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time elif partition_info is not None and (partition_time is None or partition_num is None): raise Exception('分区信息不完善') item_dict.update({ 'partition_info': partition_info_str, }) db_item = models.JobInfo(**item_dict, **{ 'user_id': g.user_id, 'project_id': g.project_id, 'trigger_status': 0, 'create_time': create_time, 'update_time': create_time, 'delete_status': 1, }) # 创建airflow端同步任务 af_job = datax_create_job(db_item, db) # 创建本地同步任务 db_item = crud.create_job_info(db, db_item) job_info = db_item.to_dict() crud.create_relation(db, db_item.id,'datax', af_job['id']) return job_info def update_job_info_services(db: Session, id: int, update_item: schemas.JobInfoUpdate): # 获取任务信息 db_item = crud.get_job_info(db,id) name_item = db.query(models.JobInfo)\ .filter(models.JobInfo.job_desc == func.binary(update_item.job_desc))\ .filter(models.JobInfo.project_id == g.project_id)\ .filter(models.JobInfo.delete_status == 1)\ .filter(models.JobInfo.id != id).first() if name_item: raise Exception('同步配置名称重复') update_dict = update_item.dict(exclude_unset=True) # 定时任务对象转为cron表达式 cron_expression_dict = update_dict.pop('cron_expression') cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict)) cron_select_type = cron_expression_dict["cron_select_type"] update_dict.update({ 'cron_select_type': cron_select_type, 'job_cron': cron_expression, }) # 分区信息拼接 partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys() and update_dict['partition_time'] != '' else None partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys() and update_dict['partition_num'] != '' else None partition_info_str = '' if partition_info is not None and partition_time is not None and partition_num is not None: partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time elif partition_info is not None and (partition_time is None or partition_num is None): raise Exception('分区信息不完善') update_dict.update({ 'partition_info': partition_info_str, }) for k, v in update_dict.items(): setattr(db_item, k, v) db_item.update_time = int(time.time()) # 修改airflow端同步任务 af_job = datax_update_job(db_item, db) crud.update_job_info(db,id,db_item) return db_item def execute_job_services(db: Session, job_id: int): relation = crud.get_af_id(db, job_id, 'datax') res = execute_job(relation.af_id) return res