1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- #!/usr/bin/python3
- # -*- coding:utf-8 -*-
- import datetime
- import pyspark.sql.functions as func
- from pyspark.sql import DataFrame
- from yibu.core import Filter, PinTypeEnum, Conf, YibuException, Col, SparkDataFrame
- from yibu.enum.col_type import ColTypeEnum
- class OrderCoreData(SparkDataFrame):
- """京慧3.0订单核心类"""
- order_no: Col = Col(ColTypeEnum.Str)
- fkey: Col = Col(ColTypeEnum.Str) # 预测的主关键字
- order_date: Col = Col(ColTypeEnum.Date)
- qty: Col = Col(ColTypeEnum.Float)
- datetime_type: Col = Col(ColTypeEnum.Int) # 当前数据的维度
- class NonSaleConf(Conf):
- num_day: int = 90
- # 是否执行动销过滤
- able: bool = True
- # 输出规则
- default_out: list = [0]
- pass
- class NonSaleError(YibuException):
- def __init__(self, message):
- super().__init__(message)
- self.message = message
- class NonSaleFilter(Filter):
- """
- 动销过滤
- """
- # 输入主数据
- in_data_main: OrderCoreData
- # 输出主数据
- out_data_use: OrderCoreData
- out_data_not_use: OrderCoreData
- def __init__(self):
- super().__init__()
- self.require_spark = True
- self.conf: NonSaleConf = NonSaleConf()
- # 输入主数据,强制类声明为OrderCoreData
- self.in_data_main: OrderCoreData = OrderCoreData(pin_type=PinTypeEnum.Input)
- # 输出主数据
- self.out_data_use: OrderCoreData = OrderCoreData(pin_type=PinTypeEnum.Output)
- self.out_data_not_use: OrderCoreData = OrderCoreData(pin_type=PinTypeEnum.Output, pin_index=1)
- pass
- def handle(self) -> None:
- in_data_main = self.in_data_main.data
- in_data_main = in_data_main.fillna(0)
- if in_data_main.rdd.isEmpty():
- raise NonSaleError(f'动销处理:主数据表为空')
- in_data_main_valid = in_data_main.where(in_data_main.qty > 0)
- # 筛选出main表中num_day内对应的数据
- in_data_main_valid: DataFrame = in_data_main_valid.where(in_data_main_valid.order_date > (
- self.conf.run_time - datetime.timedelta(days=round(self.conf.num_day))).date())
- if in_data_main_valid.rdd.isEmpty():
- raise NonSaleError('动销处理:主数据表中动销时间内数据为空')
- # 聚合去重,获得num_day之内 有销量的fkey维度
- in_data_main_valid = in_data_main_valid.groupby(OrderCoreData.fkey.name).agg(func.first('qty').alias('flag'))
- in_data_main_all: DataFrame = in_data_main_valid.join(in_data_main, how='left', on=OrderCoreData.fkey.name)
- in_data_main_use: DataFrame = in_data_main_all.filter(in_data_main_all.flag.isNotNull())
- in_data_main_not_use: DataFrame = in_data_main_all.filter(in_data_main_all.flag.isNull())
- self.out_data_use.copy_from(data=in_data_main_use, conf=self.conf)
- self.out_data_not_use.copy_from(data=in_data_main_not_use, conf=self.conf)
|