123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import os
- import stat
- import time
- from app.core.airflow.task import *
- from app.core.airflow.af_util import get_job_path
- from app.schemas import AirflowJob
- class AirflowJobSubmitter:
- @staticmethod
- def submit_dag(item: AirflowJob):
- m_compilers = {'python': PythonTaskCompiler,
- 'datax': DataXTaskCompiler,
- 'sparks': SparksTaskCompiler,
- 'java': JavaTaskCompiler}
- nodes = [m_compilers[task.task_type](item=task).translate(job_id=item.id, task_mode=item.job_mode or 1) for task
- in item.tasks if
- task.task_type != 'sparks']
- spark_nodes = [SparksTaskCompiler(item=task).translate(job_id=item.id, task_mode=item.job_mode or 1) for task in
- item.tasks if
- task.task_type == 'sparks']
- edges = []
- for edge in item.dependence:
- edges.append({"source_operator_name": f'op_{edge[0]}',
- "target_operator_name": f'op_{edge[1]}'})
- #
- # m_interval = {
- # "None": "None",
- # "@once": "@once",
- # "0 * * * *": "@hourly",
- # "0 0 * * *": "@daily",
- # "0 0 * * 0": "@weekly",
- # "0 0 1 * *": "@monthly",
- # "0 0 1 1 *": "@yearly"
- # }
- # print(f" image pull key is : {config.get('K8S', 'image_pull_key')}")
- parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
- 'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
- 'interval': item.cron if item.cron != 'None' else None,
- 'af_backend_uri': config.get('AF_BACKEND', 'uri'),
- 'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None),
- 'enable_notify': True
- }
- # env = Environment(
- # loader=PackageLoader('app.core.airflow'),
- # autoescape=select_autoescape()
- # )
- # template = env.get_template("dag_template.py.jinja2")
- # dag_content = template.render(parameters)
- # print(f'finish build:{dag_content}')
- #
- # output_path = get_job_path(job_id=item.id)
- # with open(output_path, "w") as fh:
- # fh.write(dag_content)
- #
- # os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
- # print(f'write dag to {output_path}')
- AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=item.id))
- @staticmethod
- def generate_dag_on_airflow(parameters, save_path):
- env = Environment(
- loader=PackageLoader('app.core.airflow'),
- autoescape=select_autoescape()
- )
- template = env.get_template("dag_template.py.jinja2")
- dag_content = template.render(parameters)
- print(f'finish build:{dag_content}')
- with open(save_path, "w") as fh:
- fh.write(dag_content)
- os.chmod(save_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
- print(f'write dag to {save_path}')
- @staticmethod
- def auto_submit_data_transfer():
- job_id = 0
- user_id = 0
- spark_task_demo = SparksTaskCompiler(item=None)
- spark_nodes = [
- {
- "sub_nodes": [{
- "name": 'read_and_save',
- "id": 0,
- "image": spark_task_demo.default_image,
- "cmds": ['/bin/bash', '-c', spark_task_demo.cmd_str(name='spark_data_transfer')],
- "env": {"SCRIPT": spark_task_demo.render_spark_script(
- parameters={"hive_metastore_uris": config.get('HIVE_METASTORE', 'uris')},
- template_file="data_transfer_dag_template.py.jinja2")
- },
- }],
- "edges": [],
- "name": 'data_save',
- "desc": 'task for data saving',
- "id": 0,
- }
- ]
- print(spark_nodes[0]['sub_nodes'][0]['env']['SCRIPT'])
- parameters = {'nodes': [], 'spark_nodes': spark_nodes, 'edges': [], 'dag_id': f'dag_{job_id}',
- 'user_name': user_id, 'job_id': job_id, 'trigger_status': False,
- 'interval': None,
- 'af_backend_uri': config.get('AF_BACKEND', 'uri'),
- 'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None),
- 'enable_notify':False
- }
- AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=job_id))
- print('create data transfer job success!')
- # @staticmethod
- # def auto_submit_data_transfer2():
- # # name: str
- # # file_urls: Optional[List[str]] = []
- # # script: str
- # # cmd: Optional[str] = ""
- # # cmd_parameters: str
- # # envs: Optional[Dict[str, str]] = {}
- # # run_image: str
- # # task_type: str
- #
- # df_task = AirflowTask(name='data_save', task_type='sparks', file_urls=[], script='', cmd='', env={})
- # # id: int
- # # job_type: int
- # # create_time: int
- # # update_time: int
- # # user_id: int
- # # job_mode: int
- # # tasks: List[AirflowTask]
- # # name: str
- # # dependence: List = []
- # # cron: str
- # # desc: str
- # # route_strategy: str
- # # block_strategy: str
- # # executor_timeout: int
- # # executor_fail_retry_count: int
- # # trigger_status: int
- #
- # job_item = AirflowJob(id=0, job_type=1, tasks=[df_task], create_time=int(time.time()), user_id=0, job_mode=1,
- # name='data_transfer', dependence=[], cron="None", trigger_status=0)
|