1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import json
- import time
- import requests
- from auo_tests.tasks.config import host
- from auo_tests.tasks.minio_handler import FileHandler, URL as MinioURL
- from app.core.airflow.af_util import upload2oss
- dag_script = {
- "sub_nodes": [
- {
- "id": "1",
- "name": "SqlNode1",
- "op": "sql",
- "script": "select * from train",
- },
- {
- "id": "2",
- "name": "SqlNode1",
- "op": "sql",
- "script": "select * from test",
- },
- {
- "id": "3",
- "name": "PysparkNode1",
- "op": "pyspark", # or python
- "inputs": {'train': ("1", 0),
- 'test': ("2", 0)
- },
- "script": "",
- },
- ],
- "edges": [
- ("1", "3"),
- ("2", "3")
- ]
- }
- with open('./spark_script_1009.py', 'r') as f:
- pyspark_script = f.read()
- dag_script["sub_nodes"][2]["script"] = pyspark_script
- filename = 'dag_demo_desc.dag'
- minio_bucket = 'mytest'
- minio_path = f'/xxx/tmp/{filename}'
- upload2oss(json.dumps(dag_script).encode(), minio_path, minio_bucket)
- file_path = f'{minio_bucket}/{minio_path}'
- #
- name = f'sparks_dag_demo_{int(time.time())}'
- cluster_minio_url = 'minio.default'
- data = {
- "name": name,
- "file_urls": [f'http://{cluster_minio_url}/{file_path}'],
- "script": '',
- "cmd": "",
- "cmd_parameters": "",
- "envs": {},
- "run_image": "SXKJ:32775/jupyter:dag",
- "task_type": "sparks",
- "user_id": 33
- }
- # print(data)
- print(f'http://{host}/jpt/jpt_task')
- ret = requests.post(url=f'http://{host}/jpt/jpt_task', json=data)
- # print(ret.json())
|