123456789101112131415161718192021222324252627282930313233343536373839 |
- import time
- import requests
- from auo_tests.tasks.config import host
- from auo_tests.tasks.minio_handler import FileHandler, URL as MinioURL
- filename = 'hello_world.py'
- minio_bucket = 'mytest'
- minio_path = f'xxx/{int(time.time())}_' + filename
- minio_handler = FileHandler(bucket_name=minio_bucket)
- with open(filename, 'rb') as f:
- script_str = f.read()
- minio_handler.put_byte_file(file_name=minio_path, file_content=script_str)
- file_path = f'{minio_bucket}/{minio_path}'
- cmd_parameter = '-b '
- name = f'python_task_{int(time.time())}'
- cluster_minio_url='minio.default:9000'
- data = {
- "name": name,
- "file_urls": [file_path],
- "script": '',
- "cmd": f"curl $FILE_TO_DOWNLOAD > run.py && python {cmd_parameter} run.py",
- "cmd_parameters": cmd_parameter,
- "envs": {"FILE_TO_DOWNLOAD": f'http://{cluster_minio_url}/{file_path}'},
- "run_image": "SXKJ:32775/pod_python:1.1",
- "task_type": "python",
- "user_id": 33
- }
- print(data)
- print(f'http://{host}/jpt/jpt_task')
- ret = requests.post(url=f'http://{host}/jpt/jpt_task', json=data)
- print(ret.json())
|