bash_operator.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # Operator source: https://raw.githubusercontent.com/apache/airflow/1.10.15/airflow/operators/bash_operator.py
  17. # This file has been modified from its original source to ensure it passes
  18. # Elyra linting. No functionality changes have been made.
  19. #
  20. from builtins import bytes
  21. import os
  22. import signal
  23. from subprocess import PIPE
  24. from subprocess import Popen
  25. from subprocess import STDOUT
  26. from tempfile import gettempdir
  27. from tempfile import NamedTemporaryFile
  28. from airflow.exceptions import AirflowException
  29. from airflow.models import BaseOperator
  30. from airflow.utils.decorators import apply_defaults
  31. from airflow.utils.file import TemporaryDirectory
  32. from airflow.utils.operator_helpers import context_to_airflow_vars
  33. class BashOperator(BaseOperator):
  34. r"""
  35. Execute a Bash script, command or set of commands.
  36. .. seealso::
  37. For more information on how to use this operator, take a look at the guide:
  38. :ref:`howto/operator:BashOperator`
  39. :param bash_command: The command, set of commands or reference to a
  40. bash script (must be '.sh') to be executed. (templated)
  41. :type bash_command: str
  42. :param xcom_push: If xcom_push is True, the last line written to stdout
  43. will also be pushed to an XCom when the bash command completes.
  44. :type xcom_push: bool
  45. :param env: If env is not None, it must be a mapping that defines the
  46. environment variables for the new process; these are used instead
  47. of inheriting the current process environment, which is the default
  48. behavior. (templated)
  49. :type env: dict
  50. :param output_encoding: Output encoding of bash command
  51. :type output_encoding: str
  52. .. warning::
  53. Care should be taken with "user" input or when using Jinja templates in the
  54. ``bash_command``, as this bash operator does not perform any escaping or
  55. sanitization of the command.
  56. This applies mostly to using "dag_run" conf, as that can be submitted via
  57. users in the Web UI. Most of the default template variables are not at
  58. risk.
  59. For example, do **not** do this:
  60. .. code-block:: python
  61. bash_task = BashOperator(
  62. task_id="bash_task",
  63. bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
  64. )
  65. Instead, you should pass this via the ``env`` kwarg and use double-quotes
  66. inside the bash_command, as below:
  67. .. code-block:: python
  68. bash_task = BashOperator(
  69. task_id="bash_task",
  70. bash_command='echo "here is the message: \'$message\'"',
  71. env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
  72. )
  73. """
  74. template_fields = ("bash_command", "env")
  75. template_ext = (
  76. ".sh",
  77. ".bash",
  78. )
  79. ui_color = "#f0ede4"
  80. @apply_defaults
  81. def __init__(self, bash_command, xcom_push=False, env=None, output_encoding="utf-8", *args, **kwargs):
  82. super(BashOperator, self).__init__(*args, **kwargs)
  83. self.bash_command = bash_command
  84. self.env = env
  85. self.xcom_push_flag = xcom_push
  86. self.output_encoding = output_encoding
  87. self.sub_process = None
  88. def execute(self, context):
  89. """
  90. Execute the bash command in a temporary directory
  91. which will be cleaned afterwards
  92. """
  93. self.log.info(f"Tmp dir root location: \n {gettempdir()}")
  94. # Prepare env for child process.
  95. env = self.env
  96. if env is None:
  97. env = os.environ.copy()
  98. airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
  99. acv_log = "\n".join([f"{k}={v}" for k, v in airflow_context_vars.items()])
  100. self.log.debug("Exporting the following env vars:\n" f"{acv_log}")
  101. env.update(airflow_context_vars)
  102. self.lineage_data = self.bash_command
  103. with TemporaryDirectory(prefix="airflowtmp") as tmp_dir:
  104. with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
  105. f.write(bytes(self.bash_command, "utf_8"))
  106. f.flush()
  107. fname = f.name
  108. script_location = os.path.abspath(fname)
  109. self.log.info(f"Temporary script location: {script_location}")
  110. def pre_exec():
  111. # Restore default signal disposition and invoke setsid
  112. for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
  113. if hasattr(signal, sig):
  114. signal.signal(getattr(signal, sig), signal.SIG_DFL)
  115. os.setsid()
  116. self.log.info(f"Running command: {self.bash_command}")
  117. self.sub_process = Popen(
  118. ["bash", fname], stdout=PIPE, stderr=STDOUT, cwd=tmp_dir, env=env, preexec_fn=pre_exec
  119. )
  120. self.log.info("Output:")
  121. line = ""
  122. for line in iter(self.sub_process.stdout.readline, b""):
  123. line = line.decode(self.output_encoding).rstrip()
  124. self.log.info(line)
  125. self.sub_process.wait()
  126. self.log.info(f"Command exited with return code {self.sub_process.returncode}")
  127. if self.sub_process.returncode:
  128. raise AirflowException("Bash command failed")
  129. if self.xcom_push_flag:
  130. return line
  131. def on_kill(self):
  132. self.log.info("Sending SIGTERM signal to bash process group")
  133. if self.sub_process and hasattr(self.sub_process, "pid"):
  134. os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)