Ver Fonte

Merge branch 'master' of http://gogsb.soaringnova.com/sxwl_DL/datax-admin

Zhang Li há 2 anos atrás
pai
commit
dcc50c5ebd

+ 19 - 17
app/core/airflow/task.py

@@ -3,6 +3,7 @@ from app.core.airflow.uri import spark_result_tb_name
 from app.schemas import AirflowTask
 from jinja2 import Environment, PackageLoader, select_autoescape
 from app.common.minio import FileHandler
+from configs.settings import config
 
 
 class TaskCompiler:
@@ -42,24 +43,24 @@ class TaskCompiler:
 class JavaTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(JavaTaskCompiler, self).__init__(item)
-        self.default_image = 'SXKJ:32775/java:1.0'
+        self.default_image = config.get('TASK_IMAGES', 'java')  # 'SXKJ:32775/java:1.0'
         self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
         self.task.cmd = self.task.cmd or self.default_cmd
         tar_name = self.task.file_urls[0].split('/')[-1].split('_')[-1]
-        self.task.cmd = f'curl {"http://minio.default:9000"}/{self.task.file_urls[0]}  --output {tar_name} && {self.task.cmd}'
+        self.task.cmd = f'curl {config.get("MINIO", "k8s_url")}/{self.task.file_urls[0]}  --output {tar_name} && {self.task.cmd}'
 
 
 class PythonTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(PythonTaskCompiler, self).__init__(item)
-        self.default_image = 'SXKJ:32775/pod_python:1.1'
+        self.default_image = config.get('TASK_IMAGES', 'python')  # 'SXKJ:32775/pod_python:1.1'
         self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
 
 
 class DataXTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(DataXTaskCompiler, self).__init__(item)
-        self.default_image = 'SXKJ:32775/pod_datax:0.9'
+        self.default_image = config.get('TASK_IMAGES', 'datax')  # 'SXKJ:32775/pod_datax:0.9'
         self.default_cmd = f"cd datax/bin && echo \"$SCRIPT\" > transform_datax.py &&cat  transform_datax.py  && " \
                            f"python3 transform_datax.py && cat config.json && $HOME/conda/envs/py27/b" \
                            f"in/python datax.py {self.task.cmd_parameters} config.json "
@@ -94,24 +95,25 @@ class DataXTaskCompiler(TaskCompiler):
 class SparksTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(SparksTaskCompiler, self).__init__(item)
-        self.default_image = 'SXKJ:32775/jupyter:0.96'
+        self.default_image = config.get('TASK_IMAGES', 'sparks')
         parameters = {"master": "yarn",
                       "deploy-mode": "cluster",
-                      "driver-memory": "2g",
+                      "driver-memory": "1g",
                       "driver-cores ": 1,
-                      "executor-memory": "2g",
-                      "executor-cores": 4,
+                      "executor-memory": "1g",
+                      "executor-cores": 1,
                       "num-executors": 1,
                       "archives": "/home/sxkj/bigdata/py37.zip#python3env"
                       }
-        spark_config = {'spark.default.parallelism': 2,
-                        "spark.executor.memoryOverhead": "4g",
-                        "spark.driver.memoryOverhead": "2g",
-                        "spark.yarn.maxAppAttempts": 3,
+        spark_config = {'spark.default.parallelism': 1,
+                        "spark.executor.memoryOverhead": "1g",
+                        "spark.driver.memoryOverhead": "1g",
+                        "spark.yarn.maxAppAttempts": 1,
                         "spark.yarn.submit.waitAppCompletion": "true",
                         "spark.pyspark.driver.python": "python3env/py37/bin/python",
                         "spark.yarn.appMasterEnv.PYSPARK_PYTHON": "python3env/py37/bin/python",
-                        "spark.pyspark.python": "python3env/py37/bin/python"
+                        "spark.pyspark.python": "python3env/py37/bin/python",
+                        "spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation": "true"
                         }
         param_str = ' '.join([f'--{k} {v}' for k, v in parameters.items()])
         param_str += ''.join([f' --conf {k}={v}' for k, v in spark_config.items()])
@@ -156,14 +158,14 @@ class SparksTaskCompiler(TaskCompiler):
                 skip_nodes.append(info["id"])
                 continue
             if info['op'] == 'sql':
-                inputs = {}
                 template_file = 'sql_script_template.py.jinja2'
             elif info['op'] == 'pyspark':
-                inputs = {k: spark_result_tb_name(job_id=job_id, task_id=self.task.id, spark_node_id=v[0],
-                                                  out_pin=v[1], is_tmp=task_mode) for k, v in info['inputs'].items()}
                 template_file = 'pyspark_script_template.py.jinja2'
             else:
                 continue
+            inputs = {k: spark_result_tb_name(job_id=job_id, task_id=self.task.id, spark_node_id=v[0],
+                                              out_pin=v[1], is_tmp=task_mode) for k, v in
+                      info.get('inputs', {}).items()}
             outputs = [spark_result_tb_name(job_id=job_id, task_id=self.task.id, spark_node_id=info['id'],
                                             out_pin=0, is_tmp=task_mode)]
             sub_node = {
@@ -175,7 +177,7 @@ class SparksTaskCompiler(TaskCompiler):
                         template_file=template_file),
                 },
                 'cmds': ['/bin/bash', '-c', self.cmd_str(name=f'spark_{self.task.id}_{info["id"]}')],
-                'image': "SXKJ:32775/jupyter:0.96",
+                'image': config.get('TASK_IMAGES', 'sparks')
             }
             sub_nodes.append(sub_node)
 

+ 1 - 1
app/core/airflow/templates/dag_template.py.jinja2

@@ -47,7 +47,7 @@ namespace = conf.get("kubernetes", "NAMESPACE")
 name = "dag_user{{ user_name }}"
 
 # instantiate the DAG
-with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if {{ interval }} is None else '{{ interval }}',dag_id="{{ dag_id }}",is_paused_upon_creation= not {{ trigger_status }}) as dag:
+with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if '{{ interval }}' == 'None' else '{{ interval }}',dag_id="{{ dag_id }}") as dag:
     op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
 
     {% for spark_node in spark_nodes %}

+ 0 - 0
app/core/airflow/templates/data_transfer/__init__.py


+ 35 - 0
app/core/airflow/templates/data_transfer/data_transfer.py

@@ -0,0 +1,35 @@
+from airflow import DAG
+from datetime import datetime
+from airflow.operators.bash import BashOperator
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+from airflow.configuration import conf
+
+namespace = conf.get("kubernetes", "NAMESPACE")
+
+# set the name that will be printed
+name = "luoyulong"
+
+# instantiate the DAG
+with DAG(
+        start_date=datetime(2022, 6, 1),
+        catchup=False,
+        schedule_interval=None,
+        dag_id="dag_data_transfer"
+) as dag:
+    op_transfer_data = KubernetesPodOperator(
+        task_id="task_000",
+        image="SXKJ:32775/jupyter:0.96",
+        in_cluster=True,
+        namespace=namespace,
+        name="transfer_data",
+        random_name_suffix=True,
+        labels={'app': 'backend', 'env': 'dev'},
+        reattach_on_restart=True,
+        is_delete_operator_pod=False,
+        get_logs=True,
+        log_events_on_failure=True,
+        cmds=['/bin/bash', '-c',
+              'cd /home/sxkj/bigdata && echo "$SCRIPT" > run.py && ${SPARK_HOME}/bin/spark-submit --name spark_data_transfer --master yarn --deploy-mode cluster --driver-memory 1g --driver-cores  1 --executor-memory 1g --executor-cores 1 --num-executors 1 --archives /home/sxkj/bigdata/py37.zip#python3env --conf spark.default.parallelism=1 --conf spark.executor.memoryOverhead=1g --conf spark.driver.memoryOverhead=1g --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.pyspark.driver.python=python3env/py37/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3env/py37/bin/python --conf spark.pyspark.python=python3env/py37/bin/python --conf spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true run.py'],
+        env_vars={{env}}
+    )
+

+ 32 - 0
app/core/airflow/templates/data_transfer/data_transfer_basic.py

@@ -0,0 +1,32 @@
+#!coding=utf8
+from pyspark.sql import SparkSession, DataFrame
+
+
+def main_func(input0, spark, sc):
+    output_tb = "{{ dag_run.conf.output}}"
+    print(f"read table {output_tb}")
+    input0.write.mode("overwrite").saveAsTable(output_tb)
+
+
+def run(inputs: dict):
+    spark = SparkSession.builder.config('hive.metastore.uris',
+                                        'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
+    param_dict = preprocess(input_infos=inputs, ss=spark)
+    main_func(**param_dict, spark=spark, sc=spark.sparkContext)
+
+
+def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
+    print(f"read table {tb_name}")
+    return ss.sql(f'select * from {tb_name}')
+
+
+def write_table(df: DataFrame, tb_name: str):
+    df.write.mode("overwrite").saveAsTable(tb_name)
+
+
+def preprocess(input_infos: dict, ss: SparkSession) -> dict:
+    return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
+
+
+if __name__ == '__main__':
+    run(inputs={'input0': '{{ dag_run.conf.input}}'})

+ 6 - 1
app/core/airflow/templates/pyspark_script_template.py.jinja2

@@ -30,7 +30,12 @@ def preprocess(input_infos: dict, ss: SparkSession) -> dict:
 
 
 def postprocess(rets, outputs):
-    [write_table(df=df, tb_name=outputs[idx]) for idx, df in enumerate(rets)]
+    if isinstance(rets,list):
+        for idx, df in enumerate(rets):
+            if idx==0:
+                write_table(df=df, tb_name=outputs[idx])
+            else:
+                write_table(df=df, tb_name=outputs[0].replace("_output0","_output"+str(idx)))
 
 
 if __name__ == '__main__':

+ 4 - 2
app/core/airflow/templates/sql_script_template.py.jinja2

@@ -3,8 +3,10 @@ import sys
 from pyspark.sql import SparkSession
 import os
 
-def run(outputs: list):
+def run(inputs: dict, outputs: list):
     script = "{{ script }}"
+    for k, tb_name in inputs.items():
+        script=script.replace(k,tb_name)
     ss = SparkSession.builder.config('hive.metastore.uris',
                                      'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
     ret_df = ss.sql(script)
@@ -12,4 +14,4 @@ def run(outputs: list):
 
 
 if __name__ == '__main__':
-    run(outputs={{ outputs }})
+    run(inputs={{ inputs }}, outputs={{ outputs }})

+ 18 - 5
app/core/airflow/uri.py

@@ -3,9 +3,10 @@ import json
 import requests
 
 from app.common.minio import FileHandler
+from configs.settings import config
 
 
-def spark_result_tb_name(job_id,task_id, spark_node_id, out_pin, is_tmp=False):
+def spark_result_tb_name(job_id, task_id, spark_node_id, out_pin, is_tmp=False):
     return f'job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
 
 
@@ -24,15 +25,27 @@ def upload2oss(content: bytes, uri: str, minio_bucket: str):
 
 
 def get_job_path(job_id):
-    dag_path = '/dags/'
+    dag_path = f'{config.get("AIRFLOW", "dag_files_dir")}'
     return dag_path + f'dag_{job_id}.py'
 
 
 def get_airflow_api_info():
-    uri_prefix = f'http://192.168.199.109/api/v1'
+    uri_prefix = f'http://{config.get("AIRFLOW", "ip_address")}/api/v1'
     headers = {
         'content-type': 'application/json',
-        'Authorization': 'basic YWRtaW46YWRtaW4=',
-        'Host': 'airflow-web.sxkj.com'
+        'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
+        'Host': f'{config.get("AIRFLOW", "host_in_header")}'
     }
     return uri_prefix, headers
+
+
+def call_airflow_api(method, uri, args_dict):
+    uri_prefix = f'http://{config.get("AIRFLOW", "ip_address")}/api/v1'
+    headers = {
+        'content-type': 'application/json',
+        'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
+        'Host': f'{config.get("AIRFLOW", "host_in_header")}'
+    }
+    if method == 'post':
+        print('enter post')
+        return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict)

+ 32 - 35
app/routers/job.py

@@ -6,8 +6,9 @@ from fastapi import APIRouter, Depends
 from fastapi_pagination import paginate, Params
 from sqlalchemy.orm import Session
 from app import schemas, get_db, crud
-from app.core.airflow.uri import get_job_path, get_airflow_api_info
+from app.core.airflow.uri import get_job_path, get_airflow_api_info, call_airflow_api
 from app.crud import create_airflow_job_submit
+from app.schemas import AirflowTrigger
 from utils import web_try, sxtimeit
 
 router_af_job = APIRouter(
@@ -34,8 +35,10 @@ def get_af_jobs(item_id: int):
         last_parsed_time_str = rets['last_parsed_time']
         last_parsed_time = datetime.datetime.strptime(last_parsed_time_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
     else:
-        last_parsed_time=  None
+        last_parsed_time = None
     return {"last_parsed_time": last_parsed_time}
+
+
 @router_af_job.get("/getOnce/{item_id}")
 @web_try()
 @sxtimeit
@@ -69,45 +72,39 @@ def add_dag_submit(id: int, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)):
-    job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
-    uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
-    headers = {
-        'content-type': 'application/json',
-        'Authorization': 'basic YWRtaW46YWRtaW4=',
-        'Host': 'airflow-web.sxkj.com'
-    }
-
-    response = requests.post(uri, headers=headers, data=json.dumps({}))
-    return response.json()
-
-
-#
-# @router_af_job.post("/{item_id}/update_and_run")
-# @web_try()
-# @sxtimeit
-# def update_and_trigger_job_run(item_id: int, db: Session = Depends(get_db)):
-#     job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
-#     uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
-#     headers = {
-#         'content-type': 'application/json',
-#         'Authorization': 'basic YWRtaW46YWRtaW4=',
-#         'Host': 'airflow-web.sxkj.com'
-#     }
-#
-#     response = requests.post(uri, headers=headers, data=json.dumps({}))
-#     return response.json()
+    # uri_prefix, headers = get_airflow_api_info()
+    # uri = f'{uri_prefix}/dags/dag_{item_id}/dagRuns'
+    # response = requests.post(uri, headers=headers, data=json.dumps({}))
+    # print(f'now we send trigger msg to dag_{item_id}')
+    #
+    # return response.json()
+
+    return call_airflow_api(method='post', uri=f'dags/dag_{item_id}/dagRuns', args_dict={"data": json.dumps({})}).json()
+
+
+@router_af_job.post("/{item_id}/adv_run")
+@web_try()
+@sxtimeit
+def trigger_af_job_adv_run(item_id: int, parameters: AirflowTrigger):
+    return call_airflow_api(method='post', uri=f'dags/dag_{item_id}/dagRuns',
+                            args_dict={"data": json.dumps(parameters.dict())}).json()
+
+
+@router_af_job.post("/000/data_transfer_run")
+@web_try()
+@sxtimeit
+def trigger_af_data_transfer_job_run(source_tb: str, target_tb: str):
+    rets = call_airflow_api(method='post', uri=f'dags/dag_data_transfer/dagRuns',
+                            args_dict={"json": {"conf": {"input": source_tb, "output": target_tb}}}).json()
+    return {"af_run_id": rets.get('dag_run_id', None)}
 
 
 @router_af_job.delete("/{item_id}")
 @web_try()
 @sxtimeit
 def delete_af_job(item_id: int, db: Session = Depends(get_db)):
-    uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}'
-    headers = {
-        'content-type': 'application/json',
-        'Authorization': 'basic YWRtaW46YWRtaW4=',
-        'Host': 'airflow-web.sxkj.com'
-    }
+    uri_prefix, headers = get_airflow_api_info()
+    uri = f'{uri_prefix}/dags/dag_{item_id}'
     requests.delete(uri, headers=headers)
     os.remove(get_job_path(job_id=item_id))
     crud.delete_airflow_job(db, item_id)

+ 4 - 2
app/routers/run.py

@@ -34,6 +34,7 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
 def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
     item = crud.get_airflow_run_once(item_id=run_id, db=db)
     job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
+    print(f'job_item.job_mode  is {job_item.job_mode},with af run id= {item.af_run_id}')
     if job_item.job_mode == 1:  # 常规模式
         if item.status in [2, 3]:
             return {"status": item.status}
@@ -96,7 +97,8 @@ def add_airflow_run(item: Item, db: Session = Depends(get_db)):
         spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']]
         run.details['dependence']['sparks'] = sparks_dependence
         run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes}
-        update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": 1})
+        update_run = schemas.AirflowRunUpdate(
+            **{"details": run.details, "status": 1, "af_run_id": item.data['af_run_id']})
         crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
 
 
@@ -117,7 +119,7 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
         run = None
 
     if run is not None:
-        update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
+        update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status,"af_run_id":item.data['af_run_id']})
         update_run.details['tasks'][item.data['task_id']] = {"log": logs,
                                                              "start_time": item.data["start_time"],
                                                              "end_time": item.data["end_time"],

+ 2 - 2
app/routers/task_run.py

@@ -61,8 +61,8 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
               "task_id": item.data['task_id'],
               "run_ts": item.data['run_ts']}
     logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
-    run = crud.get_airflow_run_once_normal_mode(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
-
+    # run = crud.get_airflow_run_once_normal_mode(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
+    run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
     if run is not None:
         update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
         update_run.details['tasks'][item.data['task_id']] = {"log": logs,

+ 6 - 1
app/schemas/af_run.py

@@ -1,14 +1,15 @@
 from pydantic import BaseModel
 
+
 class AirflowRunBase(BaseModel):
     details: dict
     status: int
+    af_run_id: str
 
 
 class AirflowRunCreate(AirflowRunBase):
     start_time: float
     job_id: int
-    af_run_id: str
     run_ts: str
 
 
@@ -28,3 +29,7 @@ class AirflowRun(AirflowRunBase):
 
     class Config:
         orm_mode = True
+
+
+class AirflowTrigger(BaseModel):
+    parameters: dict

+ 32 - 0
auo_tests/spark_submit/data_transfer.py

@@ -0,0 +1,32 @@
+#!coding=utf8
+from pyspark.sql import SparkSession, DataFrame
+
+
+def main_func(input0, spark, sc):
+    output_tb = "your_test_x"
+    print(f"read table {output_tb}")
+    input0.write.mode("overwrite").saveAsTable(output_tb)
+
+
+def run(inputs: dict):
+    spark = SparkSession.builder.config('hive.metastore.uris',
+                                        'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
+    param_dict = preprocess(input_infos=inputs, ss=spark)
+    main_func(**param_dict, spark=spark, sc=spark.sparkContext)
+
+
+def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
+    print(f"read table {tb_name}")
+    return ss.sql(f'select * from {tb_name}')
+
+
+def write_table(df: DataFrame, tb_name: str):
+    df.write.mode("overwrite").saveAsTable(tb_name)
+
+
+def preprocess(input_infos: dict, ss: SparkSession) -> dict:
+    return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
+
+
+if __name__ == '__main__':
+    run(inputs={'input0': "my_test_p"})

+ 4 - 3
auo_tests/spark_submit/docker-compose-1.yml → auo_tests/spark_submit/docker-compose.yml

@@ -4,7 +4,7 @@ services:
     hostname: ai2_spark_submit
     container_name: ai2_spark_submit
     restart: always
-    image: SXKJ:32775/jupyter:latest
+    image: SXKJ:32775/jupyter:0.96
     privileged: true
     ipc: host
     tty: true
@@ -15,9 +15,10 @@ services:
 #    ports:
 #      - '18082:8080'
 #      - '18224:22'
-    extra_hosts:
-      - 'minio-api.sxkj.com:192.168.199.109'
+#    extra_hosts:
+#      - 'minio-api.sxkj.com:192.168.199.109'
 #    environment:
 #      - APP_ENV=development
 
 
+

+ 15 - 0
auo_tests/spark_submit/sparksql_simple.py

@@ -0,0 +1,15 @@
+import json
+import sys
+from pyspark.sql import SparkSession
+
+
+def run(outputs: list):
+    script = 'select * from test'
+    ss = SparkSession.builder.config('hive.metastore.uris',
+                                     'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
+    ret_df = ss.sql(script)
+    # ret_df.write.mode("overwrite").saveAsTable(outputs[0])
+
+
+if __name__ == '__main__':
+    run(outputs=[])

+ 1 - 0
configs/settings.py

@@ -23,4 +23,5 @@ if os.environ.get('APP_ENV', 'development') == 'development':
 elif os.environ.get('APP_ENV') == 'production':
     config.readfp(open('production.ini'))
 
+print(f"get config of {os.environ.get('APP_ENV')}")
 print(config.get('DATABASE', 'host'))

+ 19 - 1
development.ini

@@ -4,17 +4,35 @@ pwd = happylay
 db_name = datax_web_dev
 host = 192.168.199.107
 port = 10086
+; [DATABASE]
+; user = aihubtest
+; pwd = q9WBYDynEy@jh#5N
+; db_name = aihubtest_dag_admin_db
+; host = 10.254.12.7
+; port = 3306
 [MINIO]
 url = minio-api.sxkj.com
 access_key = admin
 secret_key = sxkjadmin
+k8s_url=minio.default:9000
+
 [AIRFLOW]
 host = 192.168.199.109
 port = 18082
+host_in_header=airflow-web.sxkj.com
+ip_address=192.168.199.109
+api_token=YWRtaW46YWRtaW4=
+dag_files_dir=/dags/
 [HIVE]
 host = 192.168.199.27
 port = 10000
 username = hive
 password = hive
 database_name = default
-kerberos = 0
+kerberos=0
+
+[TASK_IMAGES]
+datax=SXKJ:32775/pod_datax:0.9
+python=SXKJ:32775/pod_python:1.1
+java=SXKJ:32775/java:1.0
+sparks=SXKJ:32775/jupyter:0.96

+ 14 - 0
production.ini

@@ -5,10 +5,15 @@ db_name = aihubtest_dag_admin_db
 host = 10.254.12.7
 port = 3306
 [MINIO]
+k8s_url=minio.default:9000
 url = aihub-minio-yili-test:9000
 access_key = minioadmin
 secret_key = minioadmin
 [AIRFLOW]
+host_in_header=airflow-web.sxkj.com
+ip_address=192.168.199.109
+api_token=YWRtaW46YWRtaW4=
+dag_files_dir=/dags/
 host = aihub-backend-af-yili-test
 port = 8080
 [HIVE]
@@ -22,4 +27,13 @@ keytab = configs/user.keytab
 krb5config = configs/krb5.conf
 kerberos_service_name = hadoop
 principal = ailab@EMR-5XJSY31F
+[TASK_IMAGES]
+datax=SXKJ:32775/pod_datax:0.9
+python=SXKJ:32775/pod_python:1.1
+java=SXKJ:32775/java:1.0
+sparks=SXKJ:32775/jupyter:0.96
+
+
+
+
 

+ 95 - 0
render_script_to_dag_manual.ipynb

@@ -0,0 +1,95 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {
+    "collapsed": true
+   },
+   "outputs": [],
+   "source": [
+    "from jinja2 import Environment, PackageLoader, select_autoescape\n",
+    "\n",
+    "\n",
+    "with open('./app/core/airflow/templates/data_transfer/data_transfer_basic.py') as f:\n",
+    "    script = f.read()\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "outputs": [
+    {
+     "name": "stderr",
+     "output_type": "stream",
+     "text": [
+      "2022-10-16 01:09:43,455| \"/Users/luoyulong/PycharmProjects/ai2/datax-admin/app/models/database.py\", line 26,\n",
+      " INFO:  connect to mysql success: mysql+mysqlconnector://root:happylay@192.168.199.107:10086/datax_web_dev?charset=utf8&auth_plugin=mysql_native_password\n"
+     ]
+    },
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "get config of None\n",
+      "192.168.199.107\n"
+     ]
+    }
+   ],
+   "source": [
+    "env = Environment(\n",
+    "    loader=PackageLoader('app.core.airflow'),\n",
+    "    autoescape=select_autoescape()\n",
+    ")\n",
+    "template = env.get_template(\"data_transfer/data_transfer.py\")\n",
+    "temp_str = template.render(  {\"env\":  {\"SCRIPT\":script}} )"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "outputs": [],
+   "source": [
+    "with open('./app/core/airflow/templates/data_transfer/data_transfer_task.py','w') as f:\n",
+    "    f.write(temp_str)\n",
+    "    f.close()"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "outputs": [],
+   "source": [],
+   "metadata": {
+    "collapsed": false
+   }
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 2
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython2",
+   "version": "2.7.6"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 0
+}