Procházet zdrojové kódy

Merge branch 'master' of http://gogsb.soaringnova.com/sxwl_DL/datax-admin

liweiquan před 2 roky
rodič
revize
d4d3788c95
40 změnil soubory, kde provedl 445 přidání a 101 odebrání
  1. 17 1
      Dockerfile
  2. 1 1
      Makefile
  3. 19 17
      app/core/airflow/task.py
  4. 1 1
      app/core/airflow/templates/dag_template.py.jinja2
  5. 0 0
      app/core/airflow/templates/data_transfer/__init__.py
  6. 35 0
      app/core/airflow/templates/data_transfer/data_transfer.py
  7. 32 0
      app/core/airflow/templates/data_transfer/data_transfer_basic.py
  8. 6 1
      app/core/airflow/templates/pyspark_script_template.py.jinja2
  9. 4 2
      app/core/airflow/templates/sql_script_template.py.jinja2
  10. 18 5
      app/core/airflow/uri.py
  11. 1 1
      app/routers/code_check.py
  12. 1 1
      app/routers/constants.py
  13. 1 1
      app/routers/dag.py
  14. 1 1
      app/routers/dag_file.py
  15. 1 1
      app/routers/data_management.py
  16. 1 1
      app/routers/datax_json.py
  17. 1 1
      app/routers/files.py
  18. 1 1
      app/routers/intermediate.py
  19. 1 1
      app/routers/jm_homework.py
  20. 1 1
      app/routers/jm_job_info.py
  21. 1 1
      app/routers/jm_job_log.py
  22. 33 36
      app/routers/job.py
  23. 1 1
      app/routers/job_info.py
  24. 1 1
      app/routers/job_jdbc_datasource.py
  25. 1 1
      app/routers/job_log.py
  26. 16 4
      app/routers/run.py
  27. 1 1
      app/routers/task.py
  28. 3 3
      app/routers/task_run.py
  29. 6 1
      app/schemas/af_run.py
  30. 32 0
      auo_tests/spark_submit/data_transfer.py
  31. 4 3
      auo_tests/spark_submit/docker-compose.yml
  32. 15 0
      auo_tests/spark_submit/sparksql_simple.py
  33. 1 0
      configs/settings.py
  34. 19 1
      development.ini
  35. 2 2
      docker-compose.yml
  36. 27 0
      docker/dev/krb5.conf
  37. 0 5
      docker/java/Dockerfile.java
  38. 27 0
      docker/test/krb5.conf
  39. 17 3
      production.ini
  40. 95 0
      render_script_to_dag_manual.ipynb

+ 17 - 1
Dockerfile

@@ -74,9 +74,25 @@ stdout_logfile_maxbytes=50MB\n\
 environment=PYTHONUNBUFFERED=1\n\
 " > /etc/supervisor/conf.d/be.conf
 
-ADD . ${WORKDIR}
 EXPOSE 8080
 
+FROM builder2 as image-dev
+
+RUN apt-get update && apt-get install -y --no-install-recommends \
+     krb5-user
+
+
+ADD . ${WORKDIR}
+RUN mv docker/dev/krb5.conf /etc/
+
+
+
+FROM builder2 as image-test
+RUN apt-get update && apt-get install -y --no-install-recommends krb5-user
+
+
+ADD . ${WORKDIR}
+RUN mv docker/test/krb5.conf /etc/
 
 
 # FROM builder2 as builder3

+ 1 - 1
Makefile

@@ -11,7 +11,7 @@ HOST=192.168.199.107
 
 all: image publish
 image:
-	@docker build -t registry.cn-hangzhou.aliyuncs.com/sxtest/$(NAME):$(VERSION) .
+	@docker build --target image-dev -t registry.cn-hangzhou.aliyuncs.com/sxtest/$(NAME):$(VERSION) .
 
 publish:
 	@docker push registry.cn-hangzhou.aliyuncs.com/sxtest/$(NAME):$(VERSION)

+ 19 - 17
app/core/airflow/task.py

@@ -3,6 +3,7 @@ from app.core.airflow.uri import spark_result_tb_name
 from app.schemas import AirflowTask
 from jinja2 import Environment, PackageLoader, select_autoescape
 from app.common.minio import FileHandler
+from configs.settings import config
 
 
 class TaskCompiler:
@@ -42,24 +43,24 @@ class TaskCompiler:
 class JavaTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(JavaTaskCompiler, self).__init__(item)
-        self.default_image = 'SXKJ:32775/java:1.0'
+        self.default_image = config.get('TASK_IMAGES', 'java')  # 'SXKJ:32775/java:1.0'
         self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
         self.task.cmd = self.task.cmd or self.default_cmd
         tar_name = self.task.file_urls[0].split('/')[-1].split('_')[-1]
-        self.task.cmd = f'curl {"http://minio.default:9000"}/{self.task.file_urls[0]}  --output {tar_name} && {self.task.cmd}'
+        self.task.cmd = f'curl {config.get("MINIO", "k8s_url")}/{self.task.file_urls[0]}  --output {tar_name} && {self.task.cmd}'
 
 
 class PythonTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(PythonTaskCompiler, self).__init__(item)
-        self.default_image = 'SXKJ:32775/pod_python:1.1'
+        self.default_image = config.get('TASK_IMAGES', 'python')  # 'SXKJ:32775/pod_python:1.1'
         self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
 
 
 class DataXTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(DataXTaskCompiler, self).__init__(item)
-        self.default_image = 'SXKJ:32775/pod_datax:0.9'
+        self.default_image = config.get('TASK_IMAGES', 'datax')  # 'SXKJ:32775/pod_datax:0.9'
         self.default_cmd = f"cd datax/bin && echo \"$SCRIPT\" > transform_datax.py &&cat  transform_datax.py  && " \
                            f"python3 transform_datax.py && cat config.json && $HOME/conda/envs/py27/b" \
                            f"in/python datax.py {self.task.cmd_parameters} config.json "
@@ -94,24 +95,25 @@ class DataXTaskCompiler(TaskCompiler):
 class SparksTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(SparksTaskCompiler, self).__init__(item)
-        self.default_image = 'SXKJ:32775/jupyter:0.96'
+        self.default_image = config.get('TASK_IMAGES', 'sparks')
         parameters = {"master": "yarn",
                       "deploy-mode": "cluster",
-                      "driver-memory": "2g",
+                      "driver-memory": "1g",
                       "driver-cores ": 1,
-                      "executor-memory": "2g",
-                      "executor-cores": 4,
+                      "executor-memory": "1g",
+                      "executor-cores": 1,
                       "num-executors": 1,
                       "archives": "/home/sxkj/bigdata/py37.zip#python3env"
                       }
-        spark_config = {'spark.default.parallelism': 2,
-                        "spark.executor.memoryOverhead": "4g",
-                        "spark.driver.memoryOverhead": "2g",
-                        "spark.yarn.maxAppAttempts": 3,
+        spark_config = {'spark.default.parallelism': 1,
+                        "spark.executor.memoryOverhead": "1g",
+                        "spark.driver.memoryOverhead": "1g",
+                        "spark.yarn.maxAppAttempts": 1,
                         "spark.yarn.submit.waitAppCompletion": "true",
                         "spark.pyspark.driver.python": "python3env/py37/bin/python",
                         "spark.yarn.appMasterEnv.PYSPARK_PYTHON": "python3env/py37/bin/python",
-                        "spark.pyspark.python": "python3env/py37/bin/python"
+                        "spark.pyspark.python": "python3env/py37/bin/python",
+                        "spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation": "true"
                         }
         param_str = ' '.join([f'--{k} {v}' for k, v in parameters.items()])
         param_str += ''.join([f' --conf {k}={v}' for k, v in spark_config.items()])
@@ -156,14 +158,14 @@ class SparksTaskCompiler(TaskCompiler):
                 skip_nodes.append(info["id"])
                 continue
             if info['op'] == 'sql':
-                inputs = {}
                 template_file = 'sql_script_template.py.jinja2'
             elif info['op'] == 'pyspark':
-                inputs = {k: spark_result_tb_name(job_id=job_id, task_id=self.task.id, spark_node_id=v[0],
-                                                  out_pin=v[1], is_tmp=task_mode) for k, v in info['inputs'].items()}
                 template_file = 'pyspark_script_template.py.jinja2'
             else:
                 continue
+            inputs = {k: spark_result_tb_name(job_id=job_id, task_id=self.task.id, spark_node_id=v[0],
+                                              out_pin=v[1], is_tmp=task_mode) for k, v in
+                      info.get('inputs', {}).items()}
             outputs = [spark_result_tb_name(job_id=job_id, task_id=self.task.id, spark_node_id=info['id'],
                                             out_pin=0, is_tmp=task_mode)]
             sub_node = {
@@ -175,7 +177,7 @@ class SparksTaskCompiler(TaskCompiler):
                         template_file=template_file),
                 },
                 'cmds': ['/bin/bash', '-c', self.cmd_str(name=f'spark_{self.task.id}_{info["id"]}')],
-                'image': "SXKJ:32775/jupyter:0.96",
+                'image': config.get('TASK_IMAGES', 'sparks')
             }
             sub_nodes.append(sub_node)
 

+ 1 - 1
app/core/airflow/templates/dag_template.py.jinja2

@@ -47,7 +47,7 @@ namespace = conf.get("kubernetes", "NAMESPACE")
 name = "dag_user{{ user_name }}"
 
 # instantiate the DAG
-with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if {{ interval }} is None else '{{ interval }}',dag_id="{{ dag_id }}",is_paused_upon_creation= not {{ trigger_status }}) as dag:
+with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if '{{ interval }}' == 'None' else '{{ interval }}',dag_id="{{ dag_id }}") as dag:
     op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
 
     {% for spark_node in spark_nodes %}

+ 0 - 0
app/core/airflow/templates/data_transfer/__init__.py


+ 35 - 0
app/core/airflow/templates/data_transfer/data_transfer.py

@@ -0,0 +1,35 @@
+from airflow import DAG
+from datetime import datetime
+from airflow.operators.bash import BashOperator
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+from airflow.configuration import conf
+
+namespace = conf.get("kubernetes", "NAMESPACE")
+
+# set the name that will be printed
+name = "luoyulong"
+
+# instantiate the DAG
+with DAG(
+        start_date=datetime(2022, 6, 1),
+        catchup=False,
+        schedule_interval=None,
+        dag_id="dag_data_transfer"
+) as dag:
+    op_transfer_data = KubernetesPodOperator(
+        task_id="task_000",
+        image="SXKJ:32775/jupyter:0.96",
+        in_cluster=True,
+        namespace=namespace,
+        name="transfer_data",
+        random_name_suffix=True,
+        labels={'app': 'backend', 'env': 'dev'},
+        reattach_on_restart=True,
+        is_delete_operator_pod=False,
+        get_logs=True,
+        log_events_on_failure=True,
+        cmds=['/bin/bash', '-c',
+              'cd /home/sxkj/bigdata && echo "$SCRIPT" > run.py && ${SPARK_HOME}/bin/spark-submit --name spark_data_transfer --master yarn --deploy-mode cluster --driver-memory 1g --driver-cores  1 --executor-memory 1g --executor-cores 1 --num-executors 1 --archives /home/sxkj/bigdata/py37.zip#python3env --conf spark.default.parallelism=1 --conf spark.executor.memoryOverhead=1g --conf spark.driver.memoryOverhead=1g --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.pyspark.driver.python=python3env/py37/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3env/py37/bin/python --conf spark.pyspark.python=python3env/py37/bin/python --conf spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true run.py'],
+        env_vars={{env}}
+    )
+

+ 32 - 0
app/core/airflow/templates/data_transfer/data_transfer_basic.py

@@ -0,0 +1,32 @@
+#!coding=utf8
+from pyspark.sql import SparkSession, DataFrame
+
+
+def main_func(input0, spark, sc):
+    output_tb = "{{ dag_run.conf.output}}"
+    print(f"read table {output_tb}")
+    input0.write.mode("overwrite").saveAsTable(output_tb)
+
+
+def run(inputs: dict):
+    spark = SparkSession.builder.config('hive.metastore.uris',
+                                        'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
+    param_dict = preprocess(input_infos=inputs, ss=spark)
+    main_func(**param_dict, spark=spark, sc=spark.sparkContext)
+
+
+def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
+    print(f"read table {tb_name}")
+    return ss.sql(f'select * from {tb_name}')
+
+
+def write_table(df: DataFrame, tb_name: str):
+    df.write.mode("overwrite").saveAsTable(tb_name)
+
+
+def preprocess(input_infos: dict, ss: SparkSession) -> dict:
+    return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
+
+
+if __name__ == '__main__':
+    run(inputs={'input0': '{{ dag_run.conf.input}}'})

+ 6 - 1
app/core/airflow/templates/pyspark_script_template.py.jinja2

@@ -30,7 +30,12 @@ def preprocess(input_infos: dict, ss: SparkSession) -> dict:
 
 
 def postprocess(rets, outputs):
-    [write_table(df=df, tb_name=outputs[idx]) for idx, df in enumerate(rets)]
+    if isinstance(rets,list):
+        for idx, df in enumerate(rets):
+            if idx==0:
+                write_table(df=df, tb_name=outputs[idx])
+            else:
+                write_table(df=df, tb_name=outputs[0].replace("_output0","_output"+str(idx)))
 
 
 if __name__ == '__main__':

+ 4 - 2
app/core/airflow/templates/sql_script_template.py.jinja2

@@ -3,8 +3,10 @@ import sys
 from pyspark.sql import SparkSession
 import os
 
-def run(outputs: list):
+def run(inputs: dict, outputs: list):
     script = "{{ script }}"
+    for k, tb_name in inputs.items():
+        script=script.replace(k,tb_name)
     ss = SparkSession.builder.config('hive.metastore.uris',
                                      'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
     ret_df = ss.sql(script)
@@ -12,4 +14,4 @@ def run(outputs: list):
 
 
 if __name__ == '__main__':
-    run(outputs={{ outputs }})
+    run(inputs={{ inputs }}, outputs={{ outputs }})

+ 18 - 5
app/core/airflow/uri.py

@@ -3,9 +3,10 @@ import json
 import requests
 
 from app.common.minio import FileHandler
+from configs.settings import config
 
 
-def spark_result_tb_name(job_id,task_id, spark_node_id, out_pin, is_tmp=False):
+def spark_result_tb_name(job_id, task_id, spark_node_id, out_pin, is_tmp=False):
     return f'job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
 
 
@@ -24,15 +25,27 @@ def upload2oss(content: bytes, uri: str, minio_bucket: str):
 
 
 def get_job_path(job_id):
-    dag_path = '/dags/'
+    dag_path = f'{config.get("AIRFLOW", "dag_files_dir")}'
     return dag_path + f'dag_{job_id}.py'
 
 
 def get_airflow_api_info():
-    uri_prefix = f'http://192.168.199.109/api/v1'
+    uri_prefix = f'http://{config.get("AIRFLOW", "ip_address")}/api/v1'
     headers = {
         'content-type': 'application/json',
-        'Authorization': 'basic YWRtaW46YWRtaW4=',
-        'Host': 'airflow-web.sxkj.com'
+        'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
+        'Host': f'{config.get("AIRFLOW", "host_in_header")}'
     }
     return uri_prefix, headers
+
+
+def call_airflow_api(method, uri, args_dict):
+    uri_prefix = f'http://{config.get("AIRFLOW", "ip_address")}/api/v1'
+    headers = {
+        'content-type': 'application/json',
+        'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
+        'Host': f'{config.get("AIRFLOW", "host_in_header")}'
+    }
+    if method == 'post':
+        print('enter post')
+        return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict)

+ 1 - 1
app/routers/code_check.py

@@ -12,7 +12,7 @@ import sqlfluff
 
 
 router = APIRouter(
-    prefix="/jpt/code_check",
+    prefix="/af/code_check",
     tags=["code_check-代码校验"],
 )
 

+ 1 - 1
app/routers/constants.py

@@ -9,7 +9,7 @@ import app.crud as crud
 from app import get_db
 
 router = APIRouter(
-    prefix="/jpt/constants",
+    prefix="/af/constants",
     tags=["constants-常量管理"],
 )
 

+ 1 - 1
app/routers/dag.py

@@ -10,7 +10,7 @@ from app.common.hive import hiveDs
 from app import get_db
 
 router = APIRouter(
-    prefix="/jpt/dag",
+    prefix="/af/dag",
     tags=["dag-dag管理"],
 )
 

+ 1 - 1
app/routers/dag_file.py

@@ -8,7 +8,7 @@ from utils.sx_web import web_try
 from fastapi.responses import StreamingResponse
 
 router = APIRouter(
-    prefix="/jpt/dag_file",
+    prefix="/af/dag_file",
     tags=["dag-dag管理"],
 )
 

+ 1 - 1
app/routers/data_management.py

@@ -18,7 +18,7 @@ from app.common.hive import hiveDs
 from app import get_db
 
 router = APIRouter(
-    prefix="/jpt/datamanagement",
+    prefix="/af/datamanagement",
     tags=["datamanagement-数据管理"],
 )
 

+ 1 - 1
app/routers/datax_json.py

@@ -13,7 +13,7 @@ from app import get_db
 
 
 router = APIRouter(
-    prefix="/jpt/datax",
+    prefix="/af/datax",
     tags=["datax构建JSON"],
 )
 

+ 1 - 1
app/routers/files.py

@@ -15,7 +15,7 @@ from app import get_db
 
 
 router = APIRouter(
-    prefix="/jpt/files",
+    prefix="/af/files",
     tags=["files-文件管理"],
 )
 

+ 1 - 1
app/routers/intermediate.py

@@ -16,7 +16,7 @@ from app import get_db
 
 
 router = APIRouter(
-    prefix="/jpt/intermediate",
+    prefix="/af/intermediate",
     tags=["intermediate-中间结果管理"],
 )
 

+ 1 - 1
app/routers/jm_homework.py

@@ -18,7 +18,7 @@ from app import get_db
 
 
 router = APIRouter(
-    prefix="/jpt/jm_homework",
+    prefix="/af/jm_homework",
     tags=["jm_homework-作业管理"],
 )
 

+ 1 - 1
app/routers/jm_job_info.py

@@ -20,7 +20,7 @@ from app import get_db
 
 
 router = APIRouter(
-    prefix="/jpt/jm_job_info",
+    prefix="/af/jm_job_info",
     tags=["jm_job_info-定时任务管理"],
 )
 

+ 1 - 1
app/routers/jm_job_log.py

@@ -17,7 +17,7 @@ from app import get_db, get_page
 
 
 router = APIRouter(
-    prefix="/jpt/jm_job_log",
+    prefix="/af/jm_job_log",
     tags=["jm_job_log-定时任务日志管理"],
 )
 

+ 33 - 36
app/routers/job.py

@@ -6,12 +6,13 @@ from fastapi import APIRouter, Depends
 from fastapi_pagination import paginate, Params
 from sqlalchemy.orm import Session
 from app import schemas, get_db, crud
-from app.core.airflow.uri import get_job_path, get_airflow_api_info
+from app.core.airflow.uri import get_job_path, get_airflow_api_info, call_airflow_api
 from app.crud import create_airflow_job_submit
+from app.schemas import AirflowTrigger
 from utils import web_try, sxtimeit
 
 router_af_job = APIRouter(
-    prefix="/jpt/af_job",
+    prefix="/af/af_job",
     tags=["airflow_job-任务管理"],
 )
 
@@ -34,8 +35,10 @@ def get_af_jobs(item_id: int):
         last_parsed_time_str = rets['last_parsed_time']
         last_parsed_time = datetime.datetime.strptime(last_parsed_time_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
     else:
-        last_parsed_time=  None
+        last_parsed_time = None
     return {"last_parsed_time": last_parsed_time}
+
+
 @router_af_job.get("/getOnce/{item_id}")
 @web_try()
 @sxtimeit
@@ -69,45 +72,39 @@ def add_dag_submit(id: int, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)):
-    job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
-    uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
-    headers = {
-        'content-type': 'application/json',
-        'Authorization': 'basic YWRtaW46YWRtaW4=',
-        'Host': 'airflow-web.sxkj.com'
-    }
-
-    response = requests.post(uri, headers=headers, data=json.dumps({}))
-    return response.json()
-
-
-#
-# @router_af_job.post("/{item_id}/update_and_run")
-# @web_try()
-# @sxtimeit
-# def update_and_trigger_job_run(item_id: int, db: Session = Depends(get_db)):
-#     job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
-#     uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
-#     headers = {
-#         'content-type': 'application/json',
-#         'Authorization': 'basic YWRtaW46YWRtaW4=',
-#         'Host': 'airflow-web.sxkj.com'
-#     }
-#
-#     response = requests.post(uri, headers=headers, data=json.dumps({}))
-#     return response.json()
+    # uri_prefix, headers = get_airflow_api_info()
+    # uri = f'{uri_prefix}/dags/dag_{item_id}/dagRuns'
+    # response = requests.post(uri, headers=headers, data=json.dumps({}))
+    # print(f'now we send trigger msg to dag_{item_id}')
+    #
+    # return response.json()
+
+    return call_airflow_api(method='post', uri=f'dags/dag_{item_id}/dagRuns', args_dict={"data": json.dumps({})}).json()
+
+
+@router_af_job.post("/{item_id}/adv_run")
+@web_try()
+@sxtimeit
+def trigger_af_job_adv_run(item_id: int, parameters: AirflowTrigger):
+    return call_airflow_api(method='post', uri=f'dags/dag_{item_id}/dagRuns',
+                            args_dict={"data": json.dumps(parameters.dict())}).json()
+
+
+@router_af_job.post("/000/data_transfer_run")
+@web_try()
+@sxtimeit
+def trigger_af_data_transfer_job_run(source_tb: str, target_tb: str):
+    rets = call_airflow_api(method='post', uri=f'dags/dag_data_transfer/dagRuns',
+                            args_dict={"json": {"conf": {"input": source_tb, "output": target_tb}}}).json()
+    return {"af_run_id": rets.get('dag_run_id', None)}
 
 
 @router_af_job.delete("/{item_id}")
 @web_try()
 @sxtimeit
 def delete_af_job(item_id: int, db: Session = Depends(get_db)):
-    uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}'
-    headers = {
-        'content-type': 'application/json',
-        'Authorization': 'basic YWRtaW46YWRtaW4=',
-        'Host': 'airflow-web.sxkj.com'
-    }
+    uri_prefix, headers = get_airflow_api_info()
+    uri = f'{uri_prefix}/dags/dag_{item_id}'
     requests.delete(uri, headers=headers)
     os.remove(get_job_path(job_id=item_id))
     crud.delete_airflow_job(db, item_id)

+ 1 - 1
app/routers/job_info.py

@@ -23,7 +23,7 @@ from app import get_db, get_page
 
 
 router = APIRouter(
-    prefix="/jpt/jobinfo",
+    prefix="/af/jobinfo",
     tags=["jobinfo-任务管理"],
 )
 

+ 1 - 1
app/routers/job_jdbc_datasource.py

@@ -15,7 +15,7 @@ from app import get_db
 
 
 router = APIRouter(
-    prefix="/jpt/datasource",
+    prefix="/af/datasource",
     tags=["datasource-数据源管理"],
 )
 

+ 1 - 1
app/routers/job_log.py

@@ -15,7 +15,7 @@ from fastapi_pagination import Page, add_pagination, paginate, Params
 from app import get_db
 
 router = APIRouter(
-    prefix="/jpt/joblog",
+    prefix="/af/joblog",
     tags=["joblog-日志管理"],
 )
 

+ 16 - 4
app/routers/run.py

@@ -16,7 +16,7 @@ class Item(BaseModel):
 
 
 router_af_run = APIRouter(
-    prefix="/jpt/af_run",
+    prefix="/af/af_run",
     tags=["airflow_run-运行管理"],
 )
 
@@ -24,7 +24,7 @@ router_af_run = APIRouter(
 @router_af_run.get("/")
 @web_try()
 @sxtimeit
-def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
+def get_runs(params: Params = Depends(), db: Session = Depends(get_db)):
     return paginate(crud.get_airflow_tasks(db), params)
 
 
@@ -34,6 +34,7 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
 def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
     item = crud.get_airflow_run_once(item_id=run_id, db=db)
     job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
+    print(f'job_item.job_mode  is {job_item.job_mode},with af run id= {item.af_run_id}')
     if job_item.job_mode == 1:  # 常规模式
         if item.status in [2, 3]:
             return {"status": item.status}
@@ -96,7 +97,8 @@ def add_airflow_run(item: Item, db: Session = Depends(get_db)):
         spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']]
         run.details['dependence']['sparks'] = sparks_dependence
         run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes}
-        update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": 1})
+        update_run = schemas.AirflowRunUpdate(
+            **{"details": run.details, "status": 1, "af_run_id": item.data['af_run_id']})
         crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
 
 
@@ -117,7 +119,8 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
         run = None
 
     if run is not None:
-        update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
+        update_run = schemas.AirflowRunUpdate(
+            **{"details": run.details, "status": run.status, "af_run_id": item.data['af_run_id']})
         update_run.details['tasks'][item.data['task_id']] = {"log": logs,
                                                              "start_time": item.data["start_time"],
                                                              "end_time": item.data["end_time"],
@@ -130,3 +133,12 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
 @sxtimeit
 def add_notification(item: Item):
     print(f'receive sigal: {item.data} ')
+
+
+@router_af_run.get("/{job_id}/{af_run_id}")
+@web_try()
+@sxtimeit
+def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
+    # task_info =
+    pass
+    # return paginate(crud.get_airflow_tasks(db), params)

+ 1 - 1
app/routers/task.py

@@ -6,7 +6,7 @@ from app import schemas, get_db, crud
 from utils import web_try, sxtimeit
 
 router_af_task = APIRouter(
-    prefix="/jpt/af_task",
+    prefix="/af/af_task",
     tags=["airflow_task-作业管理"],
 )
 

+ 3 - 3
app/routers/task_run.py

@@ -13,7 +13,7 @@ class Item(BaseModel):
 
 
 router_af_run = APIRouter(
-    prefix="/jpt/af_run",
+    prefix="/af/af_run",
     tags=["airflow_run-运行管理"],
 )
 
@@ -61,8 +61,8 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
               "task_id": item.data['task_id'],
               "run_ts": item.data['run_ts']}
     logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
-    run = crud.get_airflow_run_once_normal_mode(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
-
+    # run = crud.get_airflow_run_once_normal_mode(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
+    run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
     if run is not None:
         update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
         update_run.details['tasks'][item.data['task_id']] = {"log": logs,

+ 6 - 1
app/schemas/af_run.py

@@ -1,14 +1,15 @@
 from pydantic import BaseModel
 
+
 class AirflowRunBase(BaseModel):
     details: dict
     status: int
+    af_run_id: str
 
 
 class AirflowRunCreate(AirflowRunBase):
     start_time: float
     job_id: int
-    af_run_id: str
     run_ts: str
 
 
@@ -28,3 +29,7 @@ class AirflowRun(AirflowRunBase):
 
     class Config:
         orm_mode = True
+
+
+class AirflowTrigger(BaseModel):
+    parameters: dict

+ 32 - 0
auo_tests/spark_submit/data_transfer.py

@@ -0,0 +1,32 @@
+#!coding=utf8
+from pyspark.sql import SparkSession, DataFrame
+
+
+def main_func(input0, spark, sc):
+    output_tb = "your_test_x"
+    print(f"read table {output_tb}")
+    input0.write.mode("overwrite").saveAsTable(output_tb)
+
+
+def run(inputs: dict):
+    spark = SparkSession.builder.config('hive.metastore.uris',
+                                        'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
+    param_dict = preprocess(input_infos=inputs, ss=spark)
+    main_func(**param_dict, spark=spark, sc=spark.sparkContext)
+
+
+def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
+    print(f"read table {tb_name}")
+    return ss.sql(f'select * from {tb_name}')
+
+
+def write_table(df: DataFrame, tb_name: str):
+    df.write.mode("overwrite").saveAsTable(tb_name)
+
+
+def preprocess(input_infos: dict, ss: SparkSession) -> dict:
+    return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
+
+
+if __name__ == '__main__':
+    run(inputs={'input0': "my_test_p"})

+ 4 - 3
auo_tests/spark_submit/docker-compose-1.yml → auo_tests/spark_submit/docker-compose.yml

@@ -4,7 +4,7 @@ services:
     hostname: ai2_spark_submit
     container_name: ai2_spark_submit
     restart: always
-    image: SXKJ:32775/jupyter:latest
+    image: SXKJ:32775/jupyter:0.96
     privileged: true
     ipc: host
     tty: true
@@ -15,9 +15,10 @@ services:
 #    ports:
 #      - '18082:8080'
 #      - '18224:22'
-    extra_hosts:
-      - 'minio-api.sxkj.com:192.168.199.109'
+#    extra_hosts:
+#      - 'minio-api.sxkj.com:192.168.199.109'
 #    environment:
 #      - APP_ENV=development
 
 
+

+ 15 - 0
auo_tests/spark_submit/sparksql_simple.py

@@ -0,0 +1,15 @@
+import json
+import sys
+from pyspark.sql import SparkSession
+
+
+def run(outputs: list):
+    script = 'select * from test'
+    ss = SparkSession.builder.config('hive.metastore.uris',
+                                     'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
+    ret_df = ss.sql(script)
+    # ret_df.write.mode("overwrite").saveAsTable(outputs[0])
+
+
+if __name__ == '__main__':
+    run(outputs=[])

+ 1 - 0
configs/settings.py

@@ -23,4 +23,5 @@ if os.environ.get('APP_ENV', 'development') == 'development':
 elif os.environ.get('APP_ENV') == 'production':
     config.readfp(open('production.ini'))
 
+print(f"get config of {os.environ.get('APP_ENV')}")
 print(config.get('DATABASE', 'host'))

+ 19 - 1
development.ini

@@ -4,17 +4,35 @@ pwd = happylay
 db_name = datax_web_dev
 host = 192.168.199.107
 port = 10086
+; [DATABASE]
+; user = aihubtest
+; pwd = q9WBYDynEy@jh#5N
+; db_name = aihubtest_dag_admin_db
+; host = 10.254.12.7
+; port = 3306
 [MINIO]
 url = minio-api.sxkj.com
 access_key = admin
 secret_key = sxkjadmin
+k8s_url=minio.default:9000
+
 [AIRFLOW]
 host = 192.168.199.109
 port = 18082
+host_in_header=airflow-web.sxkj.com
+ip_address=192.168.199.109
+api_token=YWRtaW46YWRtaW4=
+dag_files_dir=/dags/
 [HIVE]
 host = 192.168.199.27
 port = 10000
 username = hive
 password = hive
 database_name = default
-kerberos = 0
+kerberos=0
+
+[TASK_IMAGES]
+datax=SXKJ:32775/pod_datax:0.9
+python=SXKJ:32775/pod_python:1.1
+java=SXKJ:32775/java:1.0
+sparks=SXKJ:32775/jupyter:0.96

+ 2 - 2
docker-compose.yml

@@ -17,5 +17,5 @@ services:
       - '18224:22'
     extra_hosts:
       - 'minio-api.sxkj.com:192.168.199.109'
-    # environment:
-    #   - APP_ENV=development
+    environment:
+     - APP_ENV=development

+ 27 - 0
docker/dev/krb5.conf

@@ -0,0 +1,27 @@
+[libdefaults]
+    dns_lookup_realm = false
+    dns_lookup_kdc = false
+    ticket_lifetime = 24h
+    renew_lifetime = 7d
+    forwardable = true
+    rdns = false
+    default_realm = EMR-5XJSY31F
+    default_tgs_enctypes = des3-cbc-sha1
+    default_tkt_enctypes = des3-cbc-sha1
+    permitted_enctypes = des3-cbc-sha1
+    kdc_timeout = 3000
+    max_retries = 3
+[realms]
+    EMR-5XJSY31F = {
+
+        kdc = 10.254.20.18:88
+        admin_server = 10.254.20.18
+        kdc = 10.254.20.22:88
+		admin_server = 10.254.20.22
+
+    }
+
+[domain_realm]
+# .example.com = EXAMPLE.COM
+
+

+ 0 - 5
docker/java/Dockerfile.java

@@ -1,5 +0,0 @@
-FROM maven:3.8.6-openjdk-8 as builder
-
-WORKDIR /workspace
-RUN sed -i "s@http://\(deb\|security\).debian.org@https://mirrors.aliyun.com@g" /etc/apt/sources.list
-

+ 27 - 0
docker/test/krb5.conf

@@ -0,0 +1,27 @@
+[libdefaults]
+    dns_lookup_realm = false
+    dns_lookup_kdc = false
+    ticket_lifetime = 24h
+    renew_lifetime = 7d
+    forwardable = true
+    rdns = false
+    default_realm = EMR-5XJSY31F
+    default_tgs_enctypes = des3-cbc-sha1
+    default_tkt_enctypes = des3-cbc-sha1
+    permitted_enctypes = des3-cbc-sha1
+    kdc_timeout = 3000
+    max_retries = 3
+[realms]
+    EMR-5XJSY31F = {
+
+        kdc = 10.254.20.18:88
+        admin_server = 10.254.20.18
+        kdc = 10.254.20.22:88
+		admin_server = 10.254.20.22
+
+    }
+
+[domain_realm]
+# .example.com = EXAMPLE.COM
+
+

+ 17 - 3
production.ini

@@ -5,21 +5,35 @@ db_name = aihubtest_dag_admin_db
 host = 10.254.12.7
 port = 3306
 [MINIO]
+k8s_url = aihub-minio-yili-test:9000
 url = aihub-minio-yili-test:9000
 access_key = minioadmin
 secret_key = minioadmin
 [AIRFLOW]
+host_in_header=airflow-web-test.digitalyili.com
+ip_address = aihub-backend-af-yili-test:8080
+api_token=YWRtaW46YWRtaW4=
+dag_files_dir=/dags/
 host = aihub-backend-af-yili-test
 port = 8080
 [HIVE]
-host = 192.168.199.27
-port = 10000
+host = 10.254.20.22
+port = 7001
 username = hive
 password = hive
 database_name = default
-kerberos = 0
+kerberos = 1
 keytab = configs/user.keytab
 krb5config = configs/krb5.conf
 kerberos_service_name = hadoop
 principal = ailab@EMR-5XJSY31F
+[TASK_IMAGES]
+datax=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-datax-yili:latest
+python=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-minio-yili-test:python
+java=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-minio-yili-test:java
+sparks=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-minio-yili-test:spark
+
+
+
+
 

+ 95 - 0
render_script_to_dag_manual.ipynb

@@ -0,0 +1,95 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {
+    "collapsed": true
+   },
+   "outputs": [],
+   "source": [
+    "from jinja2 import Environment, PackageLoader, select_autoescape\n",
+    "\n",
+    "\n",
+    "with open('./app/core/airflow/templates/data_transfer/data_transfer_basic.py') as f:\n",
+    "    script = f.read()\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "outputs": [
+    {
+     "name": "stderr",
+     "output_type": "stream",
+     "text": [
+      "2022-10-16 01:09:43,455| \"/Users/luoyulong/PycharmProjects/ai2/datax-admin/app/models/database.py\", line 26,\n",
+      " INFO:  connect to mysql success: mysql+mysqlconnector://root:happylay@192.168.199.107:10086/datax_web_dev?charset=utf8&auth_plugin=mysql_native_password\n"
+     ]
+    },
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "get config of None\n",
+      "192.168.199.107\n"
+     ]
+    }
+   ],
+   "source": [
+    "env = Environment(\n",
+    "    loader=PackageLoader('app.core.airflow'),\n",
+    "    autoescape=select_autoescape()\n",
+    ")\n",
+    "template = env.get_template(\"data_transfer/data_transfer.py\")\n",
+    "temp_str = template.render(  {\"env\":  {\"SCRIPT\":script}} )"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "outputs": [],
+   "source": [
+    "with open('./app/core/airflow/templates/data_transfer/data_transfer_task.py','w') as f:\n",
+    "    f.write(temp_str)\n",
+    "    f.close()"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "outputs": [],
+   "source": [],
+   "metadata": {
+    "collapsed": false
+   }
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 2
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython2",
+   "version": "2.7.6"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 0
+}