import datetime import os import shutil from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from apscheduler.schedulers.background import BackgroundScheduler app = FastAPI(title="定时删除airflow日志") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def delete_log(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') scheduler_path = '/logs/scheduler' logs_scheduler_exists = os.path.exists(scheduler_path) if not logs_scheduler_exists: print("不存在存放日志的文件夹") else: dt_today = datetime.date.today() dt_delete = dt_today - datetime.timedelta(5) file_name = dt_delete.strftime('%Y-%m-%d') dir_path = scheduler_path + '/' + file_name while os.path.exists(dir_path): print(dir_path) try: shutil.rmtree(dir_path) except OSError as e: print("Error: %s : %s" % (dir_path, e.strerror)) dt_delete = dt_delete - datetime.timedelta(1) file_name = dt_delete.strftime('%Y-%m-%d') dir_path = scheduler_path + '/' + file_name print(f'The scheduled task is successfully executed at time {ts}') #创建调度器:BackgroundScheduler scheduler = BackgroundScheduler() #添加任务,时间间隔2S scheduler.add_job(delete_log, 'cron', hour='1', id='delete_job') scheduler.start() print('任务创建成功') # Get 健康检查 @app.get("/cron/ping", description="健康检查") def ping(): return "pong!!" @app.post("/cron/execute", description="执行一次删除日志") def cron_execute(): delete_log() return "success" @app.post("/cron/pause", description="停止定时任务") def cron_execute(): scheduler.pause_job(job_id='delete_job') return "success" @app.post("/cron/resume", description="恢复定时任务") def cron_execute(): scheduler.resume_job(job_id='delete_job') return "success"