{ "dag_id": "N1wP690lywNj1xrkzP8qD", "user_name": "XXX", "user_id": 1, "nodes_task_name": "dfs", "nodes_task_id": 123, "itermidate_data": ["hdfs://host:port/uri"], "nodes": [{ "id": "8CSbH3t1CTdxSKZ0xqHf3", "name": "datasourceNode1", "op": "datasource", "data": { "input_source": [{ "dataSelect": true, "dataField": "uuid", "dataType": "string" }, { "dataSelect": true, "dataField": "label", "dataType": "int" }, { "dataSelect": true, "dataField": "feature1", "dataType": "double" }, { "dataSelect": true, "dataField": "feature2", "dataType": "double" }, { "dataSelect": true, "dataField": "feature3", "dataType": "double" }, { "dataSelect": true, "dataField": "feature4", "dataType": "double" }, { "dataSelect": true, "dataField": "feature5", "dataType": "double" }, { "dataSelect": true, "dataField": "feature6", "dataType": "double" }, { "dataSelect": true, "dataField": "feature7", "dataType": "double" }, { "dataSelect": true, "dataField": "feature8", "dataType": "double" }, { "dataSelect": true, "dataField": "feature9", "dataType": "double" }], "input_table": 0 } }, { "id": "GqZitaihZUjpafezqtLOf", "name": "test", "op": "datasource", "data": { "input_source": [{ "dataSelect": true, "dataField": "uuid", "dataType": "string" }, { "dataSelect": true, "dataField": "label", "dataType": "int" }, { "dataSelect": true, "dataField": "feature1", "dataType": "double" }, { "dataSelect": true, "dataField": "feature2", "dataType": "double" }, { "dataSelect": true, "dataField": "feature3", "dataType": "double" }, { "dataSelect": true, "dataField": "feature4", "dataType": "double" }, { "dataSelect": true, "dataField": "feature5", "dataType": "double" }, { "dataSelect": true, "dataField": "feature6", "dataType": "double" }, { "dataSelect": true, "dataField": "feature7", "dataType": "double" }, { "dataSelect": true, "dataField": "feature8", "dataType": "double" }, { "dataSelect": true, "dataField": "feature9", "dataType": "double" }], "input_table": 1 } }, { "id": "SHb3z_rKGtOvBFvcaKTKj", "name": "pysparkNode1", "op": "pyspark", "data": { "input_number": 2, "output": [{ "outputVar": "r0" }], "script": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]" } }], "edges": [{ "source": "8CSbH3t1CTdxSKZ0xqHf3", "target": "SHb3z_rKGtOvBFvcaKTKj" }, { "source": "GqZitaihZUjpafezqtLOf", "target": "SHb3z_rKGtOvBFvcaKTKj" }], "dag_script": { "sub_nodes": [{ "id": "8CSbH3t1CTdxSKZ0xqHf3", "name": "datasourceNode1", "type": "datasource", "op": "sql", "script": "select uuid, label, feature1, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9 from train" }, { "id": "GqZitaihZUjpafezqtLOf", "name": "test", "type": "datasource", "op": "sql", "script": "select uuid, label, feature1, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9 from test" }, { "id": "SHb3z_rKGtOvBFvcaKTKj", "type": "script", "name": "pysparkNode1", "op": "pyspark", "inputs": { "input0": ["8CSbH3t1CTdxSKZ0xqHf3", 0], "input1": ["GqZitaihZUjpafezqtLOf", 0] }, "script": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]" }], "edges": [ ["8CSbH3t1CTdxSKZ0xqHf3", "SHb3z_rKGtOvBFvcaKTKj"], ["GqZitaihZUjpafezqtLOf", "SHb3z_rKGtOvBFvcaKTKj"] ] }, "graph": { "cells": [{ "shape": "dag-edge", "attrs": { "line": { "strokeDasharray": "" } }, "id": "73ef6d80-3587-4bd6-b5ac-e2a16e8bb3d2", "zIndex": -1, "source": { "cell": "d4fb565a-83b3-4831-8180-4b2043b620a8", "port": "bottomPort" }, "target": { "cell": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68", "port": "input0" } }, { "shape": "dag-edge", "attrs": { "line": { "strokeDasharray": "" } }, "id": "bd1cf153-d0cb-48ff-a13d-0aca75295fc2", "zIndex": -1, "source": { "cell": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2", "port": "bottomPort" }, "target": { "cell": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68", "port": "input1" } }, { "position": { "x": 170, "y": 120 }, "size": { "width": 180, "height": 80 }, "view": "react-shape-view", "shape": "dag-node", "component": { "key": null, "ref": null, "props": {}, "_owner": null, "_store": {} }, "portMarkup": [{ "tagName": "foreignObject", "selector": "fo", "children": [{ "ns": "http://www.w3.org/1999/xhtml", "tagName": "body", "selector": "foBody", "attrs": { "xmlns": "http://www.w3.org/1999/xhtml" }, "style": { "width": "100%", "height": "100%", "background": "transparent" }, "children": [{ "tagName": "div", "selector": "foContent", "style": { "width": "100%", "height": "100%" } }] }] }], "ports": { "groups": { "top": { "position": "top", "attrs": { "fo": { "width": 10, "height": 10, "x": -5, "y": -5, "magnet": "true" } } }, "bottom": { "position": "bottom", "attrs": { "fo": { "width": 10, "height": 10, "x": -5, "y": -5, "magnet": "true" } } } }, "items": [{ "id": "bottomPort", "group": "bottom" }] }, "id": "d4fb565a-83b3-4831-8180-4b2043b620a8", "data": { "label": "InputSource", "status": "default", "type": "datasource", "id": "d4fb565a-83b3-4831-8180-4b2043b620a8", "nodeId": "8CSbH3t1CTdxSKZ0xqHf3", "inputSource": [{ "dataSelect": true, "dataField": "uuid", "dataType": "string" }, { "dataSelect": true, "dataField": "label", "dataType": "int" }, { "dataSelect": true, "dataField": "feature1", "dataType": "double" }, { "dataSelect": true, "dataField": "feature2", "dataType": "double" }, { "dataSelect": true, "dataField": "feature3", "dataType": "double" }, { "dataSelect": true, "dataField": "feature4", "dataType": "double" }, { "dataSelect": true, "dataField": "feature5", "dataType": "double" }, { "dataSelect": true, "dataField": "feature6", "dataType": "double" }, { "dataSelect": true, "dataField": "feature7", "dataType": "double" }, { "dataSelect": true, "dataField": "feature8", "dataType": "double" }, { "dataSelect": true, "dataField": "feature9", "dataType": "double" }], "dataTableId": 0, "dataTableName": "train", "nodeName": "datasourceNode1" }, "zIndex": 1 }, { "position": { "x": 540, "y": 110 }, "size": { "width": 180, "height": 80 }, "view": "react-shape-view", "shape": "dag-node", "component": { "key": null, "ref": null, "props": {}, "_owner": null, "_store": {} }, "portMarkup": [{ "tagName": "foreignObject", "selector": "fo", "children": [{ "ns": "http://www.w3.org/1999/xhtml", "tagName": "body", "selector": "foBody", "attrs": { "xmlns": "http://www.w3.org/1999/xhtml" }, "style": { "width": "100%", "height": "100%", "background": "transparent" }, "children": [{ "tagName": "div", "selector": "foContent", "style": { "width": "100%", "height": "100%" } }] }] }], "ports": { "groups": { "top": { "position": "top", "attrs": { "fo": { "width": 10, "height": 10, "x": -5, "y": -5, "magnet": "true" } } }, "bottom": { "position": "bottom", "attrs": { "fo": { "width": 10, "height": 10, "x": -5, "y": -5, "magnet": "true" } } } }, "items": [{ "id": "bottomPort", "group": "bottom" }] }, "id": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2", "data": { "label": "InputSource", "status": "default", "type": "datasource", "id": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2", "nodeId": "GqZitaihZUjpafezqtLOf", "inputSource": [{ "dataSelect": true, "dataField": "uuid", "dataType": "string" }, { "dataSelect": true, "dataField": "label", "dataType": "int" }, { "dataSelect": true, "dataField": "feature1", "dataType": "double" }, { "dataSelect": true, "dataField": "feature2", "dataType": "double" }, { "dataSelect": true, "dataField": "feature3", "dataType": "double" }, { "dataSelect": true, "dataField": "feature4", "dataType": "double" }, { "dataSelect": true, "dataField": "feature5", "dataType": "double" }, { "dataSelect": true, "dataField": "feature6", "dataType": "double" }, { "dataSelect": true, "dataField": "feature7", "dataType": "double" }, { "dataSelect": true, "dataField": "feature8", "dataType": "double" }, { "dataSelect": true, "dataField": "feature9", "dataType": "double" }], "dataTableId": 1, "dataTableName": "test", "nodeName": "test" }, "zIndex": 2 }, { "position": { "x": 360, "y": 440 }, "size": { "width": 180, "height": 36 }, "view": "react-shape-view", "shape": "dag-node", "component": { "key": null, "ref": null, "props": {}, "_owner": null, "_store": {} }, "portMarkup": [{ "tagName": "foreignObject", "selector": "fo", "children": [{ "ns": "http://www.w3.org/1999/xhtml", "tagName": "body", "selector": "foBody", "attrs": { "xmlns": "http://www.w3.org/1999/xhtml" }, "style": { "width": "100%", "height": "100%", "background": "transparent" }, "children": [{ "tagName": "div", "selector": "foContent", "style": { "width": "100%", "height": "100%" } }] }] }], "ports": { "groups": { "top": { "position": "top", "attrs": { "fo": { "width": 10, "height": 10, "x": -5, "y": -5, "magnet": "true" } } }, "bottom": { "position": "bottom", "attrs": { "fo": { "width": 10, "height": 10, "x": -5, "y": -5, "magnet": "true" } } } }, "items": [{ "group": "top", "id": "input0" }, { "group": "top", "id": "input1" }, { "group": "bottom", "id": "r0" }] }, "id": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68", "data": { "label": "pyspark", "status": "undone", "type": "script", "id": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68", "nodeId": "SHb3z_rKGtOvBFvcaKTKj", "nodeName": "pysparkNode1", "scriptText": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]", "outputData": [{ "outputVar": "r0" }], "inputNumber": 2 }, "zIndex": 5 }] } }