浏览代码

datax json mysql2hive hive2mysql

Zhang Li 2 年之前
父节点
当前提交
61561dab84
共有 9 个文件被更改,包括 293 次插入30 次删除
  1. 8 2
      Makefile
  2. 16 5
      app/core/datax/engine.py
  3. 149 0
      app/core/datax/hdfs.py
  4. 7 2
      app/core/ds/hive.py
  5. 8 2
      app/core/ds/mysql.py
  6. 1 1
      app/crud/datax_json.py
  7. 4 2
      app/routers/datax_json.py
  8. 100 16
      app/schemas/datax_json.py
  9. 0 0
      data/data.sql

+ 8 - 2
Makefile

@@ -3,7 +3,7 @@ VERSION=latest
 BUILD_TIME      := $(shell date "+%F %T")
 COMMIT_SHA1     := $(shell git rev-parse HEAD)
 AUTHOR          := $(shell git show -s --format='%an')
-
+REMOTE_WORKSPACE=/home/sxwl1070/zhangli/bigdata/datax-admin
 
 .PHONY: image publish
 
@@ -15,4 +15,10 @@ publish:
 	@docker push registry.cn-hangzhou.aliyuncs.com/sxtest/$(NAME):$(VERSION)
 
 pull:
-	@docker pull registry.cn-hangzhou.aliyuncs.com/sxtest/$(NAME):$(VERSION)
+	@docker pull registry.cn-hangzhou.aliyuncs.com/sxtest/$(NAME):$(VERSION)
+
+deploy: pull
+	@docker-compose down  && docker-compose up -d
+
+remote:
+	@ssh -t sxwl1070@192.168.199.107 "cd $(REMOTE_WORKSPACE); make deploy"

+ 16 - 5
app/core/datax/engine.py

@@ -2,6 +2,8 @@ from typing import Any, List
 from app import models
 from app import schemas
 from app.core.datax.rdbms import RdbmsReader, RdbmsWriter
+from app.core.datax.hdfs import *
+from app.models import database
 
 
 
@@ -10,6 +12,8 @@ class ReaderFactory:
     def get_reader(ds: models.JobJdbcDatasource):
         if ds.datasource == 'mysql':
             return RdbmsReader(ds)
+        elif ds.datasource == 'hive':
+            return HdfsReader(ds)
         else:
             raise Exception('Unimplemented Reader')
 
@@ -19,14 +23,18 @@ class WriterFactory:
     def get_writer(ds: models.JobJdbcDatasource):
         if ds.datasource == 'mysql':
             return RdbmsWriter(ds)
+        elif ds.datasource == 'hive':
+            return HdfsWriter(ds)
         else:
             raise Exception('Unimplemented Writer')
 
 class DataXEngine:
 
-    def build_job(self, ds: models.JobJdbcDatasource, param: schemas.DataXJsonParam, is_show=True) -> dict:
+    def build_job(self, ds_reader: models.JobJdbcDatasource,
+                        ds_writer: models.JobJdbcDatasource,
+                        param: schemas.DataXJsonParam, is_show=True) -> dict:
         res = dict()
-        content = self.build_content(ds, param, is_show)
+        content = self.build_content(ds_reader, ds_writer, param, is_show)
         setting = self.build_setting()
         res['job'] = {
             'content': content,
@@ -34,9 +42,12 @@ class DataXEngine:
         }
         return res
 
-    def build_content(self, ds: models.JobJdbcDatasource, param: schemas.DataXJsonParam, is_show) -> List[Any]:
-        reader = ReaderFactory.get_reader(ds)
-        writer = WriterFactory.get_writer(ds)
+    def build_content(self, ds_reader: models.JobJdbcDatasource,
+                            ds_writer: models.JobJdbcDatasource,
+                            param: schemas.DataXJsonParam, is_show) -> List[Any]:
+        reader = ReaderFactory.get_reader(ds_reader)
+        writer = WriterFactory.get_writer(ds_writer)
+
         res = dict()
         res['reader'] = reader.build(param, is_show)
         res['writer'] = writer.build(param, is_show)

+ 149 - 0
app/core/datax/hdfs.py

@@ -0,0 +1,149 @@
+from typing import List
+from app.core.datax.base import ReaderBase, WriterBase
+from app.models import JobJdbcDatasource
+from app.schemas.datax_json import DataXJsonParam
+from app.utils import *
+
+
+
+{
+    "writer": {
+          "name": "hdfswriter",
+          "parameter": {
+            "defaultFS": "hdfs://192.168.199.107:9000",
+            "fileType": "text",
+            "path": "/usr/hive/warehouse/test_1",
+            "fileName": "test_1",
+            "writeMode": "append",
+            "fieldDelimiter": "|",
+            "column": [
+              {
+                "name": "id",
+                "type": "int"
+              },
+              {
+                "name": "ssn",
+                "type": "varchar"
+              },
+              {
+                "name": "test2",
+                "type": "int"
+              }
+            ]
+          }
+        }
+}
+
+
+{
+    "reader": {
+        "name": "hdfsreader",
+        "parameter": {
+        "path": "/usr/hive/warehouse/grades/*",
+        "defaultFS": "hdfs://192.168.199.107:9000",
+        "fileType": "csv",
+        "fieldDelimiter": ",",
+        "column": [
+            {
+            "index": 0,
+            "type": "long"
+            },
+            {
+            "index": 3,
+            "type": "string"
+            },
+            {
+            "index": 5,
+            "type": "long"
+            }
+        ]
+        }
+    }
+}
+
+class HdfsReader(WriterBase):
+    def __init__(self, ds: JobJdbcDatasource):
+        WriterBase.__init__(self, ds)
+        if ds.datasource == 'hive':
+            self.name = 'hdfsreader'
+        else:
+            raise Exception('Unimplemented HdfsReader')
+
+    def _build_column(self, columns: List[dict]):
+        res = []
+        for column in columns:
+            tmp = dict()
+            index, name, type = column.split(':')
+            tmp['index'] = index
+            tmp['type'] = self._convert_type(type)
+            res.append(tmp)
+        if not res:
+            raise Exception('No column found')
+        return res
+
+    def _convert_type(self, type):
+        if type.lower() == 'int':
+            return 'long'
+        elif type.lower() == 'varchar':
+            return 'string'
+
+    def build_parameter(self, param: DataXJsonParam, is_show=True):
+        parameter = dict()
+        parameter['path'] = param.hive_reader.reader_path
+        parameter['defaultFS'] = param.hive_reader.reader_default_fs
+        parameter['fileType'] = param.hive_reader.reader_file_type
+        parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter
+        parameter['column'] = self._build_column(param.reader_columns)
+        return parameter
+
+
+    def build(self, param: DataXJsonParam, is_show=True):
+        reader = dict()
+        parameter = self.build_parameter(param, is_show)
+        reader['name'] = self.name
+        reader['parameter'] = parameter
+        return reader
+
+
+
+
+class HdfsWriter(WriterBase):
+    def __init__(self, ds: JobJdbcDatasource):
+        WriterBase.__init__(self, ds)
+        if ds.datasource == 'hive':
+            self.name = 'hdfswriter'
+        else:
+            raise Exception('Unimplemented HdfsWriter')
+
+    def _build_column(self, columns: List[dict]):
+        res = []
+        for column in columns:
+            tmp = dict()
+            _, name, type = column.split(':')
+            tmp['name'] = name
+            tmp['type'] = type
+            res.append(tmp)
+        if not res:
+            raise Exception('No column found')
+        return res
+
+
+    def build_parameter(self, param: DataXJsonParam, is_show=True):
+        parameter = dict()
+        parameter['defaultFS'] = param.hive_writer.writer_default_fs
+        parameter['fileType'] = param.hive_writer.writer_file_type
+        parameter['path'] = param.hive_writer.writer_path
+        parameter['fileName'] = param.hive_writer.writer_filename
+        parameter['writeMode'] = param.hive_writer.write_mode
+        parameter['fieldDelimiter'] = param.hive_writer.write_field_delimiter
+        parameter['column'] = self._build_column(param.writer_columns)
+        return parameter
+
+    def build(self, param: DataXJsonParam, is_show=True):
+        writer = dict()
+        parameter = self.build_parameter(param, is_show)
+        writer['name'] = self.name
+        writer['parameter'] = parameter
+        return writer
+
+

+ 7 - 2
app/core/ds/hive.py

@@ -76,7 +76,12 @@ class HiveDS(DataSourceBase):
         logger.info(self.database_name)
         sql = f'describe {self.database_name}.{table_name}'
         res = self._execute_sql([sql])
-        logger.info(res)
-        return flat_map(lambda x: [':'.join(x[:2])], res[0])
+        if res:
+            res = [[str(i) , *x]for i, x in enumerate(res[0])]
+            logger.info(res)
+
+            return flat_map(lambda x: [':'.join(x[:3])], res)
+        else:
+            raise Exception('table not found')
 
 

+ 8 - 2
app/core/ds/mysql.py

@@ -94,7 +94,13 @@ class MysqlDS(DataSourceBase):
     def get_table_schema(self, table_name):
         sql = f'describe {self.database_name}.{table_name}'
         res = self._execute_sql([sql])
-        logger.info(res)
-        return flat_map(lambda x: [':'.join(x[:2])], res[0])
+        if res:
+            res = [[str(i) , *x]for i, x in enumerate(res[0])]
+            logger.info(res)
+
+            return flat_map(lambda x: [':'.join(x[:3])], res)
+        else:
+            raise Exception('table not found')
+
 
 

+ 1 - 1
app/crud/datax_json.py

@@ -16,7 +16,7 @@ def generate_datax_json(db: Session, param: schemas.DataXJsonParam):
         raise Exception('Writer datasource not found')
 
     engine = DataXEngine()
-    job = engine.build_job(reader_ds, param, is_show=False)
+    job = engine.build_job(reader_ds, writer_ds, param, is_show=False)
     logger.info(job)
     return {'json': job}
 

+ 4 - 2
app/routers/datax_json.py

@@ -1,6 +1,6 @@
 from fastapi import APIRouter
 
-from fastapi import Depends
+from fastapi import Depends, Body
 from sqlalchemy.orm import Session
 from app import schemas
 
@@ -21,7 +21,9 @@ router = APIRouter(
 @router.post("/")
 @web_try()
 @sxtimeit
-def build_datax_json(param: schemas.DataXJsonParam, db: Session = Depends(get_db)):
+def build_datax_json(param: schemas.DataXJsonParam=Body(
+    examples = schemas.DataXJsonParam.Config.schema_extra['examples']
+), db: Session = Depends(get_db)):
     print(param)
     return crud.generate_datax_json(db, param)
 

+ 100 - 16
app/schemas/datax_json.py

@@ -16,7 +16,7 @@ class HiveWriterParam(BaseModel):
     writer_file_type: str
     writer_path: str
     writer_filename: str
-    write_mode: Optional[str]
+    write_mode: Optional[str]='append'
     write_field_delimiter: Optional[str]
 
 
@@ -42,26 +42,110 @@ class DataXJsonParam(BaseModel):
     rdbms_reader: Optional[RdbmsReaderParam]
     rdbms_writer: Optional[RdbmsWriterParam]
 
-
     class Config:
         schema_extra = {
-            "example": {
-                "reader_datasource_id": 18,
-                "reader_tables": ["job_group_copy1"],
-                "reader_columns": ["id", "app_name", "title", "address_type"],
-                "writer_datasource_id": 18,
-                "writer_tables": ["job_group_copy2"],
-                "writer_columns": ["id", "app_name", "title", "address_type"],
-                "rdbms_reader": {
-                    "reader_split_pk": "",
-                    "where_param": "",
-                    "query_sql": ""
+            'examples': {
+                'mysql2mysql': {
+                    "reader_datasource_id": 18,
+                    "reader_tables": ["job_group_copy1"],
+                    "reader_columns": ["id", "app_name", "title", "address_type"],
+                    "writer_datasource_id": 18,
+                    "writer_tables": ["job_group_copy2"],
+                    "writer_columns": ["id", "app_name", "title", "address_type"],
+                    "rdbms_reader": {
+                        "reader_split_pk": "",
+                        "where_param": "",
+                        "query_sql": ""
+                    },
+                    "rdbms_writer": {
+                        "pre_sql": "delete from job_group_copy2",
+                        "post_sql": ""
+                    }
+                },
+                'mysql2hive': {
+                    "reader_datasource_id": 18,
+                    "reader_tables": ["grades"],
+                    "reader_columns": ["id", "ssn", "test2"],
+                    "writer_datasource_id": 17,
+                    "writer_columns": ["0:id:int", "1:ssn:varchar", "2:test2:int"],
+                    "writer_tables": [],
+                    "rdbms_reader": {
+                        "reader_split_pk": "",
+                        "where_param": "",
+                        "query_sql": ""
+                    },
+                    "hive_writer": {
+                        "writer_default_fs": "hdfs://192.168.199.107:9000",
+                        "writer_file_type": "text",
+                        "writer_path": "/usr/hive/warehouse/test_1",
+                        "writer_filename": "test_1",
+                        "write_mode": "append",
+                        "write_field_delimiter": "|"
+                    }
                 },
-                "rdbms_writer": {
-                    "pre_sql": "delete from job_group_copy2",
-                    "post_sql": ""
+                'hive2mysql': {
+                    "reader_datasource_id": 17,
+                    "reader_tables": ["grades"],
+                    "reader_columns": ["0:id:int", "3:ssn:varchar", "5:test2:int"],
+                    "writer_datasource_id": 18,
+                    "writer_tables": ["grades"],
+                    "writer_columns": ["id", "ssn", "test2"],
+                    "hive_reader": {
+                        "reader_default_fs": "hdfs://192.168.199.107:9000",
+                        "reader_file_type": "csv",
+                        "reader_path": "/usr/hive/warehouse/grades/*",
+                        "reader_field_delimiter": ",",
+                        "reader_skip_header": "true"
+                    },
+                    "rdbms_writer": {
+                        "pre_sql": "delete from grades;",
+                        "post_sql": ""
+                    }
                 }
             }
         }
 
 
+    # class Config:
+    #     schema_extra = {
+    #         "example": {
+    #             "reader_datasource_id": 18,
+    #             "reader_tables": ["job_group_copy1"],
+    #             "reader_columns": ["id", "app_name", "title", "address_type"],
+    #             "writer_datasource_id": 18,
+    #             "writer_tables": ["job_group_copy2"],
+    #             "writer_columns": ["id", "app_name", "title", "address_type"],
+    #             "rdbms_reader": {
+    #                 "reader_split_pk": "",
+    #                 "where_param": "",
+    #                 "query_sql": ""
+    #             },
+    #             "rdbms_writer": {
+    #                 "pre_sql": "delete from job_group_copy2",
+    #                 "post_sql": ""
+    #             }
+    #         }
+
+    #         "example": {
+    #             "reader_datasource_id": 18,
+    #             "reader_tables": ["grades"],
+    #             "reader_columns": ["id", "ssn", "test2"],
+    #             "writer_datasource_id": 17,
+    #             "writer_columns": ["id:int", "ssn:string", "test2:int"],
+    #             "writer_tables": ["grades"],
+    #             "rdbms_reader": {
+    #                 "reader_split_pk": "",
+    #                 "where_param": "",
+    #             },
+    #               "hive_writer": {
+    #                 "writer_default_fs": "hdfs://192.168.199.107:9000",
+    #                 "writer_file_type": "text",
+    #                 "writer_path": "/usr/hive/warehouse/test_1",
+    #                 "writer_filename": "test_1",
+    #                 "write_mode": "append",
+    #                 "write_field_delimiter": "|"
+    #             }
+    #         }
+    #     }
+
+

+ 0 - 0
data/data.sql