non_sale_jh3.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. #!/usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. import datetime
  4. import pyspark.sql.functions as func
  5. from pyspark.sql import DataFrame
  6. from yibu.core import Filter, PinTypeEnum, Conf, YibuException, Col, SparkDataFrame
  7. from yibu.enum.col_type import ColTypeEnum
  8. class OrderCoreData(SparkDataFrame):
  9. """京慧3.0订单核心类"""
  10. order_no: Col = Col(ColTypeEnum.Str)
  11. fkey: Col = Col(ColTypeEnum.Str) # 预测的主关键字
  12. order_date: Col = Col(ColTypeEnum.Date)
  13. qty: Col = Col(ColTypeEnum.Float)
  14. datetime_type: Col = Col(ColTypeEnum.Int) # 当前数据的维度
  15. class NonSaleConf(Conf):
  16. num_day: int = 90
  17. # 是否执行动销过滤
  18. able: bool = True
  19. # 输出规则
  20. default_out: list = [0]
  21. pass
  22. class NonSaleError(YibuException):
  23. def __init__(self, message):
  24. super().__init__(message)
  25. self.message = message
  26. class NonSaleFilter(Filter):
  27. """
  28. 动销过滤
  29. """
  30. # 输入主数据
  31. in_data_main: OrderCoreData
  32. # 输出主数据
  33. out_data_use: OrderCoreData
  34. out_data_not_use: OrderCoreData
  35. def __init__(self):
  36. super().__init__()
  37. self.require_spark = True
  38. self.conf: NonSaleConf = NonSaleConf()
  39. # 输入主数据,强制类声明为OrderCoreData
  40. self.in_data_main: OrderCoreData = OrderCoreData(pin_type=PinTypeEnum.Input)
  41. # 输出主数据
  42. self.out_data_use: OrderCoreData = OrderCoreData(pin_type=PinTypeEnum.Output)
  43. self.out_data_not_use: OrderCoreData = OrderCoreData(pin_type=PinTypeEnum.Output, pin_index=1)
  44. pass
  45. def handle(self) -> None:
  46. in_data_main = self.in_data_main.data
  47. in_data_main = in_data_main.fillna(0)
  48. if in_data_main.rdd.isEmpty():
  49. raise NonSaleError(f'动销处理:主数据表为空')
  50. in_data_main_valid = in_data_main.where(in_data_main.qty > 0)
  51. # 筛选出main表中num_day内对应的数据
  52. in_data_main_valid: DataFrame = in_data_main_valid.where(in_data_main_valid.order_date > (
  53. self.conf.run_time - datetime.timedelta(days=round(self.conf.num_day))).date())
  54. if in_data_main_valid.rdd.isEmpty():
  55. raise NonSaleError('动销处理:主数据表中动销时间内数据为空')
  56. # 聚合去重,获得num_day之内 有销量的fkey维度
  57. in_data_main_valid = in_data_main_valid.groupby(OrderCoreData.fkey.name).agg(func.first('qty').alias('flag'))
  58. in_data_main_all: DataFrame = in_data_main_valid.join(in_data_main, how='left', on=OrderCoreData.fkey.name)
  59. in_data_main_use: DataFrame = in_data_main_all.filter(in_data_main_all.flag.isNotNull())
  60. in_data_main_not_use: DataFrame = in_data_main_all.filter(in_data_main_all.flag.isNull())
  61. self.out_data_use.copy_from(data=in_data_main_use, conf=self.conf)
  62. self.out_data_not_use.copy_from(data=in_data_main_not_use, conf=self.conf)