1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- from pyspark.sql.types import *
- from pyspark.ml.classification import LogisticRegression
- from pyspark.ml.feature import VectorAssembler
- from pyspark.ml import Pipeline
- from pyspark.sql.functions import udf, col
- def getFeatureName():
- featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
- 'feature9']
- colLst = ['uid', 'label'] + featureLst
- return featureLst, colLst
- def parseFloat(x):
- try:
- rx = float(x)
- except:
- rx = 0.0
- return rx
- def getDict(dictDataLst, colLst):
- dictData = {}
- for i in range(len(colLst)):
- dictData[colLst[i]] = parseFloat(dictDataLst[i])
- return dictData
- def to_array(col):
- def to_array_(v):
- return v.toArray().tolist()
- return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
- def main_func(train_df, test_df):
- feature_lst, col_lst = getFeatureName()
- vectorAssembler = VectorAssembler().setInputCols(feature_lst).setOutputCol("features")
- print("step 1")
- lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数
- pipeline = Pipeline(stages=[vectorAssembler, lr])
- model = pipeline.fit(train_df)
- # 打印参数
- print("\n-------------------------------------------------------------------------")
- print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
- print("-------------------------------------------------------------------------\n")
- print("step 2")
- labelsAndPreds = model.transform(test_df).withColumn("probability_xj", to_array(col("probability"))[1]) \
- .select("uid", "label", "prediction", "probability_xj")
- labelsAndPreds.show()
- print(f'labelsAndPreds type is {type(labelsAndPreds)}')
- return [labelsAndPreds]
|