yuqili 2 anni fa
parent
commit
739567b5d9
2 ha cambiato i file con 143 aggiunte e 30 eliminazioni
  1. 31 30
      README.md
  2. 112 0
      main.py

+ 31 - 30
README.md

@@ -9,50 +9,51 @@
 #### 项目结构
 
 ```shell
-├── datax.py
 ├── Dockerfile
-├── Dockerfile_krb5
+├── README.md
+├── datax.py
 ├── environment.yml
 ├── krb5.conf
 ├── main.py
-├── test.json
 ├── user.keytab
 └── utils.py
 ```
 
-#### 构建镜像
-
-`docker build -t test_agent:v1 .`
-
-#### 启动容器
-
-`docker run -it test_agent:v1 zsh`
-
-#### 开启VPN
-
-重新启动一个终端,`ssh sxwl1070@192.168.199.107`,密码为`sx`
-
-输入指令`sudo openvpn --config ./openvpn-client.ovpn`
+#### 流程
 
-密码为`sx`
+##### aihub平台
 
-然后输入用户名`liangzhongquan`
-
-最后输入密码`Hello123!`,即可启动VPN
+```shell
+# 构建镜像
+docker build -t test_agent:v1 .
+# 启动容器
+docker run -it test_agent:v1 zsh
+
+# 另起一个终端,开启VPN
+ssh sxwl1070@192.168.199.107
+# 密码:sx
+sudo openvpn --config ./openvpn-client.ovpn
+# 用户名
+liangzhongquan
+# 密码
+Hello123!
+
+# 返回容器,运行代码
+python datax.py
+# 输入所选的平台
+online / offline
+```
 
-#### 运行容器中的代码
+##### jupyter平台
 
-然后返回刚才的镜像终端中,
+```shell
+```
 
-运行`python datax.py`,即可进入选择阶段:
 
-`请输入平台:`
 
-输入`online`即为线上环境,程序会自动加载线上AI平台的各个URL,从而应对后续的各个测试流程
+### 函数介绍
 
-输入`offline`即为线下环境,程序会自动加载线上AI平台的各个URL,从而应对后续的各个测试流程
 
-### 函数介绍
 
 ```python
 class Agent(object):
@@ -81,11 +82,11 @@ if __name__ == "__main__":
 
 `test_add_job_sql2sql`:测试AI平台的添加任务,启动任务,执行任务功能,数据迁移模式为从mysql数据源到mysql数据源
 
-`test_add_job_sql2hive`:测试AI平台的添加任务,启动任务,执行任务功能,数据迁移模式为从mysql数据源到hive数据源
+`test_add_job_sql2hive`:数据迁移模式为从mysql数据源到hive数据源
 
-`test_add_job_hive2sql`:测试AI平台的添加任务,启动任务,执行任务功能,数据迁移模式为从hive数据源到mysql数据源
+`test_add_job_hive2sql`:数据迁移模式为从hive数据源到mysql数据源
 
-`test_add_hive2hive`:测试AI平台的添加任务,启动任务,执行任务功能,数据迁移模式为从hive数据源到hive数据源
+`test_add_hive2hive`:数据迁移模式为从hive数据源到hive数据源
 
 用户可以根据自身需要在`__main__`方法中添加相应的指令,从而对AI平台进行测试。
 

+ 112 - 0
main.py

@@ -0,0 +1,112 @@
+import json
+import requests as R
+import uuid
+import time
+
+
+class AI(object):
+    def __init__(self):
+        self.start_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/execute"
+        self.log_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/node_log"
+        self.status_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/debug_status"
+        self.result_url = "http://aihub-dag-test.digitalyili.com/jpt/dag/node_result"
+        self.lst = []
+        self.dag_id = self.generate_uuid()
+
+    def generate_uuid(self):
+        return str(uuid.uuid4()).replace("-", "")
+
+    def test_start(self):
+        id1 = self.generate_uuid()
+        id2 = self.generate_uuid()
+        lst = [id1, id2]
+
+        dag_script = {
+            "sub_nodes":
+                [
+                    {
+                        "id": id1,
+                        "name": "is1",
+                        "op": "sql",
+                        "script": "select feature1, feature10, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9, label, uuid from ailab.train1",
+                        "skip": False
+                    },
+
+                    {
+                        "id": id2,
+                        "name": "sql1",
+                        "op": "sql",
+                        "inputs":
+                            {
+                                "input0": ["5LH4YQVAtSrWj3cHhFOPu", 0]
+                            },
+                        "script": "select * from input0",
+                        "skip": False
+                    }
+                ],
+            "edges": [[id1, id2]]
+        }
+        data = {
+            "dag_script": json.dumps(dag_script),
+            "dag_uuid": self.dag_id
+        }
+        # dag_uuid如果存在,则更新任务,否则新建任务
+        r = R.post(url=self.start_url, json=data)
+        with open("test.json", "w", encoding="utf8") as f:
+            f.write(r.text)
+        print("testing execute process completed")
+        self.lst = lst
+
+    def test_log(self):
+        for id in self.lst:
+            url = "{}?dag_uuid={}&node_id={}".format(self.log_url, self.dag_id, id)
+            r = R.get(url)
+            js = json.loads(r.text)
+            if js["code"] == 200 and js["data"] is not None:
+                print("testing log {} process completed".format(id))
+            elif js["code"] == 200:
+                print("data in log is empty")
+            else:
+                print("status code in log is not 200")
+
+    def test_status(self):
+        url = "{}?dag_uuid={}".format(self.status_url, self.dag_id)
+        r = R.get(url)
+        js = json.loads(r.text)
+        while js["code"] != 200:
+            r = R.get(url)
+            js = json.loads(r.text)
+        res = js["data"]["job"]
+        print("now finished jobs num is {}".format(res))
+        return res
+
+    def test_result(self, index):
+        url = "{}?dag_uuid={}&node_id={}&out_pin=0".format(self.result_url, self.dag_id, self.lst[index])
+        r = R.get(url=url)
+        print("result json is {}".format(r.text))
+        js = json.loads(r.text)
+        if js["data"]["content"] is not None:
+            print("getting result successfully")
+        elif js["code"] != 200:
+            print("status code is not 200")
+        else:
+            print("fail to get result")
+
+    def test(self):
+        self.test_start()
+        num = 0
+        while self.test_status() <= 1:
+            time.sleep(5)
+            res = self.test_status()
+            if num != res:
+                print("[now]node {}".format(res))
+                num = res
+        print("testing status process completed")
+        self.test_log()
+        self.test_result(0)
+        print("testing result process completed")
+
+
+if __name__ == "__main__":
+    s = AI()
+    s.test()