main.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import json
  2. import requests as R
  3. import uuid
  4. import time
  5. class AI(object):
  6. def __init__(self):
  7. self.start_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/execute"
  8. self.log_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/node_log"
  9. self.status_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/debug_status"
  10. self.result_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/node_result"
  11. self.lst = []
  12. self.dag_id = self.generate_uuid()
  13. def generate_uuid(self):
  14. return str(uuid.uuid4()).replace("-", "")
  15. def test_start(self):
  16. id1 = self.generate_uuid()
  17. id2 = self.generate_uuid()
  18. lst = [id1, id2]
  19. dag_script = {
  20. "sub_nodes":
  21. [
  22. {
  23. "id": id1,
  24. "name": "is1",
  25. "op": "sql",
  26. "script": "select feature1, feature10, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9, label, uuid from ailab.train1",
  27. "skip": False
  28. },
  29. {
  30. "id": id2,
  31. "name": "sql1",
  32. "op": "sql",
  33. "inputs":
  34. {
  35. "input0": ["5LH4YQVAtSrWj3cHhFOPu", 0]
  36. },
  37. "script": "select * from input0",
  38. "skip": False
  39. }
  40. ],
  41. "edges": [[id1, id2]]
  42. }
  43. data = {
  44. "dag_script": json.dumps(dag_script),
  45. "dag_uuid": self.dag_id
  46. }
  47. # dag_uuid如果存在,则更新任务,否则新建任务
  48. r = R.post(url=self.start_url, json=data)
  49. print("testing execute process completed")
  50. self.lst = lst
  51. def test_log(self):
  52. for id in self.lst:
  53. url = "{}?dag_uuid={}&node_id={}".format(self.log_url, self.dag_id, id)
  54. r = R.get(url)
  55. js = json.loads(r.text)
  56. if js["code"] == 200 and js["data"] is not None:
  57. print("testing log {} process completed".format(id))
  58. elif js["code"] == 200:
  59. print("data in log is empty")
  60. else:
  61. print("status code in log is not 200")
  62. def test_status(self):
  63. url = "{}?dag_uuid={}".format(self.status_url, self.dag_id)
  64. r = R.get(url)
  65. js = json.loads(r.text)
  66. while js["code"] != 200:
  67. r = R.get(url)
  68. js = json.loads(r.text)
  69. res = js["data"]["job"]
  70. print("now finished jobs num is {}".format(res))
  71. return res
  72. def test_result(self, index):
  73. url = "{}?dag_uuid={}&node_id={}&out_pin=0".format(self.result_url, self.dag_id, self.lst[index])
  74. r = R.get(url=url)
  75. print("result json is {}".format(r.text))
  76. js = json.loads(r.text)
  77. if js["data"]["content"] is not None:
  78. print("getting result successfully")
  79. elif js["code"] != 200:
  80. print("status code is not 200")
  81. else:
  82. print("fail to get result")
  83. def test(self):
  84. self.test_start()
  85. num = 0
  86. while self.test_status() <= 1:
  87. time.sleep(5)
  88. res = self.test_status()
  89. if num != res:
  90. print("[now]node {}".format(res))
  91. num = res
  92. print("testing status process completed")
  93. self.test_log()
  94. self.test_result(0)
  95. print("testing result process completed")
  96. if __name__ == "__main__":
  97. s = AI()
  98. s.test()