Procházet zdrojové kódy

1. 更新配置格式

luoyulong před 2 roky
rodič
revize
15f10bf65d

+ 1 - 2
app/core/airflow/job.py

@@ -35,8 +35,7 @@ class AirflowJobSubmitter:
         parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
                       'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
                       'interval': item.cron if item.cron != 'None' else None,
-                      'backend_host': config.get('BACKEND', 'host'),
-                      'backend_port': config.get('BACKEND', 'port'),
+                      'af_backend_uri': config.get('AF_BACKEND', 'uri'),
                       }
 
         env = Environment(

+ 2 - 2
app/core/airflow/templates/dag_template.py.jinja2

@@ -10,7 +10,7 @@ job_id = {{ job_id }}
 
 def task_finish_alert(context):
     print('############### task begin callback!###################')
-    url = 'http://{{ backend_host }}:{{ backend_port }}/af/af_run/notification'
+    url = 'http://{{ af_backend_uri }}/af/af_run/notification'
     ti = context['ti']
     requests.post(url, json={"data": {
         "job_id": job_id,
@@ -26,7 +26,7 @@ def task_finish_alert(context):
 
 def dag_begin_alert(context):
     print('############### dag begin callback!###################')
-    url = 'http://{{ backend_host }}:{{ backend_port }}/af/af_run'
+    url = 'http://{{ af_backend_uri }}/af/af_run'
     ti = context['ti']
     requests.post(url, json={"data": {
         "job_id": job_id,

+ 3 - 3
app/core/airflow/uri.py

@@ -25,12 +25,12 @@ def upload2oss(content: bytes, uri: str, minio_bucket: str):
 
 
 def get_job_path(job_id):
-    dag_path = f'{config.get("AIRFLOW", "dag_files_dir")}'
+    dag_path = f'{config.get("AF_BACKEND", "dag_files_dir")}'
     return dag_path + f'dag_{job_id}.py'
 
 
 def get_airflow_api_info():
-    uri_prefix = f'http://{config.get("AIRFLOW", "ip_address")}/api/v1'
+    uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1'
     headers = {
         'content-type': 'application/json',
         'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
@@ -40,7 +40,7 @@ def get_airflow_api_info():
 
 
 def call_airflow_api(method, uri, args_dict):
-    uri_prefix = f'http://{config.get("AIRFLOW", "ip_address")}/api/v1'
+    uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1'
     headers = {
         'content-type': 'application/json',
         'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',

+ 5 - 2
app/routers/run.py

@@ -6,7 +6,7 @@ from fastapi_pagination import paginate, Params
 from pydantic import BaseModel
 from sqlalchemy.orm import Session
 from app import schemas, get_db, crud
-from app.core.airflow.uri import get_airflow_api_info
+from app.core.airflow.uri import get_airflow_api_info, call_airflow_api
 from app.core.k8s.k8s_client import KubernetesTools
 from utils import web_try, sxtimeit
 
@@ -139,6 +139,9 @@ def add_notification(item: Item):
 @web_try()
 @sxtimeit
 def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
+    ret = call_airflow_api(method='get',uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances',args_dict={})
+    for task in ret.json()['task_instances']:
+        print(f"{task['task_id']}:{task['duration']}")
     # task_info =
-    pass
+    # pass
     # return paginate(crud.get_airflow_tasks(db), params)

+ 2 - 2
app/utils/send_util.py

@@ -2,8 +2,8 @@ from unittest import result
 import requests
 from configs.settings import config
 
-HOST = config.get('AIRFLOW', 'HOST')
-PORT = config.get('AIRFLOW', 'PORT')
+HOST = config.get('AF_BACKEND', 'host')
+PORT = config.get('AF_BACKEND', 'port')
 
 def send_post(uri,data):
     res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)

+ 7 - 11
development.ini

@@ -4,28 +4,24 @@ pwd = happylay
 db_name = datax_web_dev
 host = 192.168.199.107
 port = 10086
-; [DATABASE]
-; user = aihubtest
-; pwd = q9WBYDynEy@jh#5N
-; db_name = aihubtest_dag_admin_db
-; host = 10.254.12.7
-; port = 3306
+
 [MINIO]
 url = minio-api.sxkj.com
 access_key = admin
 secret_key = sxkjadmin
 k8s_url=minio.default:9000
-[BACKEND]
+
+[AF_BACKEND]
+uri=192.168.199.109:18082
 host=192.168.199.109
 port=18082
+dag_files_dir=/dags/
+
 
 [AIRFLOW]
-host = 192.168.199.109
-port = 18082
+uri=192.168.199.109
 host_in_header=airflow-web.sxkj.com
-ip_address=192.168.199.109
 api_token=YWRtaW46YWRtaW4=
-dag_files_dir=/dags/
 
 [HIVE]
 host = 192.168.199.27

+ 13 - 5
production.ini

@@ -9,13 +9,21 @@ k8s_url = aihub-minio-yili-test:9000
 url = aihub-minio-yili-test:9000
 access_key = minioadmin
 secret_key = minioadmin
+
+
+[AF_BACKEND]
+uri=192.168.199.109:18082
+host=192.168.199.109
+port=18082
+dag_files_dir=/dags/
+
+
 [AIRFLOW]
-host_in_header=airflow-web-test.digitalyili.com
-ip_address = aihub-backend-af-yili-test:8080
+uri=192.168.199.109
+host_in_header=airflow-web.sxkj.com
 api_token=YWRtaW46YWRtaW4=
-dag_files_dir=/dags/
-host = aihub-backend-af-yili-test
-port = 8080
+
+
 [HIVE]
 host = 10.254.20.22
 port = 7001