Procházet zdrojové kódy

1. 完成了任务触发
2. airflow接口打通
3. 重构了日志模块

luoyulong před 2 roky
rodič
revize
6cc188a193

+ 6 - 5
app/core/airflow/job.py

@@ -1,6 +1,7 @@
 import os
 import stat
 from app.core.airflow.task import *
+from app.core.airflow.uri import get_job_path
 from app.schemas import AirflowJob
 
 
@@ -13,9 +14,9 @@ class AirflowJobSubmitter:
                        'sparks': SparksTaskCompiler,
                        'java': JavaTaskCompiler}
 
-        nodes = [m_compilers[task.task_type](item=task).translate(item.job_mode or 1) for task in item.tasks if
+        nodes = [m_compilers[task.task_type](item=task).translate(job_id=item.id,task_mode=item.job_mode or 1) for task in item.tasks if
                  task.task_type != 'sparks']
-        spark_nodes = [SparksTaskCompiler(item=task).translate(item.job_mode or 1) for task in item.tasks if
+        spark_nodes = [SparksTaskCompiler(item=task).translate(job_id=item.id,task_mode=item.job_mode or 1) for task in item.tasks if
                        task.task_type == 'sparks']
         edges = []
         for edge in item.dependence:
@@ -33,7 +34,7 @@ class AirflowJobSubmitter:
         # }
         parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
                       'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
-                      'interval': item.cron
+                      'interval': item.cron if item.cron != 'None' else None
                       }
 
         env = Environment(
@@ -44,9 +45,9 @@ class AirflowJobSubmitter:
         dag_content = template.render(parameters)
         print(f'finish build:{dag_content}')
 
-        dag_path = '/dags/'
-        output_path = dag_path + f'dag_{item.id}.py'
+        output_path = get_job_path(job_id=item.id)
         with open(output_path, "w") as fh:
             fh.write(dag_content)
+
         os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
         print(f'write dag to {output_path}')

+ 19 - 12
app/core/airflow/task.py

@@ -20,7 +20,7 @@ class TaskCompiler:
         template = env.get_template(template_file)
         return template.render(parameters)
 
-    def translate(self, task_mode=1):
+    def translate(self, job_id, task_mode=1):
         return {'image': self.task.run_image or self.default_image,
                 'cmds': ["/bin/bash", "-c", f"{self.task.cmd or self.default_cmd} "],
                 'script': self.task.script,
@@ -60,12 +60,11 @@ class DataXTaskCompiler(TaskCompiler):
     def __init__(self, item: AirflowTask):
         super(DataXTaskCompiler, self).__init__(item)
         self.default_image = 'SXKJ:32775/pod_datax:0.9'
-        # self.default_cmd = f"cd datax/bin && echo $SCRIPT > config.json && echo " \
-        #                    f"\"\'\"$HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json" \
-        #                    f"\"\'\" |xargs bash -c "
-        self.default_cmd = f"cd datax/bin && echo \"$SCRIPT\" > transform_datax.py &&cat  transform_datax.py  && python3 transform_datax.py && cat config.json && $HOME/conda/envs/py27/bin/python datax.py {self.task.cmd_parameters} config.json"
+        self.default_cmd = f"cd datax/bin && echo \"$SCRIPT\" > transform_datax.py &&cat  transform_datax.py  && " \
+                           f"python3 transform_datax.py && cat config.json && $HOME/conda/envs/py27/b" \
+                           f"in/python datax.py {self.task.cmd_parameters} config.json "
 
-    def translate(self, task_mode=1):
+    def translate(self, job_id, task_mode=1):
         print(f'{self.task.envs}')
         script_str = self.render_spark_script(
             parameters={'script': self.task.script,
@@ -78,8 +77,7 @@ class DataXTaskCompiler(TaskCompiler):
                         'partition_diff': self.task.envs.get('partition_diff', None),
                         },
             template_file="transform_datax.py.jinja2")
-        # with open('./auto_generate_demo.py','w') as f:
-        #     f.write(script_str)
+
         res = {'image': self.task.run_image or self.default_image,
                'cmds': ["/bin/bash", "-c", f"{self.task.cmd or self.default_cmd} "],
                'script': script_str,
@@ -120,7 +118,7 @@ class SparksTaskCompiler(TaskCompiler):
         basic_cmds = "cd /home/sxkj/bigdata && echo \"$SCRIPT\" > run.py && ${SPARK_HOME}/bin/spark-submit"
         self.cmd_str = lambda name: f"{basic_cmds} --name {name} {param_str} run.py"
 
-    def translate(self, task_mode=1):
+    def translate(self, job_id, task_mode=1):
         # dag_script = {
         #     "sub_nodes": [
         #         {
@@ -152,16 +150,22 @@ class SparksTaskCompiler(TaskCompiler):
         # }
         infos = json.loads(self.task.script)
         sub_nodes = []
+        skip_nodes = []
         for info in infos['sub_nodes']:
+            if info.get('skip', False):
+                skip_nodes.append(info["id"])
+                continue
             if info['op'] == 'sql':
                 inputs = {}
                 template_file = 'sql_script_template.py.jinja2'
             elif info['op'] == 'pyspark':
-                inputs = {k: spark_result_tb_name(self.task.id, *v, task_mode) for k, v in info['inputs'].items()}
+                inputs = {k: spark_result_tb_name(job_id=job_id, task_id=self.task.id, spark_node_id=v[0],
+                                                  out_pin=v[1], is_tmp=task_mode) for k, v in info['inputs'].items()}
                 template_file = 'pyspark_script_template.py.jinja2'
             else:
                 continue
-            outputs = [spark_result_tb_name(self.task.id, info['id'], 0, task_mode)]
+            outputs = [spark_result_tb_name(job_id=job_id, task_id=self.task.id, spark_node_id=info['id'],
+                                            out_pin=0, is_tmp=task_mode)]
             sub_node = {
                 'id': f'{self.task.id}_{info["id"]}',
                 'name': info['name'],
@@ -175,7 +179,10 @@ class SparksTaskCompiler(TaskCompiler):
             }
             sub_nodes.append(sub_node)
 
-        edges = [(f'{self.task.id}_{source}', f'{self.task.id}_{sink}') for (source, sink) in infos['edges']]
+        edges = []
+        for (source, sink) in infos['edges']:
+            if source not in skip_nodes and sink not in skip_nodes:
+                edges.append((f'{self.task.id}_{source}', f'{self.task.id}_{sink}'))
         return {
             "id": self.task.id,
             "sub_nodes": sub_nodes,

+ 3 - 2
app/core/airflow/templates/dag_template.py.jinja2

@@ -17,6 +17,7 @@ def task_finish_alert(context):
         "dag_id": ti.dag_id,
         "task_id": ti.task.task_id,
         "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
+        "af_run_id": context["run_id"],
         "start_time": ti.start_date.timestamp(),
         "end_time": ti.end_date.timestamp(),
         "status": ti.current_state()
@@ -32,6 +33,7 @@ def dag_begin_alert(context):
         "dag_id": ti.dag_id,
         "task_id": ti.task.task_id,
         "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
+        "af_run_id": context["run_id"],
         "start_time": ti.start_date.timestamp(),
         "end_time": ti.end_date.timestamp(),
         "status": ti.current_state()
@@ -45,7 +47,7 @@ namespace = conf.get("kubernetes", "NAMESPACE")
 name = "dag_user{{ user_name }}"
 
 # instantiate the DAG
-with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',dag_id="{{ dag_id }}",is_paused_upon_creation= not {{ trigger_status }}) as dag:
+with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if {{ interval }} is None else '{{ interval }}',dag_id="{{ dag_id }}",is_paused_upon_creation= not {{ trigger_status }}) as dag:
     op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
 
     {% for spark_node in spark_nodes %}
@@ -93,7 +95,6 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',
         env_vars={{ node['env'] }},
         on_success_callback=task_finish_alert,
         on_failure_callback=task_finish_alert
-{#        on_failure_callback=task_finish_alert#}
         )
     op_start >> {{ node['operator_name'] }}
     {% endfor %}

+ 17 - 3
app/core/airflow/templates/transform_datax.py.jinja2

@@ -2,12 +2,26 @@ import json
 from datetime import datetime, timedelta
 
 first_begin_time =  {{ first_begin_time }}   # 100
-last_time = {{ "{{ prev_execution_date.timestamp() }}" }}
+
+{{ '
+print("{{ prev_execution_date_success }}")
+' }}
+
+{{ '
+{% if prev_execution_date_success is defined %}
+last_time = {{  prev_execution_date_success.timestamp() }}
+{% else %}
+last_time = None
+{% endif %}
+'}}
+
+
 current_time = {{ "{{ execution_date.timestamp() }}" }}
 
 partition_word = '{{ partition_word }}' #'datety'
 partition_format = '{{ partition_format }}'  #'%Y-%m-%d'
 partition_diff = {{ partition_diff }} #1
+partition_diff = partition_diff if partition_diff is not None else  1
 
 last_key = '${'+'{{last_key}}'+'}'
 current_key = '${'+'{{current_key}}'+'}'
@@ -17,8 +31,8 @@ partition_key = '${'+'{{partition_key}}'+'}'
 
 datax_config = {{ script }}
 datax_config_str = json.dumps(datax_config)
-if last_key is not None:
-    now = datetime.utcfromtimestamp(datetime.timestamp(datetime.now())) + timedelta(hours=8, days=partition_diff or 1)
+if last_key is not None or last_key !='${None}':
+    now = datetime.utcfromtimestamp(datetime.timestamp(datetime.now())) + timedelta(hours=8, days=partition_diff)
     partition_value = f'{partition_word}={now.strftime(partition_format)}'
     last_time = last_time or first_begin_time
     datax_config_str = datax_config_str.replace(last_key, f'{last_time}')

+ 17 - 10
app/core/airflow/uri.py

@@ -5,22 +5,14 @@ import requests
 from app.common.minio import FileHandler
 
 
-def spark_result_tb_name(task_id, spark_node_id, out_pin, is_tmp=False):
-    return f'task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
+def spark_result_tb_name(job_id,task_id, spark_node_id, out_pin, is_tmp=False):
+    return f'job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
 
 
 def get_sub_task_script_uri(task_id, sub_node_id):
     return f'/xxx/task_{task_id}/sub_{sub_node_id}.py'
 
 
-# def get_spark_sub_task_inputs(task_id, sub_node_id, ):
-#     return f'/xxx/results/tmp/task_{task_id}/sub_{sub_node_id}/'
-#
-#
-# def get_spark_sub_task_outputs(task_id, sub_node_id, number):
-#     return [f'/xxx/results/tmp/task_{task_id}/sub_{sub_node_id}/result_{i}' for i in range(number)]
-#
-
 def get_remote_file(uri, return_json=False):
     data = requests.get(uri).content
     return json.loads(data) if return_json else data
@@ -29,3 +21,18 @@ def get_remote_file(uri, return_json=False):
 def upload2oss(content: bytes, uri: str, minio_bucket: str):
     minio_handler = FileHandler(bucket_name=minio_bucket)
     minio_handler.put_byte_file(file_name=uri, file_content=content)
+
+
+def get_job_path(job_id):
+    dag_path = '/dags/'
+    return dag_path + f'dag_{job_id}.py'
+
+
+def get_airflow_api_info():
+    uri_prefix = f'http://192.168.199.109/api/v1'
+    headers = {
+        'content-type': 'application/json',
+        'Authorization': 'basic YWRtaW46YWRtaW4=',
+        'Host': 'airflow-web.sxkj.com'
+    }
+    return uri_prefix, headers

+ 1 - 1
app/core/k8s/k8s_client.py

@@ -76,7 +76,7 @@ class KubernetesTools(object):
     #     log_content = api.read_namespaced_pod_log(pod_name, namespaces, pretty=True, tail_lines=200)
     #     return log_content
 
-    def get_pod_logs(self, namespaces, labels: dict, tail_lines=200):
+    def get_pod_logs(self, namespaces, labels: dict, tail_lines=10000):
         """
         查看pod日志
         :param namespaces: 命令空间,比如:default

+ 9 - 0
app/crud/af_job.py

@@ -29,5 +29,14 @@ def update_airflow_job(db: Session, item_id: int, update_item: schemas.AirflowJo
     return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowJob)
 
 
+def delete_airflow_job(db: Session, item_id: int):
+    item = get_airflow_job_once(item_id=item_id, db=db)
+    if not item:
+        raise Exception(f"delete failed, job {item.id} not found")
+    db.delete(item)
+    db.commit()
+    db.flush()
+
+
 def create_airflow_job_submit(item: schemas.AirflowJob):
     AirflowJobSubmitter.submit_dag(item)

+ 14 - 3
app/crud/af_run.py

@@ -8,9 +8,11 @@ from app.crud import update_to_db
 
 def create_airflow_run(db: Session, item: schemas.AirflowRunCreate):
     db_item = models.AirflowRun(**item.dict())
+    print(f'create_airflow_run-before:{db_item.to_dict()}')
     db.add(db_item)
     db.commit()
     db.refresh(db_item)
+    print(f'create_airflow_run-after:{db_item.to_dict()}')
     return db_item
 
 
@@ -23,7 +25,16 @@ def get_airflow_runs(db: Session):
     return res
 
 
-def get_airflow_run_once(db: Session, run_id: str, job_id: int):
-    res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.run_id == run_id,
-                                                                models.AirflowRun.job_id == job_id).first()
+def get_airflow_run_once_normal_mode(db: Session, af_run_id: str):
+    res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.af_run_id == af_run_id).first()
+    return res
+
+
+def get_airflow_run_once_debug_mode(db: Session, job_id: int):
+    res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.job_id == job_id).first()
+    return res
+
+
+def get_airflow_run_once(db: Session, item_id: int):
+    res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.id == item_id).first()
     return res

+ 4 - 0
app/models/af_job.py

@@ -41,3 +41,7 @@ class AirflowJob(BaseModel):
 
     def __int__(self):
         pass
+
+    def get_job_path(self):
+        dag_path = '/dags/'
+        return dag_path + f'dag_{self.id}.py'

+ 10 - 7
app/models/af_run.py

@@ -1,15 +1,18 @@
-from sqlalchemy import Column, Integer, String, Text, JSON
+from sqlalchemy import Column, Integer, String,  JSON, Float
+from sqlalchemy.dialects.mysql import DOUBLE
+
 from app.models.database import BaseModel
 
 
 class AirflowRun(BaseModel):
     __tablename__ = "airflow_run"
     id = Column(Integer, primary_key=True, index=True)
-    start_time = Column(Integer)  # 开始时间
-    end_time = Column(Integer)  # 结束时间
-    job_id = Column(String(60))  # 所属任务
-    run_id = Column(String(60))  # run id
+    start_time = Column(DOUBLE)  # 开始时间
+    end_time = Column(DOUBLE)  # 结束时间
+    job_id = Column(Integer)  # 所属任务
+    af_run_id = Column(String(60))  # run id in airflow
+    run_ts = Column(String(60))  # run timestamp
     status = Column(Integer) # 0未开始  1运行中 2成功 3失败
     details = Column(JSON)  # 任务执行详情
-    # details= { tasks:{task_id:{"logs":, "status": }}, dependence:[(t1->t2,t2->t3)]
-    # }
+
+

+ 54 - 7
app/routers/job.py

@@ -1,11 +1,12 @@
+import datetime
 import json
-
+import os
 import requests
 from fastapi import APIRouter, Depends
 from fastapi_pagination import paginate, Params
 from sqlalchemy.orm import Session
-
 from app import schemas, get_db, crud
+from app.core.airflow.uri import get_job_path, get_airflow_api_info
 from app.crud import create_airflow_job_submit
 from utils import web_try, sxtimeit
 
@@ -22,6 +23,19 @@ def get_af_jobs(params: Params = Depends(), db: Session = Depends(get_db)):
     return paginate(crud.get_airflow_jobs(db), params)
 
 
+@router_af_job.get("/{item_id}/last_parsed_time")
+@web_try()
+@sxtimeit
+def get_af_jobs(item_id: int):
+    uri_prefix, headers = get_airflow_api_info()
+    url = f'{uri_prefix}/dags/dag_{item_id}'
+    rets = requests.get(url, headers=headers).json()
+    if 'last_parsed_time' in rets:
+        last_parsed_time_str = rets['last_parsed_time']
+        last_parsed_time = datetime.datetime.strptime(last_parsed_time_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
+    else:
+        last_parsed_time=  None
+    return {"last_parsed_time": last_parsed_time}
 @router_af_job.get("/getOnce/{item_id}")
 @web_try()
 @sxtimeit
@@ -49,7 +63,6 @@ def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Sessi
 def add_dag_submit(id: int, db: Session = Depends(get_db)):
     item = crud.get_airflow_job_once(db, id)
     create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
-    # return crud.create_airflow_job(item)
 
 
 @router_af_job.post("/{item_id}/run")
@@ -61,15 +74,49 @@ def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)):
     headers = {
         'content-type': 'application/json',
         'Authorization': 'basic YWRtaW46YWRtaW4=',
-        'Host':'airflow-web.sxkj.com'
+        'Host': 'airflow-web.sxkj.com'
     }
 
     response = requests.post(uri, headers=headers, data=json.dumps({}))
     return response.json()
 
+
 #
-# @router_af_job.post("/run")
+# @router_af_job.post("/{item_id}/update_and_run")
 # @web_try()
 # @sxtimeit
-# def trigger_af_job_run(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
-#     return crud.create_airflow_job(db, item)
+# def update_and_trigger_job_run(item_id: int, db: Session = Depends(get_db)):
+#     job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
+#     uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
+#     headers = {
+#         'content-type': 'application/json',
+#         'Authorization': 'basic YWRtaW46YWRtaW4=',
+#         'Host': 'airflow-web.sxkj.com'
+#     }
+#
+#     response = requests.post(uri, headers=headers, data=json.dumps({}))
+#     return response.json()
+
+
+@router_af_job.delete("/{item_id}")
+@web_try()
+@sxtimeit
+def delete_af_job(item_id: int, db: Session = Depends(get_db)):
+    uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}'
+    headers = {
+        'content-type': 'application/json',
+        'Authorization': 'basic YWRtaW46YWRtaW4=',
+        'Host': 'airflow-web.sxkj.com'
+    }
+    requests.delete(uri, headers=headers)
+    os.remove(get_job_path(job_id=item_id))
+    crud.delete_airflow_job(db, item_id)
+
+
+@router_af_job.patch("/{item_id}/pause/{pause}")
+@web_try()
+@sxtimeit
+def pause_af_job(pause: bool, item_id: int):
+    uri_prefix, headers = get_airflow_api_info()
+    url = f'{uri_prefix}/dags/dag_{item_id}'
+    return requests.patch(url, headers=headers, data=json.dumps({"is_paused": pause})).json()

+ 46 - 14
app/routers/run.py

@@ -1,9 +1,12 @@
 import json
+
+import requests
 from fastapi import APIRouter, Depends
 from fastapi_pagination import paginate, Params
 from pydantic import BaseModel
 from sqlalchemy.orm import Session
 from app import schemas, get_db, crud
+from app.core.airflow.uri import get_airflow_api_info
 from app.core.k8s.k8s_client import KubernetesTools
 from utils import web_try, sxtimeit
 
@@ -25,6 +28,16 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
     return paginate(crud.get_airflow_tasks(db), params)
 
 
+@router_af_run.get("/{run_id}/state")
+@web_try()
+@sxtimeit
+def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
+    item = crud.get_airflow_run_once(item_id=run_id, db=db)
+    uri_prefix, headers = get_airflow_api_info()
+    url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
+    return {"state": requests.get(url, headers=headers).json().get('state', None)}
+
+
 @router_af_run.post("/")
 @web_try()
 @sxtimeit
@@ -32,36 +45,56 @@ def add_airflow_run(item: Item, db: Session = Depends(get_db)):
     print(item.data)
     job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
     sparks_dependence = {}
-    if job_item is not None:
+    debug_run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
+
+    if job_item.job_mode == 1 or (job_item.job_mode == 2 and debug_run is None):  # 常规模式
         for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
             if task.task_type == 'sparks':
-                sparks_info = json.loads(task.script)
+                sparks = json.loads(task.script)
                 dependence = []
-                for (source, sink) in sparks_info['edges']:
+                for (source, sink) in sparks['edges']:
                     dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
                 sparks_dependence[task.id] = dependence
 
-        item = schemas.AirflowRunCreate(**{"start_time": int(item.data["start_time"]),
-                                           "job_id": item.data["job_id"],
-                                           "run_id": item.data['run_ts'],
+        item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
+                                           "job_id": int(item.data["job_id"]),
+                                           "run_ts": item.data['run_ts'],
+                                           "af_run_id": item.data['af_run_id'],
                                            "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
                                                                                    "sparks": sparks_dependence}},
-                                           "status": "1"},
+                                           "status": 1},
                                         )
         crud.create_airflow_run(db, item)
 
+    elif job_item.job_mode == 2:  # 调试模式
+        run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
+        sparks_task = schemas.AirflowTask(**job_item.tasks[0])
+        assert sparks_task.task_type == 'sparks'
+        sparks = json.loads(sparks_task.script)
+        sparks_dependence[sparks_task.id] = [[f'{sparks_task.id}_{s}', f'{sparks_task.id}_{k}'] for (s, k) in
+                                             sparks['edges']]
+        spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']]
+        run.details['dependence']['sparks'] = sparks_dependence
+        run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes}
+        update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": 1})
+        crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
+
 
 @router_af_run.post("/notification")
 @web_try()
 @sxtimeit
 def add_notification(item: Item, db: Session = Depends(get_db)):
-    print(f'input : {item.data} ')
     k8s_tool = KubernetesTools()
-    labels = {"dag_id": item.data['dag_id'],
-              "task_id": item.data['task_id'],
-              "run_ts": item.data['run_ts']}
+    labels = {"dag_id": item.data['dag_id'], "task_id": item.data['task_id'], "run_ts": item.data['run_ts']}
     logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
-    run = crud.get_airflow_run_once(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
+    job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
+
+    if job_item.job_mode == 1:  # normal model, one job-> many runs
+        run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
+    elif job_item.job_mode == 2:  # debug model, one job-> one run
+        run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
+    else:
+        run = None
 
     if run is not None:
         update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
@@ -70,11 +103,10 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
                                                              "end_time": item.data["end_time"],
                                                              "status": item.data['status']}
         crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
-    return
 
 
 @router_af_run.post("/sigal")
 @web_try()
 @sxtimeit
-def add_notification(item: Item, db: Session = Depends(get_db)):
+def add_notification(item: Item):
     print(f'receive sigal: {item.data} ')

+ 80 - 0
app/routers/task_run.py

@@ -0,0 +1,80 @@
+import json
+from fastapi import APIRouter, Depends
+from fastapi_pagination import paginate, Params
+from pydantic import BaseModel
+from sqlalchemy.orm import Session
+from app import schemas, get_db, crud
+from app.core.k8s.k8s_client import KubernetesTools
+from utils import web_try, sxtimeit
+
+
+class Item(BaseModel):
+    data: dict
+
+
+router_af_run = APIRouter(
+    prefix="/jpt/af_run",
+    tags=["airflow_run-运行管理"],
+)
+
+
+@router_af_run.get("/")
+@web_try()
+@sxtimeit
+def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
+    return paginate(crud.get_airflow_tasks(db), params)
+
+
+@router_af_run.post("/")
+@web_try()
+@sxtimeit
+def add_airflow_run(item: Item, db: Session = Depends(get_db)):
+    print(item.data)
+    job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
+    sparks_dependence = {}
+    if job_item is not None:
+        for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
+            if task.task_type == 'sparks':
+                sparks_info = json.loads(task.script)
+                dependence = []
+                for (source, sink) in sparks_info['edges']:
+                    dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
+                sparks_dependence[task.id] = dependence
+
+        item = schemas.AirflowRunCreate(**{"start_time": int(item.data["start_time"]),
+                                           "job_id": item.data["job_id"],
+                                           "run_id": item.data['run_ts'],
+                                           "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
+                                                                                   "sparks": sparks_dependence}},
+                                           "status": "1"},
+                                        )
+        crud.create_airflow_run(db, item)
+
+
+@router_af_run.post("/notification")
+@web_try()
+@sxtimeit
+def add_notification(item: Item, db: Session = Depends(get_db)):
+    print(f'input : {item.data} ')
+    k8s_tool = KubernetesTools()
+    labels = {"dag_id": item.data['dag_id'],
+              "task_id": item.data['task_id'],
+              "run_ts": item.data['run_ts']}
+    logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
+    run = crud.get_airflow_run_once_normal_mode(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
+
+    if run is not None:
+        update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
+        update_run.details['tasks'][item.data['task_id']] = {"log": logs,
+                                                             "start_time": item.data["start_time"],
+                                                             "end_time": item.data["end_time"],
+                                                             "status": item.data['status']}
+        crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
+    return
+
+
+@router_af_run.post("/sigal")
+@web_try()
+@sxtimeit
+def add_notification(item: Item, db: Session = Depends(get_db)):
+    print(f'receive sigal: {item.data} ')

+ 1 - 0
app/schemas/af_job.py

@@ -38,3 +38,4 @@ class AirflowJob(AirflowJobBase):
 
     class Config:
         orm_mode = True
+

+ 9 - 7
app/schemas/af_run.py

@@ -6,9 +6,10 @@ class AirflowRunBase(BaseModel):
 
 
 class AirflowRunCreate(AirflowRunBase):
-    start_time: int
-    job_id: str
-    run_id: str
+    start_time: float
+    job_id: int
+    af_run_id: str
+    run_ts: str
 
 
 class AirflowRunUpdate(AirflowRunBase):
@@ -18,10 +19,11 @@ class AirflowRunUpdate(AirflowRunBase):
 
 class AirflowRun(AirflowRunBase):
     id: int
-    start_time: int
-    end_time: int
-    job_id: str
-    run_id: str
+    start_time: float
+    end_time: float
+    job_id: int
+    af_run_id: str
+    run_ts: str
     status: int
 
     class Config:

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 27 - 0
auo_tests/jupyters/01-update-job.ipynb


Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 75 - 0
auo_tests/jupyters/03-edit-sparks.ipynb


+ 38 - 0
auo_tests/jupyters/spark_script_1012.py

@@ -0,0 +1,38 @@
+from pyspark.sql.types import *
+from pyspark.ml.classification import LogisticRegression
+from pyspark.ml.feature import VectorAssembler
+from pyspark.ml import Pipeline
+from pyspark.sql.functions import udf, col
+from pyspark.sql import DataFrame
+
+
+def to_array(col):
+    def to_array_(v):
+        return v.toArray().tolist()
+
+    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
+
+
+def main_func(train_df: DataFrame, test_df: DataFrame, spark):
+    feat_cols = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
+                 'feature9']
+
+    vector_assembler = VectorAssembler().setInputCols(feat_cols).setOutputCol("features")
+
+    #### 训练 ####
+    print("step 1")
+    lr = LogisticRegression(regParam=0.01, maxIter=100)  # regParam 正则项参数
+    pipeline = Pipeline(stages=[vector_assembler, lr])
+    model = pipeline.fit(train_df)
+
+    # 打印参数
+    print("\n-------------------------------------------------------------------------")
+    print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
+    print("-------------------------------------------------------------------------\n")
+
+    #### 预测, 保存结果 ####
+    print("step 2")
+    labels_and_preds = model.transform(test_df).withColumn("probability_xj", to_array(col("probability"))[1]) \
+        .select("uuid", "label", "prediction", "probability_xj")
+
+    return [labels_and_preds]

Některé soubory nejsou zobrazeny, neboť je v těchto rozdílových datech změněno mnoho souborů