Browse Source

修改同步配置增加默认值传到airflow

liweiquan 2 years ago
parent
commit
aefd728ea0
1 changed files with 16 additions and 10 deletions
  1. 16 10
      app/services/datax.py

+ 16 - 10
app/services/datax.py

@@ -33,7 +33,6 @@ def datax_create_task(job_info: models.JobInfo):
     partition_list = []
     if job_info.partition_info is not None and job_info.partition_info != '':
         partition_list = job_info.partition_info.split(',')
-    envs = {}
     first_begin_time = int(time.time())
     if job_info.inc_start_time is not None and job_info.inc_start_time != '':
         first_begin_time = job_info.inc_start_time
@@ -98,16 +97,23 @@ def datax_put_task(job_info: models.JobInfo,old_af_task):
     partition_list = []
     if job_info.partition_info is not None and job_info.partition_info != '':
         partition_list = job_info.partition_info.split(',')
-    envs = {}
-    if job_info.inc_start_time and job_info.last_time and len(partition_list) > 0 and job_info.current_time:
-        envs = {
-            "first_begin_time": job_info.inc_start_time,
-            "last_key": job_info.last_time,
-            "current_key": job_info.current_time,
+    first_begin_time = int(time.time())
+    if job_info.inc_start_time is not None and job_info.inc_start_time != '':
+        first_begin_time = job_info.inc_start_time
+    last_key = 'lastTime'
+    if job_info.last_time is not None and job_info.last_time != '':
+        last_key = job_info.last_time
+    current_key = 'currentTime'
+    if job_info.current_time is not None and job_info.current_time != '':
+        current_key = job_info.current_time
+    envs = {
+            "first_begin_time": first_begin_time,
+            "last_key": last_key,
+            "current_key": current_key,
             "partition_key": "partition",
-            "partition_word": partition_list[0] if len(partition_list) > 0 else '',
-            "partition_format": partition_list[2]  if len(partition_list) > 0 else '',
-            "partition_diff": partition_list[1]  if len(partition_list) > 0 else ''
+            "partition_word": partition_list[0] if len(partition_list) > 0 else 'xujiayue',
+            "partition_format": partition_list[2]  if len(partition_list) > 0 else 'xujiayue',
+            "partition_diff": partition_list[1]  if len(partition_list) > 0 else 'xujiayue'
         }
     af_task = {
         "name": job_info.job_desc,