Browse Source

任务管理模块

liweiquan 2 years ago
parent
commit
51606ebed8

+ 5 - 1
app/crud/__init__.py

@@ -5,4 +5,8 @@ from app.crud.job_log import *
 from app.crud.data_management import *
 from app.crud.constant import *
 from app.crud.jm_homework import *
-from app.crud.jm_homework_datasource_relation import *
+from app.crud.jm_homework_datasource_relation import *
+from app.crud.jm_job_info import *
+from app.crud.jm_job_node import *
+from app.crud.jm_job_history import *
+from app.crud.jm_job_log import *

+ 4 - 2
app/crud/jm_homework.py

@@ -9,7 +9,8 @@ from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, get_
 
 def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
     jm_homework_create = item.dict()
-    db_item = db.query(models.JmHomework).filter(models.JmHomework.name == jm_homework_create['name']).first()
+    db_item = db.query(models.JmHomework).filter(models.JmHomework.name == jm_homework_create['name'])\
+        .filter(models.JmHomework.status != 0).first()
     if db_item:
         raise Exception('JmHomework name already exists')
     relation_list = jm_homework_create.pop('relation_list')
@@ -47,7 +48,8 @@ def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpda
     db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
     if not db_item:
         raise Exception('JmHomework not found')
-    db_name_item = db.query(models.JmHomework).filter(models.JmHomework.name == update_item.name).first()
+    db_name_item = db.query(models.JmHomework).filter(models.JmHomework.name == update_item.name)\
+        .filter(models.JmHomework.status != 0).first()
     if db_name_item:
         raise Exception('JmHomework name already exists')
     update_dict = update_item.dict(exclude_unset=True)

+ 10 - 0
app/crud/jm_job_history.py

@@ -0,0 +1,10 @@
+import time
+from typing import List
+from app import models, schemas
+from sqlalchemy.orm import Session
+
+def get_one_job_historys(db: Session, job_id: int):
+    res: List[models.JmJobHistory] = db.query(models.JmJobHistory)\
+            .filter(models.JmJobHistory.job_id == job_id)\
+            .order_by(models.JmJobHistory.executor_time.desc()).all()
+    return res

+ 76 - 0
app/crud/jm_job_info.py

@@ -0,0 +1,76 @@
+import time
+from typing import List
+from app import models, schemas
+from sqlalchemy.orm import Session
+
+from app.crud.constant import find_and_update
+
+def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
+    jm_job_info_create = item.dict()
+    nodes = jm_job_info_create.pop('nodes', None)
+    edges = jm_job_info_create.pop('edges', None)
+    db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
+        .filter(models.JmJobInfo.delete_status != 0).first()
+    if db_item:
+        raise Exception('JmJobInfo name already exists')
+    tag = jm_job_info_create['tag']
+    find_and_update(db, '作业分类', tag)
+    jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
+        'status': 1,
+        'delete_status': 1,
+    })
+    db.add(jm_job_info)
+    db.commit()
+    db.refresh(jm_job_info)
+    return jm_job_info,nodes,edges
+
+def get_jm_job_infos(db: Session):
+    res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.delete_status != 0).all()
+    return res
+
+def get_jm_job_info(db: Session, jm_job_id: int):
+    item = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.id == jm_job_id)\
+        .filter(models.JmJobInfo.delete_status != 0).first()
+    if not item:
+        raise Exception('JmJobInfo not found')
+    return item
+
+def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
+    jm_job_info_update = item.dict(exclude_unset=True)
+    nodes = jm_job_info_update.pop('nodes', None)
+    edges = jm_job_info_update.pop('edges', None)
+    db_item = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
+    if not db_item:
+        raise Exception('JmJobInfo not found')
+    for k, v in jm_job_info_update.items():
+        setattr(db_item, k, v)
+    db.commit()
+    db.flush()
+    db.refresh(db_item)
+    return db_item,nodes,edges
+
+def delete_jm_job_info(db: Session, jm_job_id: int):
+    jm_job_info = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.id == jm_job_id).first()
+    if not jm_job_info:
+        raise Exception("JmJobInfo not found")
+    jm_job_info.delete_status = 0
+    db.commit()
+    db.flush()
+    db.refresh(jm_job_info)
+    return jm_job_info
+
+def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
+    jm_job_info = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.id == item.id)\
+        .filter(models.JmJobInfo.delete_status != 0).first()
+    if not jm_job_info:
+        raise Exception("JmJobInfo not found")
+    jm_job_info.status = item.status
+    db.commit()
+    db.flush()
+    db.refresh(jm_job_info)
+    return jm_job_info

+ 9 - 0
app/crud/jm_job_log.py

@@ -0,0 +1,9 @@
+import time
+from typing import List
+from app import models, schemas
+from sqlalchemy.orm import Session
+
+def get_jm_job_logs(db: Session,job_id_list: List[int]):
+    res: List[models.JmJobLog] = db.query(models.JmJobLog)\
+        .filter(models.JmJobLog.job_id.in_(job_id_list)).all()
+    return res

+ 31 - 0
app/crud/jm_job_node.py

@@ -0,0 +1,31 @@
+import time
+from typing import List
+from app import models, schemas
+from sqlalchemy.orm import Session
+
+
+def create_jm_job_node(db: Session, item: models.JmJobNode):
+    db.add(item)
+    db.commit()
+    db.refresh(item)
+    return item
+
+def create_jm_job_edge(db: Session, item: models.JmJobEdge):
+    db.add(item)
+    db.commit()
+    db.refresh(item)
+    return item
+
+def get_one_job_nodes(db: Session, job_id: int):
+    res: List[models.JmJobNode] = db.query(models.JmJobNode)\
+        .filter(models.JmJobNode.job_id == job_id).all()
+    return res
+
+def get_one_job_edges(db: Session, job_id: int):
+    res: List[models.JmJobEdge] = db.query(models.JmJobEdge)\
+        .filter(models.JmJobEdge.job_id == job_id).all()
+    return res
+
+def delete_job_node(db: Session, job_id: int):
+    db.query(models.JmJobNode).filter(models.JmJobNode.job_id == job_id).delete()
+    db.query(models.JmJobEdge).filter(models.JmJobEdge.job_id == job_id).delete()

+ 6 - 1
app/models/__init__.py

@@ -4,4 +4,9 @@ from app.models.job_log import *
 from app.models.data_management import *
 from app.models.constant import *
 from app.models.jm_homework import *
-from app.models.jm_homework_datasource_relation import *
+from app.models.jm_homework_datasource_relation import *
+from app.models.jm_job_info import *
+from app.models.jm_job_node import *
+from app.models.jm_job_edge import *
+from app.models.jm_job_history import *
+from app.models.jm_job_log import *

+ 15 - 0
app/models/jm_job_edge.py

@@ -0,0 +1,15 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class JmJobEdge(BaseModel):
+    __tablename__ = "jm_job_edge"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 进节点
+    in_node_id = Column(Integer)
+    # 出节点
+    out_node_id = Column(Integer)
+    # 任务id
+    job_id = Column(Integer, nullable=False)

+ 19 - 0
app/models/jm_job_history.py

@@ -0,0 +1,19 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class JmJobHistory(BaseModel):
+    __tablename__ = "jm_job_history"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 任务id
+    job_id = Column(Integer, nullable=False)
+    # 调度时间
+    trigger_time = Column(Integer)
+    # 调度结果
+    trigger_result = Column(Integer)
+    # 执行时间
+    executor_time = Column(Integer)
+    # 执行结果
+    executor_result = Column(Integer)

+ 33 - 0
app/models/jm_job_info.py

@@ -0,0 +1,33 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class JmJobInfo(BaseModel):
+    __tablename__ = "jm_job_info"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 任务名称
+    name = Column(String, nullable=False)
+    # 任务类型
+    type = Column(String, nullable=False)
+    # 任务分类
+    tag = Column(String, nullable=False)
+    # 周期类型
+    cron_type = Column(Integer, nullable=False)
+    # 周期num
+    cron_num = Column(Integer, nullable=False)
+    # 周期单位
+    cron_unit = Column(String, nullable=False)
+    # api
+    api = Column(String)
+    # 状态
+    status = Column(Integer, nullable=False)
+    # 是否被删除
+    delete_status = Column(Integer, nullable=False)
+    # 用户id
+    user_id = Column(String, nullable=False)
+    # 用户名称
+    user_name = Column(String, nullable=False)
+    # 项目id
+    project_id = Column(String, nullable=False)

+ 21 - 0
app/models/jm_job_log.py

@@ -0,0 +1,21 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class JmJobLog(BaseModel):
+    __tablename__ = "jm_job_log"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 任务id
+    job_id = Column(Integer, nullable=False)
+    # 任务历史运行id
+    job_history_id = Column(Integer)
+    # 作业id
+    homework_id = Column(Integer)
+    # 作业名称
+    homework_name = Column(String)
+    # 执行结果
+    executor_result = Column(Integer)
+    # 日志
+    job_log_uri = Column(String)

+ 17 - 0
app/models/jm_job_node.py

@@ -0,0 +1,17 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class JmJobNode(BaseModel):
+    __tablename__ = "jm_job_node"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 任务id
+    job_id = Column(Integer)
+    # 作业id
+    homework_id = Column(Integer)
+    # 作业名称
+    homework_name = Column(String)
+    # 起始点(0:起始点;1:非起始点)
+    start_point = Column(Integer)

+ 3 - 1
app/routers/code_check.py

@@ -8,6 +8,7 @@ from app import schemas
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from pylint import epylint
+import sqlfluff
 
 
 router = APIRouter(
@@ -35,5 +36,6 @@ def code_check(code: str, code_type: str):
         os.remove(file.name)
         res = json.loads(lint_stdout.getvalue())
     else:
-        res = []
+        lint_out = sqlfluff.lint(code,'ansi')
+        res = [l for l in lint_out if l['code'] == 'PRS']
     return res

+ 177 - 0
app/routers/jm_job_info.py

@@ -0,0 +1,177 @@
+from typing import Optional, List
+from fastapi import APIRouter
+from fastapi import Depends
+from sqlalchemy.orm import Session
+from app import models, schemas
+
+import app.crud as crud
+from utils.sx_time import sxtimeit
+from utils.sx_web import web_try
+from fastapi_pagination import Page, add_pagination, paginate, Params
+
+from app import get_db
+
+
+
+router = APIRouter(
+    prefix="/jpt/jm_job_info",
+    tags=["jm_job_info-定时任务管理"],
+)
+
+@router.post("/")
+@web_try()
+@sxtimeit
+def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
+    jm_job_info,nodes,edges = crud.create_jm_job_info(db, item)
+    job_id = jm_job_info.id
+    if edges is not None and len(edges) > 0:
+        nodes_dict = {node['id']:node for node in nodes}
+        for node_id in nodes_dict:
+            node = nodes_dict[node_id]
+            if node['start_point'] == 0:
+                next_nodes = format_nodes(nodes_dict,edges,node_id)
+                node['child_nodes'] = next_nodes
+                node_tree = node
+                break
+        node = models.JmJobNode(**{
+            'job_id': job_id,
+            'homework_id': node_tree['homework_id'],
+            'homework_name': node_tree['homework_name'],
+            'start_point': node['start_point'],
+        })
+        node = crud.create_jm_job_node(db,node)
+        if edges is not None and len(edges) > 0:
+            create_jm_job_node(db,node_tree['child_nodes'], job_id, node.id)
+    else:
+        node = models.JmJobNode(**{
+            'job_id': job_id,
+            'homework_id': nodes[0]['homework_id'],
+            'homework_name': nodes[0]['homework_name'],
+            'start_point': nodes[0]['start_point'],
+        })
+        node = crud.create_jm_job_node(db,node)
+    return jm_job_info.to_dict()
+
+
+def create_jm_job_node(db: Session, nodes, job_id, last_node_id):
+    if nodes is None or len(nodes) == 0:
+        return
+    for node in nodes:
+        next_node = models.JmJobNode(**{
+            'job_id': job_id,
+            'homework_id': node['homework_id'],
+            'homework_name': node['homework_name'],
+            'start_point': node['start_point'],
+        })
+        next_node = crud.create_jm_job_node(db,next_node)
+        out_node_id = next_node.id
+        edge = models.JmJobEdge(**{
+            'in_node_id': last_node_id,
+            'out_node_id': out_node_id,
+            'job_id': job_id,
+        })
+        edge = crud.create_jm_job_edge(db,edge)
+        create_jm_job_node(db, node['child_nodes'], job_id, out_node_id)
+    return
+
+
+def format_nodes(nodes_dict,edges: List[schemas.JmJobEdge],last_node: int):
+    nodes = []
+    for edge in edges:
+        if edge['source'] == last_node:
+            node = nodes_dict[edge['target']]
+            next_nodes = format_nodes(nodes_dict,edges,edge['target'])
+            node['child_nodes'] = next_nodes
+            nodes.append(node)
+    return nodes
+
+@router.get("/")
+@web_try()
+@sxtimeit
+def get_jm_job_infos(db: Session = Depends(get_db)):
+    res_list = []
+    jm_job_list = crud.get_jm_job_infos(db)
+    for jm_job in jm_job_list:
+        history = crud.get_one_job_historys(db, jm_job.id)
+        jm_job_dict = jm_job.to_dict()
+        jm_job_dict.update({'history':history[0:10]})
+        res_list.append(jm_job_dict)
+    return res_list
+
+@router.get("/info")
+@web_try()
+@sxtimeit
+def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
+    jm_job = crud.get_jm_job_info(db,jm_job_id)
+    jm_job_dict = jm_job.to_dict()
+    nodes = crud.get_one_job_nodes(db,jm_job_id)
+    edges = crud.get_one_job_edges(db,jm_job_id)
+    edges_list = [
+        {
+            'id': edge.id,
+            'job_id': edge.job_id,
+            'source': edge.in_node_id,
+            'target': edge.out_node_id,
+        }
+        for edge in edges
+    ]
+    jm_job_dict.update({
+        'nodes': nodes,
+        'edges': edges_list,
+    })
+    return jm_job_dict
+
+@router.put("/")
+@web_try()
+@sxtimeit
+def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
+    jm_job_info,nodes,edges = crud.update_jm_job_info(db, item)
+    job_id = jm_job_info.id
+    crud.delete_job_node(db, job_id)
+    if edges is not None and len(edges) > 0:
+        nodes_dict = {node['id']:node for node in nodes}
+        for node_id in nodes_dict:
+            node = nodes_dict[node_id]
+            if node['start_point'] == 0:
+                next_nodes = format_nodes(nodes_dict,edges,node_id)
+                node['child_nodes'] = next_nodes
+                node_tree = node
+                break
+        node = models.JmJobNode(**{
+            'job_id': job_id,
+            'homework_id': node_tree['homework_id'],
+            'homework_name': node_tree['homework_name'],
+            'start_point': node['start_point'],
+        })
+        node = crud.create_jm_job_node(db,node)
+        if edges is not None and len(edges) > 0:
+            create_jm_job_node(db,node_tree['child_nodes'], job_id, node.id)
+    else:
+        node = models.JmJobNode(**{
+            'job_id': job_id,
+            'homework_id': nodes[0]['homework_id'],
+            'homework_name': nodes[0]['homework_name'],
+            'start_point': nodes[0]['start_point'],
+        })
+        node = crud.create_jm_job_node(db,node)
+    return jm_job_info.to_dict()
+
+@router.delete("/")
+@web_try()
+@sxtimeit
+def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
+    return crud.delete_jm_job_info(db,jm_job_id)
+
+@router.put("/status")
+@web_try()
+@sxtimeit
+def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
+    return crud.update_jm_job_status(db,item)
+
+@router.post("/execute/{jm_job_id}")
+@web_try()
+@sxtimeit
+def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
+    jm_job = crud.get_jm_job_info(db,jm_job_id)
+    # 进行api调用
+    return jm_job

+ 29 - 0
app/routers/jm_job_log.py

@@ -0,0 +1,29 @@
+from typing import Optional
+from fastapi import APIRouter
+
+from fastapi import Depends
+from sqlalchemy.orm import Session
+from app import schemas
+
+import app.crud as crud
+from utils.sx_time import sxtimeit
+from utils.sx_web import web_try
+from fastapi_pagination import Page, add_pagination, paginate, Params
+
+from app import get_db
+
+
+
+router = APIRouter(
+    prefix="/jpt/jm_job_log",
+    tags=["jm_job_log-定时任务日志管理"],
+)
+
+
+@router.get("/")
+@web_try()
+@sxtimeit
+def get_job_logs(db: Session = Depends(get_db)):
+    jm_log_infos = crud.get_jm_job_infos(db)
+    jm_job_id_list = [jm_job.id for jm_job in jm_log_infos]
+    return crud.get_jm_job_logs(db,jm_job_id_list)

+ 5 - 1
app/schemas/__init__.py

@@ -1,7 +1,11 @@
+from re import I
 from app.schemas.job_jdbc_datasouce import *
 from app.schemas.job_info import *
 from app.schemas.datax_json import *
 from app.schemas.job_log import *
 from app.schemas.data_management import *
 from app.schemas.constant import *
-from app.schemas.jm_homework import *
+from app.schemas.jm_homework import *
+from app.schemas.jm_homework_datasource_relation import *
+from app.schemas.jm_job_info import *
+from app.schemas.jm_job_node import *

+ 115 - 0
app/schemas/jm_job_info.py

@@ -0,0 +1,115 @@
+from typing import List, Optional
+
+from pydantic import BaseModel
+
+from app.schemas.jm_job_node import JmJobEdge, JmJobNode
+
+class JmJobInfoBase(BaseModel):
+    # 任务名称
+    name: str
+    # 任务类型
+    type: str
+    # 任务分类
+    tag: str
+    # 周期类型(1:单次;2:循环)
+    cron_type: int
+    # 周期num
+    cron_num: int
+    # 周期单位
+    cron_unit: str
+    # 用户id
+    user_id: str
+    # 用户名称
+    user_name: str
+    # 项目id
+    project_id: str
+
+
+class JmJobInfoCreate(JmJobInfoBase):
+    nodes: List[JmJobNode]
+    edges: Optional[List[JmJobEdge]] = None
+    class Config:
+        schema_extra = {
+            "example": {
+                "name": "example",
+                "type": "单作业离线任务",
+                "tag": "业务预测",
+                "cron_type": "2",
+                "cron_num": "1",
+                "cron_unit": "时",
+                "user_id": "test",
+                "user_name": "test",
+                "project_id": "test",
+                "nodes": [
+                    {
+                        "id": 1,
+                        "homework_id": 0,
+                        "homework_name": "开始",
+                        "start_point": 0,
+                    },
+                    {
+                        "id": 2,
+                        "homework_id": 1,
+                        "homework_name": "test",
+                        "start_point": 1,
+                    }
+                ],
+                "edges": [
+                    {
+                        "source": 1,
+                        "target": 2,
+                    }
+                ],
+            }
+        }
+
+class JmJobInfoUpdate(JmJobInfoBase):
+    id: int
+    nodes: List[JmJobNode]
+    edges: Optional[List[JmJobEdge]] = None
+    class Config:
+        schema_extra = {
+            "example": {
+                "id": 11,
+                "name": "example",
+                "type": "单作业离线任务",
+                "tag": "业务预测",
+                "cron_type": "2",
+                "cron_num": "1",
+                "cron_unit": "时",
+                "user_id": "test",
+                "user_name": "test",
+                "project_id": "test",
+                "nodes": [
+                    {
+                        "id": 1,
+                        "homework_id": 0,
+                        "homework_name": "开始",
+                        "start_point": 0,
+                    },
+                    {
+                        "id": 2,
+                        "homework_id": 1,
+                        "homework_name": "test",
+                        "start_point": 1,
+                    }
+                ],
+                "edges": [
+                    {
+                        "source": 1,
+                        "target": 2,
+                    }
+                ],
+            }
+        }
+
+class JmJobInfoStatusUpdate(BaseModel):
+    id: int
+    status: int
+    class Config:
+        schema_extra = {
+            "example": {
+                "id": 1,
+                "status": 0,
+            }
+        }

+ 21 - 0
app/schemas/jm_job_node.py

@@ -0,0 +1,21 @@
+from typing import List, Optional
+
+from pydantic import BaseModel
+
+class JmJobNode(BaseModel):
+    id: int
+    # 作业id
+    homework_id: Optional[int]
+    # 作业名称
+    homework_name: str
+    # 起始点
+    start_point: int
+
+class JmJobEdge(BaseModel):
+    # 入节点
+    source: int
+    # 出节点
+    target: int
+
+
+

+ 75 - 0
data/data.sql

@@ -171,3 +171,78 @@ CREATE TABLE `jm_homework_datasource_relation` (
   `homework_id` bigint(20) NOT NULL COMMENT '作业ID',
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8_unicode_ci COMMENT='作业与数据源关系表';
+
+
+-- ----------------------------
+-- Table structure for jm_job_info
+-- ----------------------------
+DROP TABLE IF EXISTS `jm_job_info`;
+CREATE TABLE `jm_job_info` (
+  `id` int(11) NOT NULL AUTO_INCREMENT,
+  `name` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '任务名称',
+  `type` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '任务类型',
+  `tag` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '任务分类',
+  `cron_type` tinyint(4) DEFAULT NULL COMMENT '周期类型',
+  `cron_num` int(4) DEFAULT NULL COMMENT '周期num',
+  `cron_unit` varchar(5) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '周期单位',
+  `status` int(11) NOT NULL COMMENT '状态',
+  `user_id` int(11) NOT NULL COMMENT '用户ID',
+  `user_name` varchar(255) COLLATE utf8_unicode_ci NOT NULL COMMENT '用户名称',
+  `project_id` int(11) NOT NULL COMMENT '项目ID',
+  PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8_unicode_ci COMMENT='定时任务详情';
+
+-- ----------------------------
+-- Table structure for jm_job_node
+-- ----------------------------
+DROP TABLE IF EXISTS `jm_job_node`;
+CREATE TABLE `jm_job_node` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `job_id` bigint(20) NOT NULL COMMENT '任务ID',
+  `homework_id` bigint(20) DEFAULT NULL COMMENT '作业ID',
+  `homework_name` varchar(50) COLLATE utf8_unicode_ci NOT NULL COMMENT '作业名称',
+  `start_point` tinyint(4) NOT NULL COMMENT '起始点(0:起始点;1:非起始点)',
+  PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8_unicode_ci COMMENT='Job节点表';
+
+-- ----------------------------
+-- Table structure for jm_job_edge
+-- ----------------------------
+DROP TABLE IF EXISTS `jm_job_edge`;
+CREATE TABLE `jm_job_edge` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `in_node_id` bigint(20) DEFAULT NULL,
+  `out_node_id` bigint(20) DEFAULT NULL,
+  `job_id` bigint(20) NOT NULL,
+  PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8_unicode_ci COMMENT='Job节点关系表';
+
+-- ----------------------------
+-- Table structure for jm_job_history
+-- ----------------------------
+DROP TABLE IF EXISTS `jm_job_history`;
+CREATE TABLE `jm_job_history` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `job_id` bigint(20) NOT NULL COMMENT '任务id',
+  `trigger_time` int(13) DEFAULT NULL COMMENT '调度时间',
+  `trigger_result` tinyint(4) DEFAULT NULL COMMENT '调度结果',
+  `executor_time` int(13) DEFAULT NULL COMMENT '执行时间',
+  `executor_result` tinyint(4) DEFAULT NULL COMMENT '执行结果',
+  PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8_unicode_ci COMMENT='定时任务历史运行情况';
+
+-- ----------------------------
+-- Table structure for jm_job_log
+-- ----------------------------
+DROP TABLE IF EXISTS `jm_job_log`;
+CREATE TABLE `jm_job_log` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `job_id` bigint(20) NOT NULL COMMENT '任务id',
+  `job_history_id` bigint(20) DEFAULT NULL COMMENT '任务历史运行id',
+  `homework_id` bigint(20) DEFAULT NULL COMMENT '作业id',
+  `homework_name` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '作业名称',
+  `executor_result` tinyint(4) DEFAULT NULL COMMENT '执行结果',
+  `job_log_uri` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '日志',
+  PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8_unicode_ci COMMENT='定时任务日志';
+

+ 1 - 0
environment.yml

@@ -23,6 +23,7 @@ dependencies:
       - fastapi-pagination==0.9.3
       - pymysql==1.0.2
       - pylint==2.5.3
+      - sqlfluff==1.3.1
       - PyHive==0.6.5
       - pure-sasl==0.6.2
       # - sasl==0.3.1

+ 2 - 1
requirements.txt

@@ -5,4 +5,5 @@ pure-sasl==0.6.2
 sasl==0.3.1
 thrift==0.16.0
 thrift-sasl==0.4.3
-pylint==2.5.3
+pylint==2.5.3
+sqlfluff==1.3.1

+ 4 - 0
server.py

@@ -15,6 +15,8 @@ import app.routers.intermediate as router_intermediate
 import app.routers.jm_homework as router_jm_homework
 import app.routers.dag as router_dag
 import app.routers.code_check as router_code_check
+import app.routers.jm_job_info as router_jm_job_info
+import app.routers.jm_job_log as router_jm_job_log
 
 Base.metadata.create_all(bind=engine)
 app = FastAPI( docs_url='/jpt/docs', redoc_url='/jpt/redoc', title="DAG管理系统")
@@ -40,6 +42,8 @@ app.include_router(router_files.router)
 app.include_router(router_intermediate.router)
 app.include_router(router_jm_homework.router)
 app.include_router(router_dag.router)
+app.include_router(router_jm_job_info.router)
+app.include_router(router_jm_job_log.router)
 app.include_router(router_code_check.router)