spark_script_demo_1009.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import json
  2. import sys
  3. from pyspark.sql.types import *
  4. from pyspark.ml.classification import LogisticRegression
  5. from pyspark.ml.feature import VectorAssembler
  6. from pyspark.ml import Pipeline
  7. from pyspark.sql.functions import udf, col
  8. from pyspark.sql import SparkSession, DataFrame
  9. import numpy
  10. # argv[0] inputs:{"input1_key":"input1_path","input2_key":"input2_path",..}
  11. # argv[1] outputs: [result_path1,result_path2...]
  12. def run(inputs: dict, outputs: list):
  13. spark = SparkSession.builder.config('hive.metastore.uris',
  14. 'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
  15. param_dict = preprocess(input_infos=inputs, ss=spark)
  16. rets = main_func(**param_dict)
  17. postprocess(rets=rets, outputs=outputs)
  18. def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
  19. return ss.sql(f'select * from {tb_name}')
  20. def write_table(df: DataFrame, tb_name: str):
  21. df.write.mode("overwrite").saveAsTable(tb_name)
  22. def preprocess(input_infos: dict, ss: SparkSession) -> dict:
  23. return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
  24. def postprocess(rets, outputs):
  25. [write_table(df=df, tb_name=outputs[idx]) for idx, df in enumerate(rets)]
  26. def to_array(col):
  27. def to_array_(v):
  28. return v.toArray().tolist()
  29. return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
  30. def main_func(train_df: DataFrame, test_df: DataFrame):
  31. feat_cols = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
  32. 'feature9']
  33. vector_assembler = VectorAssembler().setInputCols(feat_cols).setOutputCol("features")
  34. #### 训练 ####
  35. print("step 1")
  36. lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数
  37. pipeline = Pipeline(stages=[vector_assembler, lr])
  38. model = pipeline.fit(train_df)
  39. # 打印参数
  40. print("\n-------------------------------------------------------------------------")
  41. print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
  42. print("-------------------------------------------------------------------------\n")
  43. #### 预测, 保存结果 ####
  44. print("step 2")
  45. labels_and_preds = model.transform(test_df).withColumn("probability_xj", to_array(col("probability"))[1]) \
  46. .select("uuid", "label", "prediction", "probability_xj")
  47. return [labels_and_preds]
  48. if __name__ == '__main__':
  49. inputs_str = sys.argv[1]
  50. outputs_str = sys.argv[2]
  51. run(inputs=json.loads(inputs_str), outputs=json.loads(outputs_str))