perftrace.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. """
  4. Life's short, Python more.
  5. """
  6. import re
  7. import os
  8. import sys
  9. import json
  10. import uuid
  11. import signal
  12. import time
  13. import subprocess
  14. from imp import reload
  15. from optparse import OptionParser
  16. from past.builtins import raw_input
  17. reload(sys)
  18. sys.setdefaultencoding('utf8')
  19. ##begin cli & help logic
  20. def getOptionParser():
  21. usage = getUsage()
  22. parser = OptionParser(usage = usage)
  23. #rdbms reader and writer
  24. parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
  25. parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')
  26. parser.add_option('-c', '--channel', action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
  27. parser.add_option('-f', '--file', action='store', help='existing datax configuration file, include reader and writer params')
  28. parser.add_option('-t', '--type', action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
  29. parser.add_option('-d', '--delete', action='store', default='true', help='delete temporary files, the default value is true')
  30. #parser.add_option('-h', '--help', action='store', default='true', help='print usage information')
  31. return parser
  32. def getUsage():
  33. return '''
  34. The following params are available for -r --reader:
  35. [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key]
  36. *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc...
  37. *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
  38. *username: username for datasource
  39. *password: password for datasource
  40. *table: table name for read data
  41. column: column to be read, the default value is ['*']
  42. splitPk: the splitPk column of rdbms table
  43. where: limit the scope of the performance data set
  44. fetchSize: how many rows to be fetched at each communicate
  45. [these params is for stream reader, used to trace rdbms write performance]
  46. reader-sliceRecordCount: how man test data to mock(each channel), the default value is 10000
  47. reader-column : stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]
  48. The following params are available for -w --writer:
  49. [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key]
  50. datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc...
  51. *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
  52. *username: username for datasource
  53. *password: password for datasource
  54. *table: table name for write data
  55. column: column to be writed, the default value is ['*']
  56. batchSize: how many rows to be storeed at each communicate, the default value is 512
  57. preSql: prepare sql to be executed before write data, the default value is ''
  58. postSql: post sql to be executed end of write data, the default value is ''
  59. url: required for ads, pattern is ip:port
  60. schme: required for ads, ads database name
  61. [these params is for stream writer, used to trace rdbms read performance]
  62. writer-print: true means print data read from source datasource, the default value is false
  63. The following params are available global control:
  64. -c --channel: the number of concurrent tasks, the default value is 1
  65. -f --file: existing completely dataX configuration file path
  66. -t --type: test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file
  67. -h --help: print help message
  68. some demo:
  69. perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}'
  70. perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
  71. perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}'
  72. perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
  73. some example jdbc url pattern, may help:
  74. jdbc:oracle:thin:@ip:port:database
  75. jdbc:mysql://ip:port/database
  76. jdbc:sqlserver://ip:port;DatabaseName=database
  77. jdbc:postgresql://ip:port/database
  78. warn: ads url pattern is ip:port
  79. warn: test write performance will write data into your table, you can use a temporary table just for test.
  80. '''
  81. def printCopyright():
  82. DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
  83. print('Yili AI Data transfer',sys.stdout.flush())
  84. def yesNoChoice():
  85. yes = set(['yes','y', 'ye', ''])
  86. no = set(['no','n'])
  87. choice = raw_input().lower()
  88. if choice in yes:
  89. return True
  90. elif choice in no:
  91. return False
  92. else:
  93. sys.stdout.write("Please respond with 'yes' or 'no'")
  94. ##end cli & help logic
  95. ##begin process logic
  96. def suicide(signum, e):
  97. global childProcess
  98. print >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)
  99. if childProcess:
  100. childProcess.send_signal(signal.SIGQUIT)
  101. time.sleep(1)
  102. childProcess.kill()
  103. print >> sys.stderr, "DataX Process was killed ! you did ?"
  104. sys.exit(-1)
  105. def registerSignal():
  106. global childProcess
  107. signal.signal(2, suicide)
  108. signal.signal(3, suicide)
  109. signal.signal(15, suicide)
  110. def fork(command, isShell=False):
  111. global childProcess
  112. childProcess = subprocess.Popen(command, shell = isShell)
  113. registerSignal()
  114. (stdout, stderr) = childProcess.communicate()
  115. #阻塞直到子进程结束
  116. childProcess.wait()
  117. return childProcess.returncode
  118. ##end process logic
  119. ##begin datax json generate logic
  120. #warn: if not '': -> true; if not None: -> true
  121. def notNone(obj, context):
  122. if not obj:
  123. raise Exception("Configuration property [%s] could not be blank!" % (context))
  124. def attributeNotNone(obj, attributes):
  125. for key in attributes:
  126. notNone(obj.get(key), key)
  127. def isBlank(value):
  128. if value is None or len(value.strip()) == 0:
  129. return True
  130. return False
  131. def parsePluginName(jdbcUrl, pluginType):
  132. import re
  133. #warn: drds
  134. name = 'pluginName'
  135. mysqlRegex = re.compile('jdbc:(mysql)://.*')
  136. if (mysqlRegex.match(jdbcUrl)):
  137. name = 'mysql'
  138. postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
  139. if (postgresqlRegex.match(jdbcUrl)):
  140. name = 'postgresql'
  141. oracleRegex = re.compile('jdbc:(oracle):.*')
  142. if (oracleRegex.match(jdbcUrl)):
  143. name = 'oracle'
  144. sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
  145. if (sqlserverRegex.match(jdbcUrl)):
  146. name = 'sqlserver'
  147. db2Regex = re.compile('jdbc:(db2)://.*')
  148. if (db2Regex.match(jdbcUrl)):
  149. name = 'db2'
  150. return "%s%s" % (name, pluginType)
  151. def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
  152. dataxTemplate = {
  153. "job": {
  154. "setting": {
  155. "speed": {
  156. "channel": 1
  157. }
  158. },
  159. "content": [
  160. {
  161. "reader": {
  162. "name": "",
  163. "parameter": {
  164. "username": "",
  165. "password": "",
  166. "sliceRecordCount": "10000",
  167. "column": [
  168. "*"
  169. ],
  170. "connection": [
  171. {
  172. "table": [],
  173. "jdbcUrl": []
  174. }
  175. ]
  176. }
  177. },
  178. "writer": {
  179. "name": "",
  180. "parameter": {
  181. "print": "false",
  182. "connection": [
  183. {
  184. "table": [],
  185. "jdbcUrl": ''
  186. }
  187. ]
  188. }
  189. }
  190. }
  191. ]
  192. }
  193. }
  194. dataxTemplate['job']['setting']['speed']['channel'] = channel
  195. dataxTemplateContent = dataxTemplate['job']['content'][0]
  196. pluginName = ''
  197. if paramsDict.get('datasourceType'):
  198. pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
  199. elif paramsDict.get('jdbcUrl'):
  200. pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
  201. elif paramsDict.get('url'):
  202. pluginName = 'adswriter'
  203. theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
  204. dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
  205. dataxPluginParamsContent.update(paramsDict)
  206. dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')
  207. if readerOrWriter == 'reader':
  208. dataxTemplateContent.get('reader')['name'] = pluginName
  209. dataxTemplateContent.get('writer')['name'] = 'streamwriter'
  210. if paramsDict.get('writer-print'):
  211. dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
  212. del dataxPluginParamsContent['writer-print']
  213. del dataxPluginParamsContentOtherSide['connection']
  214. if readerOrWriter == 'writer':
  215. dataxTemplateContent.get('reader')['name'] = 'streamreader'
  216. dataxTemplateContent.get('writer')['name'] = pluginName
  217. if paramsDict.get('reader-column'):
  218. dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
  219. del dataxPluginParamsContent['reader-column']
  220. if paramsDict.get('reader-sliceRecordCount'):
  221. dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
  222. del dataxPluginParamsContent['reader-sliceRecordCount']
  223. del dataxPluginParamsContentOtherSide['connection']
  224. if paramsDict.get('jdbcUrl'):
  225. if readerOrWriter == 'reader':
  226. dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
  227. else:
  228. dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
  229. if paramsDict.get('table'):
  230. dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])
  231. traceJobJson = json.dumps(dataxTemplate, indent = 4)
  232. return traceJobJson
  233. def isUrl(path):
  234. if not path:
  235. return False
  236. if not isinstance(path, str):
  237. raise Exception('Configuration file path required for the string, you configure is:%s' % path)
  238. m = re.match(r"^http[s]?://\S+\w*", path.lower())
  239. if m:
  240. return True
  241. else:
  242. return False
  243. def readJobJsonFromLocal(jobConfigPath):
  244. jobConfigContent = None
  245. jobConfigPath = os.path.abspath(jobConfigPath)
  246. file = open(jobConfigPath)
  247. try:
  248. jobConfigContent = file.read()
  249. finally:
  250. file.close()
  251. if not jobConfigContent:
  252. raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
  253. return jobConfigContent
  254. def readJobJsonFromRemote(jobConfigPath):
  255. import urllib
  256. conn = urllib.urlopen(jobConfigPath)
  257. jobJson = conn.read()
  258. return jobJson
  259. def parseJson(strConfig, context):
  260. try:
  261. return json.loads(strConfig)
  262. except Exception as e:
  263. import traceback
  264. traceback.print_exc()
  265. sys.stdout.flush()
  266. print(">> sys.stderr, '%s %s need in line with json syntax'" % (context, strConfig))
  267. sys.exit(-1)
  268. def convert(options, args):
  269. traceJobJson = ''
  270. if options.file:
  271. if isUrl(options.file):
  272. traceJobJson = readJobJsonFromRemote(options.file)
  273. else:
  274. traceJobJson = readJobJsonFromLocal(options.file)
  275. traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
  276. attributeNotNone(traceJobDict, ['job'])
  277. attributeNotNone(traceJobDict['job'], ['content'])
  278. attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
  279. attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
  280. attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
  281. if options.type == 'reader':
  282. traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
  283. if options.reader:
  284. traceReaderDict = parseJson(options.reader, 'reader config')
  285. if traceReaderDict.get('writer-print') is not None:
  286. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
  287. else:
  288. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
  289. else:
  290. traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
  291. elif options.type == 'writer':
  292. traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
  293. if options.writer:
  294. traceWriterDict = parseJson(options.writer, 'writer config')
  295. if traceWriterDict.get('reader-column'):
  296. traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
  297. if traceWriterDict.get('reader-sliceRecordCount'):
  298. traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
  299. else:
  300. columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
  301. streamReaderColumn = []
  302. for i in range(columnSize):
  303. streamReaderColumn.append({"type": "long", "random": "2,10"})
  304. traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
  305. traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
  306. else:
  307. pass#do nothing
  308. return json.dumps(traceJobDict, indent = 4)
  309. elif options.reader:
  310. traceReaderDict = parseJson(options.reader, 'reader config')
  311. return renderDataXJson(traceReaderDict, 'reader', options.channel)
  312. elif options.writer:
  313. traceWriterDict = parseJson(options.writer, 'writer config')
  314. return renderDataXJson(traceWriterDict, 'writer', options.channel)
  315. else:
  316. print getUsage()
  317. sys.exit(-1)
  318. #dataxParams = {}
  319. #for opt, value in options.__dict__.items():
  320. # dataxParams[opt] = value
  321. ##end datax json generate logic
  322. if __name__ == "__main__":
  323. printCopyright()
  324. parser = getOptionParser()
  325. options, args = parser.parse_args(sys.argv[1:])
  326. #print options, args
  327. dataxTraceJobJson = convert(options, args)
  328. #由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性
  329. dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
  330. jobConfigOk = True
  331. if os.path.exists(dataxJobPath):
  332. print("file already exists, truncate and rewrite it? %s" % dataxJobPath)
  333. if yesNoChoice():
  334. jobConfigOk = True
  335. else:
  336. print("exit failed, because of file conflict")
  337. sys.exit(-1)
  338. fileWriter = open(dataxJobPath, 'w')
  339. fileWriter.write(dataxTraceJobJson)
  340. fileWriter.close()
  341. print("trace environments:")
  342. print("dataxJobPath: %s" % dataxJobPath)
  343. dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  344. print("dataxHomePath: %s" % dataxHomePath)
  345. dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
  346. print("dataxCommand: %s" % dataxCommand)
  347. returncode = fork(dataxCommand, True)
  348. if options.delete == 'true':
  349. os.remove(dataxJobPath)
  350. sys.exit(returncode)