1234567891011121314151617181920212223242526272829303132333435363738 |
- from pyspark.sql.types import *
- from pyspark.ml.classification import LogisticRegression
- from pyspark.ml.feature import VectorAssembler
- from pyspark.ml import Pipeline
- from pyspark.sql.functions import udf, col
- from pyspark.sql import DataFrame
- def to_array(col):
- def to_array_(v):
- return v.toArray().tolist()
- return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
- def main_func(train_df: DataFrame, test_df: DataFrame, spark):
- feat_cols = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
- 'feature9']
- vector_assembler = VectorAssembler().setInputCols(feat_cols).setOutputCol("features")
- #### 训练 ####
- print("step 1")
- lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数
- pipeline = Pipeline(stages=[vector_assembler, lr])
- model = pipeline.fit(train_df)
- # 打印参数
- print("\n-------------------------------------------------------------------------")
- print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
- print("-------------------------------------------------------------------------\n")
- #### 预测, 保存结果 ####
- print("step 2")
- labels_and_preds = model.transform(test_df).withColumn("probability_xj", to_array(col("probability"))[1]) \
- .select("uuid", "label", "prediction", "probability_xj")
- return [labels_and_preds]
|