test_datax_task.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. from airflow import DAG
  2. from datetime import datetime
  3. from airflow.operators.bash import BashOperator
  4. from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
  5. from airflow.configuration import conf
  6. namespace = conf.get("kubernetes", "NAMESPACE")
  7. # set the name that will be printed
  8. name = "luoyulong"
  9. # instantiate the DAG
  10. with DAG(
  11. start_date=datetime(2022,6,1),
  12. catchup=False,
  13. schedule_interval='@daily',
  14. dag_id="test_auto_dag_datax"
  15. ) as dag:
  16. op_xxx_yyy_zzz = KubernetesPodOperator(
  17. task_id="xxx_yyy_zzz",
  18. image="SXKJ:32775/pod_datax:0.9",
  19. in_cluster=True,
  20. namespace=namespace,
  21. name="data_sync",
  22. random_name_suffix=True,
  23. labels={'app':'backend', 'env':'dev'},
  24. reattach_on_restart=True,
  25. is_delete_operator_pod=False,
  26. get_logs=True,
  27. log_events_on_failure=True,
  28. cmds=['/bin/bash', '-c', 'cd datax/bin && echo $SCRIPT > config.json && echo "\'"$HOME/conda/envs/py27/bin/python datax.py $CMD_PARM config.json"\'" |xargs bash -c '],
  29. env_vars={'SCRIPT': '{\n "job": {\n "setting": {\n "speed": {\n "channel": 3\n },\n "errorLimit": {\n "record": 0,\n "percentage": 0.02\n }\n },\n "content": [\n {\n "reader": {\n "name": "mysqlreader",\n "parameter": {\n "username": "root",\n "password": "happylay",\n "column": [\n "id",\n "name"\n ],\n "where": "ct=\'shanghai\'",\n "splitPk": "",\n "connection": [\n {\n "table": [\n "test_3"\n ],\n "jdbcUrl": [\n "jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false"\n ]\n }\n ]\n }\n },\n "writer": {\n "name": "hdfswriter",\n "parameter": {\n "defaultFS": "hdfs://192.168.199.27:9000",\n "fileType": "text",\n "path": "/home/sxkj/bigdata/apache-hive-2.3.9-bin/warehouse/test_p/ct=shanghai",\n "fileName": "test_p",\n "writeMode": "append",\n "fieldDelimiter": ",",\n "column": [\n {\n "name": "id",\n "type": "int"\n },\n {\n "name": "name",\n "type": "string"\n }\n ]\n }\n }\n }\n ]\n }\n}', 'CMD_PARM': '-j "-Xms2G -Xmx2G" -p "-Dct=hangzhou" '}
  30. )