dxprof.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. #! /usr/bin/env python
  2. # vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:
  3. import re
  4. import sys
  5. import time
  6. REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
  7. REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
  8. REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
  9. REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
  10. REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
  11. REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
  12. REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
  13. REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)
  14. # {{{ function parse_timestamp() #
  15. def parse_timestamp(line):
  16. try:
  17. ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
  18. except:
  19. ts = 0
  20. return ts
  21. # }}} #
  22. # {{{ function parse_query_host() #
  23. def parse_query_host(line):
  24. ori = REG_SQL_JDBC.search(line)
  25. if (not ori):
  26. return ''
  27. ori = ori.group(1).split('?')[0]
  28. off = ori.find('@')
  29. if (off > -1):
  30. ori = ori[off+1:len(ori)]
  31. else:
  32. off = ori.find('//')
  33. if (off > -1):
  34. ori = ori[off+2:len(ori)]
  35. return ori.lower()
  36. # }}} #
  37. # {{{ function parse_query_table() #
  38. def parse_query_table(line):
  39. ori = REG_SQL_PATH.search(line)
  40. return (ori and ori.group(1).lower()) or ''
  41. # }}} #
  42. # {{{ function parse_reader_task() #
  43. def parse_task(fname):
  44. global LAST_SQL_UUID
  45. global LAST_COMMIT_UUID
  46. global DATAX_JOBDICT
  47. global DATAX_JOBDICT_COMMIT
  48. global UNIXTIME
  49. LAST_SQL_UUID = ''
  50. DATAX_JOBDICT = {}
  51. LAST_COMMIT_UUID = ''
  52. DATAX_JOBDICT_COMMIT = {}
  53. UNIXTIME = int(time.time())
  54. with open(fname, 'r') as f:
  55. for line in f.readlines():
  56. line = line.strip()
  57. if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
  58. DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
  59. LAST_SQL_UUID = ''
  60. if line.find('CommonRdbmsReader$Task') > 0:
  61. parse_read_task(line)
  62. elif line.find('commit blocks') > 0:
  63. parse_write_task(line)
  64. else:
  65. continue
  66. # }}} #
  67. # {{{ function parse_read_task() #
  68. def parse_read_task(line):
  69. ser = REG_SQL_UUID.search(line)
  70. if not ser:
  71. return
  72. LAST_SQL_UUID = ser.group()
  73. if REG_SQL_WAKE.search(line):
  74. DATAX_JOBDICT[LAST_SQL_UUID] = {
  75. 'stat' : 'R',
  76. 'wake' : parse_timestamp(line),
  77. 'done' : UNIXTIME,
  78. 'host' : parse_query_host(line),
  79. 'path' : parse_query_table(line)
  80. }
  81. elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
  82. DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
  83. DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
  84. # }}} #
  85. # {{{ function parse_write_task() #
  86. def parse_write_task(line):
  87. ser = REG_COMMIT_UUID.search(line)
  88. if not ser:
  89. return
  90. LAST_COMMIT_UUID = ser.group()
  91. if REG_COMMIT_WAKE.search(line):
  92. DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = {
  93. 'stat' : 'R',
  94. 'wake' : parse_timestamp(line),
  95. 'done' : UNIXTIME,
  96. }
  97. elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
  98. DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
  99. DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
  100. # }}} #
  101. # {{{ function result_analyse() #
  102. def result_analyse():
  103. def compare(a, b):
  104. return b['cost'] - a['cost']
  105. tasklist = []
  106. hostsmap = {}
  107. statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
  108. tasklist_commit = []
  109. statvars_commit = {'sum' : 0, 'cnt' : 0}
  110. for idx in DATAX_JOBDICT:
  111. item = DATAX_JOBDICT[idx]
  112. item['uuid'] = idx;
  113. item['cost'] = item['done'] - item['wake']
  114. tasklist.append(item);
  115. if (not (item['host'] in hostsmap)):
  116. hostsmap[item['host']] = 1
  117. statvars['svr'] += 1
  118. if (item['cost'] > -1 and item['cost'] < 864000):
  119. statvars['sum'] += item['cost']
  120. statvars['cnt'] += 1
  121. statvars['max'] = max(statvars['max'], item['done'])
  122. statvars['min'] = min(statvars['min'], item['wake'])
  123. for idx in DATAX_JOBDICT_COMMIT:
  124. itemc = DATAX_JOBDICT_COMMIT[idx]
  125. itemc['uuid'] = idx
  126. itemc['cost'] = itemc['done'] - itemc['wake']
  127. tasklist_commit.append(itemc)
  128. if (itemc['cost'] > -1 and itemc['cost'] < 864000):
  129. statvars_commit['sum'] += itemc['cost']
  130. statvars_commit['cnt'] += 1
  131. ttl = (statvars['max'] - statvars['min']) or 1
  132. idx = float(statvars['cnt']) / (statvars['sum'] or ttl)
  133. tasklist.sort(compare)
  134. for item in tasklist:
  135. print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
  136. time.strftime('%H:%M:%S', time.localtime(item['wake'])),
  137. (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
  138. item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))
  139. if (not len(tasklist) or not statvars['cnt']):
  140. return
  141. print('\n--- DataX Profiling Statistics ---')
  142. print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
  143. statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
  144. print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
  145. time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
  146. time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
  147. float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))
  148. idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
  149. tasklist_commit.sort(compare)
  150. print '%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
  151. statvars_commit['cnt'],
  152. statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
  153. idx_commit * tasklist_commit[0]['cost'])
  154. # }}} #
  155. if (len(sys.argv) < 2):
  156. print("Usage: %s filename" %(sys.argv[0]))
  157. quit(1)
  158. else:
  159. parse_task(sys.argv[1])
  160. result_analyse()