Parcourir la source

1. 更新了数据转存任务

luoyulong il y a 2 ans
Parent
commit
7e0960f0a3

+ 11 - 4
app/core/airflow/af_util.py

@@ -1,14 +1,12 @@
 import datetime
 import json
-
 import requests
-
 from app.common.minio import FileHandler
 from configs.settings import config
 
 
 def spark_result_tb_name(job_id, task_id, spark_node_id, out_pin, is_tmp=False):
-    return f'{config.get("HIVE","database_name")}.job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
+    return f'{config.get("HIVE", "database_name")}.job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
 
 
 def get_sub_task_script_uri(task_id, sub_node_id):
@@ -57,5 +55,14 @@ def call_airflow_api(method, uri, args_dict):
         print(f"enter get, uri is {uri_prefix + '/' + uri}")
         return requests.get(uri_prefix + '/' + uri, headers=headers, **args_dict)
 
+
 def datetime2timestamp(datetime_str):
-    return datetime.datetime.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
+    if datetime_str is None:
+        return None
+    ts = None
+    try:
+        ts = datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S%z').timestamp()
+    except Exception:
+        ts = datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
+    finally:
+        return ts

+ 91 - 6
app/core/airflow/job.py

@@ -1,5 +1,7 @@
 import os
 import stat
+import time
+
 from app.core.airflow.task import *
 from app.core.airflow.af_util import get_job_path
 from app.schemas import AirflowJob
@@ -40,9 +42,28 @@ class AirflowJobSubmitter:
                       'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
                       'interval': item.cron if item.cron != 'None' else None,
                       'af_backend_uri': config.get('AF_BACKEND', 'uri'),
-                      'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None)
+                      'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None),
+                      'enable_notify': True
                       }
 
+        # env = Environment(
+        #     loader=PackageLoader('app.core.airflow'),
+        #     autoescape=select_autoescape()
+        # )
+        # template = env.get_template("dag_template.py.jinja2")
+        # dag_content = template.render(parameters)
+        # print(f'finish build:{dag_content}')
+        #
+        # output_path = get_job_path(job_id=item.id)
+        # with open(output_path, "w") as fh:
+        #     fh.write(dag_content)
+        #
+        # os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
+        # print(f'write dag to {output_path}')
+        AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=item.id))
+
+    @staticmethod
+    def generate_dag_on_airflow(parameters, save_path):
         env = Environment(
             loader=PackageLoader('app.core.airflow'),
             autoescape=select_autoescape()
@@ -50,10 +71,74 @@ class AirflowJobSubmitter:
         template = env.get_template("dag_template.py.jinja2")
         dag_content = template.render(parameters)
         print(f'finish build:{dag_content}')
-
-        output_path = get_job_path(job_id=item.id)
-        with open(output_path, "w") as fh:
+        with open(save_path, "w") as fh:
             fh.write(dag_content)
+        os.chmod(save_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
+        print(f'write dag to {save_path}')
+
+    @staticmethod
+    def auto_submit_data_transfer():
+        job_id = 0
+        user_id = 0
+        spark_task_demo = SparksTaskCompiler(item=None)
+        spark_nodes = [
+            {
+                "sub_nodes": [{
+                    "name": 'read_and_save',
+                    "id": 0,
+                    "image": spark_task_demo.default_image,
+                    "cmds": ['/bin/bash', '-c',  spark_task_demo.cmd_str(name='spark_data_transfer')],
+                    "env": {"SCRIPT": spark_task_demo.render_spark_script(
+                        parameters={"hive_metastore_uris": config.get('HIVE_METASTORE', 'uris')},
+                        template_file="data_transfer_dag_template.py.jinja2")
+                    },
+                }],
+                "edges": [],
+                "name": 'data_save',
+                "desc": 'task for data saving',
+                "id": 0,
+            }
+
+        ]
+        print(spark_nodes[0]['sub_nodes'][0]['env']['SCRIPT'])
+        parameters = {'nodes': [], 'spark_nodes': spark_nodes, 'edges': [], 'dag_id': f'dag_{job_id}',
+                      'user_name': user_id, 'job_id': job_id, 'trigger_status': False,
+                      'interval': None,
+                      'af_backend_uri': config.get('AF_BACKEND', 'uri'),
+                      'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None),
+                      'enable_notify':False
+                      }
 
-        os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
-        print(f'write dag to {output_path}')
+        AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=job_id))
+        print('create data transfer job success!')
+    # @staticmethod
+    # def auto_submit_data_transfer2():
+    #     # name: str
+    #     # file_urls: Optional[List[str]] = []
+    #     # script: str
+    #     # cmd: Optional[str] = ""
+    #     # cmd_parameters: str
+    #     # envs: Optional[Dict[str, str]] = {}
+    #     # run_image: str
+    #     # task_type: str
+    #
+    #     df_task = AirflowTask(name='data_save', task_type='sparks', file_urls=[], script='', cmd='', env={})
+    #     # id: int
+    #     # job_type: int
+    #     # create_time: int
+    #     # update_time: int
+    #     # user_id: int
+    #     # job_mode: int
+    #     # tasks: List[AirflowTask]
+    #     # name: str
+    #     # dependence: List = []
+    #     # cron: str
+    #     # desc: str
+    #     # route_strategy: str
+    #     # block_strategy: str
+    #     # executor_timeout: int
+    #     # executor_fail_retry_count: int
+    #     # trigger_status: int
+    #
+    #     job_item = AirflowJob(id=0, job_type=1, tasks=[df_task], create_time=int(time.time()), user_id=0, job_mode=1,
+    #                           name='data_transfer', dependence=[], cron="None", trigger_status=0)

+ 3 - 1
app/core/airflow/task.py

@@ -1,4 +1,6 @@
 import json
+from typing import Optional
+
 from app.core.airflow.af_util import spark_result_tb_name
 from app.schemas import AirflowTask
 from jinja2 import Environment, PackageLoader, select_autoescape
@@ -94,7 +96,7 @@ class DataXTaskCompiler(TaskCompiler):
 
 
 class SparksTaskCompiler(TaskCompiler):
-    def __init__(self, item: AirflowTask):
+    def __init__(self, item: Optional[AirflowTask]):
         super(SparksTaskCompiler, self).__init__(item)
         self.default_image = config.get('TASK_IMAGES', 'sparks')
         parameters = {"master": "yarn",

+ 15 - 0
app/core/airflow/templates/dag_template.py.jinja2

@@ -8,6 +8,8 @@ import requests
 
 job_id = {{ job_id }}
 
+{% if enable_notify ==true %}
+
 def task_finish_alert(context):
     print('############### task begin callback!###################')
     url = 'http://{{ af_backend_uri }}/af/af_run/notification'
@@ -39,6 +41,19 @@ def dag_begin_alert(context):
         "status": ti.current_state()
     }})
 
+{% else %}
+
+def task_finish_alert(context):
+    pass
+
+def dag_begin_alert(context):
+    pass
+
+{% endif %}
+
+
+
+
 print('enter dag run!')
 namespace = conf.get("kubernetes", "NAMESPACE")
 

+ 1 - 1
app/core/airflow/templates/data_transfer/data_transfer.py

@@ -29,7 +29,7 @@ with DAG(
         get_logs=True,
         log_events_on_failure=True,
         cmds=['/bin/bash', '-c',
-              'kinit -kt /workspace/conf/user.keytab ailab & cd /workspace && echo "$SCRIPT" > run.py && ${SPARK_HOME}/bin/spark-submit --name spark_data_transfer --master yarn --deploy-mode cluster --driver-memory 1g --driver-cores  1 --executor-memory 1g --executor-cores 1 --num-executors 1 --archives /workspace/py37.zip#python3env --conf spark.default.parallelism=1 --conf spark.executor.memoryOverhead=1g --conf spark.driver.memoryOverhead=1g --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.pyspark.driver.python=python3env/py37/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3env/py37/bin/python --conf spark.pyspark.python=python3env/py37/bin/python --conf spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true run.py'],
+              'kinit -kt /workspace/conf/user.keytab ailab & cd /workspace && echo "$SCRIPT" > run.py && ${SPARK_HOME}/bin/spark-submit --name spark_data_transfer --master yarn --deploy-mode cluster --driver-memory 1g --driver-cores  1 --executor-memory 1g --executor-cores 1 --num-executors 1 --archives /workspace/py37.zip#python3env --conf spark.default.parallelism=1 --conf spark.executor.memoryOverhead=1g --conf spark.driver.memoryOverhead=1g --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.pyspark.driver.python=python3env/py37/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3env/py37/bin/python --conf spark.pyspark.python=python3env/py37/bin/python run.py'],
         env_vars={{env}}
     )
 

+ 5 - 6
app/core/airflow/templates/data_transfer/data_transfer_basic.py

@@ -4,13 +4,12 @@ from pyspark.sql import SparkSession, DataFrame
 
 def main_func(input0, spark, sc):
     output_tb = "{{ dag_run.conf.output}}"
-    print(f"read table {output_tb}")
+    print(f"write t0 table {output_tb}")
     input0.write.mode("overwrite").saveAsTable(output_tb)
 
 
 def run(inputs: dict):
-    spark = SparkSession.builder.config('hive.metastore.uris',
-                                        'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
+    spark = SparkSession.builder.config('hive.metastore.uris',{{metastore_uris}}).enableHiveSupport().getOrCreate()
     param_dict = preprocess(input_infos=inputs, ss=spark)
     main_func(**param_dict, spark=spark, sc=spark.sparkContext)
 
@@ -19,9 +18,9 @@ def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
     print(f"read table {tb_name}")
     return ss.sql(f'select * from {tb_name}')
 
-
-def write_table(df: DataFrame, tb_name: str):
-    df.write.mode("overwrite").saveAsTable(tb_name)
+#
+# def write_table(df: DataFrame, tb_name: str):
+#     df.write.mode("overwrite").saveAsTable(tb_name)
 
 
 def preprocess(input_infos: dict, ss: SparkSession) -> dict:

+ 27 - 0
app/core/airflow/templates/data_transfer_dag_template.py.jinja2

@@ -0,0 +1,27 @@
+#!coding=utf8
+from pyspark.sql import SparkSession, DataFrame
+
+
+def main_func(input0, spark, sc):
+    output_tb ={{   "'{{ dag_run.conf.output}}'" }}
+    print(f"write t0 table {output_tb}")
+    input0.write.mode("overwrite").saveAsTable(output_tb)
+
+
+def run(inputs: dict):
+    spark = SparkSession.builder.config('hive.metastore.uris','{{hive_metastore_uris}}').enableHiveSupport().getOrCreate()
+    param_dict = preprocess(input_infos=inputs, ss=spark)
+    main_func(**param_dict, spark=spark, sc=spark.sparkContext)
+
+
+def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
+    print(f"read table {tb_name}")
+    return ss.sql(f'select * from {tb_name}')
+
+
+def preprocess(input_infos: dict, ss: SparkSession) -> dict:
+    return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
+
+
+if __name__ == '__main__':
+    run(inputs={'input0': {{ "'{{ dag_run.conf.input}}'" }}  })

+ 3 - 2
app/routers/job.py

@@ -94,9 +94,10 @@ def trigger_af_job_adv_run(item_id: int, parameters: AirflowTrigger):
 @web_try()
 @sxtimeit
 def trigger_af_data_transfer_job_run(source_tb: str, target_tb: str):
-    rets = call_airflow_api(method='post', uri=f'dags/dag_data_transfer/dagRuns',
+    rets = call_airflow_api(method='post', uri=f'dags/dag_0/dagRuns',
                             args_dict={"json": {"conf": {"input": source_tb, "output": target_tb}}}).json()
-    return {"af_run_id": rets.get('dag_run_id', None)}
+    return {"af_run_id": rets.get('dag_run_id', None),
+            "dag_id": "dag_0"}
 
 
 @router_af_job.delete("/{item_id}")

+ 36 - 23
app/routers/run.py

@@ -10,7 +10,6 @@ from pydantic import BaseModel
 from sqlalchemy.orm import Session
 from app import schemas, get_db, crud
 from app.core.airflow.af_util import get_airflow_api_info, call_airflow_api, datetime2timestamp
-from app.core.k8s.k8s_client import KubernetesTools
 from utils import web_try, sxtimeit
 
 
@@ -133,10 +132,10 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
     if run is not None:
         update_run = schemas.AirflowRunUpdate(
             **{"details": run.details, "status": run.status, "af_run_id": item.data['af_run_id']})
-        update_run.details['tasks'][item.data['task_id']] = {"log": logs,
-                                                             "start_time": item.data["start_time"],
-                                                             "end_time": item.data["end_time"],
-                                                             "status": item.data['status']}
+        update_run.details['tasks'][item.data['task_id']].update({"log": logs,
+                                                                  "start_time": item.data["start_time"],
+                                                                  "end_time": item.data["end_time"],
+                                                                  "status": item.data['status']})
         crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
 
 
@@ -168,22 +167,16 @@ def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db
 @router_af_run.get("/running_status/{job_id}/{af_run_id}")
 @web_try()
 @sxtimeit
-def get_airflow_dagrun_running_status(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
-    job_info = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}', args_dict={})
-    tasks_info = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances',
-                                  args_dict={})
-
-    details = defaultdict(dict)
-    for task in tasks_info.json()['task_instances']:
-        details['tasks'][task['task_id']] = {
-            # "log": logs,
-            "start_time": datetime2timestamp(task['start_date']),
-            "end_time": datetime2timestamp(task['end_date']),
-            "status": task['state']
-        }
-        # print(f"{task['task_id']}:{task['duration']}")
-
-
+def get_airflow_dagrun_running_status(job_id: int, af_run_id: str):
+    job_uri = f'dags/dag_{job_id}/dagRuns/{af_run_id}'
+    job_ret = call_airflow_api(method='get', uri=job_uri, args_dict={})
+    if job_ret.status_code != 200:
+        raise Exception(f'cant found the information of this job run,please check your input: job uri is {job_uri} ')
+    return {
+        "start_time": datetime2timestamp(job_ret.json()['start_date']),
+        "end_time": datetime2timestamp(job_ret.json()['end_date']),
+        "status": job_ret.json()['state']
+    }
 
 
 @router_af_run.get("/task_log/{job_id}/{af_run_id}/{task_id}")
@@ -206,11 +199,14 @@ def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str, db: S
             state_ret = call_airflow_api(method='get', uri=state_uri, args_dict={})
             log_ret = call_airflow_api(method='get', uri=log_uri, args_dict={})
             if state_ret.status_code != 200 or log_ret.status_code != 200:
-                raise Exception(f'cant found the information of this task,please check your input.log uri is {log_uri} ')
+                return None
             update_run = schemas.AirflowRunUpdate(
                 **{"details": run.details, "status": run.status, "af_run_id": af_run_id})
+            print(f'stat is {state_ret.json()}')
             task_info = {
-                "log": log_ret.text, "status": state_ret.json()['state'],
+                "log": log_ret.text,
+                "status": state_ret.json()['state'],
+                "execution_time": datetime2timestamp(state_ret.json()['execution_date']),
                 "start_time": datetime2timestamp(state_ret.json()['start_date']),
                 "end_time": datetime2timestamp(state_ret.json()['end_date']),
             }
@@ -219,3 +215,20 @@ def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str, db: S
             return task_info
         else:
             return run.details['tasks'][task_id]
+
+
+@router_af_run.get("/data_transfer_log/{af_run_id}")
+@web_try()
+@sxtimeit
+def get_airflow_dagrun_task_log(af_run_id: str):
+    state_uri = f"dags/dag_0/dagRuns/{af_run_id}/taskInstances/0"
+    log_uri = f"{state_uri}/logs/1"
+    state_ret = call_airflow_api(method='get', uri=state_uri, args_dict={})
+    log_ret = call_airflow_api(method='get', uri=log_uri, args_dict={})
+    return {
+        "log": log_ret.text,
+        "status": state_ret.json()['state'],
+        "execution_time": datetime2timestamp(state_ret.json()['execution_date']),
+        "start_time": datetime2timestamp(state_ret.json()['start_date']),
+        "end_time": datetime2timestamp(state_ret.json()['end_date']),
+    }

+ 0 - 1
app/schemas/af_job.py

@@ -29,7 +29,6 @@ class AirflowJobUpdate(AirflowJobBase):
 
 class AirflowJob(AirflowJobBase):
     id: int
-    job_type: int
     create_time: int
     update_time: int
     user_id: int

+ 34 - 0
debug.ipynb

@@ -139,6 +139,40 @@
     "collapsed": false
    }
   },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "outputs": [],
+   "source": [
+    "datetime_str = '2022-09-02T00:00:00+00:00'"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "1662076800.0\n"
+     ]
+    }
+   ],
+   "source": [
+    "import datetime\n",
+    "try:\n",
+    "    print(datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S%z').timestamp() if datetime_str else datetime_str)\n",
+    "except Exception:\n",
+    "    print(datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S.%f%z').timestamp() if datetime_str else datetime_str)\n"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
   {
    "cell_type": "code",
    "execution_count": null,

+ 2 - 0
server.py

@@ -1,5 +1,6 @@
 from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
+from app.core.airflow.job import AirflowJobSubmitter
 from app.models.database import engine, Base
 import app.routers.job_jdbc_datasource as router_jjds
 import app.routers.constants as router_constants
@@ -50,6 +51,7 @@ app.include_router(router_jm_job_info.router)
 app.include_router(router_jm_job_log.router)
 app.include_router(router_code_check.router)
 
+AirflowJobSubmitter.auto_submit_data_transfer()
 print('server init finish:)!!!')