Browse Source

Merge branch 'master' of http://gogsb.soaringnova.com/sxwl_DL/datax-admin

liweiquan 2 years ago
parent
commit
1ff76b306f

+ 2 - 2
app/core/airflow/task.py

@@ -103,7 +103,7 @@ class SparksTaskCompiler(TaskCompiler):
                       "executor-memory": "1g",
                       "executor-cores": 1,
                       "num-executors": 1,
-                      "archives": "/home/sxkj/bigdata/py37.zip#python3env"
+                      "archives": "/workspace/py37.zip#python3env"
                       }
         spark_config = {'spark.default.parallelism': 1,
                         "spark.executor.memoryOverhead": "1g",
@@ -117,7 +117,7 @@ class SparksTaskCompiler(TaskCompiler):
                         }
         param_str = ' '.join([f'--{k} {v}' for k, v in parameters.items()])
         param_str += ''.join([f' --conf {k}={v}' for k, v in spark_config.items()])
-        basic_cmds = "cd /home/sxkj/bigdata && echo \"$SCRIPT\" > run.py && ${SPARK_HOME}/bin/spark-submit"
+        basic_cmds = "kinit -kt /workspace/conf/user.keytab ailab && cd /workspace && echo \"$SCRIPT\" > run.py && ${SPARK_HOME}/bin/spark-submit"
         self.cmd_str = lambda name: f"{basic_cmds} --name {name} {param_str} run.py"
 
     def translate(self, job_id, task_mode=1):

+ 1 - 1
app/core/airflow/templates/data_transfer/data_transfer.py

@@ -29,7 +29,7 @@ with DAG(
         get_logs=True,
         log_events_on_failure=True,
         cmds=['/bin/bash', '-c',
-              'cd /home/sxkj/bigdata && echo "$SCRIPT" > run.py && ${SPARK_HOME}/bin/spark-submit --name spark_data_transfer --master yarn --deploy-mode cluster --driver-memory 1g --driver-cores  1 --executor-memory 1g --executor-cores 1 --num-executors 1 --archives /home/sxkj/bigdata/py37.zip#python3env --conf spark.default.parallelism=1 --conf spark.executor.memoryOverhead=1g --conf spark.driver.memoryOverhead=1g --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.pyspark.driver.python=python3env/py37/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3env/py37/bin/python --conf spark.pyspark.python=python3env/py37/bin/python --conf spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true run.py'],
+              'kinit -kt /workspace/conf/user.keytab ailab & cd /workspace && echo "$SCRIPT" > run.py && ${SPARK_HOME}/bin/spark-submit --name spark_data_transfer --master yarn --deploy-mode cluster --driver-memory 1g --driver-cores  1 --executor-memory 1g --executor-cores 1 --num-executors 1 --archives /workspace/py37.zip#python3env --conf spark.default.parallelism=1 --conf spark.executor.memoryOverhead=1g --conf spark.driver.memoryOverhead=1g --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.pyspark.driver.python=python3env/py37/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3env/py37/bin/python --conf spark.pyspark.python=python3env/py37/bin/python --conf spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true run.py'],
         env_vars={{env}}
     )
 

+ 3 - 1
app/core/airflow/uri.py

@@ -47,5 +47,7 @@ def call_airflow_api(method, uri, args_dict):
         'Host': f'{config.get("AIRFLOW", "host_in_header")}'
     }
     if method == 'post':
-        print('enter post')
         return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict)
+    if method == 'get':
+        print(f"enter get, uri is {uri_prefix + '/' + uri}")
+        return requests.get(uri_prefix + '/' + uri, headers=headers, **args_dict)

+ 42 - 6
app/routers/run.py

@@ -1,4 +1,7 @@
+import datetime
 import json
+import time
+from collections import defaultdict
 
 import requests
 from fastapi import APIRouter, Depends
@@ -135,13 +138,46 @@ def add_notification(item: Item):
     print(f'receive sigal: {item.data} ')
 
 
-@router_af_run.get("/{job_id}/{af_run_id}")
+@router_af_run.get("/tasks_status/{job_id}/{af_run_id}")
 @web_try()
 @sxtimeit
 def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
-    ret = call_airflow_api(method='get',uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances',args_dict={})
+    ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={})
+    details = defaultdict(dict)
+
     for task in ret.json()['task_instances']:
-        print(f"{task['task_id']}:{task['duration']}")
-    # task_info =
-    # pass
-    # return paginate(crud.get_airflow_tasks(db), params)
+        details['tasks'][task['task_id']] = {
+            # "log": logs,
+            "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
+            "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
+            "status": task['state']
+        }
+        # print(f"{task['task_id']}:{task['duration']}")
+    return details
+
+
+@router_af_run.get("/job_status/{job_id}/{af_run_id}")
+@web_try()
+@sxtimeit
+def get_airflow_dagrun_job_info(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
+    ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={})
+    details = defaultdict(dict)
+
+    for task in ret.json()['task_instances']:
+        details['tasks'][task['task_id']] = {
+            # "log": logs,
+            "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
+            "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
+            "status": task['state']
+        }
+        # print(f"{task['task_id']}:{task['duration']}")
+    return details
+
+
+@router_af_run.get("/task_log/{job_id}/{af_run_id}/{task_id}")
+@web_try()
+@sxtimeit
+def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str):
+    ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances/{task_id}/logs/1',
+                           args_dict={})
+    return {"log": ret.text}