add_python_task.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import time
  2. import requests
  3. from auo_tests.tasks.config import host
  4. from auo_tests.tasks.minio_handler import FileHandler, URL as MinioURL
  5. filename = 'hello_world.py'
  6. minio_bucket = 'mytest'
  7. minio_path = f'xxx/{int(time.time())}_' + filename
  8. minio_handler = FileHandler(bucket_name=minio_bucket)
  9. with open(filename, 'rb') as f:
  10. script_str = f.read()
  11. minio_handler.put_byte_file(file_name=minio_path, file_content=script_str)
  12. file_path = f'{minio_bucket}/{minio_path}'
  13. cmd_parameter = '-b '
  14. name = f'python_task_{int(time.time())}'
  15. cluster_minio_url='minio.default:9000'
  16. data = {
  17. "name": name,
  18. "file_urls": [file_path],
  19. "script": '',
  20. "cmd": f"curl $FILE_TO_DOWNLOAD > run.py && python {cmd_parameter} run.py",
  21. "cmd_parameters": cmd_parameter,
  22. "envs": {"FILE_TO_DOWNLOAD": f'http://{cluster_minio_url}/{file_path}'},
  23. "run_image": "SXKJ:32775/pod_python:1.1",
  24. "task_type": "python",
  25. "user_id": 33
  26. }
  27. print(data)
  28. print(f'http://{host}/jpt/jpt_task')
  29. ret = requests.post(url=f'http://{host}/jpt/jpt_task', json=data)
  30. print(ret.json())