lr_df_demo2.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # #!coding=utf8
  2. # '''
  3. # author: huangxiaojuan
  4. # '''
  5. # # import sys
  6. # # reload(sys)
  7. # # sys.setdefaultencoding('utf8')
  8. # from pyspark.sql import SparkSession
  9. # from pyspark.sql.types import *
  10. # from pyspark.ml.classification import LogisticRegression
  11. # from pyspark.ml.feature import VectorAssembler
  12. # from pyspark.ml import Pipeline
  13. # from pyspark.sql.functions import udf, col
  14. #
  15. # input_path_1 = "hdfs://192.168.199.27:9000/user/sxkj/train.txt"
  16. # input_path_2 = "hdfs://192.168.199.27:9000/user/sxkj/test.txt"
  17. # output_path = "hdfs://192.168.199.27:9000/tmp/sparkDemo/${ModelType}"
  18. #
  19. # def getFeatureName():
  20. # featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8',
  21. # 'feature9']
  22. # colLst = ['uid', 'label'] + featureLst
  23. # return featureLst, colLst
  24. #
  25. #
  26. # def parseFloat(x):
  27. # try:
  28. # rx = float(x)
  29. # except:
  30. # rx = 0.0
  31. # return rx
  32. #
  33. #
  34. # def getDict(dictDataLst, colLst):
  35. # dictData = {}
  36. # for i in range(len(colLst)):
  37. # dictData[colLst[i]] = parseFloat(dictDataLst[i])
  38. # return dictData
  39. #
  40. #
  41. # def to_array(col):
  42. # def to_array_(v):
  43. # return v.toArray().tolist()
  44. #
  45. # # Important: asNondeterministic requires Spark 2.3 or later
  46. # # It can be safely removed i.e.
  47. # # return udf(to_array_, ArrayType(DoubleType()))(col)
  48. # # but at the cost of decreased performance
  49. # return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
  50. #
  51. #
  52. #
  53. # def run(t1, context_string):
  54. # """
  55. # Define main line of script (two input for instance). Given input data (Dataframes) and configuration output data will be returned (list of Dataframes)
  56. # Params:
  57. # t1 Dataframe, upstream data, whose name should be consistent with first slot definition
  58. # context_strinig String, task config whose name should be "context_string"
  59. # Return:
  60. # Wrap one or more output data as list of dataframes
  61. # """
  62. # sc = SparkContext._active_spark_context
  63. # sqlContext = SQLContext(sc)
  64. # t1 = do_something(t1) # data processing
  65. #
  66. # # Input Source handler for Prophet Platform
  67. # r = data_handle(t1)
  68. # return [r]
  69. #
  70. #
  71. #
  72. # def do_s
  73. #
  74. # def main():
  75. # # spark = SparkSession.builder.master("yarn").appName("spark_demo").getOrCreate()
  76. # spark = SparkSession.builder.getOrCreate()
  77. # print("Session created!")
  78. # sc = spark.sparkContext
  79. #
  80. # sampleHDFS_train = input_path_1 # sys.argv[1]
  81. # sampleHDFS_test = input_path_2 # sys.argv[2]
  82. # outputHDFS = output_path # sys.argv[3]
  83. #
  84. # featureLst, colLst = getFeatureName()
  85. #
  86. # # 读取hdfs上数据,将RDD转为DataFrame
  87. # # 训练数据
  88. # rdd_train = sc.textFile(sampleHDFS_train)
  89. # rowRDD_train = rdd_train.map(lambda x: getDict(x.split('\t'), colLst))
  90. # trainDF = spark.createDataFrame(rowRDD_train)
  91. #
  92. # # 测试数据
  93. # rdd_test = sc.textFile(sampleHDFS_test)
  94. # rowRDD_test = rdd_test.map(lambda x: getDict(x.split('\t'), colLst))
  95. # testDF = spark.createDataFrame(rowRDD_test)
  96. #
  97. # # 用于训练的特征featureLst
  98. # vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol("features")
  99. #
  100. # #### 训练 ####
  101. # print("step 1")
  102. # lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数
  103. #
  104. # pipeline = Pipeline(stages=[vectorAssembler, lr])
  105. # model = pipeline.fit(trainDF)
  106. # # 打印参数
  107. # print("\n-------------------------------------------------------------------------")
  108. # print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
  109. # print("-------------------------------------------------------------------------\n")
  110. #
  111. # #### 预测, 保存结果 ####
  112. # print("step 2")
  113. # labelsAndPreds = model.transform(testDF).withColumn("probability_xj", to_array(col("probability"))[1]) \
  114. # .select("uid", "label", "prediction", "probability_xj")
  115. # labelsAndPreds.show()
  116. # labelsAndPreds.write.mode("overwrite").options(header="true").csv(outputHDFS + "/target/output")
  117. #
  118. # #### 评估不同阈值下的准确率、召回率
  119. # print("step 3")
  120. # labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)
  121. # labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)
  122. # labelsAndPreds_label_1.show(3)
  123. # labelsAndPreds_label_0.show(3)
  124. # t_cnt = labelsAndPreds_label_1.count()
  125. # f_cnt = labelsAndPreds_label_0.count()
  126. # print("thre\ttp\ttn\tfp\tfn\taccuracy\trecall")
  127. # for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:
  128. # tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()
  129. # tn = t_cnt - tp
  130. # fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()
  131. # fn = f_cnt - fp
  132. # print("%.1f\t%d\t%d\t%d\t%d\t%.4f\t%.4f" % (thre, tp, tn, fp, fn, float(tp) / (tp + fp), float(tp) / (t_cnt)))
  133. #
  134. # # 保存模型
  135. # model.write().overwrite().save(outputHDFS + "/target/model/lrModel")
  136. #
  137. # # 加载模型
  138. # # model.load(outputHDFS + "/target/model/lrModel")
  139. #
  140. # print("output:", outputHDFS)
  141. #
  142. #
  143. # if __name__ == '__main__':
  144. # main()