Browse Source

1. 完成日志记录与查询
2. 完成了datax、java型作业的创建

luoyulong 2 years ago
parent
commit
ef8c17becb

+ 26 - 7
app/core/airflow/job.py

@@ -1,4 +1,5 @@
-from jinja2 import Environment, PackageLoader, select_autoescape
+import os
+import stat
 from app.core.airflow.task import *
 from app.schemas import AirflowJob
 
@@ -7,17 +8,34 @@ class AirflowJobSubmitter:
 
     @staticmethod
     def submit_dag(item: AirflowJob):
-        m_compilers = {'python': PythonTaskCompiler, 'datax': DataXTaskCompiler, 'sparks': SparksTaskCompiler}
-        nodes = [m_compilers[task.task_type](item=task).translate() for task in item.tasks if
+        m_compilers = {'python': PythonTaskCompiler,
+                       'datax': DataXTaskCompiler,
+                       'sparks': SparksTaskCompiler,
+                       'java': JavaTaskCompiler}
+
+        nodes = [m_compilers[task.task_type](item=task).translate(item.job_mode or 1) for task in item.tasks if
                  task.task_type != 'sparks']
-        spark_nodes = [SparksTaskCompiler(item=task).translate() for task in item.tasks if task.task_type == 'sparks']
+        spark_nodes = [SparksTaskCompiler(item=task).translate(item.job_mode or 1) for task in item.tasks if
+                       task.task_type == 'sparks']
         edges = []
         for edge in item.dependence:
             edges.append({"source_operator_name": f'op_{edge[0]}',
                           "target_operator_name": f'op_{edge[1]}'})
+        #
+        # m_interval = {
+        #     "None": "None",
+        #     "@once": "@once",
+        #     "0 * * * *": "@hourly",
+        #     "0 0 * * *": "@daily",
+        #     "0 0 * * 0": "@weekly",
+        #     "0 0 1 * *": "@monthly",
+        #     "0 0 1 1 *": "@yearly"
+        # }
+        parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
+                      'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
+                      'interval': item.cron
+                      }
 
-        parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': item.name,
-                      'user_name': item.user_id, 'job_id': item.id}
         env = Environment(
             loader=PackageLoader('app.core.airflow'),
             autoescape=select_autoescape()
@@ -27,7 +45,8 @@ class AirflowJobSubmitter:
         print(f'finish build:{dag_content}')
 
         dag_path = '/dags/'
-        output_path = dag_path + f'{item.name}_{item.id}.py'
+        output_path = dag_path + f'dag_{item.id}.py'
         with open(output_path, "w") as fh:
             fh.write(dag_content)
+        os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
         print(f'write dag to {output_path}')

+ 54 - 17
app/core/airflow/task.py

@@ -1,5 +1,4 @@
 import json
-
 from app.core.airflow.uri import spark_result_tb_name
 from app.schemas import AirflowTask
 from jinja2 import Environment, PackageLoader, select_autoescape
@@ -12,7 +11,16 @@ class TaskCompiler:
         self.default_image = None
         self.default_cmd = None
 
-    def translate(self):
+    @staticmethod
+    def render_spark_script(parameters, template_file):
+        env = Environment(
+            loader=PackageLoader('app.core.airflow'),
+            autoescape=select_autoescape()
+        )
+        template = env.get_template(template_file)
+        return template.render(parameters)
+
+    def translate(self, task_mode=1):
         return {'image': self.task.run_image or self.default_image,
                 'cmds': ["/bin/bash", "-c", f"{self.task.cmd or self.default_cmd} "],
                 'script': self.task.script,
@@ -31,6 +39,16 @@ class TaskCompiler:
         return minio_handler.put_byte_file(file_name=oss_path, file_content=context)
 
 
+class JavaTaskCompiler(TaskCompiler):
+    def __init__(self, item: AirflowTask):
+        super(JavaTaskCompiler, self).__init__(item)
+        self.default_image = 'SXKJ:32775/java:1.0'
+        self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
+        self.task.cmd = self.task.cmd or self.default_cmd
+        tar_name = self.task.file_urls[0].split('/')[-1].split('_')[-1]
+        self.task.cmd = f'curl {"http://minio.default:9000"}/{self.task.file_urls[0]}  --output {tar_name} && {self.task.cmd}'
+
+
 class PythonTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(PythonTaskCompiler, self).__init__(item)
@@ -42,9 +60,37 @@ class DataXTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(DataXTaskCompiler, self).__init__(item)
         self.default_image = 'SXKJ:32775/pod_datax:0.9'
-        self.default_cmd = f"cd datax/bin && echo $SCRIPT > config.json && echo " \
-                           f"\"\'\"$HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json" \
-                           f"\"\'\" |xargs bash -c "
+        # self.default_cmd = f"cd datax/bin && echo $SCRIPT > config.json && echo " \
+        #                    f"\"\'\"$HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json" \
+        #                    f"\"\'\" |xargs bash -c "
+        self.default_cmd = f"cd datax/bin && echo \"$SCRIPT\" > transform_datax.py &&cat  transform_datax.py  && python3 transform_datax.py && cat config.json && $HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json"
+
+    def translate(self, task_mode=1):
+        print(f'{self.task.envs}')
+        script_str = self.render_spark_script(
+            parameters={'script': self.task.script,
+                        'first_begin_time': self.task.envs.get('first_begin_time', None),
+                        'last_key': self.task.envs.get('last_key', None),
+                        'current_key': self.task.envs.get('current_key', None),
+                        'partition_key': self.task.envs.get('partition_key', None),
+                        'partition_word': self.task.envs.get('partition_word', None),
+                        'partition_format': self.task.envs.get('partition_format', None),
+                        'partition_diff': self.task.envs.get('partition_diff', None),
+                        },
+            template_file="transform_datax.py.jinja2")
+        # with open('./auto_generate_demo.py','w') as f:
+        #     f.write(script_str)
+        res = {'image': self.task.run_image or self.default_image,
+               'cmds': ["/bin/bash", "-c", f"{self.task.cmd or self.default_cmd} "],
+               'script': script_str,
+               'id': f'{self.task.id}',
+               'env': {**{"SCRIPT": script_str}, **self.task.envs},
+               'operator_name': f'op_{self.task.id}',
+               'name': self.task.name,
+               'desc': ""
+               }
+
+        return res
 
 
 class SparksTaskCompiler(TaskCompiler):
@@ -74,7 +120,7 @@ class SparksTaskCompiler(TaskCompiler):
         basic_cmds = "cd /home/sxkj/bigdata && echo \"$SCRIPT\" > run.py && ${SPARK_HOME}/bin/spark-submit"
         self.cmd_str = lambda name: f"{basic_cmds} --name {name} {param_str} run.py"
 
-    def translate(self):
+    def translate(self, task_mode=1):
         # dag_script = {
         #     "sub_nodes": [
         #         {
@@ -111,11 +157,11 @@ class SparksTaskCompiler(TaskCompiler):
                 inputs = {}
                 template_file = 'sql_script_template.py.jinja2'
             elif info['op'] == 'pyspark':
-                inputs = {k: spark_result_tb_name(self.task.id, *v) for k, v in info['inputs'].items()}
+                inputs = {k: spark_result_tb_name(self.task.id, *v, task_mode) for k, v in info['inputs'].items()}
                 template_file = 'pyspark_script_template.py.jinja2'
             else:
                 continue
-            outputs = [spark_result_tb_name(self.task.id, info['id'], 0)]
+            outputs = [spark_result_tb_name(self.task.id, info['id'], 0, task_mode)]
             sub_node = {
                 'id': f'{self.task.id}_{info["id"]}',
                 'name': info['name'],
@@ -137,12 +183,3 @@ class SparksTaskCompiler(TaskCompiler):
             'name': self.task.name,
             'desc': "first spark dag task"
         }
-
-    @staticmethod
-    def render_spark_script(parameters, template_file):
-        env = Environment(
-            loader=PackageLoader('app.core.airflow'),
-            autoescape=select_autoescape()
-        )
-        template = env.get_template(template_file)
-        return template.render(parameters)

+ 3 - 2
app/core/airflow/templates/dag_template.py.jinja2

@@ -45,7 +45,7 @@ namespace = conf.get("kubernetes", "NAMESPACE")
 name = "dag_user{{ user_name }}"
 
 # instantiate the DAG
-with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',dag_id="{{ dag_id }}") as dag:
+with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',dag_id="{{ dag_id }}",is_paused_upon_creation= not {{ trigger_status }}) as dag:
     op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
 
     {% for spark_node in spark_nodes %}
@@ -66,7 +66,7 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',
                                                                     cmds={{ spark_sub_node['cmds'] }},
                                                                     env_vars={{ spark_sub_node['env'] }},
                                                                     on_success_callback=task_finish_alert,
-{#                                                                    on_failure_callback=task_finish_alert#}
+                                                                    on_failure_callback=task_finish_alert
                                                                     )
     {% endfor %}
         {% for edge in spark_node['edges'] %}
@@ -92,6 +92,7 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',
         cmds={{ node['cmds'] }},
         env_vars={{ node['env'] }},
         on_success_callback=task_finish_alert,
+        on_failure_callback=task_finish_alert
 {#        on_failure_callback=task_finish_alert#}
         )
     op_start >> {{ node['operator_name'] }}

+ 1 - 1
app/core/airflow/templates/pyspark_script_template.py.jinja2

@@ -13,7 +13,7 @@ def run(inputs: dict, outputs: list):
     spark = SparkSession.builder.config('hive.metastore.uris',
                                         'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
     param_dict = preprocess(input_infos=inputs, ss=spark)
-    rets = main_func(**param_dict)
+    rets = main_func(**param_dict,spark=spark,sc=spark.sparkContext)
     postprocess(rets=rets, outputs=outputs)
 
 

+ 29 - 0
app/core/airflow/templates/transform_datax.py.jinja2

@@ -0,0 +1,29 @@
+import json
+from datetime import datetime, timedelta
+
+first_begin_time =  {{ first_begin_time }}   # 100
+last_time = {{ "{{ prev_execution_date.timestamp() }}" }}
+current_time = {{ "{{ execution_date.timestamp() }}" }}
+
+partition_word = '{{ partition_word }}' #'datety'
+partition_format = '{{ partition_format }}'  #'%Y-%m-%d'
+partition_diff = {{ partition_diff }} #1
+
+last_key = '${'+'{{last_key}}'+'}'
+current_key = '${'+'{{current_key}}'+'}'
+partition_key = '${'+'{{partition_key}}'+'}'
+
+
+
+datax_config = {{ script }}
+datax_config_str = json.dumps(datax_config)
+if last_key is not None:
+    now = datetime.utcfromtimestamp(datetime.timestamp(datetime.now())) + timedelta(hours=8, days=partition_diff or 1)
+    partition_value = f'{partition_word}={now.strftime(partition_format)}'
+    last_time = last_time or first_begin_time
+    datax_config_str = datax_config_str.replace(last_key, f'{last_time}')
+    datax_config_str = datax_config_str.replace(current_key, f'{current_time}')
+    datax_config_str = datax_config_str.replace(partition_key, partition_value)
+
+with open('config.json', 'w') as f:
+    f.write(datax_config_str)

+ 27 - 0
app/core/airflow/templates/transform_datax_demo.py

@@ -0,0 +1,27 @@
+import json
+from datetime import datetime, timedelta
+
+first_begin_time = 100
+last_time = 200
+current_time = 300
+
+partition_word = 'datety'
+partition_format = '%Y-%m-%d'
+partition_diff = 1
+
+last_key = '${ {lastTime} }'
+current_key = '${currentTime})'
+partition_key = '${partition}'
+
+now = datetime.utcfromtimestamp(datetime.timestamp(datetime.now())) + timedelta(hours=8, days=partition_diff)
+partition_value = f'partition_word={now.strftime(partition_format)}'
+
+datax_config_str = ""
+
+last_time = last_time or first_begin_time
+datax_config_str = datax_config_str.replace(last_key, f'{last_time}')
+datax_config_str = datax_config_str.replace(current_key, f'{current_time}')
+datax_config_str = datax_config_str.replace(partition_key, partition_value)
+
+with open('config.json', 'wb') as f:
+    f.write(datax_config_str)

+ 2 - 2
app/crud/af_job.py

@@ -20,8 +20,8 @@ def get_airflow_jobs(db: Session):
     return res
 
 
-def get_airflow_job_once(db: Session, id: int):
-    res: models.AirflowJob = db.query(models.AirflowJob).filter(models.AirflowJob.id == id).first()
+def get_airflow_job_once(db: Session, item_id: int):
+    res: models.AirflowJob = db.query(models.AirflowJob).filter(models.AirflowJob.id == item_id).first()
     return res
 
 

+ 2 - 0
app/models/af_job.py

@@ -7,6 +7,8 @@ class AirflowJob(BaseModel):
     id = Column(Integer, primary_key=True, index=True)
     name = Column(Text)
     job_type = Column(Integer)  # 任务类型:可取 1=单作业任务 ,2=多作业任务
+    job_mode = Column(Integer) # 任务模式:1= 常规模式 2=调试模式
+
 
     tasks = Column(JSON)  # 任务列表
     dependence = Column(JSON)  # 作业间的依赖

+ 26 - 1
app/routers/job.py

@@ -1,3 +1,6 @@
+import json
+
+import requests
 from fastapi import APIRouter, Depends
 from fastapi_pagination import paginate, Params
 from sqlalchemy.orm import Session
@@ -26,7 +29,6 @@ def get_af_jobs_once(item_id: int, db: Session = Depends(get_db)):
     return crud.get_airflow_job_once(db, item_id)
 
 
-
 @router_af_job.post("/")
 @web_try()
 @sxtimeit
@@ -48,3 +50,26 @@ def add_dag_submit(id: int, db: Session = Depends(get_db)):
     item = crud.get_airflow_job_once(db, id)
     create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
     # return crud.create_airflow_job(item)
+
+
+@router_af_job.post("/{item_id}/run")
+@web_try()
+@sxtimeit
+def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)):
+    job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
+    uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
+    headers = {
+        'content-type': 'application/json',
+        'Authorization': 'basic YWRtaW46YWRtaW4=',
+        'Host':'airflow-web.sxkj.com'
+    }
+
+    response = requests.post(uri, headers=headers, data=json.dumps({}))
+    return response.json()
+
+#
+# @router_af_job.post("/run")
+# @web_try()
+# @sxtimeit
+# def trigger_af_job_run(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
+#     return crud.create_airflow_job(db, item)

+ 1 - 1
app/routers/run.py

@@ -30,7 +30,7 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
 @sxtimeit
 def add_airflow_run(item: Item, db: Session = Depends(get_db)):
     print(item.data)
-    job_item = crud.get_airflow_job_once(db=db, id=item.data["job_id"])
+    job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
     sparks_dependence = {}
     if job_item is not None:
         for task in schemas.AirflowJob(**job_item.to_dict()).tasks:

+ 3 - 0
app/schemas/af_job.py

@@ -18,6 +18,7 @@ class AirflowJobBase(BaseModel):
 
 class AirflowJobCreate(AirflowJobBase):
     job_type: int
+    job_mode: int
     user_id: int
 
 
@@ -32,6 +33,8 @@ class AirflowJob(AirflowJobBase):
     create_time: int
     update_time: int
     user_id: int
+    job_type: int
+    job_mode: int
 
     class Config:
         orm_mode = True

+ 172 - 0
auo_tests/jupyters/02-edit-datax-json.ipynb

@@ -0,0 +1,172 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 15,
+   "metadata": {
+    "collapsed": true
+   },
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import datetime"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "outputs": [],
+   "source": [
+    "datax_config = json.load(open('./datax-config.json','r'))"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 9,
+   "outputs": [],
+   "source": [
+    "datax_config_str = json.dumps(datax_config)"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 10,
+   "outputs": [
+    {
+     "data": {
+      "text/plain": "'{\"job\": {\"content\": [{\"reader\": {\"name\": \"mysqlreader\", \"parameter\": {\"column\": [\"*\"], \"connection\": [{\"jdbcUrl\": [\"jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true\"], \"querySql\": [\"select * from test_order where updateTime >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})\"]}], \"password\": \"root\", \"username\": \"root\"}}, \"writer\": {\"name\": \"hdfswriter\", \"parameter\": {\"defaultFS\": \"hdfs://localhost:9000\", \"fileType\": \"text\", \"path\": \"/user/hive/warehouse/offline.db/test_order/${partition}\", \"fileName\": \"test_order\", \"column\": [{\"name\": \"keyno\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"code\", \"type\": \"string\"}, {\"name\": \"status\", \"type\": \"string\"}, {\"name\": \"province\", \"type\": \"string\"}, {\"name\": \"city\", \"type\": \"string\"}], \"writeMode\": \"append\", \"fieldDelimiter\": \",\"}}}], \"setting\": {\"speed\": {\"channel\": 2}}}}'"
+     },
+     "execution_count": 10,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "datax_config_str"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 24,
+   "outputs": [],
+   "source": [
+    "first_begin_time = 100\n",
+    "last_time = 200\n",
+    "current_time = 300\n",
+    "\n",
+    "partition_work = 'datety'\n",
+    "partition_format = '%Y-%m-%d'\n",
+    "partition_diff = 1\n",
+    "\n",
+    "\n",
+    "\n",
+    "last_key = '${lastTime}'\n",
+    "current_key = '${currentTime})'\n",
+    "partition_key = '${partition}'"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 14,
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "{\"job\": {\"content\": [{\"reader\": {\"name\": \"mysqlreader\", \"parameter\": {\"column\": [\"*\"], \"connection\": [{\"jdbcUrl\": [\"jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true\"], \"querySql\": [\"select * from test_order where updateTime >= FROM_UNIXTIME(200) and operationDate < FROM_UNIXTIME(300\"]}], \"password\": \"root\", \"username\": \"root\"}}, \"writer\": {\"name\": \"hdfswriter\", \"parameter\": {\"defaultFS\": \"hdfs://localhost:9000\", \"fileType\": \"text\", \"path\": \"/user/hive/warehouse/offline.db/test_order/${partition}\", \"fileName\": \"test_order\", \"column\": [{\"name\": \"keyno\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"code\", \"type\": \"string\"}, {\"name\": \"status\", \"type\": \"string\"}, {\"name\": \"province\", \"type\": \"string\"}, {\"name\": \"city\", \"type\": \"string\"}], \"writeMode\": \"append\", \"fieldDelimiter\": \",\"}}}], \"setting\": {\"speed\": {\"channel\": 2}}}}\n"
+     ]
+    }
+   ],
+   "source": [
+    "last_time = last_time or first_begin_time\n",
+    "print(datax_config_str.replace(last_key,f'{last_time}').replace(current_key,f'{current_time}'))"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 30,
+   "outputs": [
+    {
+     "data": {
+      "text/plain": "'2022-10-12'"
+     },
+     "execution_count": 30,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "\n",
+    "from datetime import timezone\n",
+    "\n",
+    "now = datetime.datetime.utcfromtimestamp(datetime.datetime.timestamp(datetime.datetime.now())) + datetime.timedelta(hours=8,days=partition_diff)\n",
+    "now.strftime(partition_format)\n",
+    "# DateTime temp = DateTime.ParseExact(sourceDate, \"dd-MM-yyyy\", CultureInfo.InvariantCulture);\n",
+    "# string str = temp.ToString(\"yyyy-MM-dd\");\n",
+    "\n",
+    "now.strftime(partition_format)\n",
+    "# datetime.datetime.now(tz=timezone.)"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "outputs": [],
+   "source": [],
+   "metadata": {
+    "collapsed": false,
+    "pycharm": {
+     "is_executing": true
+    }
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 30,
+   "outputs": [],
+   "source": [],
+   "metadata": {
+    "collapsed": false
+   }
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 2
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython2",
+   "version": "2.7.6"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 0
+}

+ 70 - 0
auo_tests/jupyters/datax-config.json

@@ -0,0 +1,70 @@
+{
+  "job": {
+    "content": [
+      {
+        "reader": {
+          "name": "mysqlreader",
+          "parameter": {
+            "column": [
+              "*"
+            ],
+            "connection": [
+              {
+                "jdbcUrl": [
+                  "jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true"
+                ],
+                "querySql": [
+                  "select * from test_order where updateTime >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})"
+                ]
+              }
+            ],
+            "password": "root",
+            "username": "root"
+          }
+        },
+        "writer": {
+          "name": "hdfswriter",
+          "parameter": {
+            "defaultFS": "hdfs://localhost:9000",
+            "fileType": "text",
+            "path": "/user/hive/warehouse/offline.db/test_order/${partition}",
+            "fileName": "test_order",
+            "column": [
+              {
+                "name": "keyno",
+                "type": "string"
+              },
+              {
+                "name": "name",
+                "type": "string"
+              },
+              {
+                "name": "code",
+                "type": "string"
+              },
+              {
+                "name": "status",
+                "type": "string"
+              },
+              {
+                "name": "province",
+                "type": "string"
+              },
+              {
+                "name": "city",
+                "type": "string"
+              }
+            ],
+            "writeMode": "append",
+            "fieldDelimiter": ","
+          }
+        }
+      }
+    ],
+    "setting": {
+      "speed": {
+        "channel": 2
+      }
+    }
+  }
+}

+ 70 - 0
auo_tests/tasks/datax/config.json

@@ -0,0 +1,70 @@
+{
+  "job": {
+    "content": [
+      {
+        "reader": {
+          "name": "mysqlreader",
+          "parameter": {
+            "column": [
+              "*"
+            ],
+            "connection": [
+              {
+                "jdbcUrl": [
+                  "jdbc:mysql://localhost:3306/order?useUnicode=true&characterEncoding=utf-8&useSSL=false&rewriteBatchedStatements=true"
+                ],
+                "querySql": [
+                  "select * from test_order where updateTime >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})"
+                ]
+              }
+            ],
+            "password": "root",
+            "username": "root"
+          }
+        },
+        "writer": {
+          "name": "hdfswriter",
+          "parameter": {
+            "defaultFS": "hdfs://localhost:9000",
+            "fileType": "text",
+            "path": "/user/hive/warehouse/offline.db/test_order/${partition}",
+            "fileName": "test_order",
+            "column": [
+              {
+                "name": "keyno",
+                "type": "string"
+              },
+              {
+                "name": "name",
+                "type": "string"
+              },
+              {
+                "name": "code",
+                "type": "string"
+              },
+              {
+                "name": "status",
+                "type": "string"
+              },
+              {
+                "name": "province",
+                "type": "string"
+              },
+              {
+                "name": "city",
+                "type": "string"
+              }
+            ],
+            "writeMode": "append",
+            "fieldDelimiter": ","
+          }
+        }
+      }
+    ],
+    "setting": {
+      "speed": {
+        "channel": 2
+      }
+    }
+  }
+}

+ 60 - 0
auo_tests/tasks/datax/config2.json

@@ -0,0 +1,60 @@
+{
+  "job": {
+    "setting": {
+      "speed": {
+        "channel": 3
+      },
+      "errorLimit": {
+        "record": 0,
+        "percentage": 0.02
+      }
+    },
+    "content": [
+      {
+        "reader": {
+          "name": "mysqlreader",
+          "parameter": {
+            "username": "root",
+            "password": "happylay",
+            "splitPk": "",
+            "connection": [
+              {
+                "querySql": [
+                  "select id, txn_amount, txn_type from test_4 where txn_date >= FROM_UNIXTIME(${lastTime}) and txn_date < FROM_UNIXTIME(${currentTime})"
+                ],
+                "jdbcUrl": [
+                  "jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false"
+                ]
+              }
+            ]
+          }
+        },
+        "writer": {
+          "name": "hdfswriter",
+          "parameter": {
+            "defaultFS": "hdfs://192.168.199.27:9000",
+            "fileType": "text",
+            "path": "/home/sxkj/bigdata/apache-hive-2.3.9-bin/warehouse/test_h4/${partition}",
+            "fileName": "000000",
+            "writeMode": "append",
+            "fieldDelimiter": ",",
+            "column": [
+              {
+                "name": "id",
+                "type": "int"
+              },
+              {
+                "name": "txn_amount",
+                "type": "double"
+              },
+              {
+                "name": "txn_type",
+                "type": "string"
+              }
+            ]
+          }
+        }
+      }
+    ]
+  }
+}

File diff suppressed because it is too large
+ 134 - 0
auo_tests/tasks/datax/datax_debug.py.ipynb


Some files were not shown because too many files changed in this diff