import re from app import schemas def joint_cron_expression(item: schemas.CronExpression): cron = "" if item.cron_select_type == 0: if item.minute is not None: cron += '0/'+str(item.minute) + ' *' elif item.hour is not None: cron += '0 4-22/'+str(item.hour) else: cron += '*' cron += ' * * *' elif item.cron_select_type == 1: if item.minute is not None: cron += str(item.minute) else: cron += '*' cron += ' ' if item.hour is not None: cron += str(item.hour) else: cron += '*' cron += ' * * *' elif item.cron_select_type == 2: if item.minute is not None: cron += str(item.minute) else: cron += '*' cron += ' ' if item.hour is not None: cron += str(item.hour) else: cron += '*' if item.week is not None: cron += ' ? * ' cron += str(item.week) else: cron += ' * * *' elif item.cron_select_type == 3: if item.minute is not None: cron += str(item.minute) else: cron += '*' cron += ' ' if item.hour is not None: cron += str(item.hour) else: cron += '*' cron += ' ' if item.day is not None: cron += str(item.day) else: cron += '*' cron += ' ' if item.month is not None: cron += '1/'+str(item.month) else: cron += '*' if item.day is not None: cron += ' ?' else: cron += ' *' else: cron = item.cron_expression check_cron_expression(cron) check_cron_hour(cron) return cron # reg_list = [ # "^((([0-9]|[1-5][0-9])(\\,|\\-|\\/){1}([1-9]|[1-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$", # "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([1-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$", # "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$", # "^((([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$", # "^((([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$" # ] def check_cron_expression(cron_expression): cron_list = cron_expression.split(' ') unit_list = ['minute', 'hour', 'day', 'month', 'week'] reg_list = [ "^((([0-9]|[1-5][0-9])(\\,|\\-){1}([1-9]|[1-5][0-9])(\\/){1}([1-9]|[1-5][0-9]))|(([0-9]|[1-5][0-9])(\\,|\\-|\\/){1}([1-9]|[1-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$", "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-){1}([1-9]|[1][0-9]|2[0-3])(\\/){1}([1-9]|[1][0-9]|2[0-3]))|(([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([1-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$", "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-){1}([1-9]|[1-2][0-9]|3[01])(\\/){1}([1-9]|[1-2][0-9]|3[01]))|(([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$", "^((([1-9]|[1][0-2])(\\,|\\-){1}([1-9]|[1][0-2])(\\/){1}([1-9]|[1][0-2]))|(([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$", "^((([1-7])(\\,|\\-){1}([1-7])(\\/){1}([1-7]))|(([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$" ] for cron, unit, reg in zip(cron_list, unit_list, reg_list): match_obj = re.match(reg, cron) if match_obj is None: raise Exception(f'在{unit}的位置上,数据校验错误') def parsing_cron_expression(cron_expression): cron_list = cron_expression.split(' ') unit_list = ['minute', 'hour', 'day', 'month', 'week'] cron_dict = {} for cron, unit in zip(cron_list, unit_list): if not bool(re.match('^((\\*)|(\\?))$',cron)): cron_dict.update({unit: cron.replace('0/','').replace('1/','')}) return cron_dict def check_cron_hour(cron_expression): cron_list = cron_expression.split(' ') unit_list = ['minute', 'hour', 'day', 'month', 'week'] for cron, unit in zip(cron_list, unit_list): if unit == 'hour': print(cron) match_obj = re.match("^((([4-9]|[1][0-9]|2[0-2])(\\,|\\-){1}([4-9]|[1][0-9]|2[0-2])(\\/){1}([1-9]|[1][0-9]|2[0-3]))|(([4-9]|[1][0-9]|2[0-2])(\\,|\\-){1}([4-9]|[1][0-9]|2[0-2]))|(([4-9]|[1][0-9]|2[0-2])(\\/){1}([1-9]|[1][0-9]|2[0-3]))|([4-9]|[1][0-9]|2[0-2])|(\\*))$", cron) if match_obj is None: raise Exception('执行时间必须在每日4~22点内')