task.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import json
  2. from app.core.airflow.uri import spark_result_tb_name
  3. from app.schemas import AirflowTask
  4. from jinja2 import Environment, PackageLoader, select_autoescape
  5. from app.common.minio import FileHandler
  6. class TaskCompiler:
  7. def __init__(self, item: AirflowTask):
  8. self.task = item
  9. self.default_image = None
  10. self.default_cmd = None
  11. @staticmethod
  12. def render_spark_script(parameters, template_file):
  13. env = Environment(
  14. loader=PackageLoader('app.core.airflow'),
  15. autoescape=select_autoescape()
  16. )
  17. template = env.get_template(template_file)
  18. return template.render(parameters)
  19. def translate(self, task_mode=1):
  20. return {'image': self.task.run_image or self.default_image,
  21. 'cmds': ["/bin/bash", "-c", f"{self.task.cmd or self.default_cmd} "],
  22. 'script': self.task.script,
  23. 'id': f'{self.task.id}',
  24. 'env': {**{"SCRIPT": self.task.script}, **self.task.envs},
  25. 'operator_name': f'op_{self.task.id}',
  26. 'name': self.task.name,
  27. 'desc': ""
  28. }
  29. @staticmethod
  30. def write_to_oss(oss_path, context, bucket='mytest'):
  31. if isinstance(context, str):
  32. context = bytes(context, 'utf-8')
  33. minio_handler = FileHandler(bucket_name=bucket)
  34. return minio_handler.put_byte_file(file_name=oss_path, file_content=context)
  35. class JavaTaskCompiler(TaskCompiler):
  36. def __init__(self, item: AirflowTask):
  37. super(JavaTaskCompiler, self).__init__(item)
  38. self.default_image = 'SXKJ:32775/java:1.0'
  39. self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
  40. self.task.cmd = self.task.cmd or self.default_cmd
  41. tar_name = self.task.file_urls[0].split('/')[-1].split('_')[-1]
  42. self.task.cmd = f'curl {"http://minio.default:9000"}/{self.task.file_urls[0]} --output {tar_name} && {self.task.cmd}'
  43. class PythonTaskCompiler(TaskCompiler):
  44. def __init__(self, item: AirflowTask):
  45. super(PythonTaskCompiler, self).__init__(item)
  46. self.default_image = 'SXKJ:32775/pod_python:1.1'
  47. self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
  48. class DataXTaskCompiler(TaskCompiler):
  49. def __init__(self, item: AirflowTask):
  50. super(DataXTaskCompiler, self).__init__(item)
  51. self.default_image = 'SXKJ:32775/pod_datax:0.9'
  52. # self.default_cmd = f"cd datax/bin && echo $SCRIPT > config.json && echo " \
  53. # f"\"\'\"$HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json" \
  54. # f"\"\'\" |xargs bash -c "
  55. self.default_cmd = f"cd datax/bin && echo \"$SCRIPT\" > transform_datax.py &&cat transform_datax.py && python3 transform_datax.py && cat config.json && $HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json"
  56. def translate(self, task_mode=1):
  57. print(f'{self.task.envs}')
  58. script_str = self.render_spark_script(
  59. parameters={'script': self.task.script,
  60. 'first_begin_time': self.task.envs.get('first_begin_time', None),
  61. 'last_key': self.task.envs.get('last_key', None),
  62. 'current_key': self.task.envs.get('current_key', None),
  63. 'partition_key': self.task.envs.get('partition_key', None),
  64. 'partition_word': self.task.envs.get('partition_word', None),
  65. 'partition_format': self.task.envs.get('partition_format', None),
  66. 'partition_diff': self.task.envs.get('partition_diff', None),
  67. },
  68. template_file="transform_datax.py.jinja2")
  69. # with open('./auto_generate_demo.py','w') as f:
  70. # f.write(script_str)
  71. res = {'image': self.task.run_image or self.default_image,
  72. 'cmds': ["/bin/bash", "-c", f"{self.task.cmd or self.default_cmd} "],
  73. 'script': script_str,
  74. 'id': f'{self.task.id}',
  75. 'env': {**{"SCRIPT": script_str}, **self.task.envs},
  76. 'operator_name': f'op_{self.task.id}',
  77. 'name': self.task.name,
  78. 'desc': ""
  79. }
  80. return res
  81. class SparksTaskCompiler(TaskCompiler):
  82. def __init__(self, item: AirflowTask):
  83. super(SparksTaskCompiler, self).__init__(item)
  84. self.default_image = 'SXKJ:32775/jupyter:0.96'
  85. parameters = {"master": "yarn",
  86. "deploy-mode": "cluster",
  87. "driver-memory": "2g",
  88. "driver-cores ": 1,
  89. "executor-memory": "2g",
  90. "executor-cores": 4,
  91. "num-executors": 1,
  92. "archives": "/home/sxkj/bigdata/py37.zip#python3env"
  93. }
  94. spark_config = {'spark.default.parallelism': 2,
  95. "spark.executor.memoryOverhead": "4g",
  96. "spark.driver.memoryOverhead": "2g",
  97. "spark.yarn.maxAppAttempts": 3,
  98. "spark.yarn.submit.waitAppCompletion": "true",
  99. "spark.pyspark.driver.python": "python3env/py37/bin/python",
  100. "spark.yarn.appMasterEnv.PYSPARK_PYTHON": "python3env/py37/bin/python",
  101. "spark.pyspark.python": "python3env/py37/bin/python"
  102. }
  103. param_str = ' '.join([f'--{k} {v}' for k, v in parameters.items()])
  104. param_str += ''.join([f' --conf {k}={v}' for k, v in spark_config.items()])
  105. basic_cmds = "cd /home/sxkj/bigdata && echo \"$SCRIPT\" > run.py && ${SPARK_HOME}/bin/spark-submit"
  106. self.cmd_str = lambda name: f"{basic_cmds} --name {name} {param_str} run.py"
  107. def translate(self, task_mode=1):
  108. # dag_script = {
  109. # "sub_nodes": [
  110. # {
  111. # "id": "1",
  112. # "name": "SqlNode1",
  113. # "op": "sql",
  114. # "script": "select * from train",
  115. # },
  116. # {
  117. # "id": "2",
  118. # "name": "SqlNode1",
  119. # "op": "sql",
  120. # "script": "select * from test",
  121. # },
  122. # {
  123. # "id": "3",
  124. # "name": "PysparkNode1",
  125. # "op": "pyspark", # or python
  126. # "inputs": {'train': ("1", 0),
  127. # 'test': ("2", 0)
  128. # },
  129. # "script": "import os\n ...",
  130. # },
  131. # ],
  132. # "edges": [
  133. # ("1", "3"),
  134. # ("2", "3")
  135. # ]
  136. # }
  137. infos = json.loads(self.task.script)
  138. sub_nodes = []
  139. for info in infos['sub_nodes']:
  140. if info['op'] == 'sql':
  141. inputs = {}
  142. template_file = 'sql_script_template.py.jinja2'
  143. elif info['op'] == 'pyspark':
  144. inputs = {k: spark_result_tb_name(self.task.id, *v, task_mode) for k, v in info['inputs'].items()}
  145. template_file = 'pyspark_script_template.py.jinja2'
  146. else:
  147. continue
  148. outputs = [spark_result_tb_name(self.task.id, info['id'], 0, task_mode)]
  149. sub_node = {
  150. 'id': f'{self.task.id}_{info["id"]}',
  151. 'name': info['name'],
  152. 'env': {
  153. 'SCRIPT': self.render_spark_script(
  154. parameters={'script': info['script'], 'inputs': inputs, 'outputs': outputs},
  155. template_file=template_file),
  156. },
  157. 'cmds': ['/bin/bash', '-c', self.cmd_str(name=f'spark_{self.task.id}_{info["id"]}')],
  158. 'image': "SXKJ:32775/jupyter:0.96",
  159. }
  160. sub_nodes.append(sub_node)
  161. edges = [(f'{self.task.id}_{source}', f'{self.task.id}_{sink}') for (source, sink) in infos['edges']]
  162. return {
  163. "id": self.task.id,
  164. "sub_nodes": sub_nodes,
  165. "edges": edges,
  166. 'name': self.task.name,
  167. 'desc': "first spark dag task"
  168. }