datax.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import requests as R
  2. import json
  3. from utils import ConfigOnline, ConfigOffline
  4. class Agent(object):
  5. def __init__(self):
  6. self.type = input("请输入平台:")
  7. if self.type == "offline":
  8. self.add_job_url = ConfigOffline.add_job_url
  9. self.update_url = ConfigOffline.update_url
  10. self.execute_url = ConfigOffline.execute_url
  11. self.test_url = ConfigOffline.test_url
  12. self.add_datasource_url = ConfigOffline.add_datasource_url
  13. print("加载url成功")
  14. elif self.type == "online":
  15. self.add_job_url = ConfigOnline.add_job_url
  16. self.update_url = ConfigOnline.update_url
  17. self.execute_url = ConfigOnline.execute_url
  18. self.test_url = ConfigOnline.test_url
  19. self.add_datasource_url = ConfigOnline.add_datasource_url
  20. print("加载url成功")
  21. self.job_id = 0
  22. def test_test(self, name):
  23. data = {}
  24. if self.type == "online":
  25. data = {
  26. "datasource_name": name,
  27. "datasource": "hive",
  28. "database_name": "ailab",
  29. "jdbc_url": "10.254.20.22:7001",
  30. "comments": "",
  31. "tag": "线上",
  32. "use_ssl": 0,
  33. "kerberos": 1,
  34. "keytab": "/Users/btobab/Downloads/user.keytab",
  35. "krb5config": "/Users/btobab/Downloads/krb5.conf",
  36. "kerberos_service_name": "hadoop",
  37. "principal": "ailab@EMR-5XJSY31F"
  38. }
  39. elif self.type == "offline":
  40. data = {
  41. "datasource_name": name,
  42. "datasource": ConfigOffline.datasource,
  43. "database_name": ConfigOffline.database_name,
  44. "jdbc_username": ConfigOffline.jdbc_username,
  45. "jdbc_password": ConfigOffline.jdbc_password,
  46. "jdbc_url": ConfigOffline.jdbc_url,
  47. "comments": "",
  48. "tag": "测试",
  49. "use_ssl": ConfigOffline.use_ssl,
  50. "kerberos": ConfigOffline.kerberos
  51. }
  52. r = R.post(url=self.test_url, json=data)
  53. js = json.loads(r.text)
  54. if js["data"]["code"] == 200:
  55. print("测试连接成功")
  56. else:
  57. print("测试连接失败")
  58. print(r.text)
  59. def test_add_datasource(self, name):
  60. data = {}
  61. if self.type == "online":
  62. data = {
  63. "datasource_name": name,
  64. "datasource": ConfigOnline.datasource,
  65. "database_name": ConfigOnline.database_name,
  66. "jdbc_url": ConfigOnline.jdbc_url,
  67. "comments": "",
  68. "tag": "线上",
  69. "use_ssl": ConfigOnline.use_ssl,
  70. "kerberos": ConfigOnline.kerberos,
  71. "keytab": ConfigOnline.keytab,
  72. "krb5config": ConfigOnline.krb5config,
  73. "kerberos_service_name": ConfigOnline.kerberos_service_name,
  74. "principal": ConfigOnline.principal
  75. }
  76. elif self.type == "offline":
  77. data = {
  78. "datasource_name": name,
  79. "datasource": ConfigOffline.datasource,
  80. "database_name": ConfigOffline.database_name,
  81. "jdbc_username": ConfigOffline.jdbc_username,
  82. "jdbc_password": ConfigOffline.jdbc_password,
  83. "jdbc_url": ConfigOffline.jdbc_url,
  84. "comments": "",
  85. "tag": "测试",
  86. "use_ssl": ConfigOffline.use_ssl,
  87. "kerberos": ConfigOffline.kerberos
  88. }
  89. r = R.post(url=self.add_datasource_url, json=data)
  90. print(r.text)
  91. def test_add_job_sql2sql(self, name):
  92. data = {
  93. "cron_expression":
  94. {
  95. "cron_select_type": 1, "hour": 17, "minute": 1
  96. },
  97. "executor_block_strategy": "SERIAL_EXECUTION",
  98. "executor_fail_retry_count": 0,
  99. "executor_route_strategy": "FIRST",
  100. "executor_timeout": 0,
  101. "inc_start_time": None,
  102. "job_desc": name,
  103. "job_json":
  104. "{\"job\":{\"content\":[{\"reader\":{\"name\":\"mysqlreader\",\"parameter\":{\"column\":[\"id\",\"txn_amount\",\"txn_type\",\"txn_date\"],\"where\":\"\",\"splitPk\":\"\",\"connection\":[{\"jdbcUrl\":[\"jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false\"],\"table\":[\"test_liyuqi\"]}],\"username\":\"root\",\"password\":\"happylay\"}},\"writer\":{\"name\":\"mysqlwriter\",\"parameter\":{\"connection\":[{\"jdbcUrl\":\"jdbc:mysql://192.168.199.107:10086/test-db?useSSL=false\",\"table\":[\"test_liyuqi\"]}],\"column\":[\"id\",\"txn_amount\",\"txn_type\",\"txn_date\"],\"username\":\"root\",\"password\":\"happylay\",\"preSql\":[\"\"],\"postSql\":[\"\"]}}}],\"setting\":{\"speed\":{\"channel\":\"1\"}}}}",
  105. "partition_info": "",
  106. "partition_num": 0,
  107. "user_id": "test"
  108. }
  109. self.test(data)
  110. def test_add_job_sql2hive(self, name):
  111. data = {
  112. "partition_info": "",
  113. "executor_timeout": 0,
  114. "executor_fail_retry_count": 0,
  115. "inc_start_time": None,
  116. "job_desc": name,
  117. "executor_route_strategy": "FIRST",
  118. "executor_block_strategy": "SERIAL_EXECUTION",
  119. "cron_expression": {
  120. "cron_select_type": 0,
  121. "hour": 1
  122. },
  123. "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"mysqlreader\",\"parameter\":{\"column\":[\"id\",\"name\",\"ct\"],\"where\":\"\",\"splitPk\":\"\",\"connection\":[{\"jdbcUrl\":[\"jdbc:mysql://58.87.83.174:3306/test\"],\"table\":[\"test_1\"]}],\"username\":\"root\",\"password\":\"root\"}},\"writer\":{\"name\":\"hdfswriter\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"fileType\":\"text\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileName\":\"test\",\"writeMode\":\"append\",\"fieldDelimiter\":\",\",\"column\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ct\",\"type\":\"string\"}]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}",
  124. "user_id": "test",
  125. "partition_num": 0
  126. }
  127. self.test(data)
  128. def test_add_job_hive2sql(self, name):
  129. data = {
  130. "partition_info": "",
  131. "executor_timeout": 0,
  132. "executor_fail_retry_count": 0,
  133. "inc_start_time": None,
  134. "job_desc": name,
  135. "executor_route_strategy": "FIRST",
  136. "executor_block_strategy": "SERIAL_EXECUTION",
  137. "cron_expression": {
  138. "cron_select_type": 0,
  139. "hour": 1
  140. },
  141. "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"hdfsreader\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileType\":\"text\",\"fieldDelimiter\":\",\",\"column\":[{\"index\":\"0\",\"type\":\"long\"},{\"index\":\"1\",\"type\":\"string\"},{\"index\":\"2\",\"type\":\"string\"}]}},\"writer\":{\"name\":\"mysqlwriter\",\"parameter\":{\"connection\":[{\"jdbcUrl\":\"jdbc:mysql://58.87.83.174:3306/test\",\"table\":[\"test_1\"]}],\"column\":[\"id\",\"name\",\"ct\"],\"username\":\"root\",\"password\":\"root\",\"preSql\":[\"\"],\"postSql\":[\"\"]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}",
  142. "user_id": "test",
  143. "partition_num": 0
  144. }
  145. self.test(data)
  146. def test_add_job_hive2hive(self, name):
  147. data = {
  148. "partition_info": "",
  149. "executor_timeout": 0,
  150. "executor_fail_retry_count": 0,
  151. "inc_start_time": None,
  152. "job_desc": name,
  153. "executor_route_strategy": "FIRST",
  154. "executor_block_strategy": "SERIAL_EXECUTION",
  155. "cron_expression": {
  156. "cron_select_type": 0,
  157. "hour": 1
  158. },
  159. "job_json": "{\"job\":{\"content\":[{\"reader\":{\"name\":\"hdfsreader\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_2\",\"fileType\":\"text\",\"fieldDelimiter\":\",\",\"column\":[{\"index\":\"0\",\"type\":\"long\"},{\"index\":\"1\",\"type\":\"string\"},{\"index\":\"2\",\"type\":\"string\"}]}},\"writer\":{\"name\":\"hdfswriter\",\"parameter\":{\"defaultFS\":\"hdfs://HDFS8000912\",\"hadoopConfig\":{\"dfs.nameservices\":\"HDFS8000912\",\"dfs.ha.namenodes.HDFS8000912\":\"nn1,nn2\",\"dfs.namenode.rpc-address.HDFS8000912.nn1\":\"10.254.20.18:4007\",\"dfs.namenode.rpc-address.HDFS8000912.nn2\":\"10.254.20.22:4007\",\"dfs.client.failover.proxy.provider.HDFS8000912\":\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"},\"haveKerberos\":\"true\",\"kerberosKeytabFilePath\":\"/workspace/confs/test/user.keytab\",\"kerberosPrincipal\":\"ailab@EMR-5XJSY31F\",\"fileType\":\"text\",\"path\":\"/usr/hive/warehouse/ailab.db/my_test_p\",\"fileName\":\"test\",\"writeMode\":\"append\",\"fieldDelimiter\":\",\",\"column\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ct\",\"type\":\"string\"}]}}}],\"setting\":{\"speed\":{\"channel\":\"5\"}}}}",
  160. "user_id": "test",
  161. "partition_num": 0
  162. }
  163. self.test(data)
  164. def test(self, json_data):
  165. r = R.post(url=self.add_job_url, json=json_data)
  166. print("输出添加任务日志")
  167. js = json.loads(r.text)
  168. print(js)
  169. self.job_id = js["data"]["id"]
  170. data = {
  171. "id": self.job_id,
  172. "trigger_status": 1
  173. }
  174. r = R.put(url=self.update_url, json=data)
  175. print(r.text)
  176. print("更改状态成功")
  177. r = R.post(url="{}?job_id={}".format(self.execute_url, self.job_id))
  178. print(r.text)
  179. print("启动任务成功")
  180. if __name__ == "__main__":
  181. agent = Agent()
  182. agent.test_add_datasource("test_1003844955")