123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- import requests as R
- import json
- from utils import ConfigOnline, ConfigOffline
- class Agent(object):
- def __init__(self):
- self.type = input("请输入平台:")
- if self.type == "offline":
- self.add_job_url = ConfigOffline.add_job_url
- self.update_url = ConfigOffline.update_url
- self.execute_url = ConfigOffline.execute_url
- self.test_url = ConfigOffline.test_url
- self.add_datasource_url = ConfigOffline.add_datasource_url
- print("加载url成功")
- elif self.type == "online":
- self.add_job_url = ConfigOnline.add_job_url
- self.update_url = ConfigOnline.update_url
- self.execute_url = ConfigOnline.execute_url
- self.test_url = ConfigOnline.test_url
- self.add_datasource_url = ConfigOnline.add_datasource_url
- print("加载url成功")
- self.job_id = 0
- def test_test(self, name):
- data = {}
- if self.type == "online":
- data = {
- "datasource_name": name,
- "datasource": "hive",
- "database_name": "ailab",
- "jdbc_url": "10.254.20.22:7001",
- "comments": "",
- "tag": "线上",
- "use_ssl": 0,
- "kerberos": 1,
- "keytab": "/Users/btobab/Downloads/user.keytab",
- "krb5config": "/Users/btobab/Downloads/krb5.conf",
- "kerberos_service_name": "hadoop",
- "principal": "ailab@EMR-5XJSY31F"
- }
- elif self.type == "offline":
- data = {
- "datasource_name": name,
- "datasource": ConfigOffline.datasource,
- "database_name": ConfigOffline.database_name,
- "jdbc_username": ConfigOffline.jdbc_username,
- "jdbc_password": ConfigOffline.jdbc_password,
- "jdbc_url": ConfigOffline.jdbc_url,
- "comments": "",
- "tag": "测试",
- "use_ssl": ConfigOffline.use_ssl,
- "kerberos": ConfigOffline.kerberos
- }
- r = R.post(url=self.test_url, json=data)
- js = json.loads(r.text)
- if js["data"]["code"] == 200:
- print("测试连接成功")
- else:
- print("测试连接失败")
- print(r.text)
- def test_add_datasource(self, name):
- data = {}
- if self.type == "online":
- data = {
- "datasource_name": name,
- "datasource": ConfigOnline.datasource,
- "database_name": ConfigOnline.database_name,
- "jdbc_url": ConfigOnline.jdbc_url,
- "comments": "",
- "tag": "线上",
- "use_ssl": ConfigOnline.use_ssl,
- "kerberos": ConfigOnline.kerberos,
- "keytab": ConfigOnline.keytab,
- "krb5config": ConfigOnline.krb5config,
- "kerberos_service_name": ConfigOnline.kerberos_service_name,
- "principal": ConfigOnline.principal
- }
- elif self.type == "offline":
- data = {
- "datasource_name": name,
- "datasource": ConfigOffline.datasource,
- "database_name": ConfigOffline.database_name,
- "jdbc_username": ConfigOffline.jdbc_username,
- "jdbc_password": ConfigOffline.jdbc_password,
- "jdbc_url": ConfigOffline.jdbc_url,
- "comments": "",
- "tag": "测试",
- "use_ssl": ConfigOffline.use_ssl,
- "kerberos": ConfigOffline.kerberos
- }
- r = R.post(url=self.add_datasource_url, json=data)
- print(r.text)
- def test_add_job_sql2sql(self, name):
- data = {
- "cron_expression":
- {
- "cron_select_type": 1, "hour": 17, "minute": 1
- },
- "executor_block_strategy": "SERIAL_EXECUTION",
- "executor_fail_retry_count": 0,
- "executor_route_strategy": "FIRST",
- "executor_timeout": 0,
- "inc_start_time": None,
- "job_desc": name,
- "job_json":
- "{\"job\":{\"content\":[{\"reader\":{\"name\":\"mysqlreader\",\"parameter\":{\"column\":[\"id\",\"txn_amount\",\"txn_type\",\"txn_date\"],\"where\":\"\",\"splitPk\":\"\",\"connection\":[{\"jdbcUrl\":[\"jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false\"],\"table\":[\"test_liyuqi\"]}],\"username\":\"root\",\"password\":\"happylay\"}},\"writer\":{\"name\":\"mysqlwriter\",\"parameter\":{\"connection\":[{\"jdbcUrl\":\"jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false\",\"table\":[\"test_liyuqi\"]}],\"column\":[\"id\",\"txn_amount\",\"txn_type\",\"txn_date\"],\"username\":\"root\",\"password\":\"happylay\",\"preSql\":[\"\"],\"postSql\":[\"\"]}}}],\"setting\":{\"speed\":{\"channel\":\"1\"}}}}",
- "partition_info": "",
- "partition_num": 0,
- "user_id": "test"
- }
- self.test(data)
- def test_add_job_sql2hive(self, name):
- data = {
- "partition_info": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "inc_start_time": None,
- "job_desc": name,
- "executor_route_strategy": "FIRST",
- "executor_block_strategy": "SERIAL_EXECUTION",
- "cron_expression": {
- "cron_select_type": 0,
- "hour": 1
- },
- "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"mysqlreader\",\"parameter\":{\"column\":[\"id\",\"name\",\"ct\"],\"where\":\"\",\"splitPk\":\"\",\"connection\":[{\"jdbcUrl\":[\"jdbc:mysql://58.87.83.174:3306/test\"],\"table\":[\"test_1\"]}],\"username\":\"root\",\"password\":\"root\"}},\"writer\":{\"name\":\"hdfswriter\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"fileType\":\"text\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileName\":\"test\",\"writeMode\":\"append\",\"fieldDelimiter\":\",\",\"column\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ct\",\"type\":\"string\"}]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}",
- "user_id": "test",
- "partition_num": 0
- }
- self.test(data)
- def test_add_job_hive2sql(self, name):
- data = {
- "partition_info": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "inc_start_time": None,
- "job_desc": name,
- "executor_route_strategy": "FIRST",
- "executor_block_strategy": "SERIAL_EXECUTION",
- "cron_expression": {
- "cron_select_type": 0,
- "hour": 1
- },
- "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"hdfsreader\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileType\":\"text\",\"fieldDelimiter\":\",\",\"column\":[{\"index\":\"0\",\"type\":\"long\"},{\"index\":\"1\",\"type\":\"string\"},{\"index\":\"2\",\"type\":\"string\"}]}},\"writer\":{\"name\":\"mysqlwriter\",\"parameter\":{\"connection\":[{\"jdbcUrl\":\"jdbc:mysql://58.87.83.174:3306/test\",\"table\":[\"test_1\"]}],\"column\":[\"id\",\"name\",\"ct\"],\"username\":\"root\",\"password\":\"root\",\"preSql\":[\"\"],\"postSql\":[\"\"]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}",
- "user_id": "test",
- "partition_num": 0
- }
- self.test(data)
- def test_add_job_hive2hive(self, name):
- data = {
- "partition_info": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "inc_start_time": None,
- "job_desc": name,
- "executor_route_strategy": "FIRST",
- "executor_block_strategy": "SERIAL_EXECUTION",
- "cron_expression": {
- "cron_select_type": 0,
- "hour": 1
- },
- "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"hdfsreader\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileType\":\"text\",\"fieldDelimiter\":\",\",\"column\":[{\"index\":\"0\",\"type\":\"long\"},{\"index\":\"1\",\"type\":\"string\"},{\"index\":\"2\",\"type\":\"string\"}]}},\"writer\":{\"name\":\"hdfswriter\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"fileType\":\"text\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_p\",\"fileName\":\"test\",\"writeMode\":\"append\",\"fieldDelimiter\":\",\",\"column\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ct\",\"type\":\"string\"}]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}",
- "user_id": "test",
- "partition_num": 0
- }
- self.test(data)
- def test(self, json_data):
- r = R.post(url=self.add_job_url, json=json_data)
- print("输出添加任务日志")
- js = json.loads(r.text)
- print(js)
- self.job_id = js["data"]["id"]
- data = {
- "id": self.job_id,
- "trigger_status": 1
- }
- r = R.put(url=self.update_url, json=data)
- print(r.text)
- print("更改状态成功")
- r = R.post(url="{}?job_id={}".format(self.execute_url, self.job_id))
- print(r.text)
- print("启动任务成功")
- if __name__ == "__main__":
- agent = Agent()
- agent.test_add_datasource("test_1003844955")
|