Browse Source

add job info

Zhang Li 2 years ago
parent
commit
d8955734e2

+ 2 - 1
app/crud/__init__.py

@@ -1 +1,2 @@
-from app.crud.job_jdbc_datasource import *
+from app.crud.job_jdbc_datasource import *
+from app.crud.job_info import *

+ 43 - 0
app/crud/job_info.py

@@ -0,0 +1,43 @@
+import time
+from typing import List
+from app import models, schemas
+from sqlalchemy.orm import Session
+from app.models.database import SessionLocal
+
+
+def create_job_info(db: SessionLocal, item: schemas.JobInfoCreate):
+    create_time: int = int(time.time())
+    db_item = models.JobInfo(**item.dict(), **{
+        'trigger_status': 0,
+        'create_time': create_time,
+        'update_time': create_time,
+        'user_id': 1, # TODO 用户
+        'trigger_status': 0,
+    })
+    db.add(db_item)
+    db.commit()
+    db.refresh(db_item)
+    return db_item
+
+
+
+def get_job_infos(db: Session, skip: int = 0, limit: int = 20):
+    res: List[models.JobInfo] = db.query(models.JobInfo).all()  # TODO: 排序
+    return res
+
+
+
+def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
+    db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
+    if not db_item:
+        raise Exception('JobInfo not found')
+    update_dict = update_item.dict(exclude_unset=True)
+    for k, v in update_dict.items():
+        setattr(db_item, k, v)
+    db_item.update_time = int(time.time())
+    db.commit()
+    db.flush()
+    db.refresh(db_item)
+    return db_item
+
+

+ 2 - 1
app/models/__init__.py

@@ -1 +1,2 @@
-from app.models.job_jdbc_datasource import *
+from app.models.job_jdbc_datasource import *
+from app.models.job_info import *

+ 43 - 0
app/models/job_info.py

@@ -0,0 +1,43 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class JobInfo(BaseModel):
+    __tablename__ = "job_info"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 任务执行CRON
+    job_cron = Column(String, nullable=False, index=True)
+    # 任务描述
+    job_desc = Column(String, nullable=False)
+    # 创建时间
+    create_time = Column(Integer)
+    # 更新时间
+    update_time = Column(Integer)
+    # 创建人
+    user_id = Column(Integer)
+    # 执行器路由策略
+    executor_route_strategy = Column(String)
+    # 执行器任务handler
+    executor_handler = Column(String)
+    # 执行器任务参数
+    executor_param = Column(String)
+    # 阻塞处理策略
+    executor_block_strategy = Column(String)
+    # 任务超时时间, 单位分钟
+    executor_timeout = Column(Integer, nullable=False)
+    # 失败重试次数
+    executor_fail_retry_count = Column(Integer, nullable=False)
+    # 调度状态: 0-停止 1-运行
+    trigger_status = Column(Integer, nullable=False)
+    # 上次调度时间
+    trigger_last_time = Column(Integer)
+    # 下次调度时间
+    trigger_next_time = Column(Integer)
+    # datax运行脚本
+    job_json = Column(String)
+    # 增量初始时间
+    inc_start_time = Column(Integer)
+    # 最近一次执行状态
+    last_handle_code = Column(Integer)

+ 20 - 0
app/routers/constants.py

@@ -0,0 +1,20 @@
+
+
+from fastapi import APIRouter
+from constants.constants import CONSTANTS
+from utils import *
+
+router = APIRouter(
+    prefix="/jpt/constants",
+    tags=["constants-常量管理"],
+)
+
+
+def format_constants(constants: dict):
+    return [{'id': k, 'value': v} for k, v in constants.items()]
+
+@router.post("/datasources")
+@web_try()
+@sxtimeit
+def get_datasources():
+    return format_constants(CONSTANTS['DATASOURCES'])

+ 42 - 0
app/routers/job_info.py

@@ -0,0 +1,42 @@
+from typing import Optional
+from fastapi import APIRouter
+
+from fastapi import Depends
+from sqlalchemy.orm import Session
+from app import schemas
+
+import app.crud as crud
+from utils.sx_time import sxtimeit
+from utils.sx_web import web_try
+from fastapi_pagination import Page, add_pagination, paginate, Params
+
+from app import get_db
+
+
+
+router = APIRouter(
+    prefix="/jpt/jobinfo",
+    tags=["jobinfo-任务管理"],
+)
+
+
+@router.post("/")
+@web_try()
+@sxtimeit
+def create_job_info(ds: schemas.JobInfoCreate, db: Session = Depends(get_db)):
+    return crud.create_job_info(db, ds)
+
+
+
+@router.get("/")
+@web_try()
+@sxtimeit
+def get_job_infos(params: Params=Depends(), db: Session = Depends(get_db)):
+    return paginate(crud.get_job_infos(db), params)
+
+
+@router.put("/{id}")
+@web_try()
+@sxtimeit
+def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
+    return crud.update_job_info(db, id, update_item)

+ 2 - 1
app/schemas/__init__.py

@@ -1 +1,2 @@
-from app.schemas.job_jdbc_datasouce import *
+from app.schemas.job_jdbc_datasouce import *
+from app.schemas.job_info import *

+ 88 - 0
app/schemas/job_info.py

@@ -0,0 +1,88 @@
+from typing import List, Optional
+
+from pydantic import BaseModel
+
+class JobInfoBase(BaseModel):
+    # 任务执行CRON
+    job_cron: str
+    # 任务描述
+    job_desc: str
+    # 执行器路由策略
+    executor_route_strategy: str
+    # 执行器任务handler
+    executor_handler: Optional[str]
+    # 执行器任务参数
+    executor_param: Optional[str]
+    # 阻塞处理策略
+    executor_block_strategy: str
+    # 任务超时时间, 单位分钟
+    executor_timeout: int
+    # 失败重试次数
+    executor_fail_retry_count: int
+     # 增量初始时间
+    inc_start_time: Optional[int]
+    # datax运行脚本
+    job_json: str
+
+
+
+class JobInfoCreate(JobInfoBase):
+    class Config:
+        schema_extra = {
+            "example": {
+                "job_cron": "0 0/2 * * * ?",
+                "job_desc": "mysql-mysql同步",
+                "executor_route_strategy": "FIRST",
+                "executor_handler": "",
+                "executor_param": "",
+                "executor_block_strategy": "SERIAL_EXECUTION",
+                "executor_timeout": 60,
+                "executor_fail_retry_count": 2,
+                "inc_start_time": 0,
+                "job_json": ""
+            }
+        }
+
+class JobInfoUpdate(JobInfoBase):
+    # 调度状态: 0-停止 1-运行
+    trigger_status: int
+    class Config:
+        schema_extra = {
+            "example": {
+                "job_cron": "0 0/2 * * * ?",
+                "job_desc": "mysql-mysql同步",
+                "executor_route_strategy": "FIRST",
+                "executor_handler": "",
+                "executor_param": "",
+                "executor_block_strategy": "SERIAL_EXECUTION",
+                "executor_timeout": 60,
+                "executor_fail_retry_count": 2,
+                "inc_start_time": 0,
+                "job_json": "",
+                "trigger_status": 1,
+            }
+        }
+
+
+
+class JobInfo(JobInfoBase):
+    id: int
+    # 创建时间
+    create_time: int
+    # 更新时间
+    update_time: int
+    # 创建人
+    user_id: int
+    # 调度状态: 0-停止 1-运行
+    trigger_status: int
+    # 上次调度时间
+    trigger_last_time: int
+    # 下次调度时间
+    trigger_next_time: int
+    # datax运行脚本
+    job_json: str
+    # 最近一次执行状态
+    last_handle_code: int
+
+    class Config:
+        orm_mode = True

+ 0 - 0
constants/__init__.py


+ 9 - 0
constants/constants.py

@@ -0,0 +1,9 @@
+DATASOURCES = {
+    0: 'mysql',
+    1: 'hive',
+}
+
+
+CONSTANTS = {
+    'DATASOURCES': DATASOURCES
+}

+ 7 - 8
server.py

@@ -2,19 +2,15 @@
 from fastapi import  FastAPI
 from fastapi.middleware.cors import CORSMiddleware
 
-
 from app.models.database import  engine, Base
 
 import app.routers.job_jdbc_datasource as router_jjds
+import app.routers.constants as router_constants
+import app.routers.job_info as router_job_info
 
 Base.metadata.create_all(bind=engine)
-
 app = FastAPI( docs_url='/jpt/docs', redoc_url='/jpt/redoc', title="DAG管理系统")
 
-
-app.include_router(router_jjds.router)
-
-
 # CORS 跨源资源共享
 app.add_middleware(
     CORSMiddleware,
@@ -26,6 +22,11 @@ app.add_middleware(
 
 
 
+app.include_router(router_jjds.router)
+app.include_router(router_constants.router)
+app.include_router(router_job_info.router)
+
+
 # Get 健康检查
 @app.get("/ping", description="健康检查")
 def ping():
@@ -33,8 +34,6 @@ def ping():
 
 
 
-
-
 if __name__ == '__main__':
     import uvicorn
     import argparse