cron_utils.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import re
  2. from app import schemas
  3. def joint_cron_expression(item: schemas.CronExpression):
  4. cron = ""
  5. if item.cron_select_type == 0:
  6. if item.minute is not None:
  7. cron += '0/'+str(item.minute) + ' *'
  8. elif item.hour is not None:
  9. cron += '0 0/'+str(item.hour)
  10. else: cron += '*'
  11. cron += ' * * *'
  12. elif item.cron_select_type == 1:
  13. if item.minute is not None:
  14. cron += str(item.minute)
  15. else: cron += '*'
  16. cron += ' '
  17. if item.hour is not None:
  18. cron += str(item.hour)
  19. else: cron += '*'
  20. cron += ' * * *'
  21. elif item.cron_select_type == 2:
  22. if item.minute is not None:
  23. cron += str(item.minute)
  24. else: cron += '*'
  25. cron += ' '
  26. if item.hour is not None:
  27. cron += str(item.hour)
  28. else: cron += '*'
  29. if item.week is not None:
  30. cron += ' ? * '
  31. cron += str(item.week)
  32. else: cron += ' * * *'
  33. elif item.cron_select_type == 3:
  34. if item.minute is not None:
  35. cron += str(item.minute)
  36. else: cron += '*'
  37. cron += ' '
  38. if item.hour is not None:
  39. cron += str(item.hour)
  40. else: cron += '*'
  41. cron += ' '
  42. if item.day is not None:
  43. cron += str(item.day)
  44. else: cron += '*'
  45. cron += ' '
  46. if item.month is not None:
  47. cron += '1/'+str(item.month)
  48. else: cron += '*'
  49. if item.day is not None:
  50. cron += ' ?'
  51. else: cron += ' *'
  52. else:
  53. cron = item.cron_expression
  54. match_obj = check_cron_expression(cron)
  55. if match_obj:
  56. return cron
  57. return "cron校验未通过"
  58. def check_cron_expression(cron_expression):
  59. print(cron_expression)
  60. cron_list = cron_expression.split(' ')
  61. unit_list = ['minute', 'hour', 'day', 'month', 'week']
  62. reg_list = [
  63. "^((([0-9]|[1-5][0-9])(\\,|\\-|\\/){1}([1-9]|[1-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$",
  64. "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([1-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$",
  65. "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$",
  66. "^((([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$",
  67. "^((([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$"
  68. ]
  69. for cron, unit, reg in zip(cron_list, unit_list, reg_list):
  70. match_obj = re.match(reg, cron)
  71. if match_obj is None:
  72. raise Exception(f'在{unit}的位置上,数据校验错误')
  73. return True
  74. def parsing_cron_expression(cron_expression):
  75. cron_list = cron_expression.split(' ')
  76. unit_list = ['minute', 'hour', 'day', 'month', 'week']
  77. cron_dict = {}
  78. for cron, unit in zip(cron_list, unit_list):
  79. if not bool(re.match('^((\\*)|(\\?))$',cron)):
  80. cron_dict.update({unit: cron.replace('0/','').replace('1/','')})
  81. return cron_dict