delete.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import datetime
  2. import os
  3. import shutil
  4. from fastapi import FastAPI
  5. from fastapi.middleware.cors import CORSMiddleware
  6. from apscheduler.schedulers.background import BackgroundScheduler
  7. app = FastAPI(title="定时删除airflow日志")
  8. app.add_middleware(
  9. CORSMiddleware,
  10. allow_origins=["*"],
  11. allow_credentials=True,
  12. allow_methods=["*"],
  13. allow_headers=["*"],
  14. )
  15. def delete_log():
  16. now = datetime.datetime.now()
  17. ts = now.strftime('%Y-%m-%d %H:%M:%S')
  18. scheduler_path = '/logs/scheduler'
  19. logs_scheduler_exists = os.path.exists(scheduler_path)
  20. if not logs_scheduler_exists:
  21. print("不存在存放日志的文件夹")
  22. else:
  23. dt_today = datetime.date.today()
  24. dt_delete = dt_today - datetime.timedelta(5)
  25. file_name = dt_delete.strftime('%Y-%m-%d')
  26. dir_path = scheduler_path + '/' + file_name
  27. while os.path.exists(dir_path):
  28. print(dir_path)
  29. try:
  30. shutil.rmtree(dir_path)
  31. except OSError as e:
  32. print("Error: %s : %s" % (dir_path, e.strerror))
  33. dt_delete = dt_delete - datetime.timedelta(1)
  34. file_name = dt_delete.strftime('%Y-%m-%d')
  35. dir_path = scheduler_path + '/' + file_name
  36. print(f'The scheduled task is successfully executed at time {ts}')
  37. #创建调度器:BackgroundScheduler
  38. scheduler = BackgroundScheduler()
  39. #添加任务,时间间隔2S
  40. scheduler.add_job(delete_log, 'cron', hour='1', id='delete_job')
  41. scheduler.start()
  42. print('任务创建成功')
  43. # Get 健康检查
  44. @app.get("/cron/ping", description="健康检查")
  45. def ping():
  46. return "pong!!"
  47. @app.post("/cron/execute", description="执行一次删除日志")
  48. def cron_execute():
  49. delete_log()
  50. return "success"
  51. @app.post("/cron/pause", description="停止定时任务")
  52. def cron_execute():
  53. scheduler.pause_job(job_id='delete_job')
  54. return "success"
  55. @app.post("/cron/resume", description="恢复定时任务")
  56. def cron_execute():
  57. scheduler.resume_job(job_id='delete_job')
  58. return "success"