123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- # #!coding=utf8
- # '''
- # author: huangxiaojuan
- # '''
- # # import sys
- # # reload(sys)
- # # sys.setdefaultencoding('utf8')
- # from pyspark.sql import SparkSession
- # from pyspark.sql.types import *
- # from pyspark.ml.classification import LogisticRegression
- # from pyspark.ml.feature import VectorAssembler
- # from pyspark.ml import Pipeline
- # from pyspark.sql.functions import udf, col
- #
- # input_path_1 = "hdfs://192.168.199.27:9000/user/sxkj/train.txt"
- # input_path_2 = "hdfs://192.168.199.27:9000/user/sxkj/test.txt"
- # output_path = "hdfs://192.168.199.27:9000/tmp/sparkDemo/${ModelType}"
- #
- # def getFeatureName():
- # featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
- # 'feature9']
- # colLst = ['uid', 'label'] + featureLst
- # return featureLst, colLst
- #
- #
- # def parseFloat(x):
- # try:
- # rx = float(x)
- # except:
- # rx = 0.0
- # return rx
- #
- #
- # def getDict(dictDataLst, colLst):
- # dictData = {}
- # for i in range(len(colLst)):
- # dictData[colLst[i]] = parseFloat(dictDataLst[i])
- # return dictData
- #
- #
- # def to_array(col):
- # def to_array_(v):
- # return v.toArray().tolist()
- #
- # # Important: asNondeterministic requires Spark 2.3 or later
- # # It can be safely removed i.e.
- # # return udf(to_array_, ArrayType(DoubleType()))(col)
- # # but at the cost of decreased performance
- # return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
- #
- #
- #
- # def run(t1, context_string):
- # """
- # Define main line of script (two input for instance). Given input data (Dataframes) and configuration output data will be returned (list of Dataframes)
- # Params:
- # t1 Dataframe, upstream data, whose name should be consistent with first slot definition
- # context_strinig String, task config whose name should be "context_string"
- # Return:
- # Wrap one or more output data as list of dataframes
- # """
- # sc = SparkContext._active_spark_context
- # sqlContext = SQLContext(sc)
- # t1 = do_something(t1) # data processing
- #
- # # Input Source handler for Prophet Platform
- # r = data_handle(t1)
- # return [r]
- #
- #
- #
- # def do_s
- #
- # def main():
- # # spark = SparkSession.builder.master("yarn").appName("spark_demo").getOrCreate()
- # spark = SparkSession.builder.getOrCreate()
- # print("Session created!")
- # sc = spark.sparkContext
- #
- # sampleHDFS_train = input_path_1 # sys.argv[1]
- # sampleHDFS_test = input_path_2 # sys.argv[2]
- # outputHDFS = output_path # sys.argv[3]
- #
- # featureLst, colLst = getFeatureName()
- #
- # # 读取hdfs上数据,将RDD转为DataFrame
- # # 训练数据
- # rdd_train = sc.textFile(sampleHDFS_train)
- # rowRDD_train = rdd_train.map(lambda x: getDict(x.split('\t'), colLst))
- # trainDF = spark.createDataFrame(rowRDD_train)
- #
- # # 测试数据
- # rdd_test = sc.textFile(sampleHDFS_test)
- # rowRDD_test = rdd_test.map(lambda x: getDict(x.split('\t'), colLst))
- # testDF = spark.createDataFrame(rowRDD_test)
- #
- # # 用于训练的特征featureLst
- # vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol("features")
- #
- # #### 训练 ####
- # print("step 1")
- # lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数
- #
- # pipeline = Pipeline(stages=[vectorAssembler, lr])
- # model = pipeline.fit(trainDF)
- # # 打印参数
- # print("\n-------------------------------------------------------------------------")
- # print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
- # print("-------------------------------------------------------------------------\n")
- #
- # #### 预测, 保存结果 ####
- # print("step 2")
- # labelsAndPreds = model.transform(testDF).withColumn("probability_xj", to_array(col("probability"))[1]) \
- # .select("uid", "label", "prediction", "probability_xj")
- # labelsAndPreds.show()
- # labelsAndPreds.write.mode("overwrite").options(header="true").csv(outputHDFS + "/target/output")
- #
- # #### 评估不同阈值下的准确率、召回率
- # print("step 3")
- # labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)
- # labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)
- # labelsAndPreds_label_1.show(3)
- # labelsAndPreds_label_0.show(3)
- # t_cnt = labelsAndPreds_label_1.count()
- # f_cnt = labelsAndPreds_label_0.count()
- # print("thre\ttp\ttn\tfp\tfn\taccuracy\trecall")
- # for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:
- # tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()
- # tn = t_cnt - tp
- # fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()
- # fn = f_cnt - fp
- # print("%.1f\t%d\t%d\t%d\t%d\t%.4f\t%.4f" % (thre, tp, tn, fp, fn, float(tp) / (tp + fp), float(tp) / (t_cnt)))
- #
- # # 保存模型
- # model.write().overwrite().save(outputHDFS + "/target/model/lrModel")
- #
- # # 加载模型
- # # model.load(outputHDFS + "/target/model/lrModel")
- #
- # print("output:", outputHDFS)
- #
- #
- # if __name__ == '__main__':
- # main()
|