job.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import os
  2. import stat
  3. import time
  4. from app.core.airflow.task import *
  5. from app.core.airflow.af_util import get_job_path
  6. from app.schemas import AirflowJob
  7. class AirflowJobSubmitter:
  8. @staticmethod
  9. def submit_dag(item: AirflowJob):
  10. m_compilers = {'python': PythonTaskCompiler,
  11. 'datax': DataXTaskCompiler,
  12. 'sparks': SparksTaskCompiler,
  13. 'java': JavaTaskCompiler}
  14. nodes = [m_compilers[task.task_type](item=task).translate(job_id=item.id, task_mode=item.job_mode or 1) for task
  15. in item.tasks if
  16. task.task_type != 'sparks']
  17. spark_nodes = [SparksTaskCompiler(item=task).translate(job_id=item.id, task_mode=item.job_mode or 1) for task in
  18. item.tasks if
  19. task.task_type == 'sparks']
  20. edges = []
  21. for edge in item.dependence:
  22. edges.append({"source_operator_name": f'op_{edge[0]}',
  23. "target_operator_name": f'op_{edge[1]}'})
  24. #
  25. # m_interval = {
  26. # "None": "None",
  27. # "@once": "@once",
  28. # "0 * * * *": "@hourly",
  29. # "0 0 * * *": "@daily",
  30. # "0 0 * * 0": "@weekly",
  31. # "0 0 1 * *": "@monthly",
  32. # "0 0 1 1 *": "@yearly"
  33. # }
  34. # print(f" image pull key is : {config.get('K8S', 'image_pull_key')}")
  35. parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
  36. 'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
  37. 'interval': item.cron if item.cron != 'None' else None,
  38. 'af_backend_uri': config.get('AF_BACKEND', 'uri'),
  39. 'node_selectors': config.get('K8S', 'node_selectors', fallback=None),
  40. 'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None),
  41. 'enable_notify': True
  42. }
  43. AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=item.id))
  44. @staticmethod
  45. def generate_dag_on_airflow(parameters, save_path):
  46. env = Environment(
  47. loader=PackageLoader('app.core.airflow'),
  48. autoescape=select_autoescape()
  49. )
  50. template = env.get_template("dag_template.py.jinja2")
  51. dag_content = template.render(parameters)
  52. print(f'finish build:{dag_content}')
  53. with open(save_path, "w") as fh:
  54. fh.write(dag_content)
  55. os.chmod(save_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
  56. print(f'write dag to {save_path}')
  57. @staticmethod
  58. def auto_submit_data_transfer():
  59. job_id = 0
  60. user_id = 0
  61. spark_task_demo = SparksTaskCompiler(item=None)
  62. spark_nodes = [
  63. {
  64. "sub_nodes": [{
  65. "name": 'read_and_save',
  66. "id": 0,
  67. "image": spark_task_demo.default_image,
  68. "cmds": ['/bin/bash', '-c', spark_task_demo.cmd_str(name='spark_data_transfer')],
  69. "env": {"SCRIPT": spark_task_demo.render_spark_script(
  70. parameters={"hive_metastore_uris": config.get('HIVE_METASTORE', 'uris')},
  71. template_file="data_transfer_dag_template.py.jinja2")
  72. },
  73. }],
  74. "edges": [],
  75. "name": 'data_save',
  76. "desc": 'task for data saving',
  77. "id": 0,
  78. }
  79. ]
  80. parameters = {'nodes': [], 'spark_nodes': spark_nodes, 'edges': [], 'dag_id': f'dag_{job_id}',
  81. 'user_name': user_id, 'job_id': job_id, 'trigger_status': False,
  82. 'interval': None,
  83. 'af_backend_uri': config.get('AF_BACKEND', 'uri'),
  84. 'node_selectors': config.get('K8S', 'node_selectors', fallback=None),
  85. 'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None),
  86. 'enable_notify': False
  87. }
  88. AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=job_id))
  89. print('create data transfer job success!')
  90. @staticmethod
  91. def auto_submit_requirements_install():
  92. job_id = -1
  93. user_id = 0
  94. cmds = []
  95. if config.get('K8S', 'enable_kerberos', fallback=None) in ['true', "True", True]:
  96. principal = config.get('HIVE', 'principal', fallback=None)
  97. cmds.append( f"kinit -kt /workspace/conf/user.keytab {principal}")
  98. if config.get('HOST_ALIAS', 'enable', fallback=None) in ['true', "True", True]:
  99. host_alias: Dict = json.loads(config.get('HOST_ALIAS', 'host_alias'))
  100. for k, v in host_alias.items():
  101. cmds.append(f"echo '{k} {v}' >> /etc/hosts")
  102. cmds.append("cd /workspace")
  103. cmds.append("./py37/bin/pip3 install {{ dag_run.conf.get('requirements_str','numpy') }} -i https://pypi.douban.com/simple/")
  104. cmds.append("zip -q -r /tmp/py37.zip py37")
  105. cmds.append("hdfs dfs -put -f /tmp/py37.zip {{dag_run.conf.get('target_path','/tmp/basic.zip')}} ")
  106. py_task_demo = PythonTaskCompiler(item=AirflowTask(id=0, task_type="python", user_id=user_id, cmd_parameters="",
  107. create_time=0, update_time=0, name="requirements_install",
  108. file_urls=[], script="", cmd=' && '.join(cmds), envs={},
  109. run_image=SparksTaskCompiler(item=None).default_image))
  110. parameters = {'nodes': [py_task_demo.translate(job_id=job_id, task_mode=1)], 'spark_nodes': [], 'edges': [],
  111. 'dag_id': f'dag_{job_id}',
  112. 'user_name': user_id, 'job_id': job_id, 'trigger_status': False,
  113. 'interval': None,
  114. 'af_backend_uri': config.get('AF_BACKEND', 'uri'),
  115. 'node_selectors': config.get('K8S', 'node_selectors', fallback=None),
  116. 'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None),
  117. 'enable_notify': False
  118. }
  119. AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=job_id))
  120. print('create requirements_install success!')