import json import time import requests from auo_tests.tasks.config import host from auo_tests.tasks.minio_handler import FileHandler, URL as MinioURL from app.core.airflow.af_util import upload2oss dag_script = { "sub_nodes": [ { "id": "1", "name": "SqlNode1", "op": "sql", "script": "select * from train", }, { "id": "2", "name": "SqlNode1", "op": "sql", "script": "select * from test", }, { "id": "3", "name": "PysparkNode1", "op": "pyspark", # or python "inputs": {'train': ("1", 0), 'test': ("2", 0) }, "script": "", }, ], "edges": [ ("1", "3"), ("2", "3") ] } with open('./spark_script_1009.py', 'r') as f: pyspark_script = f.read() dag_script["sub_nodes"][2]["script"] = pyspark_script filename = 'dag_demo_desc.dag' minio_bucket = 'mytest' minio_path = f'/xxx/tmp/{filename}' upload2oss(json.dumps(dag_script).encode(), minio_path, minio_bucket) file_path = f'{minio_bucket}/{minio_path}' # name = f'sparks_dag_demo_{int(time.time())}' cluster_minio_url = 'minio.default' data = { "name": name, "file_urls": [f'http://{cluster_minio_url}/{file_path}'], "script": '', "cmd": "", "cmd_parameters": "", "envs": {}, "run_image": "SXKJ:32775/jupyter:dag", "task_type": "sparks", "user_id": 33 } # print(data) print(f'http://{host}/jpt/jpt_task') ret = requests.post(url=f'http://{host}/jpt/jpt_task', json=data) # print(ret.json())