package_requirements.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from airflow import DAG
  2. from datetime import datetime
  3. from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
  4. from airflow.configuration import conf
  5. # get the current Kubernetes namespace Airflow is running in
  6. namespace = conf.get("kubernetes", "NAMESPACE")
  7. # set the name that will be printed
  8. name = 'luoyulong'
  9. python_requirements = ['sphinx>=1.8','sphinx_rtd_theme','recommonmark>=0.6.0','markdown>=3.4.1']
  10. # instantiate the DAG
  11. with DAG(
  12. start_date=datetime(2022,6,1),
  13. catchup=False,
  14. schedule_interval='@daily',
  15. dag_id='lyl_package_test'
  16. ) as dag:
  17. package_python_libs1 = KubernetesPodOperator(
  18. # unique id of the task within the DAG
  19. task_id='package-task-01',
  20. # the Docker image to launch
  21. image='SXKJ:32775/jupyter:1.1',
  22. image_pull_policy='Always',
  23. # launch the Pod on the same cluster as Airflow is running on
  24. in_cluster=True,
  25. # launch the Pod in the same namespace as Airflow is running in
  26. namespace=namespace,
  27. # Pod configuration
  28. # name the Pod
  29. name='my_fucking_pod01',
  30. # give the Pod name a random suffix, ensure uniqueness in the namespace
  31. random_name_suffix=True,
  32. # attach labels to the Pod, can be used for grouping
  33. labels={'app':'backend', 'env':'dev'},
  34. # reattach to worker instead of creating a new Pod on worker failure
  35. reattach_on_restart=True,
  36. # delete Pod after the task is finished
  37. is_delete_operator_pod=False,
  38. # get log stdout of the container as task logs
  39. get_logs=True,
  40. # log events in case of Pod failure
  41. log_events_on_failure=True,
  42. cmds=["/bin/bash", "-c",'sh /home/sxkj/bigdata/install.sh'],
  43. # pass your name as an environment var
  44. env_vars={"PYTHON_REQUIREMENTS": ' '.join(python_requirements),
  45. "UPLOAD_PATH":'/tmp/x.zip'
  46. }
  47. )
  48. package_python_libs2 = KubernetesPodOperator(
  49. # unique id of the task within the DAG
  50. task_id='package-task-02',
  51. # the Docker image to launch
  52. image='SXKJ:32775/jupyter:1.1',
  53. image_pull_policy='Always',
  54. # launch the Pod on the same cluster as Airflow is running on
  55. in_cluster=True,
  56. # launch the Pod in the same namespace as Airflow is running in
  57. namespace=namespace,
  58. # Pod configuration
  59. # name the Pod
  60. name='my_fucking_pod02',
  61. # give the Pod name a random suffix, ensure uniqueness in the namespace
  62. random_name_suffix=True,
  63. # attach labels to the Pod, can be used for grouping
  64. labels={'app': 'backend', 'env': 'dev'},
  65. # reattach to worker instead of creating a new Pod on worker failure
  66. reattach_on_restart=True,
  67. # delete Pod after the task is finished
  68. is_delete_operator_pod=False,
  69. # get log stdout of the container as task logs
  70. get_logs=True,
  71. # log events in case of Pod failure
  72. log_events_on_failure=True,
  73. cmds=["/bin/bash", "-c", 'echo 12345'],
  74. # pass your name as an environment var
  75. env_vars={"PYTHON_REQUIREMENTS": ' '.join(python_requirements),
  76. "UPLOAD_PATH": '/tmp/x'
  77. }
  78. )
  79. package_python_libs1 >> package_python_libs2