#!coding=utf8 import json import sys from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import * from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import VectorAssembler from pyspark.ml import Pipeline from pyspark.sql.functions import udf, col from pyspark.context import SparkContext # argv[0] inputs:{"input1_key":"input1_path","input2_key":"input2_path",..} # argv[1] outputs: [result_path1,result_path2...] def run(inputs: dict, outputs: list): spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext inputs = preprocess(inputs, spark=spark, sc=sc) rets = main_func(**inputs) postprocess(rets=rets, outputs=outputs) def read_input_file(spark: SparkSession, sc: SparkContext, uri) -> DataFrame: # todo: 待替换 rdd_train = sc.textFile(uri) col_lst = ['uid', 'label', 'feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9'] rdd_data = rdd_train.map(lambda x: getDict(x.split('\t'), col_lst)) return spark.createDataFrame(rdd_data) def write_output_file(result: DataFrame, result_path: str): # todo: 待替换 result.write.mode("overwrite").options(header="true").csv(result_path) def preprocess(input_infos: dict, spark: SparkSession, sc: SparkContext) -> dict: return {k: read_input_file(spark=spark, sc=sc, uri=v) for k, v in input_infos.items()} def postprocess(rets, outputs): [write_output_file(result=ret, result_path=outputs[idx]) for idx, ret in enumerate(rets)] def getFeatureName(): featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9'] colLst = ['uid', 'label'] + featureLst return featureLst, colLst def parseFloat(x): try: rx = float(x) except: rx = 0.0 return rx def getDict(dictDataLst, colLst): dictData = {} for i in range(len(colLst)): dictData[colLst[i]] = parseFloat(dictDataLst[i]) return dictData def to_array(col): def to_array_(v): return v.toArray().tolist() return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col) def main_func(train_df, test_df): feature_lst, col_lst = getFeatureName() vectorAssembler = VectorAssembler().setInputCols(feature_lst).setOutputCol("features") print("step 1") lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数 pipeline = Pipeline(stages=[vectorAssembler, lr]) model = pipeline.fit(train_df) # 打印参数 print("\n-------------------------------------------------------------------------") print("LogisticRegression parameters:\n" + lr.explainParams() + "\n") print("-------------------------------------------------------------------------\n") print("step 2") labelsAndPreds = model.transform(test_df).withColumn("probability_xj", to_array(col("probability"))[1]) \ .select("uid", "label", "prediction", "probability_xj") labelsAndPreds.show() print(f'labelsAndPreds type is {type(labelsAndPreds)}') return [labelsAndPreds] if __name__ == '__main__': inputs_str = sys.argv[1] outputs_str = sys.argv[2] run(inputs=json.loads(inputs_str), outputs=json.loads(outputs_str))