1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import datetime
- import os
- import shutil
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- from apscheduler.schedulers.background import BackgroundScheduler
- app = FastAPI(title="定时删除airflow日志")
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- def delete_log():
- now = datetime.datetime.now()
- ts = now.strftime('%Y-%m-%d %H:%M:%S')
- scheduler_path = '/logs/scheduler'
- logs_scheduler_exists = os.path.exists(scheduler_path)
- if not logs_scheduler_exists:
- print("不存在存放日志的文件夹")
- else:
- dt_today = datetime.date.today()
- dt_delete = dt_today - datetime.timedelta(5)
- file_name = dt_delete.strftime('%Y-%m-%d')
- dir_path = scheduler_path + '/' + file_name
- while os.path.exists(dir_path):
- print(dir_path)
- try:
- shutil.rmtree(dir_path)
- except OSError as e:
- print("Error: %s : %s" % (dir_path, e.strerror))
- dt_delete = dt_delete - datetime.timedelta(1)
- file_name = dt_delete.strftime('%Y-%m-%d')
- dir_path = scheduler_path + '/' + file_name
- print(f'The scheduled task is successfully executed at time {ts}')
- #创建调度器:BackgroundScheduler
- scheduler = BackgroundScheduler()
- #添加任务,时间间隔2S
- scheduler.add_job(delete_log, 'cron', hour='1', id='delete_job')
- scheduler.start()
- print('任务创建成功')
- # Get 健康检查
- @app.get("/cron/ping", description="健康检查")
- def ping():
- return "pong!!"
- @app.post("/cron/execute", description="执行一次删除日志")
- def cron_execute():
- delete_log()
- return "success"
- @app.post("/cron/pause", description="停止定时任务")
- def cron_execute():
- scheduler.pause_job(job_id='delete_job')
- return "success"
- @app.post("/cron/resume", description="恢复定时任务")
- def cron_execute():
- scheduler.resume_job(job_id='delete_job')
- return "success"
|