import requests as R import json from utils import ConfigOnline, ConfigOffline class Agent(object): def __init__(self): self.type = input("请输入平台:") if self.type == "offline": self.add_job_url = ConfigOffline.add_job_url self.update_url = ConfigOffline.update_url self.execute_url = ConfigOffline.execute_url self.test_url = ConfigOffline.test_url self.add_datasource_url = ConfigOffline.add_datasource_url print("加载url成功") elif self.type == "online": self.add_job_url = ConfigOnline.add_job_url self.update_url = ConfigOnline.update_url self.execute_url = ConfigOnline.execute_url self.test_url = ConfigOnline.test_url self.add_datasource_url = ConfigOnline.add_datasource_url print("加载url成功") self.job_id = 0 def test_test(self, name): data = {} if self.type == "online": data = { "datasource_name": name, "datasource": "hive", "database_name": "ailab", "jdbc_url": "10.254.20.22:7001", "comments": "", "tag": "线上", "use_ssl": 0, "kerberos": 1, "keytab": "/Users/btobab/Downloads/user.keytab", "krb5config": "/Users/btobab/Downloads/krb5.conf", "kerberos_service_name": "hadoop", "principal": "ailab@EMR-5XJSY31F" } elif self.type == "offline": data = { "datasource_name": name, "datasource": ConfigOffline.datasource, "database_name": ConfigOffline.database_name, "jdbc_username": ConfigOffline.jdbc_username, "jdbc_password": ConfigOffline.jdbc_password, "jdbc_url": ConfigOffline.jdbc_url, "comments": "", "tag": "测试", "use_ssl": ConfigOffline.use_ssl, "kerberos": ConfigOffline.kerberos } r = R.post(url=self.test_url, json=data) js = json.loads(r.text) if js["data"]["code"] == 200: print("测试连接成功") else: print("测试连接失败") print(r.text) def test_add_datasource(self, name): data = {} if self.type == "online": data = { "datasource_name": name, "datasource": ConfigOnline.datasource, "database_name": ConfigOnline.database_name, "jdbc_url": ConfigOnline.jdbc_url, "comments": "", "tag": "线上", "use_ssl": ConfigOnline.use_ssl, "kerberos": ConfigOnline.kerberos, "keytab": ConfigOnline.keytab, "krb5config": ConfigOnline.krb5config, "kerberos_service_name": ConfigOnline.kerberos_service_name, "principal": ConfigOnline.principal } elif self.type == "offline": data = { "datasource_name": name, "datasource": ConfigOffline.datasource, "database_name": ConfigOffline.database_name, "jdbc_username": ConfigOffline.jdbc_username, "jdbc_password": ConfigOffline.jdbc_password, "jdbc_url": ConfigOffline.jdbc_url, "comments": "", "tag": "测试", "use_ssl": ConfigOffline.use_ssl, "kerberos": ConfigOffline.kerberos } r = R.post(url=self.add_datasource_url, json=data) print(r.text) def test_add_job_sql2sql(self, name): data = { "cron_expression": { "cron_select_type": 1, "hour": 17, "minute": 1 }, "executor_block_strategy": "SERIAL_EXECUTION", "executor_fail_retry_count": 0, "executor_route_strategy": "FIRST", "executor_timeout": 0, "inc_start_time": None, "job_desc": name, "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"mysqlreader\",\"parameter\":{\"column\":[\"id\",\"txn_amount\",\"txn_type\",\"txn_date\"],\"where\":\"\",\"splitPk\":\"\",\"connection\":[{\"jdbcUrl\":[\"jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false\"],\"table\":[\"test_liyuqi\"]}],\"username\":\"root\",\"password\":\"happylay\"}},\"writer\":{\"name\":\"mysqlwriter\",\"parameter\":{\"connection\":[{\"jdbcUrl\":\"jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false\",\"table\":[\"test_liyuqi\"]}],\"column\":[\"id\",\"txn_amount\",\"txn_type\",\"txn_date\"],\"username\":\"root\",\"password\":\"happylay\",\"preSql\":[\"\"],\"postSql\":[\"\"]}}}],\"setting\":{\"speed\":{\"channel\":\"1\"}}}}", "partition_info": "", "partition_num": 0, "user_id": "test" } self.test(data) def test_add_job_sql2hive(self, name): data = { "partition_info": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "inc_start_time": None, "job_desc": name, "executor_route_strategy": "FIRST", "executor_block_strategy": "SERIAL_EXECUTION", "cron_expression": { "cron_select_type": 0, "hour": 1 }, "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"mysqlreader\",\"parameter\":{\"column\":[\"id\",\"name\",\"ct\"],\"where\":\"\",\"splitPk\":\"\",\"connection\":[{\"jdbcUrl\":[\"jdbc:mysql://58.87.83.174:3306/test\"],\"table\":[\"test_1\"]}],\"username\":\"root\",\"password\":\"root\"}},\"writer\":{\"name\":\"hdfswriter\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"fileType\":\"text\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileName\":\"test\",\"writeMode\":\"append\",\"fieldDelimiter\":\",\",\"column\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ct\",\"type\":\"string\"}]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}", "user_id": "test", "partition_num": 0 } self.test(data) def test_add_job_hive2sql(self, name): data = { "partition_info": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "inc_start_time": None, "job_desc": name, "executor_route_strategy": "FIRST", "executor_block_strategy": "SERIAL_EXECUTION", "cron_expression": { "cron_select_type": 0, "hour": 1 }, "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"hdfsreader\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileType\":\"text\",\"fieldDelimiter\":\",\",\"column\":[{\"index\":\"0\",\"type\":\"long\"},{\"index\":\"1\",\"type\":\"string\"},{\"index\":\"2\",\"type\":\"string\"}]}},\"writer\":{\"name\":\"mysqlwriter\",\"parameter\":{\"connection\":[{\"jdbcUrl\":\"jdbc:mysql://58.87.83.174:3306/test\",\"table\":[\"test_1\"]}],\"column\":[\"id\",\"name\",\"ct\"],\"username\":\"root\",\"password\":\"root\",\"preSql\":[\"\"],\"postSql\":[\"\"]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}", "user_id": "test", "partition_num": 0 } self.test(data) def test_add_job_hive2hive(self, name): data = { "partition_info": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "inc_start_time": None, "job_desc": name, "executor_route_strategy": "FIRST", "executor_block_strategy": "SERIAL_EXECUTION", "cron_expression": { "cron_select_type": 0, "hour": 1 }, "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"hdfsreader\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileType\":\"text\",\"fieldDelimiter\":\",\",\"column\":[{\"index\":\"0\",\"type\":\"long\"},{\"index\":\"1\",\"type\":\"string\"},{\"index\":\"2\",\"type\":\"string\"}]}},\"writer\":{\"name\":\"hdfswriter\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"fileType\":\"text\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_p\",\"fileName\":\"test\",\"writeMode\":\"append\",\"fieldDelimiter\":\",\",\"column\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ct\",\"type\":\"string\"}]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}", "user_id": "test", "partition_num": 0 } self.test(data) def test(self, json_data): r = R.post(url=self.add_job_url, json=json_data) print("输出添加任务日志") js = json.loads(r.text) print(js) self.job_id = js["data"]["id"] data = { "id": self.job_id, "trigger_status": 1 } r = R.put(url=self.update_url, json=data) print(r.text) print("更改状态成功") r = R.post(url="{}?job_id={}".format(self.execute_url, self.job_id)) print(r.text) print("启动任务成功") if __name__ == "__main__": agent = Agent() agent.test_add_datasource("test_1003844955")