Browse Source

定时任务模块对接airflow

liweiquan 2 years ago
parent
commit
0ee72011ad

+ 1 - 1
app/core/datax/hdfs.py

@@ -134,7 +134,7 @@ class HdfsWriter(WriterBase):
         parameter['fileType'] = param.hive_writer.writer_file_type
         parameter['path'] = param.hive_writer.writer_path
         parameter['fileName'] = param.hive_writer.writer_filename
-        parameter['writerMode'] = param.hive_writer.writer_mode
+        parameter['writeMode'] = param.hive_writer.writer_mode
         parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter
         parameter['column'] = self._build_column(param.writer_columns)
         return parameter

+ 3 - 2
app/crud/jm_homework.py

@@ -5,7 +5,7 @@ from sqlalchemy.orm import Session
 from app.crud.constant import find_and_update
 
 from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, delete_jm_relations, get_jm_relations
-from app.services.jm_job import jm_job_create_task
+from app.services.jm_job import jm_job_create_task, jm_job_update_task
 
 
 def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
@@ -25,10 +25,10 @@ def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
         'update_time': create_time,
         'status': 1
     })
-    jm_job_create_task(db_item, db)
     db.add(db_item)
     db.commit()
     db.refresh(db_item)
+    jm_job_create_task(db_item, db)
     if jm_homework_create['type'] == 'Dag' and relation_list is not None:
         for relation in relation_list:
             create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
@@ -68,6 +68,7 @@ def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpda
     for k, v in jm_homework_update.items():
         setattr(db_item, k, v)
     db_item.update_time = int(time.time())
+    jm_job_update_task(db_item, db)
     db.commit()
     db.flush()
     db.refresh(db_item)

+ 6 - 0
app/crud/jm_job_info.py

@@ -5,6 +5,7 @@ from sqlalchemy.orm import Session
 
 from app.crud.constant import find_and_update
 from app.utils.cron_utils import *
+from app.services.jm_job import jm_job_submit
 
 def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
     jm_job_info_create = item.dict()
@@ -31,6 +32,7 @@ def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
     db.add(jm_job_info)
     db.commit()
     db.refresh(jm_job_info)
+    jm_job_submit(jm_job_info, db)
     return jm_job_info,nodes,edges
 
 def get_jm_job_infos(db: Session):
@@ -73,6 +75,7 @@ def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
     find_and_update(db, '任务标签', tag)
     for k, v in jm_job_info_update.items():
         setattr(db_item, k, v)
+    jm_job_submit(db_item,db)
     db.commit()
     db.flush()
     db.refresh(db_item)
@@ -83,6 +86,8 @@ def delete_jm_job_info(db: Session, jm_job_id: int):
         .filter(models.JmJobInfo.id == jm_job_id).first()
     if not jm_job_info:
         raise Exception('未找到该定时任务')
+    if jm_job_info.status == 1:
+        raise Exception('该任务未停用,不能删除')
     jm_job_info.delete_status = 0
     db.commit()
     db.flush()
@@ -96,6 +101,7 @@ def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
     if not jm_job_info:
         raise Exception('未找到该定时任务')
     jm_job_info.status = item.status
+    jm_job_submit(jm_job_info,db)
     db.commit()
     db.flush()
     db.refresh(jm_job_info)

+ 9 - 5
app/crud/job_jdbc_datasource.py

@@ -71,15 +71,19 @@ def create_job_jdbc_datasource(db: Session, item: schemas.JobJdbcDatasourceCreat
     return db_item
 
 
-def get_job_jdbc_datasources(db: Session, skip: int = 0, limit: int = 20):
-    res: List[models.JobJdbcDatasource] = db.query(models.JobJdbcDatasource).filter(
-        models.JobJdbcDatasource.status == 1).all()
+def get_job_jdbc_datasources(db: Session, datasource_type: str = None, skip: int = 0, limit: int = 20):
+    res: List[models.JobJdbcDatasource] = []
+    if datasource_type is not None and datasource_type != '':
+        res = db.query(models.JobJdbcDatasource)\
+            .filter(models.JobJdbcDatasource.datasource == datasource_type)\
+            .filter(models.JobJdbcDatasource.status == 1).all()
+    else:
+        res = db.query(models.JobJdbcDatasource)\
+            .filter(models.JobJdbcDatasource.status == 1).all()
     for item in res:
         item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
     return res
 
-    # return db.query(models.JobJdbcDatasource).offset(skip).limit(limit).all()
-
 
 def update_job_jdbc_datasources(db: Session, ds_id: int, update_item: schemas.JobJdbcDatasourceUpdate):
     ds, update_item = _format_datasource(db, update_item)

+ 6 - 0
app/crud/relation.py

@@ -18,6 +18,12 @@ def get_af_id(db: Session, se_id: int, type: str):
         .filter(models.Relation.type == type).first()
     return res
 
+def get_af_ids(db: Session, se_ids: List[int], type: str):
+    res: List[models.Relation] = db.query(models.Relation)\
+        .filter(models.Relation.se_id.in_(se_ids))\
+        .filter(models.Relation.type == type).all()
+    return res
+
 def delete_relation(db: Session, se_id: int, type: str):
     res: models.Relation = db.query(models.Relation)\
         .filter(models.Relation.se_id == se_id)\

+ 9 - 0
app/routers/dag.py

@@ -1,3 +1,5 @@
+from importlib.resources import contents
+import json
 import os
 from fastapi import APIRouter
 
@@ -53,3 +55,10 @@ def get_file_byte(filename, chunk_size=1024):
             else:
                 break
 
+@router.post("/execute")
+@web_try()
+@sxtimeit
+def execute_dag(dag_script: str):
+    print(dag_script)
+    return ""
+

+ 2 - 1
app/routers/files.py

@@ -66,4 +66,5 @@ def get_file( uri: str):
     response = StreamingResponse(io.BytesIO(file), status_code=code, media_type="application/octet-stream")
     # 在请求头进行配置
     response.headers["Content-Disposition"] = "attachment; filename="+uri+".log"
-    return response
+    return response
+

+ 9 - 1
app/routers/jm_homework.py

@@ -8,6 +8,7 @@ from app import schemas
 
 import app.crud as crud
 from app.crud import jm_homework
+from app.services.jm_job import red_dag_and_format
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
@@ -57,4 +58,11 @@ def delete_jm_homework(jm_id: int, db: Session = Depends(get_db)):
     res = crud.find_by_homework_and_job(db, jm_job_ids,jm_id)
     if len(res) > 0:
         raise Exception("该作业正在被定时任务使用,不可删除")
-    return crud.delete_jm_homework(db, jm_id)
+    return crud.delete_jm_homework(db, jm_id)
+
+
+@router.get("/test")
+def get_test_dag(db: Session = Depends(get_db)):
+    jm_homework = crud.get_jm_homework_info(db, 83)
+    res = red_dag_and_format(jm_homework, db)
+    return res

+ 2 - 2
app/routers/job_jdbc_datasource.py

@@ -59,8 +59,8 @@ def create_datasource(ds: schemas.JobJdbcDatasourceCreate, db: Session = Depends
 @router.get("/")
 @web_try()
 @sxtimeit
-def get_datasources(params: Params=Depends(), db: Session = Depends(get_db)):
-    return paginate(crud.get_job_jdbc_datasources(db), params)
+def get_datasources(datasource_type: Optional[str] = None, params: Params=Depends(), db: Session = Depends(get_db)):
+    return paginate(crud.get_job_jdbc_datasources(db, datasource_type), params)
 
 @router.put("/{ds_id}")
 @web_try()

+ 40 - 0
app/services/dag.py

@@ -0,0 +1,40 @@
+from app import crud, models
+from app.utils.send_util import *
+from app.utils.utils import get_cmd_parameter
+from sqlalchemy.orm import Session
+
+def dag_create_job(dag_script: str):
+    af_task = dag_create_task(dag_script)
+    af_job = {
+        "tasks": [af_task],
+        "name": "123",
+        "dependence": [],
+        "cron": "once",
+        "desc": "123",
+        "route_strategy": "",
+        "block_strategy": "",
+        "executor_timeout": "",
+        "executor_fail_retry_count": "",
+        "trigger_status": 1,
+        "job_type": 0,
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_job', af_job)
+    af_job = res['data']
+    send_submit(af_job['id'])
+
+def dag_create_task(dag_script: str):
+    af_task = {
+        "name": "123",
+        "file_urls": [],
+        "script": dag_script,
+        "cmd": "",
+        "cmd_parameters": "",
+        "envs": {},
+        "run_image": "",
+        "task_type": "sparks",
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_task', af_task)
+    af_task = res['data']
+    return af_task

+ 172 - 7
app/services/jm_job.py

@@ -1,7 +1,11 @@
+import json
+from turtle import update
 from app import crud, models
+from app.crud.jm_homework_datasource_relation import get_jm_relations
 from app.utils.send_util import *
 from app.utils.utils import get_cmd_parameter
 from sqlalchemy.orm import Session
+from app.common.minio import FileHandler
 
 type_dict = {
     "Java": "java",
@@ -10,10 +14,15 @@ type_dict = {
 }
 
 def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
+    content = ''
+    if jm_homework.type == "Dag":
+        content = red_dag_and_format(jm_homework, db)
+    elif jm_homework.type == "Python":
+        content = red_python_and_format(jm_homework)
     af_task = {
         "name": jm_homework.name,
-        "file_urls": [jm_homework.dag_url] if jm_homework.type == "Dag" else [jm_homework.script_file],
-        "script": "",
+        "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
+        "script": content if jm_homework.type != "Java" else "",
         "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
         "cmd_parameters": "",
         "envs": {},
@@ -23,15 +32,20 @@ def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
     }
     res = send_post('/jpt/af_task', af_task)
     af_task = res['data']
-    crud.create_relation(db ,jm_homework.id, jm_homework.type, af_task['id'])
+    crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
     return af_task
 
 def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
-    relation = crud.get_af_id(db, jm_homework.id, jm_homework.type)
+    relation = crud.get_af_id(db, jm_homework.id, 'task')
+    content = ''
+    if jm_homework.type == "Dag":
+        content = content = red_dag_and_format(jm_homework, db)
+    elif jm_homework.type == "Python":
+        content = red_python_and_format(jm_homework)
     af_task = {
         "name": jm_homework.name,
-        "file_urls": [jm_homework.dag_url] if jm_homework.type == "Dag" else [jm_homework.script_file],
-        "script": "",
+        "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
+        "script": content if jm_homework.type != "Java" else "",
         "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
         "cmd_parameters": "",
         "envs": {},
@@ -41,5 +55,156 @@ def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
     }
     res = send_put('/jpt/af_task', relation.af_id, af_task)
     af_task = res['data']
+    return af_task
+
+def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
+    nodes = crud.get_one_job_nodes(db, jm_job_info.id)
+    homework_ids = [node.homework_id for node in nodes]
+    relations = crud.get_af_ids(db,homework_ids, 'task')
+    se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
+    tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
+    edges = crud.get_one_job_edges(db, jm_job_info.id)
+    dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
+    af_job = {
+        "tasks": tasks,
+        "name": jm_job_info.name,
+        "dependence": dependence,
+        "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else 'onec',
+        "desc": jm_job_info.name,
+        "route_strategy": "",
+        "block_strategy": "",
+        "executor_timeout": 0,
+        "executor_fail_retry_count": 0,
+        "trigger_status": jm_job_info.status,
+        "job_type": 0,
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_job', af_job)
+    af_job = res['data']
+    crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
+    send_submit(af_job['id'])
+
+
+def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
+    nodes = crud.get_one_job_nodes(db, jm_job_info.id)
+    homework_ids = [node.homework_id for node in nodes]
+    relations = crud.get_af_ids(db,homework_ids, 'task')
+    se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
+    tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
+    edges = crud.get_one_job_edges(db, jm_job_info.id)
+    dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
+    af_job = {
+        "tasks": tasks,
+        "name": jm_job_info.name,
+        "dependence": dependence,
+        "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else 'onec',
+        "desc": jm_job_info.name,
+        "route_strategy": "",
+        "block_strategy": "",
+        "executor_timeout": 0,
+        "executor_fail_retry_count": 0,
+        "trigger_status": jm_job_info.status,
+    }
+    job_relation = crud.get_af_id(db,jm_job_info.id,'job')
+    res = send_put('/jpt/af_job', job_relation.af_id, af_job)
+    af_job = res['data']
+    send_submit(af_job['id'])
+
+def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
+    job_relation = crud.get_af_id(db,jm_job_info.id,'job')
+    if job_relation is None:
+        jm_job_create_job(jm_job_info, db)
+    else:
+        jm_job_update_job(jm_job_info, db)
+
+
+def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
+    relations = get_jm_relations(db,jm_homework.id)
+    node_relation_dict = { relation.node_uuid:relation for relation in relations}
+
+    f = open('./dag' + jm_homework.dag_url)
+    lines = f.read()
+    result = json.loads(lines)
+    f.close()
+    edges = result['edges']
+    t_s = {}
+    input_num = {}
+    for edge in edges:
+        if edge['target'] in t_s.keys():
+            t_s[edge['target']].append(edge['source'])
+        else:
+            t_s.update({edge['target']:[edge['source']]})
+    nodes = result['nodes']
+    sub_nodes = []
+    for node in nodes:
+        if node['op'] == 'datasource':
+            fileds = node['data']['input_source']
+            script = 'select '
+            for filed in fileds:
+                script += filed['dataField'] + ','
+            script.strip(',')
+            script += ' from ' + node_relation_dict[node['id']].table
+            sub_node = {
+                "id": node['id'],
+                "name": node['name'],
+                "op": 'sql',
+                "script":script
+            }
+            sub_nodes.append(sub_node)
+        elif node['op'] == 'outputsource':
+            fileds = node['data']['output_source']
+            script = 'select '
+            for filed in fileds:
+                script += filed['dataField'] + ','
+            script.strip([','])
+            script += ' from ' + node_relation_dict[node['id']].table
+            inputs = {}
+            index = 0
+            input_list = t_s[node['id']]
+            for input in input_list:
+                if input in input_num.keys():
+                    input_num[input]+=1
+                else:
+                    input_num.update({input:0})
+                inputs.update({'input'+str(index):[input,input_num[input]]})
+                index+=1
+            sub_node = {
+                "id": node['id'],
+                "name": node['name'],
+                "op": 'sql',
+                "inputs": inputs,
+                "script":script
+            }
+            sub_nodes.append(sub_node)
+        else:
+            inputs = {}
+            index = 0
+            input_list = t_s[node['id']]
+            for input in input_list:
+                if input in input_num.keys():
+                    input_num[input]+=1
+                else:
+                    input_num.update({input:0})
+                inputs.update({'input'+str(index):[input,input_num[input]]})
+                index+=1
+            sub_node = {
+                "id": node['id'],
+                "name": node['name'],
+                "op": node['op'],
+                "inputs": inputs,
+                "script": node['data']['script'],
+            }
+            sub_nodes.append(sub_node)
+    res = {
+        'dag_script': {
+            'sub_nodes': sub_nodes,
+            'edges': [(edge['source'],edge['target']) for edge in edges]
+        }
+    }
+
+    return json.dumps(res)
 
-    return af_task
+def red_python_and_format(jm_homework):
+    file_handler = FileHandler("datax")
+    file = file_handler.get_file("/python/test.py")
+    return file.decode("utf-8")

+ 5 - 4
app/utils/send_util.py

@@ -11,7 +11,8 @@ def send_post(uri,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败')
+        print(result)
+        raise Exception('请求airflow失败-->'+result['msg'])
 
 def send_submit(af_job_id):
     res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id))
@@ -20,7 +21,7 @@ def send_submit(af_job_id):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败')
+        raise Exception('请求airflow失败-->'+result['msg'])
 
 
 def send_put(uri,path_data,data):
@@ -29,7 +30,7 @@ def send_put(uri,path_data,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败')
+        raise Exception('请求airflow失败-->'+result['msg'])
 
 def send_get(uri,path_data):
     res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
@@ -37,4 +38,4 @@ def send_get(uri,path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败')
+        raise Exception('请求airflow失败-->'+result['msg'])

+ 1 - 1
app/utils/utils.py

@@ -43,6 +43,6 @@ def get_cmd_parameter(jvm_param, inc_start_time, replace_param, partition_info):
         partition_time_format = partition_list[2].replace('yyyy','%Y').replace('MM','%m').replace('dd','%d')
         partition_time_str = time.strftime(partition_time_format,time.localtime(partition_time))
         cmd_parameter += ' -Dpartition=' + str(partition_list[0]) + '=' + partition_time_str
-    if replace_param is not None:
+    if replace_param is not None and replace_param != '':
         cmd_parameter += '" '
     return cmd_parameter

File diff suppressed because it is too large
+ 179 - 0
dag/demo.dag


+ 1 - 0
python/test.py

@@ -0,0 +1 @@
+print("hello world!")

Some files were not shown because too many files changed in this diff