123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- #! /usr/bin/env python
- # vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:
- import re
- import sys
- import time
- REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
- REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
- REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
- REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
- REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
- REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
- REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
- REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)
- # {{{ function parse_timestamp() #
- def parse_timestamp(line):
- try:
- ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
- except:
- ts = 0
- return ts
- # }}} #
- # {{{ function parse_query_host() #
- def parse_query_host(line):
- ori = REG_SQL_JDBC.search(line)
- if (not ori):
- return ''
- ori = ori.group(1).split('?')[0]
- off = ori.find('@')
- if (off > -1):
- ori = ori[off+1:len(ori)]
- else:
- off = ori.find('//')
- if (off > -1):
- ori = ori[off+2:len(ori)]
- return ori.lower()
- # }}} #
- # {{{ function parse_query_table() #
- def parse_query_table(line):
- ori = REG_SQL_PATH.search(line)
- return (ori and ori.group(1).lower()) or ''
- # }}} #
- # {{{ function parse_reader_task() #
- def parse_task(fname):
- global LAST_SQL_UUID
- global LAST_COMMIT_UUID
- global DATAX_JOBDICT
- global DATAX_JOBDICT_COMMIT
- global UNIXTIME
- LAST_SQL_UUID = ''
- DATAX_JOBDICT = {}
- LAST_COMMIT_UUID = ''
- DATAX_JOBDICT_COMMIT = {}
- UNIXTIME = int(time.time())
- with open(fname, 'r') as f:
- for line in f.readlines():
- line = line.strip()
- if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
- DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
- LAST_SQL_UUID = ''
- if line.find('CommonRdbmsReader$Task') > 0:
- parse_read_task(line)
- elif line.find('commit blocks') > 0:
- parse_write_task(line)
- else:
- continue
- # }}} #
- # {{{ function parse_read_task() #
- def parse_read_task(line):
- ser = REG_SQL_UUID.search(line)
- if not ser:
- return
- LAST_SQL_UUID = ser.group()
- if REG_SQL_WAKE.search(line):
- DATAX_JOBDICT[LAST_SQL_UUID] = {
- 'stat' : 'R',
- 'wake' : parse_timestamp(line),
- 'done' : UNIXTIME,
- 'host' : parse_query_host(line),
- 'path' : parse_query_table(line)
- }
- elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
- DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
- DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
- # }}} #
- # {{{ function parse_write_task() #
- def parse_write_task(line):
- ser = REG_COMMIT_UUID.search(line)
- if not ser:
- return
- LAST_COMMIT_UUID = ser.group()
- if REG_COMMIT_WAKE.search(line):
- DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = {
- 'stat' : 'R',
- 'wake' : parse_timestamp(line),
- 'done' : UNIXTIME,
- }
- elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
- DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
- DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
- # }}} #
- # {{{ function result_analyse() #
- def result_analyse():
- def compare(a, b):
- return b['cost'] - a['cost']
- tasklist = []
- hostsmap = {}
- statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
- tasklist_commit = []
- statvars_commit = {'sum' : 0, 'cnt' : 0}
- for idx in DATAX_JOBDICT:
- item = DATAX_JOBDICT[idx]
- item['uuid'] = idx;
- item['cost'] = item['done'] - item['wake']
- tasklist.append(item);
- if (not (item['host'] in hostsmap)):
- hostsmap[item['host']] = 1
- statvars['svr'] += 1
- if (item['cost'] > -1 and item['cost'] < 864000):
- statvars['sum'] += item['cost']
- statvars['cnt'] += 1
- statvars['max'] = max(statvars['max'], item['done'])
- statvars['min'] = min(statvars['min'], item['wake'])
- for idx in DATAX_JOBDICT_COMMIT:
- itemc = DATAX_JOBDICT_COMMIT[idx]
- itemc['uuid'] = idx
- itemc['cost'] = itemc['done'] - itemc['wake']
- tasklist_commit.append(itemc)
- if (itemc['cost'] > -1 and itemc['cost'] < 864000):
- statvars_commit['sum'] += itemc['cost']
- statvars_commit['cnt'] += 1
- ttl = (statvars['max'] - statvars['min']) or 1
- idx = float(statvars['cnt']) / (statvars['sum'] or ttl)
- tasklist.sort(compare)
- for item in tasklist:
- print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
- time.strftime('%H:%M:%S', time.localtime(item['wake'])),
- (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
- item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))
- if (not len(tasklist) or not statvars['cnt']):
- return
- print('\n--- DataX Profiling Statistics ---')
- print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
- statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
- print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
- time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
- time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
- float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))
- idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
- tasklist_commit.sort(compare)
- print '%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
- statvars_commit['cnt'],
- statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
- idx_commit * tasklist_commit[0]['cost'])
- # }}} #
- if (len(sys.argv) < 2):
- print("Usage: %s filename" %(sys.argv[0]))
- quit(1)
- else:
- parse_task(sys.argv[1])
- result_analyse()
|