123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- import re
- from app import schemas
- def joint_cron_expression(item: schemas.CronExpression):
- cron = ""
- if item.cron_select_type == 0:
- if item.minute is not None:
- cron += '0/'+str(item.minute) + ' *'
- elif item.hour is not None:
- cron += '0 0/'+str(item.hour)
- else: cron += '*'
- cron += ' * * *'
- elif item.cron_select_type == 1:
- if item.minute is not None:
- cron += str(item.minute)
- else: cron += '*'
- cron += ' '
- if item.hour is not None:
- cron += str(item.hour)
- else: cron += '*'
- cron += ' * * *'
- elif item.cron_select_type == 2:
- if item.minute is not None:
- cron += str(item.minute)
- else: cron += '*'
- cron += ' '
- if item.hour is not None:
- cron += str(item.hour)
- else: cron += '*'
- if item.week is not None:
- cron += ' ? * '
- cron += str(item.week)
- else: cron += ' * * *'
- elif item.cron_select_type == 3:
- if item.minute is not None:
- cron += str(item.minute)
- else: cron += '*'
- cron += ' '
- if item.hour is not None:
- cron += str(item.hour)
- else: cron += '*'
- cron += ' '
- if item.day is not None:
- cron += str(item.day)
- else: cron += '*'
- cron += ' '
- if item.month is not None:
- cron += '1/'+str(item.month)
- else: cron += '*'
- if item.day is not None:
- cron += ' ?'
- else: cron += ' *'
- else:
- cron = item.cron_expression
- match_obj = check_cron_expression(cron)
- if match_obj:
- return cron
- return "cron校验未通过"
- def check_cron_expression(cron_expression):
- print(cron_expression)
- cron_list = cron_expression.split(' ')
- unit_list = ['minute', 'hour', 'day', 'month', 'week']
- reg_list = [
- "^((([0-9]|[1-5][0-9])(\\,|\\-|\\/){1}([1-9]|[1-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$",
- "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([1-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$",
- "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$",
- "^((([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$",
- "^((([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$"
- ]
- for cron, unit, reg in zip(cron_list, unit_list, reg_list):
- match_obj = re.match(reg, cron)
- if match_obj is None:
- raise Exception(f'在{unit}的位置上,数据校验错误')
- return True
- def parsing_cron_expression(cron_expression):
- cron_list = cron_expression.split(' ')
- unit_list = ['minute', 'hour', 'day', 'month', 'week']
- cron_dict = {}
- for cron, unit in zip(cron_list, unit_list):
- if not bool(re.match('^((\\*)|(\\?))$',cron)):
- cron_dict.update({unit: cron.replace('0/','').replace('1/','')})
- return cron_dict
|