Explorar o código

Merge branch 'master' of http://gogsb.soaringnova.com/sxwl_DL/datax-admin

Zhang Li %!s(int64=2) %!d(string=hai) anos
pai
achega
16f63473ee
Modificáronse 40 ficheiros con 1346 adicións e 103 borrados
  1. 1 1
      app/common/minio.py
  2. 26 7
      app/core/airflow/job.py
  3. 54 17
      app/core/airflow/task.py
  4. 3 2
      app/core/airflow/templates/dag_template.py.jinja2
  5. 1 1
      app/core/airflow/templates/pyspark_script_template.py.jinja2
  6. 29 0
      app/core/airflow/templates/transform_datax.py.jinja2
  7. 27 0
      app/core/airflow/templates/transform_datax_demo.py
  8. 1 1
      app/core/datax/hdfs.py
  9. 2 2
      app/crud/af_job.py
  10. 5 0
      app/crud/af_run.py
  11. 3 0
      app/crud/jm_homework.py
  12. 6 0
      app/crud/jm_job_info.py
  13. 9 5
      app/crud/job_jdbc_datasource.py
  14. 6 0
      app/crud/relation.py
  15. 2 0
      app/models/af_job.py
  16. 10 0
      app/routers/dag.py
  17. 38 1
      app/routers/files.py
  18. 12 2
      app/routers/jm_homework.py
  19. 33 3
      app/routers/jm_job_info.py
  20. 47 38
      app/routers/jm_job_log.py
  21. 26 1
      app/routers/job.py
  22. 13 0
      app/routers/job_info.py
  23. 2 2
      app/routers/job_jdbc_datasource.py
  24. 46 5
      app/routers/job_log.py
  25. 1 1
      app/routers/run.py
  26. 3 0
      app/schemas/af_job.py
  27. 1 1
      app/schemas/jm_homework.py
  28. 40 0
      app/services/dag.py
  29. 9 4
      app/services/datax.py
  30. 221 0
      app/services/jm_job.py
  31. 34 4
      app/utils/send_util.py
  32. 4 4
      app/utils/utils.py
  33. 172 0
      auo_tests/jupyters/02-edit-datax-json.ipynb
  34. 70 0
      auo_tests/jupyters/datax-config.json
  35. 70 0
      auo_tests/tasks/datax/config.json
  36. 60 0
      auo_tests/tasks/datax/config2.json
  37. 134 0
      auo_tests/tasks/datax/datax_debug.py.ipynb
  38. 120 0
      dag/demo.dag
  39. 4 1
      production.ini
  40. 1 0
      python/test.py

+ 1 - 1
app/common/minio.py

@@ -93,6 +93,6 @@ class FileHandler(object):
             return objects
         except ResponseError:
             raise Exception("列出文件失败")
-        return objects
 
 
+datax_client = FileHandler('datax')

+ 26 - 7
app/core/airflow/job.py

@@ -1,4 +1,5 @@
-from jinja2 import Environment, PackageLoader, select_autoescape
+import os
+import stat
 from app.core.airflow.task import *
 from app.schemas import AirflowJob
 
@@ -7,17 +8,34 @@ class AirflowJobSubmitter:
 
     @staticmethod
     def submit_dag(item: AirflowJob):
-        m_compilers = {'python': PythonTaskCompiler, 'datax': DataXTaskCompiler, 'sparks': SparksTaskCompiler}
-        nodes = [m_compilers[task.task_type](item=task).translate() for task in item.tasks if
+        m_compilers = {'python': PythonTaskCompiler,
+                       'datax': DataXTaskCompiler,
+                       'sparks': SparksTaskCompiler,
+                       'java': JavaTaskCompiler}
+
+        nodes = [m_compilers[task.task_type](item=task).translate(item.job_mode or 1) for task in item.tasks if
                  task.task_type != 'sparks']
-        spark_nodes = [SparksTaskCompiler(item=task).translate() for task in item.tasks if task.task_type == 'sparks']
+        spark_nodes = [SparksTaskCompiler(item=task).translate(item.job_mode or 1) for task in item.tasks if
+                       task.task_type == 'sparks']
         edges = []
         for edge in item.dependence:
             edges.append({"source_operator_name": f'op_{edge[0]}',
                           "target_operator_name": f'op_{edge[1]}'})
+        #
+        # m_interval = {
+        #     "None": "None",
+        #     "@once": "@once",
+        #     "0 * * * *": "@hourly",
+        #     "0 0 * * *": "@daily",
+        #     "0 0 * * 0": "@weekly",
+        #     "0 0 1 * *": "@monthly",
+        #     "0 0 1 1 *": "@yearly"
+        # }
+        parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
+                      'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
+                      'interval': item.cron
+                      }
 
-        parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': item.name,
-                      'user_name': item.user_id, 'job_id': item.id}
         env = Environment(
             loader=PackageLoader('app.core.airflow'),
             autoescape=select_autoescape()
@@ -27,7 +45,8 @@ class AirflowJobSubmitter:
         print(f'finish build:{dag_content}')
 
         dag_path = '/dags/'
-        output_path = dag_path + f'{item.name}_{item.id}.py'
+        output_path = dag_path + f'dag_{item.id}.py'
         with open(output_path, "w") as fh:
             fh.write(dag_content)
+        os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
         print(f'write dag to {output_path}')

+ 54 - 17
app/core/airflow/task.py

@@ -1,5 +1,4 @@
 import json
-
 from app.core.airflow.uri import spark_result_tb_name
 from app.schemas import AirflowTask
 from jinja2 import Environment, PackageLoader, select_autoescape
@@ -12,7 +11,16 @@ class TaskCompiler:
         self.default_image = None
         self.default_cmd = None
 
-    def translate(self):
+    @staticmethod
+    def render_spark_script(parameters, template_file):
+        env = Environment(
+            loader=PackageLoader('app.core.airflow'),
+            autoescape=select_autoescape()
+        )
+        template = env.get_template(template_file)
+        return template.render(parameters)
+
+    def translate(self, task_mode=1):
         return {'image': self.task.run_image or self.default_image,
                 'cmds': ["/bin/bash", "-c", f"{self.task.cmd or self.default_cmd} "],
                 'script': self.task.script,
@@ -31,6 +39,16 @@ class TaskCompiler:
         return minio_handler.put_byte_file(file_name=oss_path, file_content=context)
 
 
+class JavaTaskCompiler(TaskCompiler):
+    def __init__(self, item: AirflowTask):
+        super(JavaTaskCompiler, self).__init__(item)
+        self.default_image = 'SXKJ:32775/java:1.0'
+        self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
+        self.task.cmd = self.task.cmd or self.default_cmd
+        tar_name = self.task.file_urls[0].split('/')[-1].split('_')[-1]
+        self.task.cmd = f'curl {"http://minio.default:9000"}/{self.task.file_urls[0]}  --output {tar_name} && {self.task.cmd}'
+
+
 class PythonTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(PythonTaskCompiler, self).__init__(item)
@@ -42,9 +60,37 @@ class DataXTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(DataXTaskCompiler, self).__init__(item)
         self.default_image = 'SXKJ:32775/pod_datax:0.9'
-        self.default_cmd = f"cd datax/bin && echo $SCRIPT > config.json && echo " \
-                           f"\"\'\"$HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json" \
-                           f"\"\'\" |xargs bash -c "
+        # self.default_cmd = f"cd datax/bin && echo $SCRIPT > config.json && echo " \
+        #                    f"\"\'\"$HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json" \
+        #                    f"\"\'\" |xargs bash -c "
+        self.default_cmd = f"cd datax/bin && echo \"$SCRIPT\" > transform_datax.py &&cat  transform_datax.py  && python3 transform_datax.py && cat config.json && $HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json"
+
+    def translate(self, task_mode=1):
+        print(f'{self.task.envs}')
+        script_str = self.render_spark_script(
+            parameters={'script': self.task.script,
+                        'first_begin_time': self.task.envs.get('first_begin_time', None),
+                        'last_key': self.task.envs.get('last_key', None),
+                        'current_key': self.task.envs.get('current_key', None),
+                        'partition_key': self.task.envs.get('partition_key', None),
+                        'partition_word': self.task.envs.get('partition_word', None),
+                        'partition_format': self.task.envs.get('partition_format', None),
+                        'partition_diff': self.task.envs.get('partition_diff', None),
+                        },
+            template_file="transform_datax.py.jinja2")
+        # with open('./auto_generate_demo.py','w') as f:
+        #     f.write(script_str)
+        res = {'image': self.task.run_image or self.default_image,
+               'cmds': ["/bin/bash", "-c", f"{self.task.cmd or self.default_cmd} "],
+               'script': script_str,
+               'id': f'{self.task.id}',
+               'env': {**{"SCRIPT": script_str}, **self.task.envs},
+               'operator_name': f'op_{self.task.id}',
+               'name': self.task.name,
+               'desc': ""
+               }
+
+        return res
 
 
 class SparksTaskCompiler(TaskCompiler):
@@ -74,7 +120,7 @@ class SparksTaskCompiler(TaskCompiler):
         basic_cmds = "cd /home/sxkj/bigdata && echo \"$SCRIPT\" > run.py && ${SPARK_HOME}/bin/spark-submit"
         self.cmd_str = lambda name: f"{basic_cmds} --name {name} {param_str} run.py"
 
-    def translate(self):
+    def translate(self, task_mode=1):
         # dag_script = {
         #     "sub_nodes": [
         #         {
@@ -111,11 +157,11 @@ class SparksTaskCompiler(TaskCompiler):
                 inputs = {}
                 template_file = 'sql_script_template.py.jinja2'
             elif info['op'] == 'pyspark':
-                inputs = {k: spark_result_tb_name(self.task.id, *v) for k, v in info['inputs'].items()}
+                inputs = {k: spark_result_tb_name(self.task.id, *v, task_mode) for k, v in info['inputs'].items()}
                 template_file = 'pyspark_script_template.py.jinja2'
             else:
                 continue
-            outputs = [spark_result_tb_name(self.task.id, info['id'], 0)]
+            outputs = [spark_result_tb_name(self.task.id, info['id'], 0, task_mode)]
             sub_node = {
                 'id': f'{self.task.id}_{info["id"]}',
                 'name': info['name'],
@@ -137,12 +183,3 @@ class SparksTaskCompiler(TaskCompiler):
             'name': self.task.name,
             'desc': "first spark dag task"
         }
-
-    @staticmethod
-    def render_spark_script(parameters, template_file):
-        env = Environment(
-            loader=PackageLoader('app.core.airflow'),
-            autoescape=select_autoescape()
-        )
-        template = env.get_template(template_file)
-        return template.render(parameters)

+ 3 - 2
app/core/airflow/templates/dag_template.py.jinja2

@@ -45,7 +45,7 @@ namespace = conf.get("kubernetes", "NAMESPACE")
 name = "dag_user{{ user_name }}"
 
 # instantiate the DAG
-with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',dag_id="{{ dag_id }}") as dag:
+with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',dag_id="{{ dag_id }}",is_paused_upon_creation= not {{ trigger_status }}) as dag:
     op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
 
     {% for spark_node in spark_nodes %}
@@ -66,7 +66,7 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',
                                                                     cmds={{ spark_sub_node['cmds'] }},
                                                                     env_vars={{ spark_sub_node['env'] }},
                                                                     on_success_callback=task_finish_alert,
-{#                                                                    on_failure_callback=task_finish_alert#}
+                                                                    on_failure_callback=task_finish_alert
                                                                     )
     {% endfor %}
         {% for edge in spark_node['edges'] %}
@@ -92,6 +92,7 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',
         cmds={{ node['cmds'] }},
         env_vars={{ node['env'] }},
         on_success_callback=task_finish_alert,
+        on_failure_callback=task_finish_alert
 {#        on_failure_callback=task_finish_alert#}
         )
     op_start >> {{ node['operator_name'] }}

+ 1 - 1
app/core/airflow/templates/pyspark_script_template.py.jinja2

@@ -13,7 +13,7 @@ def run(inputs: dict, outputs: list):
     spark = SparkSession.builder.config('hive.metastore.uris',
                                         'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
     param_dict = preprocess(input_infos=inputs, ss=spark)
-    rets = main_func(**param_dict)
+    rets = main_func(**param_dict,spark=spark,sc=spark.sparkContext)
     postprocess(rets=rets, outputs=outputs)
 
 

+ 29 - 0
app/core/airflow/templates/transform_datax.py.jinja2

@@ -0,0 +1,29 @@
+import json
+from datetime import datetime, timedelta
+
+first_begin_time =  {{ first_begin_time }}   # 100
+last_time = {{ "{{ prev_execution_date.timestamp() }}" }}
+current_time = {{ "{{ execution_date.timestamp() }}" }}
+
+partition_word = '{{ partition_word }}' #'datety'
+partition_format = '{{ partition_format }}'  #'%Y-%m-%d'
+partition_diff = {{ partition_diff }} #1
+
+last_key = '${'+'{{last_key}}'+'}'
+current_key = '${'+'{{current_key}}'+'}'
+partition_key = '${'+'{{partition_key}}'+'}'
+
+
+
+datax_config = {{ script }}
+datax_config_str = json.dumps(datax_config)
+if last_key is not None:
+    now = datetime.utcfromtimestamp(datetime.timestamp(datetime.now())) + timedelta(hours=8, days=partition_diff or 1)
+    partition_value = f'{partition_word}={now.strftime(partition_format)}'
+    last_time = last_time or first_begin_time
+    datax_config_str = datax_config_str.replace(last_key, f'{last_time}')
+    datax_config_str = datax_config_str.replace(current_key, f'{current_time}')
+    datax_config_str = datax_config_str.replace(partition_key, partition_value)
+
+with open('config.json', 'w') as f:
+    f.write(datax_config_str)

+ 27 - 0
app/core/airflow/templates/transform_datax_demo.py

@@ -0,0 +1,27 @@
+import json
+from datetime import datetime, timedelta
+
+first_begin_time = 100
+last_time = 200
+current_time = 300
+
+partition_word = 'datety'
+partition_format = '%Y-%m-%d'
+partition_diff = 1
+
+last_key = '${ {lastTime} }'
+current_key = '${currentTime})'
+partition_key = '${partition}'
+
+now = datetime.utcfromtimestamp(datetime.timestamp(datetime.now())) + timedelta(hours=8, days=partition_diff)
+partition_value = f'partition_word={now.strftime(partition_format)}'
+
+datax_config_str = ""
+
+last_time = last_time or first_begin_time
+datax_config_str = datax_config_str.replace(last_key, f'{last_time}')
+datax_config_str = datax_config_str.replace(current_key, f'{current_time}')
+datax_config_str = datax_config_str.replace(partition_key, partition_value)
+
+with open('config.json', 'wb') as f:
+    f.write(datax_config_str)

+ 1 - 1
app/core/datax/hdfs.py

@@ -134,7 +134,7 @@ class HdfsWriter(WriterBase):
         parameter['fileType'] = param.hive_writer.writer_file_type
         parameter['path'] = param.hive_writer.writer_path
         parameter['fileName'] = param.hive_writer.writer_filename
-        parameter['writerMode'] = param.hive_writer.writer_mode
+        parameter['writeMode'] = param.hive_writer.writer_mode
         parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter
         parameter['column'] = self._build_column(param.writer_columns)
         return parameter

+ 2 - 2
app/crud/af_job.py

@@ -20,8 +20,8 @@ def get_airflow_jobs(db: Session):
     return res
 
 
-def get_airflow_job_once(db: Session, id: int):
-    res: models.AirflowJob = db.query(models.AirflowJob).filter(models.AirflowJob.id == id).first()
+def get_airflow_job_once(db: Session, item_id: int):
+    res: models.AirflowJob = db.query(models.AirflowJob).filter(models.AirflowJob.id == item_id).first()
     return res
 
 

+ 5 - 0
app/crud/af_run.py

@@ -22,6 +22,11 @@ def get_airflow_runs(db: Session):
     res: List[models.AirflowRun] = db.query(models.AirflowRun).all()
     return res
 
+def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int]):
+    res: List[models.AirflowRun] = db.query(models.AirflowRun)\
+        .filter(models.AirflowRun.job_id.in_(job_ids)).all()
+    return res
+
 
 def get_airflow_run_once(db: Session, run_id: str, job_id: int):
     res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.run_id == run_id,

+ 3 - 0
app/crud/jm_homework.py

@@ -5,6 +5,7 @@ from sqlalchemy.orm import Session
 from app.crud.constant import find_and_update
 
 from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, delete_jm_relations, get_jm_relations
+from app.services.jm_job import jm_homework_submit
 
 
 def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
@@ -30,6 +31,7 @@ def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
     if jm_homework_create['type'] == 'Dag' and relation_list is not None:
         for relation in relation_list:
             create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
+    jm_homework_submit(db_item, db)
     return db_item.to_dict()
 
 def get_jm_homeworks(db: Session, project_id: str):
@@ -73,6 +75,7 @@ def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpda
     if jm_homework_update['type'] == 'Dag' and relation_list is not None:
         for relation in relation_list:
             create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
+    jm_homework_submit(db_item, db)
     return db_item.to_dict()
 
 def delete_jm_homework(db: Session, id: int):

+ 6 - 0
app/crud/jm_job_info.py

@@ -5,6 +5,7 @@ from sqlalchemy.orm import Session
 
 from app.crud.constant import find_and_update
 from app.utils.cron_utils import *
+from app.services.jm_job import jm_job_submit
 
 def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
     jm_job_info_create = item.dict()
@@ -31,6 +32,7 @@ def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
     db.add(jm_job_info)
     db.commit()
     db.refresh(jm_job_info)
+    jm_job_submit(jm_job_info, db)
     return jm_job_info,nodes,edges
 
 def get_jm_job_infos(db: Session):
@@ -76,6 +78,7 @@ def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
     db.commit()
     db.flush()
     db.refresh(db_item)
+    jm_job_submit(db_item,db)
     return db_item,nodes,edges
 
 def delete_jm_job_info(db: Session, jm_job_id: int):
@@ -83,6 +86,8 @@ def delete_jm_job_info(db: Session, jm_job_id: int):
         .filter(models.JmJobInfo.id == jm_job_id).first()
     if not jm_job_info:
         raise Exception('未找到该定时任务')
+    if jm_job_info.status == 1:
+        raise Exception('该任务未停用,不能删除')
     jm_job_info.delete_status = 0
     db.commit()
     db.flush()
@@ -96,6 +101,7 @@ def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
     if not jm_job_info:
         raise Exception('未找到该定时任务')
     jm_job_info.status = item.status
+    jm_job_submit(jm_job_info,db)
     db.commit()
     db.flush()
     db.refresh(jm_job_info)

+ 9 - 5
app/crud/job_jdbc_datasource.py

@@ -71,15 +71,19 @@ def create_job_jdbc_datasource(db: Session, item: schemas.JobJdbcDatasourceCreat
     return db_item
 
 
-def get_job_jdbc_datasources(db: Session, skip: int = 0, limit: int = 20):
-    res: List[models.JobJdbcDatasource] = db.query(models.JobJdbcDatasource).filter(
-        models.JobJdbcDatasource.status == 1).all()
+def get_job_jdbc_datasources(db: Session, datasource_type: str = None, skip: int = 0, limit: int = 20):
+    res: List[models.JobJdbcDatasource] = []
+    if datasource_type is not None and datasource_type != '':
+        res = db.query(models.JobJdbcDatasource)\
+            .filter(models.JobJdbcDatasource.datasource == datasource_type)\
+            .filter(models.JobJdbcDatasource.status == 1).all()
+    else:
+        res = db.query(models.JobJdbcDatasource)\
+            .filter(models.JobJdbcDatasource.status == 1).all()
     for item in res:
         item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
     return res
 
-    # return db.query(models.JobJdbcDatasource).offset(skip).limit(limit).all()
-
 
 def update_job_jdbc_datasources(db: Session, ds_id: int, update_item: schemas.JobJdbcDatasourceUpdate):
     ds, update_item = _format_datasource(db, update_item)

+ 6 - 0
app/crud/relation.py

@@ -18,6 +18,12 @@ def get_af_id(db: Session, se_id: int, type: str):
         .filter(models.Relation.type == type).first()
     return res
 
+def get_af_ids(db: Session, se_ids: List[int], type: str):
+    res: List[models.Relation] = db.query(models.Relation)\
+        .filter(models.Relation.se_id.in_(se_ids))\
+        .filter(models.Relation.type == type).all()
+    return res
+
 def delete_relation(db: Session, se_id: int, type: str):
     res: models.Relation = db.query(models.Relation)\
         .filter(models.Relation.se_id == se_id)\

+ 2 - 0
app/models/af_job.py

@@ -7,6 +7,8 @@ class AirflowJob(BaseModel):
     id = Column(Integer, primary_key=True, index=True)
     name = Column(Text)
     job_type = Column(Integer)  # 任务类型:可取 1=单作业任务 ,2=多作业任务
+    job_mode = Column(Integer) # 任务模式:1= 常规模式 2=调试模式
+
 
     tasks = Column(JSON)  # 任务列表
     dependence = Column(JSON)  # 作业间的依赖

+ 10 - 0
app/routers/dag.py

@@ -1,3 +1,5 @@
+from importlib.resources import contents
+import json
 import os
 from fastapi import APIRouter
 
@@ -52,3 +54,11 @@ def get_file_byte(filename, chunk_size=1024):
                 yield content
             else:
                 break
+
+@router.post("/execute")
+@web_try()
+@sxtimeit
+def execute_dag(dag_script: str):
+    print(dag_script)
+    return ""
+

+ 38 - 1
app/routers/files.py

@@ -1,4 +1,5 @@
 import io
+import time
 import uuid
 from fastapi import APIRouter
 
@@ -42,6 +43,41 @@ def get_file( uri: str):
     response.headers["Content-Disposition"] = "attachment; filename="+uri+".table"
     return response
 
+@router.get("/java/")
+@sxtimeit
+def get_file( uri: str):
+    file_handler = FileHandler("datax")
+    file = file_handler.get_file(uri)
+    code = 200
+    if len(file) == 0:
+        code = 404
+
+    response = StreamingResponse(io.BytesIO(file), status_code=code, media_type="application/octet-stream")
+    # 在请求头进行配置
+    response.headers["Content-Disposition"] = "attachment; filename="+uri
+    return response
+
+@router.post("/java/")
+@web_try()
+@sxtimeit
+def put_java_jar( file: UploadFile = File(...),):
+    print("UploadFile-->",file.filename)
+    file_handler = FileHandler("datax")
+    file_name = str(int(time.time()))+'_'+file.filename
+    url = file_handler.put_byte_file("java/"+file_name, file.file.read())
+    return url
+
+
+@router.post("/python/")
+@web_try()
+@sxtimeit
+def put_java_jar( file: UploadFile = File(...),):
+    print("UploadFile-->",file.filename)
+    file_handler = FileHandler("datax")
+    file_name = str(int(time.time()))+'_'+file.filename
+    url = file_handler.put_byte_file("python/"+file_name, file.file.read())
+    return url
+
 
 @router.post("/jm_job_log/")
 @web_try()
@@ -66,4 +102,5 @@ def get_file( uri: str):
     response = StreamingResponse(io.BytesIO(file), status_code=code, media_type="application/octet-stream")
     # 在请求头进行配置
     response.headers["Content-Disposition"] = "attachment; filename="+uri+".log"
-    return response
+    return response
+

+ 12 - 2
app/routers/jm_homework.py

@@ -7,6 +7,8 @@ from sqlalchemy.orm import Session
 from app import schemas
 
 import app.crud as crud
+from app.crud import jm_homework
+from app.services.jm_job import red_dag_and_format
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
@@ -25,7 +27,8 @@ router = APIRouter(
 @sxtimeit
 def create_jm_homework(item: schemas.JmHomeworkCreate, db: Session = Depends(get_db)):
     # 根据获取到的文件路径另存一份并改变
-    return crud.create_jm_homework(db, item)
+    jm_homework = crud.create_jm_homework(db, item)
+    return jm_homework
 
 @router.get("/")
 @web_try()
@@ -55,4 +58,11 @@ def delete_jm_homework(jm_id: int, db: Session = Depends(get_db)):
     res = crud.find_by_homework_and_job(db, jm_job_ids,jm_id)
     if len(res) > 0:
         raise Exception("该作业正在被定时任务使用,不可删除")
-    return crud.delete_jm_homework(db, jm_id)
+    return crud.delete_jm_homework(db, jm_id)
+
+
+@router.get("/test")
+def get_test_dag(db: Session = Depends(get_db)):
+    jm_homework = crud.get_jm_homework_info(db, 83)
+    res = red_dag_and_format(jm_homework, db)
+    return res

+ 33 - 3
app/routers/jm_job_info.py

@@ -10,6 +10,7 @@ from app import models, schemas
 import app.crud as crud
 from app.schemas import cron_expression
 from app.utils.cron_utils import *
+from app.utils.send_util import send_delete, send_execute
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
@@ -65,8 +66,34 @@ def create_jm_job_node(db: Session, nodes, edges, job_id):
 def get_jm_job_infos(db: Session = Depends(get_db)):
     res_list = []
     jm_job_list = crud.get_jm_job_infos(db)
+    jm_job_ids = [job.id for job in jm_job_list]
+    relations = crud.get_af_ids(db,jm_job_ids, 'job')
+    af_to_datax = {relation.af_id:relation.se_id for relation in relations}
+    af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
+    res = {}
+    for af_job_run in af_job_runs:
+        tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
+        if len(tasks) > 0:
+            task = tasks[-1]
+            task.pop('log',None)
+            job_id = af_to_datax[int(af_job_run.job_id)]
+            log = {
+                "id": af_job_run.id,
+                "job_id": job_id,
+                "af_job_id": int(af_job_run.job_id),
+                "run_id": af_job_run.run_id,
+                "trigger_time": af_job_run.start_time,
+                "trigger_result": 1 if task else 0,
+                "execute_time": task['start_time'] if task else 0,
+                "execute_result": 1 if task and task['status'] == 'success' else 0,
+                "end_time": task['end_time'] if task else 0,
+            }
+            if job_id in res.keys():
+                res[job_id].append(log)
+            else:
+                res.update({job_id: [log]})
     for jm_job in jm_job_list:
-        history = crud.get_one_job_historys(db, jm_job.id)
+        history = res[jm_job.id] if jm_job.id in res.keys() else []
         jm_job_dict = jm_job.to_dict()
         jm_job_dict.update({'history':history[0:10]})
         res_list.append(jm_job_dict)
@@ -109,6 +136,8 @@ def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_
 @web_try()
 @sxtimeit
 def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
+    relation = crud.get_af_id(db, jm_job_id, 'job')
+    send_delete('/jpt/af_job', relation.af_id)
     return crud.delete_jm_job_info(db,jm_job_id)
 
 @router.put("/status")
@@ -124,8 +153,9 @@ def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
     jm_job = crud.get_jm_job_info(db,jm_job_id)
     if jm_job.status == 0:
         raise Exception('任务已被停用')
-    # 进行api调用
-    return jm_job
+    relation = crud.get_af_id(db, jm_job_id, 'job')
+    res = send_execute(relation.af_id)
+    return res['data']
 
 
 @router.post("/cron_expression")

+ 47 - 38
app/routers/jm_job_log.py

@@ -24,50 +24,59 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def get_job_logs(job_id: int = None, db: Session = Depends(get_db)):
-    jm_job_infos = []
+    jm_job_list = []
     if job_id is not None:
-        jm_job_infos = [crud.get_jm_job_info(db, job_id)]
+        jm_job_list = [crud.get_jm_job_info(db, job_id)]
     else:
-        jm_job_infos = crud.get_jm_job_infos(db)
-    job_id_to_job = {jm_job.id:jm_job for jm_job in jm_job_infos}
-    jm_job_id_list = job_id_to_job.keys()
-    job_history_list = crud.get_historys_by_job_ids(db,jm_job_id_list)
+        jm_job_list = crud.get_jm_job_infos(db)
+    id_to_job = {job.id:job for job in jm_job_list}
+    relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
+    af_to_datax = {relation.af_id:relation.se_id for relation in relations}
+    af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
     res = []
-    for job_history in job_history_list:
-        jm_job = job_id_to_job[job_history.job_id]
-        job_history_dict = job_history.__dict__
-        job_history_dict.update({"job_name":jm_job.name})
-        job_history_dict.update({"job_type":jm_job.type})
-        job_history_dict.update({"job_tag":jm_job.tag})
-        res.append(job_history_dict)
+    for af_job_run in af_job_runs:
+        tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
+        if len(tasks) > 0:
+            task = tasks[-1]
+            task.pop('log',None)
+            job_id = af_to_datax[int(af_job_run.job_id)]
+            log = {
+                "id": af_job_run.id,
+                "job_id": job_id,
+                "job_name": id_to_job[job_id].name,
+                "job_type": id_to_job[job_id].type,
+                "job_tag": id_to_job[job_id].tag,
+                "af_job_id": int(af_job_run.job_id),
+                "run_id": af_job_run.run_id,
+                "trigger_time": af_job_run.start_time,
+                "trigger_result": 1 if task else 0,
+                "execute_time": task['start_time'] if task else 0,
+                "execute_result": 1 if task and task['status'] == 'success' else 0,
+                "end_time": task['end_time'] if task else 0,
+            }
+            res.append(log)
+    res.sort(key=lambda x: x['trigger_time'], reverse=True)
     return res
 
 @router.get("/logs")
 @web_try()
 @sxtimeit
-def get_job_logs(job_history_id: int,db: Session = Depends(get_db)):
-    job_history_info = crud.get_jm_job_history_info(db,job_history_id)
-    job_info = crud.get_jm_job_info(db,job_history_info.job_id)
-    job_logs = crud.get_jm_job_logs_by_history_id(db,job_history_id)
-    if len(job_logs) <= 0:
-        raise Exception("未找到该任务此次运行的日志")
-    if job_info.type == '单作业离线任务':
-        return {
-            'job_type': job_info.type,
-            'logs': job_logs,
+def get_job_logs(run_id: str, job_id: int, db: Session = Depends(get_db)):
+    af_job_run = crud.get_airflow_run_once(db, run_id, job_id)
+    tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
+    res = []
+    for task in tasks:
+        log = {
+            "id": af_job_run.id,
+            "af_job_id": int(af_job_run.job_id),
+            "run_id": af_job_run.run_id,
+            "trigger_time": af_job_run.start_time,
+            "trigger_result": 1 if task else 0,
+            "execute_time": task['start_time'] if task else 0,
+            "execute_result": 1 if task and task['status'] == 'success' else 0,
+            "end_time": task['end_time'] if task else 0,
+            "log": task['log'] if task else None
         }
-    res = {}
-    for job_log in job_logs:
-        if job_log.homework_id in res.keys():
-            res[job_log.homework_id]['nodes'].append(job_log)
-        else:
-            res.update({job_log.homework_id:{
-                "homework_name":job_log.homework_name,
-                "nodes": [job_log]
-            }})
-
-    logs = [v for k, v in res.items()]
-    return {
-        'job_type': job_info.type,
-        'logs': logs,
-    }
+        res.append(log)
+    res.sort(key=lambda x: x['trigger_time'], reverse=True)
+    return res

+ 26 - 1
app/routers/job.py

@@ -1,3 +1,6 @@
+import json
+
+import requests
 from fastapi import APIRouter, Depends
 from fastapi_pagination import paginate, Params
 from sqlalchemy.orm import Session
@@ -26,7 +29,6 @@ def get_af_jobs_once(item_id: int, db: Session = Depends(get_db)):
     return crud.get_airflow_job_once(db, item_id)
 
 
-
 @router_af_job.post("/")
 @web_try()
 @sxtimeit
@@ -48,3 +50,26 @@ def add_dag_submit(id: int, db: Session = Depends(get_db)):
     item = crud.get_airflow_job_once(db, id)
     create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
     # return crud.create_airflow_job(item)
+
+
+@router_af_job.post("/{item_id}/run")
+@web_try()
+@sxtimeit
+def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)):
+    job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
+    uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
+    headers = {
+        'content-type': 'application/json',
+        'Authorization': 'basic YWRtaW46YWRtaW4=',
+        'Host':'airflow-web.sxkj.com'
+    }
+
+    response = requests.post(uri, headers=headers, data=json.dumps({}))
+    return response.json()
+
+#
+# @router_af_job.post("/run")
+# @web_try()
+# @sxtimeit
+# def trigger_af_job_run(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
+#     return crud.create_airflow_job(db, item)

+ 13 - 0
app/routers/job_info.py

@@ -100,6 +100,19 @@ def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depe
 @web_try()
 @sxtimeit
 def delete_job_info(job_id: int, db: Session = Depends(get_db)):
+    relation = crud.get_af_id(db, job_id, 'datax')
+    send_delete('/jpt/af_job', relation.af_id)
     return crud.delete_job_info(db, job_id)
 
+@router.post("/execute")
+@web_try()
+@sxtimeit
+def execute_job_info(job_id: int, db: Session = Depends(get_db)):
+    jm_job = crud.get_job_info(db, job_id)
+    if jm_job.trigger_status == 0:
+        raise Exception('任务已被停用')
+    relation = crud.get_af_id(db, job_id, 'datax')
+    res = send_execute(relation.af_id)
+    return res['data']
+
 add_pagination(router)

+ 2 - 2
app/routers/job_jdbc_datasource.py

@@ -59,8 +59,8 @@ def create_datasource(ds: schemas.JobJdbcDatasourceCreate, db: Session = Depends
 @router.get("/")
 @web_try()
 @sxtimeit
-def get_datasources(params: Params=Depends(), db: Session = Depends(get_db)):
-    return paginate(crud.get_job_jdbc_datasources(db), params)
+def get_datasources(datasource_type: Optional[str] = None, params: Params=Depends(), db: Session = Depends(get_db)):
+    return paginate(crud.get_job_jdbc_datasources(db, datasource_type), params)
 
 @router.put("/{ds_id}")
 @web_try()

+ 46 - 5
app/routers/job_log.py

@@ -28,12 +28,53 @@ def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
 @router.get("/")
 @web_try()
 @sxtimeit
-def get_job_logs(params: Params = Depends(), db: Session = Depends(get_db)):
-    return paginate(crud.get_job_logs(db), params)
+def get_job_logs(job_id: Optional[int] = None, params: Params = Depends(), db: Session = Depends(get_db)):
+    job_infos = []
+    if job_id is None:
+        job_infos = crud.get_job_infos(db)
+    else:
+        job_infos = [crud.get_job_info(db,job_id)]
+    id_to_job = {job.id:job for job in job_infos}
+    relations = crud.get_af_ids(db, id_to_job.keys(), 'datax')
+    af_to_datax = {relation.af_id:relation.se_id for relation in relations}
+    af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
+    res = []
+    for af_job_run in af_job_runs:
+        task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
+        job_id = af_to_datax[int(af_job_run.job_id)]
+        log = {
+            "id": af_job_run.id,
+            "job_id": job_id,
+            "job_desc": id_to_job[job_id].job_desc,
+            "af_job_id": int(af_job_run.job_id),
+            "run_id": af_job_run.run_id,
+            "trigger_time": af_job_run.start_time,
+            "trigger_result": 1 if task else 0,
+            "execute_time": task['start_time'] if task else 0,
+            "execute_result": 1 if task and task['status'] == 'success' else 0,
+            "end_time": task['end_time'] if task else 0,
+            "log": task['log'] if task else None
+        }
+        res.append(log)
+    res.sort(key=lambda x: x['trigger_time'], reverse=True)
+    return paginate(res, params)
 
 
-@router.get("/getOnce/{id}")
+@router.get("/getOnce")
 @web_try()
 @sxtimeit
-def get_job_logs_once(id: int, db: Session = Depends(get_db)):
-    return crud.get_job_log_once(db, id)
+def get_job_logs_once(run_id: str, job_id: int, db: Session = Depends(get_db)):
+    af_job_run = crud.get_airflow_run_once(db, run_id, job_id)
+    task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
+    log = {
+        "id": af_job_run.id,
+        "af_job_id": int(af_job_run.job_id),
+        "run_id": af_job_run.run_id,
+        "trigger_time": af_job_run.start_time,
+        "trigger_result": 1 if task else 0,
+        "execute_time": task['start_time'] if task else 0,
+        "execute_result": 1 if task and task['status'] == 'success' else 0,
+        "end_time": task['end_time'] if task else 0,
+        "log": task['log'] if task else None
+    }
+    return log

+ 1 - 1
app/routers/run.py

@@ -30,7 +30,7 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
 @sxtimeit
 def add_airflow_run(item: Item, db: Session = Depends(get_db)):
     print(item.data)
-    job_item = crud.get_airflow_job_once(db=db, id=item.data["job_id"])
+    job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
     sparks_dependence = {}
     if job_item is not None:
         for task in schemas.AirflowJob(**job_item.to_dict()).tasks:

+ 3 - 0
app/schemas/af_job.py

@@ -18,6 +18,7 @@ class AirflowJobBase(BaseModel):
 
 class AirflowJobCreate(AirflowJobBase):
     job_type: int
+    job_mode: int
     user_id: int
 
 
@@ -32,6 +33,8 @@ class AirflowJob(AirflowJobBase):
     create_time: int
     update_time: int
     user_id: int
+    job_type: int
+    job_mode: int
 
     class Config:
         orm_mode = True

+ 1 - 1
app/schemas/jm_homework.py

@@ -20,7 +20,7 @@ class JmHomeworkBase(BaseModel):
     # 脚本文件
     script_file: str
     # 执行命令
-    execute_command: str
+    execute_command: Optional[str]
     # 用户ID
     user_id: str
     # 项目ID

+ 40 - 0
app/services/dag.py

@@ -0,0 +1,40 @@
+from app import crud, models
+from app.utils.send_util import *
+from app.utils.utils import get_cmd_parameter
+from sqlalchemy.orm import Session
+
+def dag_create_job(dag_script: str):
+    af_task = dag_create_task(dag_script)
+    af_job = {
+        "tasks": [af_task],
+        "name": "123",
+        "dependence": [],
+        "cron": "once",
+        "desc": "123",
+        "route_strategy": "",
+        "block_strategy": "",
+        "executor_timeout": "",
+        "executor_fail_retry_count": "",
+        "trigger_status": 1,
+        "job_type": 0,
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_job', af_job)
+    af_job = res['data']
+    send_submit(af_job['id'])
+
+def dag_create_task(dag_script: str):
+    af_task = {
+        "name": "123",
+        "file_urls": [],
+        "script": dag_script,
+        "cmd": "",
+        "cmd_parameters": "",
+        "envs": {},
+        "run_image": "",
+        "task_type": "sparks",
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_task', af_task)
+    af_task = res['data']
+    return af_task

+ 9 - 4
app/services/datax.py

@@ -1,5 +1,3 @@
-
-
 from app import crud, models
 from app.utils.send_util import *
 from app.utils.utils import get_cmd_parameter
@@ -7,9 +5,11 @@ from sqlalchemy.orm import Session
 
 def datax_create_job(job_info: models.JobInfo, db: Session):
     af_task = datax_create_task(job_info)
+    cron: str = job_info.job_cron
+    cron.replace('?','*')
     af_job = {
         "tasks": [af_task],
-        "name": job_info.job_desc,
+        "name": cron,
         "dependence": [],
         "cron": job_info.job_cron,
         "desc": job_info.job_desc,
@@ -18,6 +18,7 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
         "executor_timeout": job_info.executor_timeout,
         "executor_fail_retry_count": job_info.executor_fail_retry_count,
         "trigger_status": job_info.trigger_status,
+        "job_mode":1,
         "job_type": 0,
         "user_id": 0,
     }
@@ -25,6 +26,7 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
     af_job = res['data']
     crud.create_relation(db, job_info.id,'datax', af_job['id'])
     send_submit(af_job['id'])
+    send_pause(af_job['id'], job_info.trigger_status)
 
 def datax_create_task(job_info: models.JobInfo):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
@@ -51,11 +53,13 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     old_af_job = res['data']
     old_af_task = old_af_job['tasks'][0]
     af_task = datax_put_task(job_info,old_af_task)
+    cron: str = job_info.job_cron
+    cron.replace('?','*')
     af_job = {
         "tasks": [af_task],
         "name": job_info.job_desc,
         "dependence": [],
-        "cron": job_info.job_cron,
+        "cron": cron,
         "desc": job_info.job_desc,
         "route_strategy": job_info.executor_route_strategy,
         "block_strategy": job_info.executor_block_strategy,
@@ -66,6 +70,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     res = send_put('/jpt/af_job', old_af_job['id'], af_job)
     af_job = res['data']
     send_submit(af_job['id'])
+    send_pause(af_job['id'], job_info.trigger_status)
 
 
 def datax_put_task(job_info: models.JobInfo,old_af_task):

+ 221 - 0
app/services/jm_job.py

@@ -0,0 +1,221 @@
+import json
+from turtle import update
+from app import crud, models
+from app.crud.jm_homework_datasource_relation import get_jm_relations
+from app.utils.send_util import *
+from app.utils.utils import get_cmd_parameter
+from sqlalchemy.orm import Session
+from app.common.minio import FileHandler
+
+type_dict = {
+    "Java": "java",
+    "Python": "python",
+    "Dag": "sparks"
+}
+
+def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
+    content = ''
+    if jm_homework.type == "Dag":
+        content = red_dag_and_format(jm_homework, db)
+    elif jm_homework.type == "Python":
+        content = red_python_and_format(jm_homework)
+    af_task = {
+        "name": jm_homework.name,
+        "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
+        "script": content if jm_homework.type != "Java" else "",
+        "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
+        "cmd_parameters": "",
+        "envs": {},
+        "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
+        "task_type": type_dict[jm_homework.type],
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_task', af_task)
+    af_task = res['data']
+    crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
+    return af_task
+
+def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
+    relation = crud.get_af_id(db, jm_homework.id, 'task')
+    content = ''
+    if jm_homework.type == "Dag":
+        content = content = red_dag_and_format(jm_homework, db)
+    elif jm_homework.type == "Python":
+        content = red_python_and_format(jm_homework)
+    af_task = {
+        "name": jm_homework.name,
+        "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
+        "script": content if jm_homework.type != "Java" else "",
+        "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
+        "cmd_parameters": "",
+        "envs": {},
+        "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
+        "task_type": type_dict[jm_homework.type],
+        "user_id": 0,
+    }
+    res = send_put('/jpt/af_task', relation.af_id, af_task)
+    af_task = res['data']
+    return af_task
+
+def jm_homework_submit(jm_homework: models.JmHomework, db: Session):
+    task_relation = crud.get_af_id(db,jm_homework.id,'task')
+    if task_relation is None:
+        jm_job_create_task(jm_homework, db)
+    else:
+        jm_job_update_task(jm_homework, db)
+
+def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
+    nodes = crud.get_one_job_nodes(db, jm_job_info.id)
+    homework_ids = [node.homework_id for node in nodes]
+    relations = crud.get_af_ids(db,homework_ids, 'task')
+    se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
+    tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
+    edges = crud.get_one_job_edges(db, jm_job_info.id)
+    dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
+    cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
+    cron.replace('?','*')
+    af_job = {
+        "tasks": tasks,
+        "name": jm_job_info.name,
+        "dependence": dependence,
+        "cron": cron,
+        "desc": jm_job_info.name,
+        "route_strategy": "",
+        "block_strategy": "",
+        "executor_timeout": 0,
+        "executor_fail_retry_count": 0,
+        "trigger_status": jm_job_info.status,
+        "job_mode":1,
+        "job_type": 0,
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_job', af_job)
+    af_job = res['data']
+    crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
+    send_submit(af_job['id'])
+    send_pause(af_job['id'], jm_job_info.status)
+
+
+def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
+    nodes = crud.get_one_job_nodes(db, jm_job_info.id)
+    homework_ids = [node.homework_id for node in nodes]
+    node_id_to_h_id = {node.id:node.homework_id for node in nodes}
+    relations = crud.get_af_ids(db,homework_ids, 'task')
+    se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
+    tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
+    edges = crud.get_one_job_edges(db, jm_job_info.id)
+    dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges]
+    cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
+    cron.replace('?','*')
+    af_job = {
+        "tasks": tasks,
+        "name": jm_job_info.name,
+        "dependence": dependence,
+        "cron": cron,
+        "desc": jm_job_info.name,
+        "route_strategy": "",
+        "block_strategy": "",
+        "executor_timeout": 0,
+        "executor_fail_retry_count": 0,
+        "trigger_status": jm_job_info.status,
+    }
+    job_relation = crud.get_af_id(db,jm_job_info.id,'job')
+    res = send_put('/jpt/af_job', job_relation.af_id, af_job)
+    af_job = res['data']
+    send_submit(af_job['id'])
+    send_pause(af_job['id'],jm_job_info.status)
+
+def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
+    job_relation = crud.get_af_id(db,jm_job_info.id,'job')
+    if job_relation is None:
+        jm_job_create_job(jm_job_info, db)
+    else:
+        jm_job_update_job(jm_job_info, db)
+
+
+def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
+    relations = get_jm_relations(db,jm_homework.id)
+    node_relation_dict = { relation.node_uuid:relation for relation in relations}
+    f = open('./dag' + jm_homework.dag_url)
+    lines = f.read()
+    result = json.loads(lines)
+    f.close()
+    edges = result['edges']
+    t_s = {}
+    input_num = {}
+    for edge in edges:
+        if edge['target'] in t_s.keys():
+            t_s[edge['target']].append(edge['source'])
+        else:
+            t_s.update({edge['target']:[edge['source']]})
+    nodes = result['nodes']
+    sub_nodes = []
+    for node in nodes:
+        if node['op'] == 'datasource':
+            fileds = node['data']['input_source']
+            script = 'select '
+            for filed in fileds:
+                script += filed['dataField'] + ','
+            script = script.strip(',')
+            script += ' from ' + node_relation_dict[node['id']].table
+            sub_node = {
+                "id": node['id'],
+                "name": node['name'],
+                "op": 'sql',
+                "script":script
+            }
+            sub_nodes.append(sub_node)
+        elif node['op'] == 'outputsource':
+            fileds = node['data']['output_source']
+            script = 'select '
+            for filed in fileds:
+                script += filed['dataField'] + ','
+            script = script.strip(',')
+            script += ' from ' + node_relation_dict[node['id']].table
+            inputs = {}
+            index = 0
+            input_list = t_s[node['id']]
+            for input in input_list:
+                if input in input_num.keys():
+                    input_num[input]+=1
+                else:
+                    input_num.update({input:0})
+                inputs.update({'input'+str(index):[input,input_num[input]]})
+                index+=1
+            sub_node = {
+                "id": node['id'],
+                "name": node['name'],
+                "op": 'sql',
+                "inputs": inputs,
+                "script":script
+            }
+            sub_nodes.append(sub_node)
+        else:
+            inputs = {}
+            index = 0
+            input_list = t_s[node['id']]
+            for input in input_list:
+                if input in input_num.keys():
+                    input_num[input]+=1
+                else:
+                    input_num.update({input:0})
+                inputs.update({'input'+str(index):[input,input_num[input]]})
+                index+=1
+            sub_node = {
+                "id": node['id'],
+                "name": node['name'],
+                "op": node['op'],
+                "inputs": inputs,
+                "script": node['data']['script'],
+            }
+            sub_nodes.append(sub_node)
+    res = {
+        'sub_nodes': sub_nodes,
+        'edges': [(edge['source'],edge['target']) for edge in edges]
+    }
+    return json.dumps(res)
+
+def red_python_and_format(jm_homework):
+    file_handler = FileHandler("datax")
+    file = file_handler.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
+    return file.decode("utf-8")

+ 34 - 4
app/utils/send_util.py

@@ -11,7 +11,8 @@ def send_post(uri,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败')
+        print(result)
+        raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
 
 def send_submit(af_job_id):
     res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id))
@@ -20,7 +21,7 @@ def send_submit(af_job_id):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败')
+        raise Exception('提交任务,请求airflow失败-->'+result['msg'])
 
 
 def send_put(uri,path_data,data):
@@ -29,7 +30,7 @@ def send_put(uri,path_data,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败')
+        raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
 
 def send_get(uri,path_data):
     res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
@@ -37,4 +38,33 @@ def send_get(uri,path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败')
+        raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
+
+
+def send_execute(path_data):
+    res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(path_data)}/run')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])
+
+# 起停任务
+def send_pause(af_job_id, status):
+    flag = True if status == 0 else False
+    res = requests.patch(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(af_job_id)}/pause/{str(flag)}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('修改任务状态,请求airflow失败-->'+result['msg'])
+
+# 删除任务
+def send_delete(uri, path_data):
+    res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        print(result)
+        raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])

+ 4 - 4
app/utils/utils.py

@@ -33,16 +33,16 @@ def byte_conversion(size):
 def get_cmd_parameter(jvm_param, inc_start_time, replace_param, partition_info):
     cmd_parameter = ''
     current_time = int(time.time())
-    if jvm_param is not None:
+    if jvm_param is not None and jvm_param != '':
         cmd_parameter += '-j "' + jvm_param + '" '
-    if replace_param is not None:
+    if replace_param is not None and replace_param != '':
         cmd_parameter += '-p "' + (replace_param % (inc_start_time,current_time))
-    if partition_info is not None:
+    if partition_info is not None and partition_info != '':
         partition_list = partition_info.split(',')
         partition_time = current_time - 86400*int(partition_list[1])
         partition_time_format = partition_list[2].replace('yyyy','%Y').replace('MM','%m').replace('dd','%d')
         partition_time_str = time.strftime(partition_time_format,time.localtime(partition_time))
         cmd_parameter += ' -Dpartition=' + str(partition_list[0]) + '=' + partition_time_str
-    if replace_param is not None:
+    if replace_param is not None and replace_param != '':
         cmd_parameter += '" '
     return cmd_parameter

+ 172 - 0
auo_tests/jupyters/02-edit-datax-json.ipynb

@@ -0,0 +1,172 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 15,
+   "metadata": {
+    "collapsed": true
+   },
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import datetime"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "outputs": [],
+   "source": [
+    "datax_config = json.load(open('./datax-config.json','r'))"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 9,
+   "outputs": [],
+   "source": [
+    "datax_config_str = json.dumps(datax_config)"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 10,
+   "outputs": [
+    {
+     "data": {
+      "text/plain": "'{\"job\": {\"content\": [{\"reader\": {\"name\": \"mysqlreader\", \"parameter\": {\"column\": [\"*\"], \"connection\": [{\"jdbcUrl\": [\"jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true\"], \"querySql\": [\"select * from test_order where updateTime >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})\"]}], \"password\": \"root\", \"username\": \"root\"}}, \"writer\": {\"name\": \"hdfswriter\", \"parameter\": {\"defaultFS\": \"hdfs://localhost:9000\", \"fileType\": \"text\", \"path\": \"/user/hive/warehouse/offline.db/test_order/${partition}\", \"fileName\": \"test_order\", \"column\": [{\"name\": \"keyno\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"code\", \"type\": \"string\"}, {\"name\": \"status\", \"type\": \"string\"}, {\"name\": \"province\", \"type\": \"string\"}, {\"name\": \"city\", \"type\": \"string\"}], \"writeMode\": \"append\", \"fieldDelimiter\": \",\"}}}], \"setting\": {\"speed\": {\"channel\": 2}}}}'"
+     },
+     "execution_count": 10,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "datax_config_str"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 24,
+   "outputs": [],
+   "source": [
+    "first_begin_time = 100\n",
+    "last_time = 200\n",
+    "current_time = 300\n",
+    "\n",
+    "partition_work = 'datety'\n",
+    "partition_format = '%Y-%m-%d'\n",
+    "partition_diff = 1\n",
+    "\n",
+    "\n",
+    "\n",
+    "last_key = '${lastTime}'\n",
+    "current_key = '${currentTime})'\n",
+    "partition_key = '${partition}'"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 14,
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "{\"job\": {\"content\": [{\"reader\": {\"name\": \"mysqlreader\", \"parameter\": {\"column\": [\"*\"], \"connection\": [{\"jdbcUrl\": [\"jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true\"], \"querySql\": [\"select * from test_order where updateTime >= FROM_UNIXTIME(200) and operationDate < FROM_UNIXTIME(300\"]}], \"password\": \"root\", \"username\": \"root\"}}, \"writer\": {\"name\": \"hdfswriter\", \"parameter\": {\"defaultFS\": \"hdfs://localhost:9000\", \"fileType\": \"text\", \"path\": \"/user/hive/warehouse/offline.db/test_order/${partition}\", \"fileName\": \"test_order\", \"column\": [{\"name\": \"keyno\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"code\", \"type\": \"string\"}, {\"name\": \"status\", \"type\": \"string\"}, {\"name\": \"province\", \"type\": \"string\"}, {\"name\": \"city\", \"type\": \"string\"}], \"writeMode\": \"append\", \"fieldDelimiter\": \",\"}}}], \"setting\": {\"speed\": {\"channel\": 2}}}}\n"
+     ]
+    }
+   ],
+   "source": [
+    "last_time = last_time or first_begin_time\n",
+    "print(datax_config_str.replace(last_key,f'{last_time}').replace(current_key,f'{current_time}'))"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 30,
+   "outputs": [
+    {
+     "data": {
+      "text/plain": "'2022-10-12'"
+     },
+     "execution_count": 30,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "\n",
+    "from datetime import timezone\n",
+    "\n",
+    "now = datetime.datetime.utcfromtimestamp(datetime.datetime.timestamp(datetime.datetime.now())) + datetime.timedelta(hours=8,days=partition_diff)\n",
+    "now.strftime(partition_format)\n",
+    "# DateTime temp = DateTime.ParseExact(sourceDate, \"dd-MM-yyyy\", CultureInfo.InvariantCulture);\n",
+    "# string str = temp.ToString(\"yyyy-MM-dd\");\n",
+    "\n",
+    "now.strftime(partition_format)\n",
+    "# datetime.datetime.now(tz=timezone.)"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "outputs": [],
+   "source": [],
+   "metadata": {
+    "collapsed": false,
+    "pycharm": {
+     "is_executing": true
+    }
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 30,
+   "outputs": [],
+   "source": [],
+   "metadata": {
+    "collapsed": false
+   }
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 2
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython2",
+   "version": "2.7.6"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 0
+}

+ 70 - 0
auo_tests/jupyters/datax-config.json

@@ -0,0 +1,70 @@
+{
+  "job": {
+    "content": [
+      {
+        "reader": {
+          "name": "mysqlreader",
+          "parameter": {
+            "column": [
+              "*"
+            ],
+            "connection": [
+              {
+                "jdbcUrl": [
+                  "jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true"
+                ],
+                "querySql": [
+                  "select * from test_order where updateTime >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})"
+                ]
+              }
+            ],
+            "password": "root",
+            "username": "root"
+          }
+        },
+        "writer": {
+          "name": "hdfswriter",
+          "parameter": {
+            "defaultFS": "hdfs://localhost:9000",
+            "fileType": "text",
+            "path": "/user/hive/warehouse/offline.db/test_order/${partition}",
+            "fileName": "test_order",
+            "column": [
+              {
+                "name": "keyno",
+                "type": "string"
+              },
+              {
+                "name": "name",
+                "type": "string"
+              },
+              {
+                "name": "code",
+                "type": "string"
+              },
+              {
+                "name": "status",
+                "type": "string"
+              },
+              {
+                "name": "province",
+                "type": "string"
+              },
+              {
+                "name": "city",
+                "type": "string"
+              }
+            ],
+            "writeMode": "append",
+            "fieldDelimiter": ","
+          }
+        }
+      }
+    ],
+    "setting": {
+      "speed": {
+        "channel": 2
+      }
+    }
+  }
+}

+ 70 - 0
auo_tests/tasks/datax/config.json

@@ -0,0 +1,70 @@
+{
+  "job": {
+    "content": [
+      {
+        "reader": {
+          "name": "mysqlreader",
+          "parameter": {
+            "column": [
+              "*"
+            ],
+            "connection": [
+              {
+                "jdbcUrl": [
+                  "jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true"
+                ],
+                "querySql": [
+                  "select * from test_order where updateTime >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})"
+                ]
+              }
+            ],
+            "password": "root",
+            "username": "root"
+          }
+        },
+        "writer": {
+          "name": "hdfswriter",
+          "parameter": {
+            "defaultFS": "hdfs://localhost:9000",
+            "fileType": "text",
+            "path": "/user/hive/warehouse/offline.db/test_order/${partition}",
+            "fileName": "test_order",
+            "column": [
+              {
+                "name": "keyno",
+                "type": "string"
+              },
+              {
+                "name": "name",
+                "type": "string"
+              },
+              {
+                "name": "code",
+                "type": "string"
+              },
+              {
+                "name": "status",
+                "type": "string"
+              },
+              {
+                "name": "province",
+                "type": "string"
+              },
+              {
+                "name": "city",
+                "type": "string"
+              }
+            ],
+            "writeMode": "append",
+            "fieldDelimiter": ","
+          }
+        }
+      }
+    ],
+    "setting": {
+      "speed": {
+        "channel": 2
+      }
+    }
+  }
+}

+ 60 - 0
auo_tests/tasks/datax/config2.json

@@ -0,0 +1,60 @@
+{
+  "job": {
+    "setting": {
+      "speed": {
+        "channel": 3
+      },
+      "errorLimit": {
+        "record": 0,
+        "percentage": 0.02
+      }
+    },
+    "content": [
+      {
+        "reader": {
+          "name": "mysqlreader",
+          "parameter": {
+            "username": "root",
+            "password": "happylay",
+            "splitPk": "",
+            "connection": [
+              {
+                "querySql": [
+                  "select id, txn_amount, txn_type from test_4 where txn_date >= FROM_UNIXTIME(${lastTime}) and txn_date < FROM_UNIXTIME(${currentTime})"
+                ],
+                "jdbcUrl": [
+                  "jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false"
+                ]
+              }
+            ]
+          }
+        },
+        "writer": {
+          "name": "hdfswriter",
+          "parameter": {
+            "defaultFS": "hdfs://192.168.199.27:9000",
+            "fileType": "text",
+            "path": "/home/sxkj/bigdata/apache-hive-2.3.9-bin/warehouse/test_h4/${partition}",
+            "fileName": "000000",
+            "writeMode": "append",
+            "fieldDelimiter": ",",
+            "column": [
+              {
+                "name": "id",
+                "type": "int"
+              },
+              {
+                "name": "txn_amount",
+                "type": "double"
+              },
+              {
+                "name": "txn_type",
+                "type": "string"
+              }
+            ]
+          }
+        }
+      }
+    ]
+  }
+}

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 134 - 0
auo_tests/tasks/datax/datax_debug.py.ipynb


A diferenza do arquivo foi suprimida porque é demasiado grande
+ 120 - 0
dag/demo.dag


+ 4 - 1
production.ini

@@ -13,4 +13,7 @@ port = 10086
 [MINIO]
 url = minio-api.sxkj.com
 access_key = admin
-secret_key = sxkjadmin
+secret_key = sxkjadmin
+[AIRFLOW]
+host = 192.168.199.109
+port = 18082

+ 1 - 0
python/test.py

@@ -0,0 +1 @@
+print("hello world!")

Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio