123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import json
- import requests as R
- import uuid
- import time
- class AI(object):
- def __init__(self):
- self.start_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/execute"
- self.log_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/node_log"
- self.status_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/debug_status"
- self.result_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/node_result"
- self.lst = []
- self.dag_id = self.generate_uuid()
- def generate_uuid(self):
- return str(uuid.uuid4()).replace("-", "")
- def test_start(self):
- id1 = self.generate_uuid()
- id2 = self.generate_uuid()
- lst = [id1, id2]
- dag_script = {
- "sub_nodes":
- [
- {
- "id": id1,
- "name": "is1",
- "op": "sql",
- "script": "select feature1, feature10, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9, label, uuid from ailab.train1",
- "skip": False
- },
- {
- "id": id2,
- "name": "sql1",
- "op": "sql",
- "inputs":
- {
- "input0": ["5LH4YQVAtSrWj3cHhFOPu", 0]
- },
- "script": "select * from input0",
- "skip": False
- }
- ],
- "edges": [[id1, id2]]
- }
- data = {
- "dag_script": json.dumps(dag_script),
- "dag_uuid": self.dag_id
- }
- # dag_uuid如果存在,则更新任务,否则新建任务
- r = R.post(url=self.start_url, json=data)
- print("testing execute process completed")
- self.lst = lst
- def test_log(self):
- for id in self.lst:
- url = "{}?dag_uuid={}&node_id={}".format(self.log_url, self.dag_id, id)
- r = R.get(url)
- js = json.loads(r.text)
- if js["code"] == 200 and js["data"] is not None:
- print("testing log {} process completed".format(id))
- elif js["code"] == 200:
- print("data in log is empty")
- else:
- print("status code in log is not 200")
- def test_status(self):
- url = "{}?dag_uuid={}".format(self.status_url, self.dag_id)
- r = R.get(url)
- js = json.loads(r.text)
- while js["code"] != 200:
- r = R.get(url)
- js = json.loads(r.text)
- res = js["data"]["job"]
- print("now finished jobs num is {}".format(res))
- return res
- def test_result(self, index):
- url = "{}?dag_uuid={}&node_id={}&out_pin=0".format(self.result_url, self.dag_id, self.lst[index])
- r = R.get(url=url)
- print("result json is {}".format(r.text))
- js = json.loads(r.text)
- if js["data"]["content"] is not None:
- print("getting result successfully")
- elif js["code"] != 200:
- print("status code is not 200")
- else:
- print("fail to get result")
- def test(self):
- self.test_start()
- num = 0
- while self.test_status() <= 1:
- time.sleep(5)
- res = self.test_status()
- if num != res:
- print("[now]node {}".format(res))
- num = res
- print("testing status process completed")
- self.test_log()
- self.test_result(0)
- print("testing result process completed")
- if __name__ == "__main__":
- s = AI()
- s.test()
|