datax.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import sys
  4. import os
  5. import signal
  6. import subprocess
  7. import time
  8. import re
  9. import socket
  10. import json
  11. from optparse import OptionParser
  12. from optparse import OptionGroup
  13. from string import Template
  14. import codecs
  15. import platform
  16. def isWindows():
  17. return platform.system() == 'Windows'
  18. DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  19. DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
  20. if isWindows():
  21. codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
  22. CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
  23. else:
  24. CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
  25. LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
  26. DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
  27. DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
  28. DATAX_HOME, LOGBACK_FILE)
  29. ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
  30. DEFAULT_PROPERTY_CONF, CLASS_PATH)
  31. REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
  32. RET_STATE = {
  33. "KILL": 143,
  34. "FAIL": -1,
  35. "OK": 0,
  36. "RUN": 1,
  37. "RETRY": 2
  38. }
  39. def getLocalIp():
  40. try:
  41. return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
  42. except:
  43. return "Unknown"
  44. def suicide(signum, e):
  45. global child_process
  46. print('>> sys.stderr', "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum))
  47. if child_process:
  48. child_process.send_signal(signal.SIGQUIT)
  49. time.sleep(1)
  50. child_process.kill()
  51. print('>> sys.stderr', "DataX Process was killed ! you did ?")
  52. sys.exit(RET_STATE["KILL"])
  53. def register_signal():
  54. if not isWindows():
  55. global child_process
  56. signal.signal(2, suicide)
  57. signal.signal(3, suicide)
  58. signal.signal(15, suicide)
  59. def getOptionParser():
  60. usage = "usage: %prog [options] job-url-or-path"
  61. parser = OptionParser(usage=usage)
  62. prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
  63. "Normal user use these options to set jvm parameters, job runtime mode etc. "
  64. "Make sure these options can be used in Product Env.")
  65. prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
  66. default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
  67. prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
  68. help="Set job unique id when running by Distribute/Local Mode.")
  69. prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
  70. action="store", default="standalone",
  71. help="Set job runtime mode such as: standalone, local, distribute. "
  72. "Default mode is standalone.")
  73. prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
  74. action="store", dest="params",
  75. help='Set job parameter, eg: the source tableName you want to set it by command, '
  76. 'then you can use like this: -p"-DtableName=your-table-name", '
  77. 'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
  78. 'Note: you should config in you job tableName with ${tableName}.')
  79. prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
  80. action="store", dest="reader",type="string",
  81. help='View job config[reader] template, eg: mysqlreader,streamreader')
  82. prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
  83. action="store", dest="writer",type="string",
  84. help='View job config[writer] template, eg: mysqlwriter,streamwriter')
  85. parser.add_option_group(prodEnvOptionGroup)
  86. devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
  87. "Developer use these options to trace more details of DataX.")
  88. devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
  89. help="Set to remote debug mode.")
  90. devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
  91. default="info", help="Set log level such as: debug, info, all etc.")
  92. parser.add_option_group(devEnvOptionGroup)
  93. return parser
  94. def generateJobConfigTemplate(reader, writer):
  95. readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)
  96. writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)
  97. print(readerRef)
  98. print(writerRef)
  99. jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
  100. print(jobGuid)
  101. jobTemplate={
  102. "job": {
  103. "setting": {
  104. "speed": {
  105. "channel": ""
  106. }
  107. },
  108. "content": [
  109. {
  110. "reader": {},
  111. "writer": {}
  112. }
  113. ]
  114. }
  115. }
  116. readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)
  117. writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)
  118. try:
  119. readerPar = readPluginTemplate(readerTemplatePath);
  120. except Exception as e:
  121. print("Read reader[%s] template error: cant find file %s".format(reader,readerTemplatePath))
  122. try:
  123. writerPar = readPluginTemplate(writerTemplatePath);
  124. except Exception as e:
  125. print("Read writer %s template error: cant find file %s" % (writer,writerTemplatePath))
  126. jobTemplate['job']['content'][0]['reader'] = readerPar;
  127. jobTemplate['job']['content'][0]['writer'] = writerPar;
  128. print(json.dumps(jobTemplate, indent=4, sort_keys=True))
  129. def readPluginTemplate(plugin):
  130. with open(plugin, 'r') as f:
  131. return json.load(f)
  132. def isUrl(path):
  133. if not path:
  134. return False
  135. assert (isinstance(path, str))
  136. m = re.match(r"^http[s]?://\S+\w*", path.lower())
  137. if m:
  138. return True
  139. else:
  140. return False
  141. def buildStartCommand(options, args):
  142. commandMap = {}
  143. tempJVMCommand = DEFAULT_JVM
  144. if options.jvmParameters:
  145. tempJVMCommand = tempJVMCommand + " " + options.jvmParameters
  146. if options.remoteDebug:
  147. tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
  148. print('local ip: ', getLocalIp())
  149. if options.loglevel:
  150. tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))
  151. if options.mode:
  152. commandMap["mode"] = options.mode
  153. # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
  154. jobResource = args[0]
  155. if not isUrl(jobResource):
  156. jobResource = os.path.abspath(jobResource)
  157. if jobResource.lower().startswith("file://"):
  158. jobResource = jobResource[len("file://"):]
  159. jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
  160. if options.params:
  161. jobParams = jobParams + " " + options.params
  162. if options.jobid:
  163. commandMap["jobid"] = options.jobid
  164. commandMap["jvm"] = tempJVMCommand
  165. commandMap["params"] = jobParams
  166. commandMap["job"] = jobResource
  167. return Template(ENGINE_COMMAND).substitute(**commandMap)
  168. def printCopyright():
  169. print("Yili AI Data transfer start:\n")
  170. #print(sys.stdout.flush())
  171. if __name__ == "__main__":
  172. printCopyright()
  173. parser = getOptionParser()
  174. options, args = parser.parse_args(sys.argv[1:])
  175. if options.reader is not None and options.writer is not None:
  176. generateJobConfigTemplate(options.reader,options.writer)
  177. sys.exit(RET_STATE['OK'])
  178. if len(args) != 1:
  179. parser.print_help()
  180. sys.exit(RET_STATE['FAIL'])
  181. startCommand = buildStartCommand(options, args)
  182. # print startCommand
  183. child_process = subprocess.Popen(startCommand, shell=True)
  184. register_signal()
  185. (stdout, stderr) = child_process.communicate()
  186. sys.exit(child_process.returncode)