123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- {
- "dag_id": "N1wP690lywNj1xrkzP8qD",
- "user_name": "XXX",
- "user_id": 1,
- "nodes_task_name": "dfs",
- "nodes_task_id": 123,
- "itermidate_data": ["hdfs://host:port/uri"],
- "nodes": [{
- "id": "8CSbH3t1CTdxSKZ0xqHf3",
- "name": "datasourceNode1",
- "op": "datasource",
- "data": {
- "input_source": [{
- "dataSelect": true,
- "dataField": "uuid",
- "dataType": "string"
- }, {
- "dataSelect": true,
- "dataField": "label",
- "dataType": "int"
- }, {
- "dataSelect": true,
- "dataField": "feature1",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature2",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature3",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature4",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature5",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature6",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature7",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature8",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature9",
- "dataType": "double"
- }],
- "input_table": 0
- }
- }, {
- "id": "GqZitaihZUjpafezqtLOf",
- "name": "test",
- "op": "datasource",
- "data": {
- "input_source": [{
- "dataSelect": true,
- "dataField": "uuid",
- "dataType": "string"
- }, {
- "dataSelect": true,
- "dataField": "label",
- "dataType": "int"
- }, {
- "dataSelect": true,
- "dataField": "feature1",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature2",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature3",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature4",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature5",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature6",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature7",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature8",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature9",
- "dataType": "double"
- }],
- "input_table": 1
- }
- }, {
- "id": "SHb3z_rKGtOvBFvcaKTKj",
- "name": "pysparkNode1",
- "op": "pyspark",
- "data": {
- "input_number": 2,
- "output": [{
- "outputVar": "r0"
- }],
- "script": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]"
- }
- }],
- "edges": [{
- "source": "8CSbH3t1CTdxSKZ0xqHf3",
- "target": "SHb3z_rKGtOvBFvcaKTKj"
- }, {
- "source": "GqZitaihZUjpafezqtLOf",
- "target": "SHb3z_rKGtOvBFvcaKTKj"
- }],
- "dag_script": {
- "sub_nodes": [{
- "id": "8CSbH3t1CTdxSKZ0xqHf3",
- "name": "datasourceNode1",
- "type": "datasource",
- "op": "sql",
- "script": "select uuid, label, feature1, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9 from train"
- }, {
- "id": "GqZitaihZUjpafezqtLOf",
- "name": "test",
- "type": "datasource",
- "op": "sql",
- "script": "select uuid, label, feature1, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9 from test"
- }, {
- "id": "SHb3z_rKGtOvBFvcaKTKj",
- "type": "script",
- "name": "pysparkNode1",
- "op": "pyspark",
- "inputs": {
- "input0": ["8CSbH3t1CTdxSKZ0xqHf3", 0],
- "input1": ["GqZitaihZUjpafezqtLOf", 0]
- },
- "script": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]"
- }],
- "edges": [
- ["8CSbH3t1CTdxSKZ0xqHf3", "SHb3z_rKGtOvBFvcaKTKj"],
- ["GqZitaihZUjpafezqtLOf", "SHb3z_rKGtOvBFvcaKTKj"]
- ]
- },
- "graph": {
- "cells": [{
- "shape": "dag-edge",
- "attrs": {
- "line": {
- "strokeDasharray": ""
- }
- },
- "id": "73ef6d80-3587-4bd6-b5ac-e2a16e8bb3d2",
- "zIndex": -1,
- "source": {
- "cell": "d4fb565a-83b3-4831-8180-4b2043b620a8",
- "port": "bottomPort"
- },
- "target": {
- "cell": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68",
- "port": "input0"
- }
- }, {
- "shape": "dag-edge",
- "attrs": {
- "line": {
- "strokeDasharray": ""
- }
- },
- "id": "bd1cf153-d0cb-48ff-a13d-0aca75295fc2",
- "zIndex": -1,
- "source": {
- "cell": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2",
- "port": "bottomPort"
- },
- "target": {
- "cell": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68",
- "port": "input1"
- }
- }, {
- "position": {
- "x": 170,
- "y": 120
- },
- "size": {
- "width": 180,
- "height": 80
- },
- "view": "react-shape-view",
- "shape": "dag-node",
- "component": {
- "key": null,
- "ref": null,
- "props": {},
- "_owner": null,
- "_store": {}
- },
- "portMarkup": [{
- "tagName": "foreignObject",
- "selector": "fo",
- "children": [{
- "ns": "http://www.w3.org/1999/xhtml",
- "tagName": "body",
- "selector": "foBody",
- "attrs": {
- "xmlns": "http://www.w3.org/1999/xhtml"
- },
- "style": {
- "width": "100%",
- "height": "100%",
- "background": "transparent"
- },
- "children": [{
- "tagName": "div",
- "selector": "foContent",
- "style": {
- "width": "100%",
- "height": "100%"
- }
- }]
- }]
- }],
- "ports": {
- "groups": {
- "top": {
- "position": "top",
- "attrs": {
- "fo": {
- "width": 10,
- "height": 10,
- "x": -5,
- "y": -5,
- "magnet": "true"
- }
- }
- },
- "bottom": {
- "position": "bottom",
- "attrs": {
- "fo": {
- "width": 10,
- "height": 10,
- "x": -5,
- "y": -5,
- "magnet": "true"
- }
- }
- }
- },
- "items": [{
- "id": "bottomPort",
- "group": "bottom"
- }]
- },
- "id": "d4fb565a-83b3-4831-8180-4b2043b620a8",
- "data": {
- "label": "InputSource",
- "status": "default",
- "type": "datasource",
- "id": "d4fb565a-83b3-4831-8180-4b2043b620a8",
- "nodeId": "8CSbH3t1CTdxSKZ0xqHf3",
- "inputSource": [{
- "dataSelect": true,
- "dataField": "uuid",
- "dataType": "string"
- }, {
- "dataSelect": true,
- "dataField": "label",
- "dataType": "int"
- }, {
- "dataSelect": true,
- "dataField": "feature1",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature2",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature3",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature4",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature5",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature6",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature7",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature8",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature9",
- "dataType": "double"
- }],
- "dataTableId": 0,
- "dataTableName": "train",
- "nodeName": "datasourceNode1"
- },
- "zIndex": 1
- }, {
- "position": {
- "x": 540,
- "y": 110
- },
- "size": {
- "width": 180,
- "height": 80
- },
- "view": "react-shape-view",
- "shape": "dag-node",
- "component": {
- "key": null,
- "ref": null,
- "props": {},
- "_owner": null,
- "_store": {}
- },
- "portMarkup": [{
- "tagName": "foreignObject",
- "selector": "fo",
- "children": [{
- "ns": "http://www.w3.org/1999/xhtml",
- "tagName": "body",
- "selector": "foBody",
- "attrs": {
- "xmlns": "http://www.w3.org/1999/xhtml"
- },
- "style": {
- "width": "100%",
- "height": "100%",
- "background": "transparent"
- },
- "children": [{
- "tagName": "div",
- "selector": "foContent",
- "style": {
- "width": "100%",
- "height": "100%"
- }
- }]
- }]
- }],
- "ports": {
- "groups": {
- "top": {
- "position": "top",
- "attrs": {
- "fo": {
- "width": 10,
- "height": 10,
- "x": -5,
- "y": -5,
- "magnet": "true"
- }
- }
- },
- "bottom": {
- "position": "bottom",
- "attrs": {
- "fo": {
- "width": 10,
- "height": 10,
- "x": -5,
- "y": -5,
- "magnet": "true"
- }
- }
- }
- },
- "items": [{
- "id": "bottomPort",
- "group": "bottom"
- }]
- },
- "id": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2",
- "data": {
- "label": "InputSource",
- "status": "default",
- "type": "datasource",
- "id": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2",
- "nodeId": "GqZitaihZUjpafezqtLOf",
- "inputSource": [{
- "dataSelect": true,
- "dataField": "uuid",
- "dataType": "string"
- }, {
- "dataSelect": true,
- "dataField": "label",
- "dataType": "int"
- }, {
- "dataSelect": true,
- "dataField": "feature1",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature2",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature3",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature4",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature5",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature6",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature7",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature8",
- "dataType": "double"
- }, {
- "dataSelect": true,
- "dataField": "feature9",
- "dataType": "double"
- }],
- "dataTableId": 1,
- "dataTableName": "test",
- "nodeName": "test"
- },
- "zIndex": 2
- }, {
- "position": {
- "x": 360,
- "y": 440
- },
- "size": {
- "width": 180,
- "height": 36
- },
- "view": "react-shape-view",
- "shape": "dag-node",
- "component": {
- "key": null,
- "ref": null,
- "props": {},
- "_owner": null,
- "_store": {}
- },
- "portMarkup": [{
- "tagName": "foreignObject",
- "selector": "fo",
- "children": [{
- "ns": "http://www.w3.org/1999/xhtml",
- "tagName": "body",
- "selector": "foBody",
- "attrs": {
- "xmlns": "http://www.w3.org/1999/xhtml"
- },
- "style": {
- "width": "100%",
- "height": "100%",
- "background": "transparent"
- },
- "children": [{
- "tagName": "div",
- "selector": "foContent",
- "style": {
- "width": "100%",
- "height": "100%"
- }
- }]
- }]
- }],
- "ports": {
- "groups": {
- "top": {
- "position": "top",
- "attrs": {
- "fo": {
- "width": 10,
- "height": 10,
- "x": -5,
- "y": -5,
- "magnet": "true"
- }
- }
- },
- "bottom": {
- "position": "bottom",
- "attrs": {
- "fo": {
- "width": 10,
- "height": 10,
- "x": -5,
- "y": -5,
- "magnet": "true"
- }
- }
- }
- },
- "items": [{
- "group": "top",
- "id": "input0"
- }, {
- "group": "top",
- "id": "input1"
- }, {
- "group": "bottom",
- "id": "r0"
- }]
- },
- "id": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68",
- "data": {
- "label": "pyspark",
- "status": "undone",
- "type": "script",
- "id": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68",
- "nodeId": "SHb3z_rKGtOvBFvcaKTKj",
- "nodeName": "pysparkNode1",
- "scriptText": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]",
- "outputData": [{
- "outputVar": "r0"
- }],
- "inputNumber": 2
- },
- "zIndex": 5
- }]
- }
- }
|