Browse Source

1. hive.metastore.uris模版问题

luoyulong 2 years ago
parent
commit
6bc850c10e

+ 4 - 0
app/core/airflow/uri.py → app/core/airflow/af_util.py

@@ -1,3 +1,4 @@
+import datetime
 import json
 
 import requests
@@ -55,3 +56,6 @@ def call_airflow_api(method, uri, args_dict):
     if method == 'get':
         print(f"enter get, uri is {uri_prefix + '/' + uri}")
         return requests.get(uri_prefix + '/' + uri, headers=headers, **args_dict)
+
+def datetime2timestamp(datetime_str):
+    return datetime.datetime.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()

+ 1 - 1
app/core/airflow/job.py

@@ -1,7 +1,7 @@
 import os
 import stat
 from app.core.airflow.task import *
-from app.core.airflow.uri import get_job_path
+from app.core.airflow.af_util import get_job_path
 from app.schemas import AirflowJob
 
 

+ 1 - 1
app/core/airflow/task.py

@@ -1,5 +1,5 @@
 import json
-from app.core.airflow.uri import spark_result_tb_name
+from app.core.airflow.af_util import spark_result_tb_name
 from app.schemas import AirflowTask
 from jinja2 import Environment, PackageLoader, select_autoescape
 from app.common.minio import FileHandler

+ 2 - 2
app/core/airflow/templates/pyspark_script_template.py.jinja2

@@ -10,8 +10,7 @@ from pyspark.sql import SparkSession, DataFrame
 # argv[0] inputs:{"input1_key":"input1_path","input2_key":"input2_path",..}
 # argv[1] outputs: [result_path1,result_path2...]
 def run(inputs: dict, outputs: list):
-    spark = SparkSession.builder.config('hive.metastore.uris',
-                                        '{{ hive_metastore_uris }}').enableHiveSupport().getOrCreate()
+    spark = SparkSession.builder.config('hive.metastore.uris','{{ hive_metastore_uris }}').enableHiveSupport().getOrCreate()
     param_dict = preprocess(input_infos=inputs, ss=spark)
     rets = main_func(**param_dict,spark=spark,sc=spark.sparkContext)
     postprocess(rets=rets, outputs=outputs)
@@ -29,6 +28,7 @@ def preprocess(input_infos: dict, ss: SparkSession) -> dict:
     return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
 
 
+
 def postprocess(rets, outputs):
     if isinstance(rets,list):
         for idx, df in enumerate(rets):

+ 1 - 1
app/routers/job.py

@@ -6,7 +6,7 @@ from fastapi import APIRouter, Depends
 from fastapi_pagination import paginate, Params
 from sqlalchemy.orm import Session
 from app import schemas, get_db, crud
-from app.core.airflow.uri import get_job_path, get_airflow_api_info, call_airflow_api
+from app.core.airflow.af_util import get_job_path, get_airflow_api_info, call_airflow_api
 from app.crud import create_airflow_job_submit
 from app.schemas import AirflowTrigger
 from utils import web_try, sxtimeit

+ 33 - 15
app/routers/run.py

@@ -9,7 +9,7 @@ from fastapi_pagination import paginate, Params
 from pydantic import BaseModel
 from sqlalchemy.orm import Session
 from app import schemas, get_db, crud
-from app.core.airflow.uri import get_airflow_api_info, call_airflow_api
+from app.core.airflow.af_util import get_airflow_api_info, call_airflow_api, datetime2timestamp
 from app.core.k8s.k8s_client import KubernetesTools
 from utils import web_try, sxtimeit
 
@@ -148,7 +148,7 @@ def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db
     for task in ret.json()['task_instances']:
         details['tasks'][task['task_id']] = {
             # "log": logs,
-            "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
+            "start_time":  datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
             "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
             "status": task['state']
         }
@@ -160,20 +160,38 @@ def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db
 @web_try()
 @sxtimeit
 def get_airflow_dagrun_running_status(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
-    ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}', args_dict={})
-    ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={})
+    job_info = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}', args_dict={})
+    tasks_info = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={})
+
+
+    details = defaultdict(dict)
+    for task in tasks_info.json()['task_instances']:
+        details['tasks'][task['task_id']] = {
+            # "log": logs,
+            "start_time": datetime2timestamp(task['start_date']),
+            "end_time": datetime2timestamp(task['end_date']),
+            "status": task['state']
+        }
+        # print(f"{task['task_id']}:{task['duration']}")
 
-    # details = defaultdict(dict)
-
-    # for task in ret.json()['task_instances']:
-    #     details['tasks'][task['task_id']] = {
-    #         # "log": logs,
-    #         "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
-    #         "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
-    #         "status": task['state']
-    #     }
-    #     # print(f"{task['task_id']}:{task['duration']}")
-    return ret.json()
+    # item = schemas.AirflowRunUpdate(**{#"start_time": item.data["start_time"],
+    #                                    #"job_id": int(job_id),
+    #                                    # "run_ts": item.data['run_ts'],
+    #                                    # "af_run_id": item.data['af_run_id'],
+    #                                     "end_time":datetime2timestamp()
+    #                                    "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
+    #                                                                            "sparks": sparks_dependence}},
+    #                                    "status": 0},
+
+    # item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
+    #                                    "job_id": int(job_id),
+    #                                    # "run_ts": item.data['run_ts'],
+    #                                    # "af_run_id": item.data['af_run_id'],
+    #                                    "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
+    #                                                                            "sparks": sparks_dependence}},
+    #                                    "status": 0},
+
+    # return ret.json()
 
 
 @router_af_run.get("/task_log/{job_id}/{af_run_id}/{task_id}")

+ 1 - 1
auo_tests/tasks/add_sparks_task.py

@@ -5,7 +5,7 @@ import requests
 
 from auo_tests.tasks.config import host
 from auo_tests.tasks.minio_handler import FileHandler, URL as MinioURL
-from app.core.airflow.uri import upload2oss
+from app.core.airflow.af_util import upload2oss
 
 dag_script = {
     "sub_nodes": [

+ 1 - 0
production.ini

@@ -16,6 +16,7 @@ uri=aihub-backend-af-yili-test:8080
 host=aihub-backend-af-yili-test
 port=8080
 dag_files_dir=/dags/
+
 [K8S]
 image_pull_key=codingregistrykey