spark_script_1012.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. from pyspark.sql.types import *
  2. from pyspark.ml.classification import LogisticRegression
  3. from pyspark.ml.feature import VectorAssembler
  4. from pyspark.ml import Pipeline
  5. from pyspark.sql.functions import udf, col
  6. from pyspark.sql import DataFrame
  7. def to_array(col):
  8. def to_array_(v):
  9. return v.toArray().tolist()
  10. return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
  11. def main_func(train_df: DataFrame, test_df: DataFrame, spark):
  12. feat_cols = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
  13. 'feature9']
  14. vector_assembler = VectorAssembler().setInputCols(feat_cols).setOutputCol("features")
  15. #### 训练 ####
  16. print("step 1")
  17. lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数
  18. pipeline = Pipeline(stages=[vector_assembler, lr])
  19. model = pipeline.fit(train_df)
  20. # 打印参数
  21. print("\n-------------------------------------------------------------------------")
  22. print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
  23. print("-------------------------------------------------------------------------\n")
  24. #### 预测, 保存结果 ####
  25. print("step 2")
  26. labels_and_preds = model.transform(test_df).withColumn("probability_xj", to_array(col("probability"))[1]) \
  27. .select("uuid", "label", "prediction", "probability_xj")
  28. return [labels_and_preds]