Browse Source

datax 模块对接完成

liweiquan 2 years ago
parent
commit
b49ae01591

+ 1 - 0
app/crud/__init__.py

@@ -13,3 +13,4 @@ from app.crud.jm_job_log import *
 from app.crud.af_task import *
 from app.crud.af_task import *
 from app.crud.af_job import *
 from app.crud.af_job import *
 from app.crud.af_run import *
 from app.crud.af_run import *
+from app.crud.relation import *

+ 49 - 2
app/crud/job_info.py

@@ -16,12 +16,21 @@ def create_job_info(db: Session, item: schemas.JobInfoCreate):
         'cron_select_type': cron_select_type,
         'cron_select_type': cron_select_type,
         'job_cron': cron_expression,
         'job_cron': cron_expression,
     })
     })
+    partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() else None
+    partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() else None
+    partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() else None
+    partition_info_str = ''
+    if partition_info is not None and partition_time is not None and partition_num is not None:
+        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
+    elif (partition_info is not None or partition_info != '') and (partition_time is None or partition_num is None):
+        raise Exception('分区信息不完善')
+    item_dict.update({
+        'partition_info': partition_info_str,
+    })
     db_item = models.JobInfo(**item_dict, **{
     db_item = models.JobInfo(**item_dict, **{
         'trigger_status': 0,
         'trigger_status': 0,
         'create_time': create_time,
         'create_time': create_time,
         'update_time': create_time,
         'update_time': create_time,
-        'user_id': 1, # TODO 用户
-        'trigger_status': 0,
         'delete_status': 1,
         'delete_status': 1,
     })
     })
     db.add(db_item)
     db.add(db_item)
@@ -42,6 +51,24 @@ def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
     if not db_item:
     if not db_item:
         raise Exception('未找到该任务')
         raise Exception('未找到该任务')
     update_dict = update_item.dict(exclude_unset=True)
     update_dict = update_item.dict(exclude_unset=True)
+    cron_expression_dict = update_dict.pop('cron_expression')
+    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
+    cron_select_type = cron_expression_dict["cron_select_type"]
+    update_dict.update({
+        'cron_select_type': cron_select_type,
+        'job_cron': cron_expression,
+    })
+    partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() else None
+    partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys() else None
+    partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys() else None
+    partition_info_str = ''
+    if partition_info is not None and partition_time is not None and partition_num is not None:
+        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
+    elif (partition_info is not None or partition_info != '') and (partition_time is None or partition_num is None):
+        raise Exception('分区信息不完善')
+    update_dict.update({
+        'partition_info': partition_info_str,
+    })
     for k, v in update_dict.items():
     for k, v in update_dict.items():
         setattr(db_item, k, v)
         setattr(db_item, k, v)
     db_item.update_time = int(time.time())
     db_item.update_time = int(time.time())
@@ -50,10 +77,30 @@ def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
     db.refresh(db_item)
     db.refresh(db_item)
     return db_item
     return db_item
 
 
+def update_job_trigger_status(db: Session, id: int, trigger_status: int):
+    db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
+    if not db_item:
+        raise Exception('未找到该任务')
+    db_item.trigger_status = trigger_status
+    db_item.update_time = int(time.time())
+    db.commit()
+    db.flush()
+    db.refresh(db_item)
+    return db_item
+
+
+def get_job_info(db: Session, id: int):
+    db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
+    if not db_item:
+        raise Exception('未找到该任务')
+    return db_item
+
 def delete_job_info(db: Session, job_id: int):
 def delete_job_info(db: Session, job_id: int):
     job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first()
     job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first()
     if not job_item:
     if not job_item:
         raise Exception('未找到该任务')
         raise Exception('未找到该任务')
+    if job_item.trigger_status == 1:
+        raise Exception('该任务未停用,不能删除')
     job_item.delete_status = 0
     job_item.delete_status = 0
     db.commit()
     db.commit()
     db.flush()
     db.flush()

+ 25 - 0
app/crud/relation.py

@@ -0,0 +1,25 @@
+import time
+from typing import List
+from app import models, schemas
+from sqlalchemy.orm import Session
+
+def create_relation(db: Session, se_id: int, type: str, af_id: int):
+    db_item = models.Relation(**{"se_id": se_id,
+                                 "type": type,
+                                 "af_id": af_id})
+    db.add(db_item)
+    db.commit()
+    db.refresh(db_item)
+    return db_item
+
+def get_af_id(db: Session, se_id: int, type: str):
+    res: models.Relation = db.query(models.Relation)\
+        .filter(models.Relation.se_id == se_id)\
+        .filter(models.Relation.type == type).first()
+    return res
+
+def delete_relation(db: Session, se_id: int, type: str):
+    res: models.Relation = db.query(models.Relation)\
+        .filter(models.Relation.se_id == se_id)\
+        .filter(models.Relation.type == type).delete()
+    return res

+ 2 - 1
app/models/__init__.py

@@ -12,4 +12,5 @@ from app.models.jm_job_history import *
 from app.models.jm_job_log import *
 from app.models.jm_job_log import *
 from app.models.af_task import *
 from app.models.af_task import *
 from app.models.af_job import *
 from app.models.af_job import *
-from app.models.af_run import *
+from app.models.af_run import *
+from app.models.relation import *

+ 1 - 1
app/models/job_info.py

@@ -18,7 +18,7 @@ class JobInfo(BaseModel):
     # 更新时间
     # 更新时间
     update_time = Column(Integer)
     update_time = Column(Integer)
     # 创建人
     # 创建人
-    user_id = Column(Integer)
+    user_id = Column(String, nullable=False)
     # 执行器路由策略
     # 执行器路由策略
     executor_route_strategy = Column(String)
     executor_route_strategy = Column(String)
     # 执行器任务handler
     # 执行器任务handler

+ 15 - 0
app/models/relation.py

@@ -0,0 +1,15 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class Relation(BaseModel):
+    __tablename__ = "relation"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 服务端id
+    se_id = Column(Integer, nullable=False)
+    # (datax,job,task)
+    type = Column(Integer, nullable=False)
+    # airflow端id
+    af_id = Column(Integer, nullable=False)

+ 57 - 2
app/routers/job_info.py

@@ -1,15 +1,23 @@
+from pyexpat import model
+import time
 from typing import Optional
 from typing import Optional
 from fastapi import APIRouter
 from fastapi import APIRouter
 
 
 from fastapi import Depends
 from fastapi import Depends
 from sqlalchemy.orm import Session
 from sqlalchemy.orm import Session
-from app import schemas
+from app import models, schemas
 
 
 import app.crud as crud
 import app.crud as crud
+from app.crud import job_info
+from app.services.datax import datax_create_job, datax_update_job
+from app.utils.cron_utils import parsing_cron_expression
+from app.utils.send_util import *
+from app.utils.utils import *
 from utils.sx_time import sxtimeit
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
 from fastapi_pagination import Page, add_pagination, paginate, Params
 
 
+
 from app import get_db
 from app import get_db
 
 
 
 
@@ -34,12 +42,59 @@ def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
 def get_job_infos(params: Params=Depends(), db: Session = Depends(get_db)):
 def get_job_infos(params: Params=Depends(), db: Session = Depends(get_db)):
     return paginate(crud.get_job_infos(db), params)
     return paginate(crud.get_job_infos(db), params)
 
 
+@router.get("/info")
+@web_try()
+@sxtimeit
+def get_job_info(job_id: int, db: Session = Depends(get_db)):
+    job_info = crud.get_job_info(db, job_id)
+    job_info_dict = job_info.to_dict()
+    cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron']
+    cron_expression_dict = parsing_cron_expression(cron_expression)
+    cron_expression_dict.update({
+        'cron_select_type': cron_select_type,
+        'cron_expression': cron_expression
+    })
+    job_info_dict.update({
+        'cron_expression_dict': cron_expression_dict
+    })
+    partition_info_str = job_info_dict.pop('partition_info', None)
+    if partition_info_str:
+        partition_list = partition_info_str.split(',')
+        job_info_dict.update({
+            'partition_info': partition_list[0],
+            'partition_time': partition_list[2],
+            'partition_num': int(partition_list[1])
+
+        })
+    return job_info_dict
+
+
 
 
 @router.put("/{id}")
 @router.put("/{id}")
 @web_try()
 @web_try()
 @sxtimeit
 @sxtimeit
 def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
 def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
-    return crud.update_job_info(db, id, update_item)
+    job_info = crud.update_job_info(db, id, update_item)
+    relation = crud.get_af_id(db, job_info.id, 'datax')
+    if not relation:
+        datax_create_job(job_info,db)
+    else:
+        datax_update_job(job_info,db)
+    return job_info
+
+@router.put("/update_trigger_status/")
+@web_try()
+@sxtimeit
+def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)):
+    job_info = crud.get_job_info(db, item.id)
+    relation = crud.get_af_id(db, job_info.id, 'datax')
+    job_info.trigger_status = item.trigger_status
+    if not relation:
+        datax_create_job(job_info,db)
+    else:
+        datax_update_job(job_info,db)
+    job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
+    return job_info
 
 
 @router.delete("/{job_id}")
 @router.delete("/{job_id}")
 @web_try()
 @web_try()

+ 40 - 9
app/schemas/job_info.py

@@ -25,10 +25,16 @@ class JobInfoBase(BaseModel):
     job_json: str
     job_json: str
      # 增量时间
      # 增量时间
     replace_param: Optional[str]
     replace_param: Optional[str]
-    # 分区信息
+    # 分区字段
     partition_info: Optional[str]
     partition_info: Optional[str]
+    # 分区时间格式
+    partition_time: Optional[str]
+    # 分区时间倒退天数
+    partition_num: Optional[int]
     # jvm参数
     # jvm参数
     jvm_param: Optional[str]
     jvm_param: Optional[str]
+    # 创建人
+    user_id: str
 
 
 
 
 
 
@@ -57,20 +63,31 @@ class JobInfoCreate(JobInfoBase):
                 "inc_start_time": 0,
                 "inc_start_time": 0,
                 "job_json": "",
                 "job_json": "",
                 "replace_param": "-DlastTime='%s' -DcurrentTime='%s'",
                 "replace_param": "-DlastTime='%s' -DcurrentTime='%s'",
-                "partition_info": "txn_date,0,yyyy-MM-dd",
+                "partition_info": "txn_date",
+                "partition_time": "yyyy-MM-dd",
+                "partition_num": 0,
                 "jvm_param": "",
                 "jvm_param": "",
+                "user_id": "test",
             }
             }
         }
         }
 
 
 class JobInfoUpdate(JobInfoBase):
 class JobInfoUpdate(JobInfoBase):
-    # 运行周期
-    job_cron: str
+    # 周期表达式
+    cron_expression: CronExpression
     # 调度状态: 0-停止 1-运行
     # 调度状态: 0-停止 1-运行
     trigger_status: int
     trigger_status: int
     class Config:
     class Config:
         schema_extra = {
         schema_extra = {
             "example": {
             "example": {
-                "job_cron": "0 0 1 1/2 ?",
+                "cron_expression": {
+                    "cron_select_type": 3,
+                    "cron_expression": "",
+                    "minute": 0,
+                    "hour": 0,
+                    "day": 1,
+                    "week": 3,
+                    "month": 2,
+                },
                 "job_desc": "mysql-mysql同步",
                 "job_desc": "mysql-mysql同步",
                 "executor_route_strategy": "FIRST",
                 "executor_route_strategy": "FIRST",
                 "executor_handler": "",
                 "executor_handler": "",
@@ -82,8 +99,11 @@ class JobInfoUpdate(JobInfoBase):
                 "job_json": "",
                 "job_json": "",
                 "trigger_status": 1,
                 "trigger_status": 1,
                 "replace_param": "-DlastTime='%s' -DcurrentTime='%s'",
                 "replace_param": "-DlastTime='%s' -DcurrentTime='%s'",
-                "partition_info": "txn_date,0,yyyy-MM-dd",
+                "partition_info": "txn_date",
+                "partition_time": "yyyy-MM-dd",
+                "partition_num": 0,
                 "jvm_param": "",
                 "jvm_param": "",
+                "user_id": "test",
             }
             }
         }
         }
 
 
@@ -95,8 +115,6 @@ class JobInfo(JobInfoBase):
     create_time: int
     create_time: int
     # 更新时间
     # 更新时间
     update_time: int
     update_time: int
-    # 创建人
-    user_id: int
     # 调度状态: 0-停止 1-运行
     # 调度状态: 0-停止 1-运行
     trigger_status: int
     trigger_status: int
     # 上次调度时间
     # 上次调度时间
@@ -111,4 +129,17 @@ class JobInfo(JobInfoBase):
     job_cron: str
     job_cron: str
 
 
     class Config:
     class Config:
-        orm_mode = True
+        orm_mode = True
+
+class JobInfoTriggerStatus(BaseModel):
+    # 任务id
+    id: int
+    # 调度状态: 0-停止 1-运行
+    trigger_status: int
+    class Config:
+        schema_extra = {
+            "example": {
+                "id": 1,
+                "trigger_status": 0,
+            }
+        }

+ 0 - 0
app/services/__init__.py


+ 84 - 0
app/services/datax.py

@@ -0,0 +1,84 @@
+
+
+from app import crud, models
+from app.utils.send_util import *
+from app.utils.utils import get_cmd_parameter
+from sqlalchemy.orm import Session
+
+def datax_create_job(job_info: models.JobInfo, db: Session):
+    af_task = datax_create_task(job_info)
+    af_job = {
+        "tasks": [af_task],
+        "name": job_info.job_desc,
+        "dependence": [],
+        "cron": job_info.job_cron,
+        "desc": job_info.job_desc,
+        "route_strategy": job_info.executor_route_strategy,
+        "block_strategy": job_info.executor_block_strategy,
+        "executor_timeout": job_info.executor_timeout,
+        "executor_fail_retry_count": job_info.executor_fail_retry_count,
+        "trigger_status": job_info.trigger_status,
+        "job_type": 0,
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_job', af_job)
+    af_job = res['data']
+    crud.create_relation(db, job_info.id,'datax', af_job['id'])
+    send_submit(af_job['id'])
+
+def datax_create_task(job_info: models.JobInfo):
+    cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
+    af_task = {
+        "name": job_info.job_desc,
+        "file_urls": [],
+        "script": job_info.job_json,
+        "cmd": "",
+        "cmd_parameters": cmd_parameter,
+        "envs": {},
+        "run_image": "",
+        "task_type": "datax",
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_task', af_task)
+    af_task = res['data']
+    return af_task
+
+
+def datax_update_job(job_info: models.JobInfo, db: Session):
+    relation = crud.get_af_id(db, job_info.id, 'datax')
+    af_job_id = relation.af_id
+    res = send_get("/jpt/af_job/getOnce",af_job_id)
+    old_af_job = res['data']
+    old_af_task = old_af_job['tasks'][0]
+    af_task = datax_put_task(job_info,old_af_task)
+    af_job = {
+        "tasks": [af_task],
+        "name": job_info.job_desc,
+        "dependence": [],
+        "cron": job_info.job_cron,
+        "desc": job_info.job_desc,
+        "route_strategy": job_info.executor_route_strategy,
+        "block_strategy": job_info.executor_block_strategy,
+        "executor_timeout": job_info.executor_timeout,
+        "executor_fail_retry_count": job_info.executor_fail_retry_count,
+        "trigger_status": job_info.trigger_status,
+    }
+    res = send_put('/jpt/af_job', old_af_job['id'], af_job)
+    af_job = res['data']
+    send_submit(af_job['id'])
+
+
+def datax_put_task(job_info: models.JobInfo,old_af_task):
+    cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
+    af_task = {
+        "name": job_info.job_desc,
+        "file_urls": [],
+        "script": job_info.job_json,
+        "cmd": "",
+        "cmd_parameters": cmd_parameter,
+        "envs": {},
+        "run_image": "",
+    }
+    res = send_put('/jpt/af_task', old_af_task['id'],af_task)
+    af_task = res['data']
+    return af_task

+ 40 - 0
app/utils/send_util.py

@@ -0,0 +1,40 @@
+from unittest import result
+import requests
+from configs.settings import config
+
+HOST = config.get('AIRFLOW', 'HOST')
+PORT = config.get('AIRFLOW', 'PORT')
+
+def send_post(uri,data):
+    res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('请求airflow失败')
+
+def send_submit(af_job_id):
+    res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id))
+    result = res.json()
+    print(result)
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('请求airflow失败')
+
+
+def send_put(uri,path_data,data):
+    res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data)
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('请求airflow失败')
+
+def send_get(uri,path_data):
+    res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('请求airflow失败')

+ 19 - 0
app/utils/utils.py

@@ -1,5 +1,6 @@
 
 
 import base64
 import base64
+import time
 from configs.logging import logger
 from configs.logging import logger
 
 
 def decode_user(username, password):
 def decode_user(username, password):
@@ -27,3 +28,21 @@ def byte_conversion(size):
         return str("%.1f"%(size/1024/1024)) + 'MB'
         return str("%.1f"%(size/1024/1024)) + 'MB'
     else:
     else:
         return str("%.1f"%(size/1024/1024/1024)) + 'GB'
         return str("%.1f"%(size/1024/1024/1024)) + 'GB'
+
+
+def get_cmd_parameter(jvm_param, inc_start_time, replace_param, partition_info):
+    cmd_parameter = ''
+    current_time = int(time.time())
+    if jvm_param is not None:
+        cmd_parameter += '-j "' + jvm_param + '" '
+    if replace_param is not None:
+        cmd_parameter += '-p "' + (replace_param % (inc_start_time,current_time))
+    if partition_info is not None:
+        partition_list = partition_info.split(',')
+        partition_time = current_time - 86400*int(partition_list[1])
+        partition_time_format = partition_list[2].replace('yyyy','%Y').replace('MM','%m').replace('dd','%d')
+        partition_time_str = time.strftime(partition_time_format,time.localtime(partition_time))
+        cmd_parameter += ' -Dpartition=' + str(partition_list[0]) + '=' + partition_time_str
+    if replace_param is not None:
+        cmd_parameter += '" '
+    return cmd_parameter

+ 1 - 2
auo_tests/tasks/add_datax_task.py

@@ -21,5 +21,4 @@ data = {
 }
 }
 
 
 print(f'http://{host}/jpt/af_task')
 print(f'http://{host}/jpt/af_task')
-ret = requests.post(url=f'http://{host}/jpt/af_task', json=data)
-# print(ret.json())
+

+ 5 - 3
data/data.sql

@@ -259,6 +259,8 @@ CREATE TABLE `jm_job_log` (
 -- ----------------------------
 -- ----------------------------
 ALTER TABLE `job_info`
 ALTER TABLE `job_info`
 ADD COLUMN `cron_select_type` tinyint(4) NOT NULL COMMENT '周期选择类型' AFTER `id`,
 ADD COLUMN `cron_select_type` tinyint(4) NOT NULL COMMENT '周期选择类型' AFTER `id`,
-ADD COLUMN `replace_param` varchar(20) NULL COMMENT '增量时间' AFTER `delete_status`,
-ADD COLUMN `partition_info` varchar(20) NULL COMMENT '分区信息' AFTER `replace_param`,
-ADD COLUMN `jvm_param` varchar(50) NULL COMMENT 'jvm参数' AFTER `partition_info`;
+ADD COLUMN `replace_param` varchar(50) NULL COMMENT '增量时间' AFTER `delete_status`,
+ADD COLUMN `partition_info` varchar(50) NULL COMMENT '分区信息' AFTER `replace_param`,
+ADD COLUMN `jvm_param` varchar(50) NULL COMMENT 'jvm参数' AFTER `partition_info`;
+ALTER TABLE `job_info`
+MODIFY COLUMN `user_id` varchar(50) NOT NULL COMMENT '用户ID' AFTER `update_time`;

+ 4 - 1
development.ini

@@ -13,4 +13,7 @@ port = 10086
 [MINIO]
 [MINIO]
 url = minio-api.sxkj.com
 url = minio-api.sxkj.com
 access_key = admin
 access_key = admin
-secret_key = sxkjadmin
+secret_key = sxkjadmin
+[AIRFLOW]
+host = 192.168.199.109
+port = 18082