Jelajahi Sumber

定时删除日志

liweiquan 2 tahun lalu
induk
melakukan
cd02cc7b6f
7 mengubah file dengan 97 tambahan dan 4 penghapusan
  1. 14 0
      Dockerfile
  2. 0 2
      app/routers/job_log.py
  3. 68 0
      delete.py
  4. 2 1
      docker-compose.yml
  5. 1 0
      environment.yml
  6. 12 0
      run_delete.py
  7. 0 1
      server.py

+ 14 - 0
Dockerfile

@@ -79,6 +79,20 @@ environment=PYTHONUNBUFFERED=1\n\
 
 EXPOSE 8080
 
+RUN echo "\
+[program:cron]\n\
+directory=%(ENV_WORKDIR)s\n\
+command= /opt/conda/envs/py38/bin/gunicorn  delete:app -b 0.0.0.0:8000 --reload  -k utils.r_uvicorn_worker.RestartableUvicornWorker \n\
+autorestart=true\n\
+startretries=1000\n\
+redirect_stderr=true\n\
+stdout_logfile=/var/log/cron.log\n\
+stdout_logfile_maxbytes=5MB\n\
+environment=PYTHONUNBUFFERED=1\n\
+" > /etc/supervisor/conf.d/cron.conf
+
+EXPOSE 8000
+
 FROM builder2 as image-dev
 
 RUN apt-get update && apt-get install -y --no-install-recommends krb5-user

+ 0 - 2
app/routers/job_log.py

@@ -39,7 +39,6 @@ def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page),
     # 循环获取日志
     for af_job_run in af_job_runs:
         job_id = af_to_datax[int(af_job_run.job_id)]
-        print(f'job_id==>{job_id}')
         # 获取af_job
         job_log = None
         if len(af_job_run.details['tasks']) > 0:
@@ -47,7 +46,6 @@ def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page),
         else:
             af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
             task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
-            print(f"datax任务的作业{task['id']}")
             log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
             job_log = log_res['data'] if 'data' in log_res.keys() else None
         log = {

+ 68 - 0
delete.py

@@ -0,0 +1,68 @@
+import datetime
+import os
+import shutil
+from fastapi import FastAPI
+from fastapi.middleware.cors import CORSMiddleware
+from apscheduler.schedulers.background import BackgroundScheduler
+
+app = FastAPI(title="定时删除airflow日志")
+
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"],
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+def delete_log():
+    now = datetime.datetime.now()
+    ts = now.strftime('%Y-%m-%d %H:%M:%S')
+    scheduler_path = '/logs/scheduler'
+    logs_scheduler_exists = os.path.exists(scheduler_path)
+    if not logs_scheduler_exists:
+        print("不存在存放日志的文件夹")
+    else:
+        dt_today = datetime.date.today()
+        dt_delete = dt_today - datetime.timedelta(5)
+        file_name = dt_delete.strftime('%Y-%m-%d')
+        dir_path = scheduler_path + '/' + file_name
+        while os.path.exists(dir_path):
+            print(dir_path)
+            try:
+                shutil.rmtree(dir_path)
+            except OSError as e:
+                print("Error: %s : %s" % (dir_path, e.strerror))
+            dt_delete = dt_delete - datetime.timedelta(1)
+            file_name = dt_delete.strftime('%Y-%m-%d')
+            dir_path = scheduler_path + '/' + file_name
+    print(f'The scheduled task is successfully executed at time {ts}')
+
+#创建调度器:BackgroundScheduler
+scheduler = BackgroundScheduler()
+#添加任务,时间间隔2S
+scheduler.add_job(delete_log, 'cron', hour='1', id='delete_job')
+scheduler.start()
+print('任务创建成功')
+
+# Get 健康检查
+@app.get("/cron/ping", description="健康检查")
+def ping():
+    return "pong!!"
+
+@app.post("/cron/execute", description="执行一次删除日志")
+def cron_execute():
+    delete_log()
+    return "success"
+
+@app.post("/cron/pause", description="停止定时任务")
+def cron_execute():
+    scheduler.pause_job(job_id='delete_job')
+    return "success"
+
+@app.post("/cron/resume", description="恢复定时任务")
+def cron_execute():
+    scheduler.resume_job(job_id='delete_job')
+    return "success"
+
+

+ 2 - 1
docker-compose.yml

@@ -14,8 +14,9 @@ services:
     #   - /mnt/nfs/airflow-airflow-dags-pvc-b2638332-6249-4a45-b99e-7a54dc63482f/fc309d7dd0f5c1de9299e5e9a222a098faec1de0:/dags
     ports:
       - '18082:8080'
+      - '18083:8000'
       - '18224:22'
     extra_hosts:
       - 'minio-api.sxkj.com:192.168.199.109'
     environment:
-     - APP_ENV=development
+      - APP_ENV=development

+ 1 - 0
environment.yml

@@ -40,5 +40,6 @@ dependencies:
       - uvloop
       - kubernetes
       - httptools
+      - apscheduler
       - -i https://mirror.baidu.com/pypi/simple
 prefix: /opt/conda/envs/py38

+ 12 - 0
run_delete.py

@@ -0,0 +1,12 @@
+import uvicorn
+import argparse
+
+if __name__ == '__main__':
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--host', default='0.0.0.0')
+    parser.add_argument('--port', default=8000)
+    opt = parser.parse_args()
+
+    app_str = 'delete:app'  # make the app string equal to whatever the name of this file is
+    uvicorn.run(app_str, host=opt.host, port=int(opt.port), reload=True)

+ 0 - 1
server.py

@@ -18,7 +18,6 @@ import app.routers.jm_job_log as router_jm_job_log
 from app.routers.run import router_af_run
 from app.routers.job import router_af_job
 from app.routers.task import router_af_task
-from app.utils.get_kerberos import get_kerberos_to_local
 from utils.sx_log import format_print
 
 format_print()