Prechádzať zdrojové kódy

Merge remote-tracking branch 'origin/master'

luoyulong 2 rokov pred
rodič
commit
c782409309

+ 3 - 0
Dockerfile

@@ -35,6 +35,9 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
 RUN locale-gen zh_CN.UTF-8
 RUN dpkg-reconfigure locales
 
+ENV TZ Asia/Shanghai
+RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime &&\
+    echo "Asia/Shanghai" > /etc/timezone
 
 CMD ["supervisord", "-n"]
 

+ 1 - 1
app/crud/jm_job_info.py

@@ -12,7 +12,7 @@ def create_jm_job_info(db: Session, jm_job_info: models.JmJobInfo):
 def get_jm_job_infos(db: Session):
     res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\
         .filter(models.JmJobInfo.delete_status != 0)\
-        .order_by(models.JmJobInfo.id.desc()).all()
+        .order_by(models.JmJobInfo.create_time.desc()).all()
     return res
 
 def get_jm_job_info(db: Session, jm_job_id: int):

+ 3 - 1
app/crud/job_info.py

@@ -13,7 +13,9 @@ def create_job_info(db: Session, item: models.JobInfo):
     return item
 
 def get_job_infos(db: Session):
-    res: List[models.JobInfo] = db.query(models.JobInfo).filter(models.JobInfo.delete_status == 1).all()  # TODO: 排序
+    res: List[models.JobInfo] = db.query(models.JobInfo)\
+        .filter(models.JobInfo.delete_status == 1)\
+        .order_by(models.JobInfo.create_time.desc()).all()  # TODO: 排序
     return res
 
 def update_job_info(db: Session, id: int, update_item: models.JobInfo):

+ 4 - 0
app/models/jm_job_info.py

@@ -33,3 +33,7 @@ class JmJobInfo(BaseModel):
     user_name = Column(String, nullable=False)
     # 项目id
     project_id = Column(String, nullable=False)
+    # 创建时间
+    create_time = Column(Integer)
+    # 更新时间
+    update_time = Column(Integer)

+ 23 - 17
app/routers/dag.py

@@ -1,9 +1,11 @@
+import json
 from sqlalchemy.orm import Session
 from fastapi import Depends
 from fastapi import APIRouter
 from app.services.dag import dag_job_submit, get_tmp_table_name
 from app import crud, models, schemas
-from app.utils.send_util import get_job_run_status
+from app.utils.send_util import get_job_run_status, get_task_log
+from constants.constants import RUN_STATUS
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from app.common.hive import hiveDs
@@ -28,14 +30,19 @@ def execute_dag(dag: schemas.Dag,db: Session = Depends(get_db)):
 def get_dag_debug_status(dag_uuid: str, db: Session = Depends(get_db)):
     relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
     af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
-    tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
-    nodes = {}
-    for task_id in tasks:
-        task = tasks[task_id]
-        node_id = task_id.split('_')[1]
-        nodes.update({node_id:task['status']})
     job_run_res = get_job_run_status(af_job_run.id)
     job_run_status = job_run_res['data']['status']
+    af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
+    task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
+    nodes = {}
+    if task:
+        task_script = json.loads(task['script'])
+        for node in task_script['sub_nodes']:
+            task_id = str(task['id'])+'_'+node['id']
+            task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
+            task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
+            if task_log:
+                nodes.update({node['id']:task_log['status'] if task_log['status'] else 'running'})
     res = {
         "job":job_run_status if job_run_status else af_job_run.status,
         "nodes": nodes
@@ -48,16 +55,15 @@ def get_dag_debug_status(dag_uuid: str, db: Session = Depends(get_db)):
 def get_dag_debug_status(dag_uuid: str, node_id: str,db: Session = Depends(get_db)):
     relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
     af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
-    tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
-    node_log = {}
-    for task_id in tasks:
-        task = tasks[task_id]
-        task_id = task_id.split('_')[1]
-        node_log.update({task_id:task['log']})
-    if node_id in node_log.keys():
-        return node_log[node_id]
-    else:
-        return None
+    af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
+    task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
+    if task:
+        task_id = str(task['id'])+'_'+node_id
+        task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
+        task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
+        if task_log:
+            return task_log['log']
+    return None
 
 @router.get("/node_result")
 @web_try()

+ 39 - 17
app/routers/jm_job_info.py

@@ -1,4 +1,7 @@
-import datetime
+import base64
+from datetime import datetime
+from datetime import timedelta
+from datetime import timezone
 import croniter
 import re
 from typing import Optional, List
@@ -45,21 +48,11 @@ def get_jm_job_infos(db: Session = Depends(get_db)):
     af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
     res = {}
     for af_job_run in af_job_runs:
-        tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
-        if len(tasks) > 0:
-            task = tasks[-1]
-            task.pop('log',None)
+        if af_job_run.status in [2,3]:
             job_id = af_to_datax[int(af_job_run.job_id)]
             log = {
-                "id": af_job_run.id,
-                "job_id": job_id,
-                "af_job_id": int(af_job_run.job_id),
-                "run_id": af_job_run.af_run_id,
-                "trigger_time": af_job_run.start_time,
-                "trigger_result": 1 if task else 0,
-                "execute_time": task['start_time'] if task else 0,
+                "start_time": af_job_run.start_time,
                 "execute_result": af_job_run.status,
-                "end_time": task['end_time'] if task else 0,
             }
             if job_id in res.keys():
                 res[job_id].append(log)
@@ -67,9 +60,11 @@ def get_jm_job_infos(db: Session = Depends(get_db)):
                 res.update({job_id: [log]})
     for jm_job in jm_job_list:
         history = res[jm_job.id] if jm_job.id in res.keys() else []
-        history.sort(key=lambda x: x['trigger_time'], reverse=True)
+        history.sort(key=lambda x: x['start_time'], reverse=True)
         jm_job_dict = jm_job.to_dict()
-        jm_job_dict.update({'history':history[0:10]})
+        history = history[0:10]
+        history.sort(key=lambda x: x['start_time'], reverse=False)
+        jm_job_dict.update({'history':history})
         res_list.append(jm_job_dict)
     return res_list
 
@@ -129,6 +124,31 @@ def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
     return res['data']
 
 
+@router.get("/api/{jm_job_id}")
+@web_try()
+@sxtimeit
+def copy_api_path(jm_job_id: int, db: Session = Depends(get_db)):
+    initial = str(jm_job_id).encode('utf-8')
+    encryption_id = base64.b64encode(initial).decode('utf-8')
+    return f'/jpt/jm_job_info/api_execute/{encryption_id}'
+
+@router.get("/api_execute/{encryption_id}")
+@web_try()
+@sxtimeit
+def api_execute_jm_job(encryption_id: str, db: Session = Depends(get_db)):
+    jm_job_id = 0
+    try:
+        initial = base64.b64decode(encryption_id)
+        jm_job_id = int(initial.decode('utf-8'))
+    except Exception as e:
+        raise Exception('任务路径解析失败')
+    jm_job = crud.get_jm_job_info(db,jm_job_id)
+    if jm_job.status == 0:
+        raise Exception('任务已被停用')
+    res = execute_job_services(db,jm_job_id)
+    return res['data']
+
+
 @router.post("/cron_expression")
 @web_try()
 @sxtimeit
@@ -146,11 +166,13 @@ def get_cron_next_execute(cron_expression: str):
 
 
 def run_get_next_time(cron_expression):
-    now = datetime.datetime.now()
+    SHA_TZ = timezone(timedelta(hours=8),name='Asia/Shanghai',)
+    utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
+    now = utc_now.astimezone(SHA_TZ)
     cron_str = cron_expression.replace('?','*')
     cron = croniter.croniter(cron_str, now)
     execute_list = []
     for i in range(0, 5):
-        next_time = cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M")
+        next_time = cron.get_next(datetime).strftime("%Y-%m-%d %H:%M")
         execute_list.append(next_time)
     return execute_list

+ 58 - 60
app/routers/jm_job_log.py

@@ -1,3 +1,4 @@
+import json
 from typing import List, Optional
 from fastapi import APIRouter
 
@@ -28,6 +29,7 @@ router = APIRouter(
 @sxtimeit
 def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Session = Depends(get_db)):
     jm_job_list = []
+    # 是否有任务筛选
     if job_id is not None:
         jm_job_list = [crud.get_jm_job_info(db, job_id)]
     else:
@@ -35,84 +37,79 @@ def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Sessi
     id_to_job = {job.id:job for job in jm_job_list}
     relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
     af_to_datax = {relation.af_id:relation.se_id for relation in relations}
+    # 获取任务运行记录
     af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
+    # 根据时间进行排序
     af_job_runs.sort(key=lambda x: x.start_time, reverse=True)
     total = len(af_job_runs)
+    # 进行分页
     af_job_runs = af_job_runs[(params['page'] - 1) * params['size']:params['page'] * params['size']]
     res = []
     for af_job_run in af_job_runs:
-        tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
-        if len(tasks) > 0:
-            task = tasks[-1]
-            task.pop('log',None)
-            job_id = af_to_datax[int(af_job_run.job_id)]
-            execute_result = None
-            if af_job_run.status <= 1:
-                run_status = get_job_run_status(af_job_run.id)
-                execute_result = run_status['data']['status']
-            log = {
-                "id": af_job_run.id,
-                "job_id": job_id,
-                "job_name": id_to_job[job_id].name,
-                "job_type": id_to_job[job_id].type,
-                "job_tag": id_to_job[job_id].tag,
-                "af_job_id": int(af_job_run.job_id),
-                "run_id": af_job_run.af_run_id,
-                "trigger_time": af_job_run.start_time,
-                "trigger_result": 1,
-                "execute_time": task['start_time'] if task else 0,
-                "execute_result": execute_result if execute_result else af_job_run.status,
-                "end_time": task['end_time'] if task else 0,
-            }
-            res.append(log)
-    return page_help(res,params['page'],params['size'],total)
-
-@router.get("/logs")
-@web_try()
-@sxtimeit
-def get_job_log_once(run_id: str, db: Session = Depends(get_db)):
-    af_job_run = crud.get_airflow_run_once(db, run_id)
-    tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
-    res = []
-    for task in tasks:
+        job_id = af_to_datax[int(af_job_run.job_id)]
+        execute_result = None
+        # 若该记录未运行完成,获取运行的状态
+        if af_job_run.status <= 1:
+            run_status = get_job_run_status(af_job_run.id)
+            execute_result = run_status['data']['status']
         log = {
             "id": af_job_run.id,
+            "job_id": job_id,
+            "job_name": id_to_job[job_id].name,
+            "job_type": id_to_job[job_id].type,
+            "job_tag": id_to_job[job_id].tag,
             "af_job_id": int(af_job_run.job_id),
             "run_id": af_job_run.af_run_id,
-            "trigger_time": af_job_run.start_time,
-            "trigger_result": 1,
-            "execute_time": task['start_time'] if task else 0,
-            "execute_result": af_job_run.status,
-            "end_time": task['end_time'] if task else 0,
-            "log": task['log'] if task else None
+            "start_time": af_job_run.start_time,
+            "result": execute_result if execute_result else af_job_run.status,
         }
         res.append(log)
-    res.sort(key=lambda x: x['trigger_time'], reverse=True)
-    return res
+    return page_help(res,params['page'],params['size'],total)
 
 @router.get("/all_task")
 @web_try()
 @sxtimeit
 def get_job_all_task(run_id: str, db: Session = Depends(get_db)):
     af_job_run = crud.get_airflow_run_once(db, run_id)
-    af_job_id = af_job_run.job_id
-    af_job = crud.get_airflow_job_once(db, af_job_id)
+    af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
+    tasks = list(af_job.tasks) if len(list(af_job.tasks))>0 else []
     res = []
-    for task in af_job.tasks:
-        task.update({
-            'job_id':af_job_id,
-            'af_run_id':af_job_run.af_run_id,
-            'task_id':task['id'],
-            })
-        task_log_res = get_task_log(af_job_id, af_job_run.af_run_id, task['id'])
-        task_log = task_log_res['data'] if 'data' in task_log_res else None
-        if task_log:
-            task.update({
-                'execute_result':task_log['status'] if 'status' in task_log else None,
-                'execute_time':task_log['start_time'] if 'start_time' in task_log else None,
-                'log': task_log['log'] if 'log' in task_log else None
-                })
-        res.append(task)
+    for task in tasks:
+        if task['task_type'] == 'sparks':
+            task_script = json.loads(task['script'])
+            for node in task_script['sub_nodes']:
+                task_id = str(task['id'])+'_'+node['id']
+                log ={
+                    'name':task['name']+'-'+node['name'],
+                    'job_id':af_job.id,
+                    'af_run_id':af_job_run.af_run_id,
+                    'task_id': task_id,
+                }
+                task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
+                task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
+                if task_log:
+                    log.update({
+                        'execute_result': RUN_STATUS[task_log['status']] if task_log['status'] else 0,
+                        'execute_time':task_log['start_time'],
+                        'log': task_log['log']
+                        })
+                res.append(log)
+        else:
+            log ={
+                'name':task['name'],
+                'job_id':af_job.id,
+                'af_run_id':af_job_run.af_run_id,
+                'task_id':task['id'],
+            }
+            task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
+            task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
+            if task_log:
+                log.update({
+                    'execute_result':RUN_STATUS[task_log['status']] if task_log['status'] else 0,
+                    'execute_time':task_log['start_time'],
+                    'log': task_log['log']
+                    })
+            res.append(log)
     return res
 
 
@@ -121,7 +118,8 @@ def get_job_all_task(run_id: str, db: Session = Depends(get_db)):
 @sxtimeit
 def get_job_task_log(job_id: str, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
     res = get_task_log(job_id, af_run_id, task_id)
-    return res['data']
+    log = res['data'] if 'data' in res else None
+    return log
 
 @router.get("/logs_status/{ids}")
 @web_try()

+ 23 - 31
app/routers/job_log.py

@@ -6,7 +6,7 @@ from sqlalchemy.orm import Session
 from app import get_page, page_help, schemas
 
 import app.crud as crud
-from app.utils.send_util import get_job_run_status
+from app.utils.send_util import get_job_run_status, get_task_log
 from constants.constants import RUN_STATUS
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
@@ -19,19 +19,12 @@ router = APIRouter(
     tags=["joblog-日志管理"],
 )
 
-
-@router.post("/")
-@web_try()
-@sxtimeit
-def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
-    return crud.create_job_log(db, item)
-
-
 @router.get("/")
 @web_try()
 @sxtimeit
 def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page), db: Session = Depends(get_db)):
     job_infos = []
+    # 是否有任务筛选
     if job_id is None:
         job_infos = crud.get_job_infos(db)
     else:
@@ -39,30 +32,31 @@ def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page),
     id_to_job = {job.id:job for job in job_infos}
     relations = crud.get_af_ids(db, id_to_job.keys(), 'datax')
     af_to_datax = {relation.af_id:relation.se_id for relation in relations}
+    # 获取运行记录
     af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
+    # 根据开始时间排序
     af_job_runs.sort(key=lambda x: x.start_time, reverse=True)
     total = len(af_job_runs)
+    # 进行分页
     af_job_runs = af_job_runs[(params['page'] - 1) * params['size']:params['page'] * params['size']]
     res = []
+    # 循环获取日志
     for af_job_run in af_job_runs:
-        task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
         job_id = af_to_datax[int(af_job_run.job_id)]
-        execute_result = None
-        if af_job_run.status <= 1:
-            run_status = get_job_run_status(af_job_run.id)
-            execute_result = run_status['data']['status']
+        # 获取af_job
+        af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
+        task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
+        log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
+        job_log = log_res['data'] if 'data' in log_res.keys() else None
         log = {
             "id": af_job_run.id,
             "job_id": job_id,
             "job_desc": id_to_job[job_id].job_desc,
             "af_job_id": int(af_job_run.job_id),
-            "run_id": af_job_run.af_run_id,
-            "trigger_time": af_job_run.start_time,
-            "trigger_result": 1,
-            "execute_time": task['start_time'] if task else 0,
-            "execute_result": execute_result if execute_result else af_job_run.status,
-            "end_time": task['end_time'] if task else 0,
-            "log": task['log'] if task else None
+            "run_id": af_job_run.id,
+            "af_run_id": af_job_run.af_run_id,
+            "start_time": job_log['start_time'],
+            "result": RUN_STATUS[job_log['status']] if job_log['status'] else 0,
         }
         res.append(log)
     return page_help(res,params['page'],params['size'],total)
@@ -72,18 +66,16 @@ def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page),
 @web_try()
 @sxtimeit
 def get_job_logs_once(run_id: int, db: Session = Depends(get_db)):
+    # 获取af_run
     af_job_run = crud.get_airflow_run_once(db, run_id)
-    task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
+    # 获取af_job
+    af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
+    # 取出其中的task
+    task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
+    log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
+    job_log = log_res['data'] if 'data' in log_res.keys() else None
     log = {
-        "id": af_job_run.id,
-        "af_job_id": int(af_job_run.job_id),
-        "run_id": af_job_run.af_run_id,
-        "trigger_time": af_job_run.start_time,
-        "trigger_result": 1,
-        "execute_time": task['start_time'] if task else 0,
-        "execute_result": af_job_run.status,
-        "end_time": task['end_time'] if task else 0,
-        "log": task['log'] if task else None
+        "log": job_log['log'] if 'log' in job_log.keys() else None
     }
     return log
 

+ 2 - 2
app/services/datax.py

@@ -7,7 +7,7 @@ from sqlalchemy.orm import Session
 def datax_create_job(job_info: models.JobInfo, db: Session):
     af_task = datax_create_task(job_info)
     cron: str = job_info.job_cron
-    cron.replace('?','*')
+    cron = cron.replace('?','*')
     af_job = {
         "tasks": [af_task],
         "name": job_info.job_desc,
@@ -67,7 +67,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     old_af_task = old_af_job['tasks'][0]
     af_task = datax_put_task(job_info,old_af_task)
     cron: str = job_info.job_cron
-    cron.replace('?','*')
+    cron = cron.replace('?','*')
     af_job = {
         "tasks": [af_task],
         "name": job_info.job_desc,

+ 3 - 3
app/services/jm_job.py

@@ -69,7 +69,7 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
     dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
-    cron.replace('?','*')
+    cron = cron.replace('?','*')
     af_job = {
         "tasks": tasks,
         "name": jm_job_info.name,
@@ -99,7 +99,7 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
     dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
-    cron.replace('?','*')
+    cron = cron.replace('?','*')
     af_job = {
         "tasks": tasks,
         "name": jm_job_info.name,
@@ -220,7 +220,7 @@ def on_off_control(af_job_id: int,status: int):
             print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
             break
         if i >= 10:
-            raise Exception(f"{af_job_id}==>执行失败")
+            raise Exception(f"{af_job_id}==>状态修改失败")
         time.sleep(2)
 
 def execute_job(af_job_id: int):

+ 5 - 0
app/services/jm_job_info.py

@@ -1,3 +1,4 @@
+import time
 from sqlalchemy.orm import Session
 from app import models, schemas
 from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
@@ -5,6 +6,7 @@ from app.utils.cron_utils import joint_cron_expression
 import app.crud as crud
 
 def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
+    create_time = int(time.time())
     jm_job_info_create = item.dict()
     # 定时对象转为cron表达式
     cron_expression_item = jm_job_info_create.pop('cron_expression', None)
@@ -28,6 +30,8 @@ def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
     jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
         'status': 0,
         'delete_status': 1,
+        'create_time': create_time,
+        'update_time': create_time,
     })
     # 创建airflow端任务
     af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
@@ -72,6 +76,7 @@ def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
     # 修改airflow端任务
     af_job = jm_job_update_job(db_item,nodes,edges,db)
     # 修改local端任务
+    db_item.update_time = int(time.time())
     db_item = crud.update_jm_job_info(db,db_item)
     # 删除之前的作业节点并创建新作业节点与节点关系
     crud.delete_job_node(db, db_item.id)

+ 4 - 4
app/utils/cron_utils.py

@@ -65,10 +65,10 @@ def check_cron_expression(cron_expression):
     unit_list = ['minute', 'hour', 'day', 'month', 'week']
     reg_list = [
         "^((([0-9]|[0-5][0-9])(\\,|\\-|\\/){1}([0-9]|[0-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$",
-        "^((([0-9]|[01][0-9]|2[0-3])(\\,|\\-|\\/){1}([0-9]|[01][0-9]|2[0-3]))|([0-9]|[01][0-9]|2[0-3])|(\\*))$",
-        "^((([0-9]|[0-2][0-9]|3[01])(\\,|\\-|\\/){1}([0-9]|[0-2][0-9]|3[01]))|([0-9]|[0-2][0-9]|3[01])|(\\*)|(\\?))$",
-        "^((([0-9]|[0-1][0-2])(\\,|\\-|\\/){1}([0-9]|[0-1][0-2]))|([0-9]|[0-1][0-2])|(\\*))$",
-        "^((([1-9]|0[1-9]|1[0-2])(\\,|\\-|\\/){1}([1-9]|0[1-9]|1[0-2]))|([1-9]|0[1-9]|1[0-2])|(\\*)|(\\?))$"
+        "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([0-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$",
+        "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$",
+        "^((([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$",
+        "^((([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$"
         ]
     for cron, unit, reg in zip(cron_list, unit_list, reg_list):
         match_obj = re.match(reg, cron)

+ 1 - 5
constants/constants.py

@@ -8,8 +8,4 @@ CONSTANTS = {
     'DATASOURCES': DATASOURCES
 }
 
-RUN_STATUS = {
-    'success': 1,
-    'failed': 0,
-    'running': 2
-}
+RUN_STATUS = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3, 'upstream_failed': 3}

+ 7 - 0
data/data.sql

@@ -329,4 +329,11 @@ MODIFY COLUMN `value` varchar(500) CHARACTER SET utf8 COLLATE utf8_unicode_ci NO
 ALTER TABLE `job_jdbc_datasource`
 ADD COLUMN `use_ssl` tinyint(1) NULL COMMENT '是否使用ssl(0:不使用,1:使用)' AFTER `principal`;
 
+-- ----------------------------
+-- Alter for jm_job_info
+-- ----------------------------
+ALTER TABLE `jm_job_info`
+ADD COLUMN `create_time` int(20) NULL COMMENT '创建时间' AFTER `project_id`,
+ADD COLUMN `update_time` int(20) NULL COMMENT '修改时间' AFTER `create_time`;
+
 SET FOREIGN_KEY_CHECKS = 1;