Browse Source

日志模块、任务模块调整

liweiquan 2 years ago
parent
commit
03cdea6859

+ 3 - 2
app/crud/jm_job_history.py

@@ -6,12 +6,13 @@ from sqlalchemy.orm import Session
 def get_one_job_historys(db: Session, job_id: int):
     res: List[models.JmJobHistory] = db.query(models.JmJobHistory)\
             .filter(models.JmJobHistory.job_id == job_id)\
-            .order_by(models.JmJobHistory.executor_time.desc()).all()
+            .order_by(models.JmJobHistory.trigger_time.desc()).all()
     return res
 
 def get_historys_by_job_ids(db: Session,job_id_list: List[int]):
     res: List[models.JmJobHistory] = db.query(models.JmJobHistory)\
-        .filter(models.JmJobHistory.job_id.in_(job_id_list)).all()
+        .filter(models.JmJobHistory.job_id.in_(job_id_list))\
+        .order_by(models.JmJobHistory.trigger_time.desc()).all()
     return res
 
 def get_jm_job_history_info(db: Session,jm_job_history_id: int):

+ 18 - 8
app/crud/jm_job_info.py

@@ -9,8 +9,13 @@ from app.utils.cron_utils import *
 def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
     jm_job_info_create = item.dict()
     cron_expression_item = jm_job_info_create.pop('cron_expression', None)
-    cron_expression = joint_cron_expression(cron_expression_item)
-    cron_select_type = cron_expression_item.cron_select_type
+    if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
+        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
+        cron_select_type = cron_expression_item["cron_select_type"]
+        jm_job_info_create.update({
+            'cron_select_type': cron_select_type,
+            'cron_expression': cron_expression,
+        })
     nodes = jm_job_info_create.pop('nodes', None)
     edges = jm_job_info_create.pop('edges', None)
     db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
@@ -20,9 +25,7 @@ def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
     tag = jm_job_info_create['tag']
     find_and_update(db, '任务标签', tag)
     jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
-        'cron_select_type': cron_select_type,
-        'cron_expression': cron_expression,
-        'status': 1,
+        'status': 0,
         'delete_status': 1,
     })
     db.add(jm_job_info)
@@ -47,8 +50,13 @@ def get_jm_job_info(db: Session, jm_job_id: int):
 def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
     jm_job_info_update = item.dict(exclude_unset=True)
     cron_expression_item = jm_job_info_update.pop('cron_expression', None)
-    cron_expression = joint_cron_expression(cron_expression_item)
-    cron_select_type = cron_expression_item.cron_select_type
+    if jm_job_info_update['cron_type'] == 2:
+        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
+        cron_select_type = cron_expression_item["cron_select_type"]
+        jm_job_info_update.update({
+            'cron_select_type': cron_select_type,
+            'cron_expression': cron_expression,
+        })
     nodes = jm_job_info_update.pop('nodes', None)
     edges = jm_job_info_update.pop('edges', None)
     db_item = db.query(models.JmJobInfo)\
@@ -58,7 +66,7 @@ def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
     db_name_item = db.query(models.JmJobInfo)\
         .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
         .filter(models.JmJobInfo.delete_status != 0)\
-        .filter(models.JmJobInfo.id != id).first()
+        .filter(models.JmJobInfo.id != item.id).first()
     if db_name_item:
         raise Exception('定时任务名称已存在')
     tag = jm_job_info_update['tag']
@@ -92,3 +100,5 @@ def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
     db.flush()
     db.refresh(jm_job_info)
     return jm_job_info
+
+

+ 2 - 0
app/models/jm_homework.py

@@ -21,6 +21,8 @@ class JmHomework(BaseModel):
     dag_url = Column(String)
     # 脚本文件
     script_file = Column(String)
+    # 执行命令
+    execute_command = Column(String)
     # 更新时间
     update_time = Column(Integer)
     # 创建时间

+ 48 - 85
app/routers/jm_job_info.py

@@ -1,3 +1,5 @@
+import datetime
+import croniter
 import re
 from typing import Optional, List
 from fastapi import APIRouter
@@ -27,67 +29,36 @@ router = APIRouter(
 def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
     jm_job_info,nodes,edges = crud.create_jm_job_info(db, item)
     job_id = jm_job_info.id
-    if edges is not None and len(edges) > 0:
-        nodes_dict = {node['id']:node for node in nodes}
-        for node_id in nodes_dict:
-            node = nodes_dict[node_id]
-            if node['start_point'] == 0:
-                next_nodes = format_nodes(nodes_dict,edges,node_id)
-                node['child_nodes'] = next_nodes
-                node_tree = node
-                break
-        node = models.JmJobNode(**{
-            'job_id': job_id,
-            'homework_id': node_tree['homework_id'],
-            'homework_name': node_tree['homework_name'],
-            'start_point': node['start_point'],
-        })
-        node = crud.create_jm_job_node(db,node)
-        if edges is not None and len(edges) > 0:
-            create_jm_job_node(db,node_tree['child_nodes'], job_id, node.id)
-    else:
-        node = models.JmJobNode(**{
-            'job_id': job_id,
-            'homework_id': nodes[0]['homework_id'],
-            'homework_name': nodes[0]['homework_name'],
-            'start_point': nodes[0]['start_point'],
-        })
-        node = crud.create_jm_job_node(db,node)
+    create_jm_job_node(db, nodes, edges, job_id)
     return jm_job_info.to_dict()
 
 
-def create_jm_job_node(db: Session, nodes, job_id, last_node_id):
+def create_jm_job_node(db: Session, nodes, edges, job_id):
+    uuid_node_id = {}
     if nodes is None or len(nodes) == 0:
         return
     for node in nodes:
-        next_node = models.JmJobNode(**{
+        uuid = node['id']
+        node_item = models.JmJobNode(**{
             'job_id': job_id,
             'homework_id': node['homework_id'],
             'homework_name': node['homework_name'],
-            'start_point': node['start_point'],
+            'start_point': 1,
         })
-        next_node = crud.create_jm_job_node(db,next_node)
-        out_node_id = next_node.id
-        edge = models.JmJobEdge(**{
-            'in_node_id': last_node_id,
-            'out_node_id': out_node_id,
+        node_item = crud.create_jm_job_node(db,node_item)
+        node_id = node_item.id
+        uuid_node_id.update({uuid:node_id})
+    if nodes is None or len(nodes) == 0:
+        return
+    for edge in edges:
+        edge_item = models.JmJobEdge(**{
             'job_id': job_id,
+            'in_node_id': uuid_node_id[edge['source']],
+            'out_node_id': uuid_node_id[edge['target']]
         })
-        edge = crud.create_jm_job_edge(db,edge)
-        create_jm_job_node(db, node['child_nodes'], job_id, out_node_id)
+        edge = crud.create_jm_job_edge(db,edge_item)
     return
 
-
-def format_nodes(nodes_dict,edges: List[schemas.JmJobEdge],last_node: int):
-    nodes = []
-    for edge in edges:
-        if edge['source'] == last_node:
-            node = nodes_dict[edge['target']]
-            next_nodes = format_nodes(nodes_dict,edges,edge['target'])
-            node['child_nodes'] = next_nodes
-            nodes.append(node)
-    return nodes
-
 @router.get("/")
 @web_try()
 @sxtimeit
@@ -108,16 +79,6 @@ def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
     jm_job = crud.get_jm_job_info(db,jm_job_id)
     jm_job_dict = jm_job.to_dict()
     nodes = crud.get_one_job_nodes(db,jm_job_id)
-    edges = crud.get_one_job_edges(db,jm_job_id)
-    edges_list = [
-        {
-            'id': edge.id,
-            'job_id': edge.job_id,
-            'source': edge.in_node_id,
-            'target': edge.out_node_id,
-        }
-        for edge in edges
-    ]
     cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression']
     cron_expression_dict = {}
     if cron_type == 2:
@@ -129,7 +90,6 @@ def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
 
     jm_job_dict.update({
         'nodes': nodes,
-        'edges': edges_list,
         'cron_expression_dict': cron_expression_dict
     })
     return jm_job_dict
@@ -141,32 +101,8 @@ def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_
     jm_job_info,nodes,edges = crud.update_jm_job_info(db, item)
     job_id = jm_job_info.id
     crud.delete_job_node(db, job_id)
-    if edges is not None and len(edges) > 0:
-        nodes_dict = {node['id']:node for node in nodes}
-        for node_id in nodes_dict:
-            node = nodes_dict[node_id]
-            if node['start_point'] == 0:
-                next_nodes = format_nodes(nodes_dict,edges,node_id)
-                node['child_nodes'] = next_nodes
-                node_tree = node
-                break
-        node = models.JmJobNode(**{
-            'job_id': job_id,
-            'homework_id': node_tree['homework_id'],
-            'homework_name': node_tree['homework_name'],
-            'start_point': node['start_point'],
-        })
-        node = crud.create_jm_job_node(db,node)
-        if edges is not None and len(edges) > 0:
-            create_jm_job_node(db,node_tree['child_nodes'], job_id, node.id)
-    else:
-        node = models.JmJobNode(**{
-            'job_id': job_id,
-            'homework_id': nodes[0]['homework_id'],
-            'homework_name': nodes[0]['homework_name'],
-            'start_point': nodes[0]['start_point'],
-        })
-        node = crud.create_jm_job_node(db,node)
+    job_id = jm_job_info.id
+    create_jm_job_node(db, nodes, edges, job_id)
     return jm_job_info.to_dict()
 
 @router.delete("/")
@@ -189,4 +125,31 @@ def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
     if jm_job.status == 0:
         raise Exception('任务已被停用')
     # 进行api调用
-    return jm_job
+    return jm_job
+
+
+@router.post("/cron_expression")
+@web_try()
+@sxtimeit
+def get_cron_expression(cron_expression: schemas.CronExpression):
+    print(cron_expression)
+    cron = joint_cron_expression(cron_expression)
+    return cron
+
+@router.get("/cron_next_execute")
+@web_try()
+@sxtimeit
+def get_cron_next_execute(cron_expression: str):
+    execute_list = run_get_next_time(cron_expression)
+    return execute_list
+
+
+def run_get_next_time(cron_expression):
+    now = datetime.datetime.now()
+    cron_str = cron_expression.replace('?','*')
+    cron = croniter.croniter(cron_str, now)
+    execute_list = []
+    for i in range(0, 5):
+        next_time = cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M")
+        execute_list.append(next_time)
+    return execute_list

+ 27 - 26
app/routers/jm_job_log.py

@@ -23,8 +23,12 @@ router = APIRouter(
 @router.get("/")
 @web_try()
 @sxtimeit
-def get_job_logs(db: Session = Depends(get_db)):
-    jm_job_infos = crud.get_jm_job_infos(db)
+def get_job_logs(job_id: int = None, db: Session = Depends(get_db)):
+    jm_job_infos = []
+    if job_id is not None:
+        jm_job_infos = [crud.get_jm_job_info(db, job_id)]
+    else:
+        jm_job_infos = crud.get_jm_job_infos(db)
     job_id_to_job = {jm_job.id:jm_job for jm_job in jm_job_infos}
     jm_job_id_list = job_id_to_job.keys()
     job_history_list = crud.get_historys_by_job_ids(db,jm_job_id_list)
@@ -33,8 +37,11 @@ def get_job_logs(db: Session = Depends(get_db)):
         jm_job = job_id_to_job[job_history.job_id]
         job_history_dict = job_history.__dict__
         job_history_dict.update({"job_name":jm_job.name})
+        job_history_dict.update({"job_type":jm_job.type})
+        job_history_dict.update({"job_tag":jm_job.tag})
         res.append(job_history_dict)
     return res
+
 @router.get("/logs")
 @web_try()
 @sxtimeit
@@ -44,29 +51,23 @@ def get_job_logs(job_history_id: int,db: Session = Depends(get_db)):
     job_logs = crud.get_jm_job_logs_by_history_id(db,job_history_id)
     if len(job_logs) <= 0:
         raise Exception("未找到该任务此次运行的日志")
-    homework_ids = [node.homework_id for node in crud.find_homework_by_job(db,job_info.id)]
-    homeworks = crud.get_jm_homeworks_by_ids(db,homework_ids)
-    homework_logs = {}
+    if job_info.type == '单作业离线任务':
+        return {
+            'job_type': job_info.type,
+            'logs': job_logs,
+        }
+    res = {}
     for job_log in job_logs:
-        if job_log.homework_id in homework_logs.keys():
-            homework_logs[job_log.homework_id].append(job_log)
-        else:
-            homework_logs.update({job_log.homework_id:[job_log]})
-    if job_info.type == "单作业离线任务":
-        homework = homeworks[0]
-        if homework.type == "Dag":
-            res = homework_logs[homework.id]
+        if job_log.homework_id in res.keys():
+            res[job_log.homework_id]['nodes'].append(job_log)
         else:
-            res = homework_logs[homework.id][0]
-    else:
-        res = []
-        for homework in homeworks:
-            print(homework.__dict__)
-            if homework.type == "Dag":
-                nodes = homework_logs[homework.id]
-                homework_dict = homework.__dict__.copy()
-                homework_dict.update({'nodes': nodes})
-                res.append(homework_dict)
-            else:
-                res.append(homework_logs[homework.id][0])
-    return res
+            res.update({job_log.homework_id:{
+                "homework_name":job_log.homework_name,
+                "nodes": [job_log]
+            }})
+
+    logs = [v for k, v in res.items()]
+    return {
+        'job_type': job_info.type,
+        'logs': logs,
+    }

+ 4 - 0
app/schemas/jm_homework.py

@@ -19,6 +19,8 @@ class JmHomeworkBase(BaseModel):
     dag_url: str
     # 脚本文件
     script_file: str
+    # 执行命令
+    execute_command: str
     # 用户ID
     user_id: str
     # 项目ID
@@ -38,6 +40,7 @@ class JmHomeworkCreate(JmHomeworkBase):
                 "dag_uuid": "",
                 "dag_url": "",
                 "script_file": "/test/scripts/example",
+                "execute_command": "ls",
                 "user_id": "test",
                 "project_id": "test",
                 "relation_list": [
@@ -63,6 +66,7 @@ class JmHomeworkUpdate(JmHomeworkBase):
                 "dag_uuid": "",
                 "dag_url": "",
                 "script_file": "/test/scripts/example",
+                "execute_command": "ls",
                 "user_id": "test",
                 "project_id": "test",
                 "relation_list": [

+ 0 - 2
app/schemas/jm_job_node.py

@@ -8,8 +8,6 @@ class JmJobNode(BaseModel):
     homework_id: Optional[int]
     # 作业名称
     homework_name: str
-    # 起始点
-    start_point: int
 
 class JmJobEdge(BaseModel):
     # 入节点

+ 5 - 4
app/utils/cron_utils.py

@@ -8,7 +8,7 @@ def joint_cron_expression(item: schemas.CronExpression):
         if item.minute is not None:
             cron += '0/'+str(item.minute) + ' *'
         elif item.hour is not None:
-            cron += '* 0/'+str(item.hour)
+            cron += '0 0/'+str(item.hour)
         else: cron += '*'
         cron += ' * * *'
     elif item.cron_select_type == 1:
@@ -46,7 +46,7 @@ def joint_cron_expression(item: schemas.CronExpression):
         else: cron += '*'
         cron += ' '
         if item.month is not None:
-            cron += '0/'+str(item.month)
+            cron += '1/'+str(item.month)
         else: cron += '*'
         if item.day is not None:
             cron += ' ?'
@@ -82,5 +82,6 @@ def parsing_cron_expression(cron_expression):
     cron_dict = {}
     for cron, unit in zip(cron_list, unit_list):
         if not bool(re.match('^((\\*)|(\\?))$',cron)):
-            cron_dict.update({unit: cron.replace('0/','')})
-    return cron_dict
+            cron_dict.update({unit: cron.replace('0/','').replace('1/','')})
+    return cron_dict
+

+ 676 - 0
dag/get_last_day_data.dag

@@ -0,0 +1,676 @@
+{
+  "user_name": "test",
+  "user_id": "test",
+  "project_name": "test",
+  "project_id": "test",
+  "itermidate_data": [
+    "hdfs://host:port/uri"
+  ],
+  "nodes": [
+    {
+      "id": "77a6e831-bb62-42e1-be8d-97699db00e73",
+      "name": "DataSource",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "ed5475dc-b611-47ce-b2ab-475e3ab27e03",
+      "name": "last_day",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "371ce58d-4931-455c-921e-ea5007726aaf",
+      "name": "last_week",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "fa7c114d-2fe2-4c5c-84e4-f13ad9549e2e",
+      "name": "last_month",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "b20c3b19-6dd1-417a-ba36-387fed05b41c",
+      "name": "last_year",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "110cc156-50d9-4475-bfd4-52b45664567a",
+      "name": "next_day",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "35960a11-4666-4877-85a0-8f55737684bf",
+      "name": "next_week",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "93984bef-575c-4e97-856c-2850c295a0b1",
+      "name": "next_month",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "96d5fa5b-88d4-4905-9dd8-4f5eb171cd6b",
+      "name": "next_year",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "ff47c3f8-2551-4fe5-b182-e94cbd31ddf8",
+      "name": "next_oooo",
+      "op": "datasource",
+      "data": {
+        "input_source": [
+          {
+            "dataSelect": true,
+            "dataField": "id",
+            "dataType": "int"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "ssn",
+            "dataType": "string"
+          },
+          {
+            "dataSelect": true,
+            "dataField": "test2",
+            "dataType": "int"
+          }
+        ],
+        "input_table": 5
+      }
+    },
+    {
+      "id": "143cfcb1-f566-4f97-b38b-9b62d40fe4cd",
+      "name": "SQL",
+      "op": "script",
+      "data": {
+        "input": 1,
+        "output": [
+          {
+            "outputVar": "r3"
+          }
+        ],
+        "script": "Select * from table11",
+        "param": "test SQL Param"
+      }
+    },
+    {
+      "id": "6b437370-0468-44da-a0bb-e637e33f3efb",
+      "name": "PySpark",
+      "op": "script",
+      "data": {
+        "input": 1,
+        "output": [
+          {
+            "outputVar": "r2"
+          }
+        ],
+        "script": "import pySpark\n\nprint('hello wolrd pySpark')",
+        "param": "test PySpark Param"
+      }
+    },
+    {
+      "id": "c79943c6-9513-4cf8-9c24-d5fa89b6cdde",
+      "name": "Python",
+      "op": "script",
+      "data": {
+        "input": 1,
+        "output": [
+          {
+            "outputVar": "r1"
+          }
+        ],
+        "script": "import test\nprint('hello world')",
+        "param": "test param",
+        "package": "test = 1.1\ntest1 = 2.2"
+      }
+    }
+  ],
+  "edges": [
+    {
+      "id": "b29bcdc7-b543-4d4a-9bd7-7fa74f3a548d",
+      "source": "77a6e831-bb62-42e1-be8d-97699db00e73",
+      "target": "143cfcb1-f566-4f97-b38b-9b62d40fe4cd"
+    },
+    {
+      "id": "e87690d4-f71d-44f5-85fc-3c14b32b323e",
+      "source": "77a6e831-bb62-42e1-be8d-97699db00e73",
+      "target": "c79943c6-9513-4cf8-9c24-d5fa89b6cdde"
+    },
+    {
+      "id": "4c312a41-5abf-47a8-a6ad-c2fbba147c4e",
+      "source": "77a6e831-bb62-42e1-be8d-97699db00e73",
+      "target": "6b437370-0468-44da-a0bb-e637e33f3efb"
+    }
+  ],
+  "graph": {
+    "cells": [
+      {
+        "shape": "dag-edge",
+        "attrs": {
+          "line": {
+            "strokeDasharray": ""
+          }
+        },
+        "id": "b29bcdc7-b543-4d4a-9bd7-7fa74f3a548d",
+        "zIndex": -1,
+        "source": {
+          "cell": "77a6e831-bb62-42e1-be8d-97699db00e73",
+          "port": "bottomPort"
+        },
+        "target": {
+          "cell": "143cfcb1-f566-4f97-b38b-9b62d40fe4cd",
+          "port": "topPort"
+        }
+      },
+      {
+        "shape": "dag-edge",
+        "attrs": {
+          "line": {
+            "strokeDasharray": ""
+          }
+        },
+        "id": "e87690d4-f71d-44f5-85fc-3c14b32b323e",
+        "zIndex": -1,
+        "source": {
+          "cell": "77a6e831-bb62-42e1-be8d-97699db00e73",
+          "port": "bottomPort"
+        },
+        "target": {
+          "cell": "c79943c6-9513-4cf8-9c24-d5fa89b6cdde",
+          "port": "topPort"
+        }
+      },
+      {
+        "shape": "dag-edge",
+        "attrs": {
+          "line": {
+            "strokeDasharray": ""
+          }
+        },
+        "id": "4c312a41-5abf-47a8-a6ad-c2fbba147c4e",
+        "zIndex": -1,
+        "source": {
+          "cell": "77a6e831-bb62-42e1-be8d-97699db00e73",
+          "port": "bottomPort"
+        },
+        "target": {
+          "cell": "6b437370-0468-44da-a0bb-e637e33f3efb",
+          "port": "topPort"
+        }
+      },
+      {
+        "position": {
+          "x": 240,
+          "y": 130
+        },
+        "size": {
+          "width": 180,
+          "height": 80
+        },
+        "view": "react-shape-view",
+        "shape": "dag-node",
+        "component": {
+          "key": null,
+          "ref": null,
+          "props": {},
+          "_owner": null,
+          "_store": {}
+        },
+        "ports": {
+          "groups": {
+            "top": {
+              "position": "top",
+              "attrs": {
+                "circle": {
+                  "r": 4,
+                  "magnet": true,
+                  "stroke": "#C2C8D5",
+                  "strokeWidth": 1,
+                  "fill": "#fff"
+                }
+              }
+            },
+            "bottom": {
+              "position": "bottom",
+              "attrs": {
+                "circle": {
+                  "r": 4,
+                  "magnet": true,
+                  "stroke": "#C2C8D5",
+                  "strokeWidth": 1,
+                  "fill": "#fff"
+                }
+              }
+            }
+          },
+          "items": [
+            {
+              "id": "bottomPort",
+              "group": "bottom"
+            }
+          ]
+        },
+        "id": "77a6e831-bb62-42e1-be8d-97699db00e73",
+        "data": {
+          "label": "DataSource",
+          "status": "default",
+          "type": "datasource",
+          "id": "77a6e831-bb62-42e1-be8d-97699db00e73",
+          "inputSource": [
+            {
+              "dataSelect": false,
+              "dataField": "name",
+              "dataType": "string"
+            },
+            {
+              "dataSelect": true,
+              "dataField": "address",
+              "dataType": "string"
+            }
+          ],
+          "dataTable": 5
+        },
+        "zIndex": 1
+      },
+      {
+        "position": {
+          "x": 480,
+          "y": 300
+        },
+        "size": {
+          "width": 180,
+          "height": 36
+        },
+        "view": "react-shape-view",
+        "shape": "dag-node",
+        "component": {
+          "key": null,
+          "ref": null,
+          "props": {},
+          "_owner": null,
+          "_store": {}
+        },
+        "ports": {
+          "groups": {
+            "top": {
+              "position": "top",
+              "attrs": {
+                "circle": {
+                  "r": 4,
+                  "magnet": true,
+                  "stroke": "#C2C8D5",
+                  "strokeWidth": 1,
+                  "fill": "#fff"
+                }
+              }
+            },
+            "bottom": {
+              "position": "bottom",
+              "attrs": {
+                "circle": {
+                  "r": 4,
+                  "magnet": true,
+                  "stroke": "#C2C8D5",
+                  "strokeWidth": 1,
+                  "fill": "#fff"
+                }
+              }
+            }
+          },
+          "items": [
+            {
+              "id": "topPort",
+              "group": "top"
+            },
+            {
+              "id": "bottomPort",
+              "group": "bottom"
+            }
+          ]
+        },
+        "id": "143cfcb1-f566-4f97-b38b-9b62d40fe4cd",
+        "data": {
+          "label": "SQL",
+          "status": "default",
+          "type": "script",
+          "id": "143cfcb1-f566-4f97-b38b-9b62d40fe4cd",
+          "paramText": "test SQL Param",
+          "scriptText": "Select * from table11",
+          "outputData": [
+            {
+              "outputVar": "r3"
+            }
+          ],
+          "inputNumber": 1
+        },
+        "zIndex": 2
+      },
+      {
+        "position": {
+          "x": 230,
+          "y": 310
+        },
+        "size": {
+          "width": 180,
+          "height": 36
+        },
+        "view": "react-shape-view",
+        "shape": "dag-node",
+        "component": {
+          "key": null,
+          "ref": null,
+          "props": {},
+          "_owner": null,
+          "_store": {}
+        },
+        "ports": {
+          "groups": {
+            "top": {
+              "position": "top",
+              "attrs": {
+                "circle": {
+                  "r": 4,
+                  "magnet": true,
+                  "stroke": "#C2C8D5",
+                  "strokeWidth": 1,
+                  "fill": "#fff"
+                }
+              }
+            },
+            "bottom": {
+              "position": "bottom",
+              "attrs": {
+                "circle": {
+                  "r": 4,
+                  "magnet": true,
+                  "stroke": "#C2C8D5",
+                  "strokeWidth": 1,
+                  "fill": "#fff"
+                }
+              }
+            }
+          },
+          "items": [
+            {
+              "id": "topPort",
+              "group": "top"
+            },
+            {
+              "id": "bottomPort",
+              "group": "bottom"
+            }
+          ]
+        },
+        "id": "6b437370-0468-44da-a0bb-e637e33f3efb",
+        "data": {
+          "label": "PySpark",
+          "status": "default",
+          "type": "script",
+          "id": "6b437370-0468-44da-a0bb-e637e33f3efb",
+          "paramText": "test PySpark Param",
+          "scriptText": "import pySpark\n\nprint('hello wolrd pySpark')",
+          "outputData": [
+            {
+              "outputVar": "r2"
+            }
+          ],
+          "inputNumber": 1
+        },
+        "zIndex": 3
+      },
+      {
+        "position": {
+          "x": -10,
+          "y": 310
+        },
+        "size": {
+          "width": 180,
+          "height": 36
+        },
+        "view": "react-shape-view",
+        "shape": "dag-node",
+        "component": {
+          "key": null,
+          "ref": null,
+          "props": {},
+          "_owner": null,
+          "_store": {}
+        },
+        "ports": {
+          "groups": {
+            "top": {
+              "position": "top",
+              "attrs": {
+                "circle": {
+                  "r": 4,
+                  "magnet": true,
+                  "stroke": "#C2C8D5",
+                  "strokeWidth": 1,
+                  "fill": "#fff"
+                }
+              }
+            },
+            "bottom": {
+              "position": "bottom",
+              "attrs": {
+                "circle": {
+                  "r": 4,
+                  "magnet": true,
+                  "stroke": "#C2C8D5",
+                  "strokeWidth": 1,
+                  "fill": "#fff"
+                }
+              }
+            }
+          },
+          "items": [
+            {
+              "id": "topPort",
+              "group": "top"
+            },
+            {
+              "id": "bottomPort",
+              "group": "bottom"
+            }
+          ]
+        },
+        "id": "c79943c6-9513-4cf8-9c24-d5fa89b6cdde",
+        "data": {
+          "label": "Python",
+          "status": "default",
+          "type": "script",
+          "id": "c79943c6-9513-4cf8-9c24-d5fa89b6cdde",
+          "paramText": "test param",
+          "scriptText": "import test\nprint('hello world')",
+          "outputData": [
+            {
+              "outputVar": "r1"
+            }
+          ],
+          "inputNumber": 1,
+          "packageData": "test = 1.1\ntest1 = 2.2"
+        },
+        "zIndex": 4
+      }
+    ]
+  }
+}

+ 1 - 1
data/data.sql

@@ -148,6 +148,7 @@ CREATE TABLE `jm_homework` (
   `dag_uuid` varchar(32) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT 'DAG_ID',
   `dag_url` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT 'DAG文件地址',
   `script_file` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '脚本文件',
+  `execute_command` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '执行命令',
   `update_time` int(13) DEFAULT NULL COMMENT '更新时间',
   `create_time` int(13) NOT NULL COMMENT '创建时间',
   `user_id` varchar(50) COLLATE utf8_unicode_ci NOT NULL COMMENT '用户ID',
@@ -158,7 +159,6 @@ CREATE TABLE `jm_homework` (
 
 
 
-
 -- ----------------------------
 -- Table structure for jm_homework_datasource_relation
 -- ----------------------------

+ 1 - 0
environment.yml

@@ -35,5 +35,6 @@ dependencies:
       - pandas
       - minio==5.0.1
       - Pillow==9.1.1
+      - croniter==1.3.7
       - -i https://mirror.baidu.com/pypi/simple
 prefix: /opt/conda/envs/py38