job.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import os
  2. import stat
  3. from app.core.airflow.task import *
  4. from app.core.airflow.uri import get_job_path
  5. from app.schemas import AirflowJob
  6. class AirflowJobSubmitter:
  7. @staticmethod
  8. def submit_dag(item: AirflowJob):
  9. m_compilers = {'python': PythonTaskCompiler,
  10. 'datax': DataXTaskCompiler,
  11. 'sparks': SparksTaskCompiler,
  12. 'java': JavaTaskCompiler}
  13. nodes = [m_compilers[task.task_type](item=task).translate(job_id=item.id,task_mode=item.job_mode or 1) for task in item.tasks if
  14. task.task_type != 'sparks']
  15. spark_nodes = [SparksTaskCompiler(item=task).translate(job_id=item.id,task_mode=item.job_mode or 1) for task in item.tasks if
  16. task.task_type == 'sparks']
  17. edges = []
  18. for edge in item.dependence:
  19. edges.append({"source_operator_name": f'op_{edge[0]}',
  20. "target_operator_name": f'op_{edge[1]}'})
  21. #
  22. # m_interval = {
  23. # "None": "None",
  24. # "@once": "@once",
  25. # "0 * * * *": "@hourly",
  26. # "0 0 * * *": "@daily",
  27. # "0 0 * * 0": "@weekly",
  28. # "0 0 1 * *": "@monthly",
  29. # "0 0 1 1 *": "@yearly"
  30. # }
  31. parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
  32. 'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
  33. 'interval': item.cron if item.cron != 'None' else None
  34. }
  35. env = Environment(
  36. loader=PackageLoader('app.core.airflow'),
  37. autoescape=select_autoescape()
  38. )
  39. template = env.get_template("dag_template.py.jinja2")
  40. dag_content = template.render(parameters)
  41. print(f'finish build:{dag_content}')
  42. output_path = get_job_path(job_id=item.id)
  43. with open(output_path, "w") as fh:
  44. fh.write(dag_content)
  45. os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
  46. print(f'write dag to {output_path}')