123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- #!coding=utf8
- import json
- import sys
- from pyspark.sql import SparkSession, DataFrame
- from pyspark.sql.types import *
- from pyspark.ml.classification import LogisticRegression
- from pyspark.ml.feature import VectorAssembler
- from pyspark.ml import Pipeline
- from pyspark.sql.functions import udf, col
- from pyspark.context import SparkContext
- # argv[0] inputs:{"input1_key":"input1_path","input2_key":"input2_path",..}
- # argv[1] outputs: [result_path1,result_path2...]
- def run(inputs: dict, outputs: list):
- spark = SparkSession.builder.getOrCreate()
- sc = spark.sparkContext
- inputs = preprocess(inputs, spark=spark, sc=sc)
- rets = main_func(**inputs)
- postprocess(rets=rets, outputs=outputs)
- def read_input_file(spark: SparkSession, sc: SparkContext, uri) -> DataFrame: # todo: 待替换
- rdd_train = sc.textFile(uri)
- col_lst = ['uid', 'label', 'feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7',
- 'feature8', 'feature9']
- rdd_data = rdd_train.map(lambda x: getDict(x.split('\t'), col_lst))
- return spark.createDataFrame(rdd_data)
- def write_output_file(result: DataFrame, result_path: str): # todo: 待替换
- result.write.mode("overwrite").options(header="true").csv(result_path)
- def preprocess(input_infos: dict, spark: SparkSession, sc: SparkContext) -> dict:
- return {k: read_input_file(spark=spark, sc=sc, uri=v) for k, v in input_infos.items()}
- def postprocess(rets, outputs):
- [write_output_file(result=ret, result_path=outputs[idx]) for idx, ret in enumerate(rets)]
- def getFeatureName():
- featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
- 'feature9']
- colLst = ['uid', 'label'] + featureLst
- return featureLst, colLst
- def parseFloat(x):
- try:
- rx = float(x)
- except:
- rx = 0.0
- return rx
- def getDict(dictDataLst, colLst):
- dictData = {}
- for i in range(len(colLst)):
- dictData[colLst[i]] = parseFloat(dictDataLst[i])
- return dictData
- def to_array(col):
- def to_array_(v):
- return v.toArray().tolist()
- return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
- def main_func(train_df, test_df):
- feature_lst, col_lst = getFeatureName()
- vectorAssembler = VectorAssembler().setInputCols(feature_lst).setOutputCol("features")
- print("step 1")
- lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数
- pipeline = Pipeline(stages=[vectorAssembler, lr])
- model = pipeline.fit(train_df)
- # 打印参数
- print("\n-------------------------------------------------------------------------")
- print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
- print("-------------------------------------------------------------------------\n")
- print("step 2")
- labelsAndPreds = model.transform(test_df).withColumn("probability_xj", to_array(col("probability"))[1]) \
- .select("uid", "label", "prediction", "probability_xj")
- labelsAndPreds.show()
- print(f'labelsAndPreds type is {type(labelsAndPreds)}')
- return [labelsAndPreds]
- if __name__ == '__main__':
- inputs_str = sys.argv[1]
- outputs_str = sys.argv[2]
- run(inputs=json.loads(inputs_str), outputs=json.loads(outputs_str))
|