add_sparks_task.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import json
  2. import time
  3. import requests
  4. from auo_tests.tasks.config import host
  5. from auo_tests.tasks.minio_handler import FileHandler, URL as MinioURL
  6. from app.core.airflow.af_util import upload2oss
  7. dag_script = {
  8. "sub_nodes": [
  9. {
  10. "id": "1",
  11. "name": "SqlNode1",
  12. "op": "sql",
  13. "script": "select * from train",
  14. },
  15. {
  16. "id": "2",
  17. "name": "SqlNode1",
  18. "op": "sql",
  19. "script": "select * from test",
  20. },
  21. {
  22. "id": "3",
  23. "name": "PysparkNode1",
  24. "op": "pyspark", # or python
  25. "inputs": {'train': ("1", 0),
  26. 'test': ("2", 0)
  27. },
  28. "script": "",
  29. },
  30. ],
  31. "edges": [
  32. ("1", "3"),
  33. ("2", "3")
  34. ]
  35. }
  36. with open('./spark_script_1009.py', 'r') as f:
  37. pyspark_script = f.read()
  38. dag_script["sub_nodes"][2]["script"] = pyspark_script
  39. filename = 'dag_demo_desc.dag'
  40. minio_bucket = 'mytest'
  41. minio_path = f'/xxx/tmp/{filename}'
  42. upload2oss(json.dumps(dag_script).encode(), minio_path, minio_bucket)
  43. file_path = f'{minio_bucket}/{minio_path}'
  44. #
  45. name = f'sparks_dag_demo_{int(time.time())}'
  46. cluster_minio_url = 'minio.default'
  47. data = {
  48. "name": name,
  49. "file_urls": [f'http://{cluster_minio_url}/{file_path}'],
  50. "script": '',
  51. "cmd": "",
  52. "cmd_parameters": "",
  53. "envs": {},
  54. "run_image": "SXKJ:32775/jupyter:dag",
  55. "task_type": "sparks",
  56. "user_id": 33
  57. }
  58. # print(data)
  59. print(f'http://{host}/jpt/jpt_task')
  60. ret = requests.post(url=f'http://{host}/jpt/jpt_task', json=data)
  61. # print(ret.json())