Prechádzať zdrojové kódy

Merge branch 'master' of http://gogsb.soaringnova.com/sxwl_DL/datax-admin

Zhang Li 2 rokov pred
rodič
commit
8a8499c21c

+ 0 - 2
app/crud/af_run.py

@@ -1,8 +1,6 @@
 from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
-import time
-
 from app.crud import update_to_db
 
 

+ 1 - 1
app/models/af_run.py

@@ -12,7 +12,7 @@ class AirflowRun(BaseModel):
     job_id = Column(Integer)  # 所属任务
     af_run_id = Column(String(60))  # run id in airflow
     run_ts = Column(String(60))  # run timestamp
-    status = Column(Integer) # 0未开始  1运行中 2成功 3失败
+    status = Column(Integer) # 0队列中  1运行中 2成功 3失败
     details = Column(JSON)  # 任务执行详情
 
 

+ 18 - 4
app/models/database.py

@@ -1,17 +1,21 @@
- # database.py
+import time  # database.py
+from typing import Optional
+
+from sqlalchemy.orm import Session
 from sqlalchemy import create_engine
 from sqlalchemy.ext.declarative import declarative_base
 from sqlalchemy.orm import sessionmaker
+from sqlalchemy.orm.attributes import flag_modified
+
 from configs.logging import logger
 from configs.settings import config
 
 USER = config.get('DATABASE', 'USER')
 PWD = config.get('DATABASE', 'pwd')
 DB_NAME = config.get('DATABASE', 'DB_NAME')
-HOST =  config.get('DATABASE', 'HOST')
+HOST = config.get('DATABASE', 'HOST')
 PORT = config.get('DATABASE', 'PORT')
 
-
 SQLALCHEMY_DATABASE_URL = f'mysql+mysqlconnector://{USER}:{PWD}@{HOST}:{PORT}/{DB_NAME}?charset=utf8&auth_plugin=mysql_native_password'
 engine = create_engine(
     SQLALCHEMY_DATABASE_URL, pool_pre_ping=True
@@ -21,11 +25,21 @@ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
 
 logger.info(f" connect to mysql success: {SQLALCHEMY_DATABASE_URL}")
 
-
 Base = declarative_base()
 
+
 class BaseModel(Base):
     __abstract__ = True
+
     def to_dict(self):
         return {c.name: getattr(self, c.name) for c in self.__table__.columns}
 
+    def update(self, db: Session, enforce_update: Optional[dict] = None):
+        for k in enforce_update or {}:
+            flag_modified(self, k)
+        if hasattr(self, "update_time"):
+            self.update_time = int(time.time())
+        # db.add(self)
+        db.commit()
+        db.flush()
+        db.refresh(self)

+ 2 - 2
app/routers/dag.py

@@ -126,8 +126,8 @@ def get_dag_debug_result(dag_uuid: str,node_id: str,out_pin: int ,db: Session =
             break
     result = None
     if task_id:
-        table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}'
+        table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}_tmp'
         t_list = hiveDs.list_tables()
-        result = hiveDs.get_preview_data(table_name,500) if table_name in t_list else None
+        result = hiveDs.get_preview_data(table_name,500) if table_name.lower() in t_list else None
     return result
 

+ 2 - 2
app/routers/jm_job_log.py

@@ -58,7 +58,7 @@ def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Sessi
                 "af_job_id": int(af_job_run.job_id),
                 "run_id": af_job_run.af_run_id,
                 "trigger_time": af_job_run.start_time,
-                "trigger_result": 1 if task else 0,
+                "trigger_result": 1,
                 "execute_time": task['start_time'] if task else 0,
                 "execute_result": execute_result if execute_result else af_job_run.status,
                 "end_time": task['end_time'] if task else 0,
@@ -79,7 +79,7 @@ def get_job_log_once(run_id: str, db: Session = Depends(get_db)):
             "af_job_id": int(af_job_run.job_id),
             "run_id": af_job_run.af_run_id,
             "trigger_time": af_job_run.start_time,
-            "trigger_result": 1 if task else 0,
+            "trigger_result": 1,
             "execute_time": task['start_time'] if task else 0,
             "execute_result": af_job_run.status,
             "end_time": task['end_time'] if task else 0,

+ 2 - 2
app/routers/job_log.py

@@ -58,7 +58,7 @@ def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page),
             "af_job_id": int(af_job_run.job_id),
             "run_id": af_job_run.af_run_id,
             "trigger_time": af_job_run.start_time,
-            "trigger_result": 1 if task else 0,
+            "trigger_result": 1,
             "execute_time": task['start_time'] if task else 0,
             "execute_result": execute_result if execute_result else af_job_run.status,
             "end_time": task['end_time'] if task else 0,
@@ -79,7 +79,7 @@ def get_job_logs_once(run_id: int, db: Session = Depends(get_db)):
         "af_job_id": int(af_job_run.job_id),
         "run_id": af_job_run.af_run_id,
         "trigger_time": af_job_run.start_time,
-        "trigger_result": 1 if task else 0,
+        "trigger_result": 1,
         "execute_time": task['start_time'] if task else 0,
         "execute_result": af_job_run.status,
         "end_time": task['end_time'] if task else 0,

+ 25 - 5
app/routers/run.py

@@ -28,14 +28,34 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
     return paginate(crud.get_airflow_tasks(db), params)
 
 
-@router_af_run.get("/{run_id}/state")
+@router_af_run.get("/{run_id}/status")
 @web_try()
 @sxtimeit
 def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
     item = crud.get_airflow_run_once(item_id=run_id, db=db)
-    uri_prefix, headers = get_airflow_api_info()
-    url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
-    return {"state": requests.get(url, headers=headers).json().get('state', None)}
+    job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
+    if job_item.job_mode == 1:  # 常规模式
+        if item.status in [2, 3]:
+            return {"status": item.status}
+        else:
+            uri_prefix, headers = get_airflow_api_info()
+            url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
+            state = requests.get(url, headers=headers).json().get('state', None)
+            status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
+            print(f'status is {status}, with state {state}')
+            if item.status != status:  # queue or running
+                item.status = status
+                item.update(db)
+                print(f'after update {item.status}')
+            return {"status": status}
+    else:
+        uri_prefix, headers = get_airflow_api_info()
+        url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
+        state = requests.get(url, headers=headers).json().get('state', None)
+        status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
+        print(f'status is {status}, with state {state}')
+
+    return {"status": status}
 
 
 @router_af_run.post("/")
@@ -62,7 +82,7 @@ def add_airflow_run(item: Item, db: Session = Depends(get_db)):
                                            "af_run_id": item.data['af_run_id'],
                                            "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
                                                                                    "sparks": sparks_dependence}},
-                                           "status": 1},
+                                           "status": 0},
                                         )
         crud.create_airflow_run(db, item)
 

+ 3 - 6
app/services/jm_job.py

@@ -166,11 +166,8 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             sub_nodes.append(sub_node)
         elif node['op'] == 'outputsource':
             fileds = node['data']['output_source']
-            script = 'select '
-            for filed in fileds:
-                script += filed['dataField'] + ','
-            script = script.strip(',')
-            script += ' from ' + node_relation_dict[node['id']].table
+            script = '''def main_func (input0, spark,sc):
+    input0.write.mode("overwrite").saveAsTable('''+node_relation_dict[node['id']].table+''')'''
             inputs = {}
             index = 0
             input_list = t_s[node['id']]
@@ -184,7 +181,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             sub_node = {
                 "id": node['id'],
                 "name": node['name'],
-                "op": 'sql',
+                "op": 'pyspark',
                 "inputs": inputs,
                 "script":script
             }