liweiquan пре 2 година
родитељ
комит
dcd8357e8a
4 измењених фајлова са 15 додато и 14 уклоњено
  1. 1 0
      app/core/datax/engine.py
  2. 2 2
      app/core/datax/hdfs.py
  3. 8 8
      app/crud/job_info.py
  4. 4 4
      app/schemas/datax_json.py

+ 1 - 0
app/core/datax/engine.py

@@ -48,6 +48,7 @@ class DataXEngine:
 
         res = dict()
         res['reader'] = reader.build(param, is_show)
+        print("===",param)
         res['writer'] = writer.build(param, is_show)
         return [res]
 

+ 2 - 2
app/core/datax/hdfs.py

@@ -134,8 +134,8 @@ class HdfsWriter(WriterBase):
         parameter['fileType'] = param.hive_writer.writer_file_type
         parameter['path'] = param.hive_writer.writer_path
         parameter['fileName'] = param.hive_writer.writer_filename
-        parameter['writeMode'] = param.hive_writer.write_mode
-        parameter['fieldDelimiter'] = param.hive_writer.write_field_delimiter
+        parameter['writerMode'] = param.hive_writer.writer_mode
+        parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter
         parameter['column'] = self._build_column(param.writer_columns)
         return parameter
 

+ 8 - 8
app/crud/job_info.py

@@ -16,13 +16,13 @@ def create_job_info(db: Session, item: schemas.JobInfoCreate):
         'cron_select_type': cron_select_type,
         'job_cron': cron_expression,
     })
-    partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() else None
-    partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() else None
-    partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() else None
+    partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None
+    partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None
+    partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None
     partition_info_str = ''
     if partition_info is not None and partition_time is not None and partition_num is not None:
         partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
-    elif (partition_info is not None or partition_info != '') and (partition_time is None or partition_num is None):
+    elif partition_info is not None and (partition_time is None or partition_num is None):
         raise Exception('分区信息不完善')
     item_dict.update({
         'partition_info': partition_info_str,
@@ -58,13 +58,13 @@ def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
         'cron_select_type': cron_select_type,
         'job_cron': cron_expression,
     })
-    partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() else None
-    partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys() else None
-    partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys() else None
+    partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None
+    partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys()  and update_dict['partition_time'] != '' else None
+    partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys()  and update_dict['partition_num'] != '' else None
     partition_info_str = ''
     if partition_info is not None and partition_time is not None and partition_num is not None:
         partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
-    elif (partition_info is not None or partition_info != '') and (partition_time is None or partition_num is None):
+    elif partition_info is not None and (partition_time is None or partition_num is None):
         raise Exception('分区信息不完善')
     update_dict.update({
         'partition_info': partition_info_str,

+ 4 - 4
app/schemas/datax_json.py

@@ -16,8 +16,8 @@ class HiveWriterParam(BaseModel):
     writer_file_type: str
     writer_path: str
     writer_filename: str
-    write_mode: Optional[str]='append'
-    write_field_delimiter: Optional[str]
+    writer_mode: Optional[str]='append'
+    writer_field_delimiter: Optional[str]
 
 
 class RdbmsReaderParam(BaseModel):
@@ -79,8 +79,8 @@ class DataXJsonParam(BaseModel):
                         "writer_file_type": "text",
                         "writer_path": "/usr/hive/warehouse/test_1",
                         "writer_filename": "test_1",
-                        "write_mode": "append",
-                        "write_field_delimiter": "|"
+                        "writer_mode": "append",
+                        "writer_field_delimiter": "|"
                     }
                 },
                 'hive2mysql': {