lr_df_demo_srcipt_define.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from pyspark.sql.types import *
  2. from pyspark.ml.classification import LogisticRegression
  3. from pyspark.ml.feature import VectorAssembler
  4. from pyspark.ml import Pipeline
  5. from pyspark.sql.functions import udf, col
  6. def getFeatureName():
  7. featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
  8. 'feature9']
  9. colLst = ['uid', 'label'] + featureLst
  10. return featureLst, colLst
  11. def parseFloat(x):
  12. try:
  13. rx = float(x)
  14. except:
  15. rx = 0.0
  16. return rx
  17. def getDict(dictDataLst, colLst):
  18. dictData = {}
  19. for i in range(len(colLst)):
  20. dictData[colLst[i]] = parseFloat(dictDataLst[i])
  21. return dictData
  22. def to_array(col):
  23. def to_array_(v):
  24. return v.toArray().tolist()
  25. return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
  26. def main_func(train_df, test_df):
  27. feature_lst, col_lst = getFeatureName()
  28. vectorAssembler = VectorAssembler().setInputCols(feature_lst).setOutputCol("features")
  29. print("step 1")
  30. lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数
  31. pipeline = Pipeline(stages=[vectorAssembler, lr])
  32. model = pipeline.fit(train_df)
  33. # 打印参数
  34. print("\n-------------------------------------------------------------------------")
  35. print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
  36. print("-------------------------------------------------------------------------\n")
  37. print("step 2")
  38. labelsAndPreds = model.transform(test_df).withColumn("probability_xj", to_array(col("probability"))[1]) \
  39. .select("uid", "label", "prediction", "probability_xj")
  40. labelsAndPreds.show()
  41. print(f'labelsAndPreds type is {type(labelsAndPreds)}')
  42. return [labelsAndPreds]