{"dag_id":"N1wP690lywNj1xrkzaghe","user_name":"XXX","user_id":1,"nodes_task_name":"dfs","nodes_task_id":123,"itermidate_data":["hdfs://host:port/uri"],"nodes":[{"id":"8CSbH3t1CTdxSKZ0xqHf3","name":"datasourceNode1","op":"datasource","data":{"input_source":[{"dataSelect":true,"dataField":"uuid","dataType":"string"},{"dataSelect":true,"dataField":"label","dataType":"int"},{"dataSelect":true,"dataField":"feature1","dataType":"double"},{"dataSelect":true,"dataField":"feature2","dataType":"double"},{"dataSelect":true,"dataField":"feature3","dataType":"double"},{"dataSelect":true,"dataField":"feature4","dataType":"double"},{"dataSelect":true,"dataField":"feature5","dataType":"double"},{"dataSelect":true,"dataField":"feature6","dataType":"double"},{"dataSelect":true,"dataField":"feature7","dataType":"double"},{"dataSelect":true,"dataField":"feature8","dataType":"double"},{"dataSelect":true,"dataField":"feature9","dataType":"double"}],"input_table":0}},{"id":"GqZitaihZUjpafezqtLOf","name":"test","op":"datasource","data":{"input_source":[{"dataSelect":true,"dataField":"uuid","dataType":"string"},{"dataSelect":true,"dataField":"label","dataType":"int"},{"dataSelect":true,"dataField":"feature1","dataType":"double"},{"dataSelect":true,"dataField":"feature2","dataType":"double"},{"dataSelect":true,"dataField":"feature3","dataType":"double"},{"dataSelect":true,"dataField":"feature4","dataType":"double"},{"dataSelect":true,"dataField":"feature5","dataType":"double"},{"dataSelect":true,"dataField":"feature6","dataType":"double"},{"dataSelect":true,"dataField":"feature7","dataType":"double"},{"dataSelect":true,"dataField":"feature8","dataType":"double"},{"dataSelect":true,"dataField":"feature9","dataType":"double"}],"input_table":1}},{"id":"SHb3zarKGtOvBFvcaKTKj","name":"pysparkNode1","op":"pyspark","data":{"input_number":2,"output":[{"outputVar":"r0"}],"script":"# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]"}},{"id":"dkbV9oK3AZuaIljq_C9w3","name":"test1","op":"outputsource","data":{"output_source":[{"dataField":"abc","dataType":"int"}]}}],"edges":[{"source":"8CSbH3t1CTdxSKZ0xqHf3","target":"SHb3zarKGtOvBFvcaKTKj"},{"source":"GqZitaihZUjpafezqtLOf","target":"SHb3zarKGtOvBFvcaKTKj"},{"source":"SHb3zarKGtOvBFvcaKTKj","target":"dkbV9oK3AZuaIljq_C9w3"}],"graph":{"cells":[{"shape":"dag-edge","attrs":{"line":{"strokeDasharray":"","style":{"animation":""}}},"id":"73ef6d80-3587-4bd6-b5ac-e2a16e8bb3d2","zIndex":-1,"source":{"cell":"d4fb565a-83b3-4831-8180-4b2043b620a8","port":"bottomPort"},"target":{"cell":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","port":"input0"}},{"shape":"dag-edge","attrs":{"line":{"strokeDasharray":"","style":{"animation":""}}},"id":"bd1cf153-d0cb-48ff-a13d-0aca75295fc2","zIndex":-1,"source":{"cell":"55b8e5d3-ba4b-4ebc-b670-59951103e1e2","port":"bottomPort"},"target":{"cell":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","port":"input1"}},{"shape":"dag-edge","attrs":{"line":{"strokeDasharray":""}},"id":"35a1272b-cb76-4bef-86b4-e87ab418329c","zIndex":-1,"source":{"cell":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","port":"r0"},"target":{"cell":"c6d091b7-95a2-4195-8cf0-201e2f0e4db6","port":"topPort"}},{"position":{"x":170,"y":120},"size":{"width":180,"height":80},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"id":"bottomPort","group":"bottom"}]},"id":"d4fb565a-83b3-4831-8180-4b2043b620a8","data":{"label":"InputSource","status":"default","type":"datasource","id":"d4fb565a-83b3-4831-8180-4b2043b620a8","nodeId":"8CSbH3t1CTdxSKZ0xqHf3","inputSource":[{"dataSelect":true,"dataField":"uuid","dataType":"string"},{"dataSelect":true,"dataField":"label","dataType":"int"},{"dataSelect":true,"dataField":"feature1","dataType":"double"},{"dataSelect":true,"dataField":"feature2","dataType":"double"},{"dataSelect":true,"dataField":"feature3","dataType":"double"},{"dataSelect":true,"dataField":"feature4","dataType":"double"},{"dataSelect":true,"dataField":"feature5","dataType":"double"},{"dataSelect":true,"dataField":"feature6","dataType":"double"},{"dataSelect":true,"dataField":"feature7","dataType":"double"},{"dataSelect":true,"dataField":"feature8","dataType":"double"},{"dataSelect":true,"dataField":"feature9","dataType":"double"}],"dataTableId":0,"dataTableName":"train","nodeName":"datasourceNode1"},"zIndex":1},{"position":{"x":530,"y":120},"size":{"width":180,"height":80},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"id":"bottomPort","group":"bottom"}]},"id":"55b8e5d3-ba4b-4ebc-b670-59951103e1e2","data":{"label":"InputSource","status":"default","type":"datasource","id":"55b8e5d3-ba4b-4ebc-b670-59951103e1e2","nodeId":"GqZitaihZUjpafezqtLOf","inputSource":[{"dataSelect":true,"dataField":"uuid","dataType":"string"},{"dataSelect":true,"dataField":"label","dataType":"int"},{"dataSelect":true,"dataField":"feature1","dataType":"double"},{"dataSelect":true,"dataField":"feature2","dataType":"double"},{"dataSelect":true,"dataField":"feature3","dataType":"double"},{"dataSelect":true,"dataField":"feature4","dataType":"double"},{"dataSelect":true,"dataField":"feature5","dataType":"double"},{"dataSelect":true,"dataField":"feature6","dataType":"double"},{"dataSelect":true,"dataField":"feature7","dataType":"double"},{"dataSelect":true,"dataField":"feature8","dataType":"double"},{"dataSelect":true,"dataField":"feature9","dataType":"double"}],"dataTableId":1,"dataTableName":"test","nodeName":"test"},"zIndex":2},{"position":{"x":360,"y":440},"size":{"width":180,"height":36},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"group":"top","id":"input0"},{"group":"top","id":"input1"},{"group":"bottom","id":"r0"}]},"id":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","data":{"label":"pyspark","status":"success","type":"script","id":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","nodeId":"SHb3zarKGtOvBFvcaKTKj","nodeName":"pysparkNode1","scriptText":"# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]","outputData":[{"outputVar":"r0"}],"inputNumber":2},"zIndex":5},{"position":{"x":360,"y":550},"size":{"width":180,"height":80},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"id":"topPort","group":"top"}]},"id":"c6d091b7-95a2-4195-8cf0-201e2f0e4db6","data":{"label":"OutputSource","status":"default","type":"outputsource","id":"c6d091b7-95a2-4195-8cf0-201e2f0e4db6","nodeId":"dkbV9oK3AZuaIljq_C9w3","nodeName":"test1","outputSource":[{"dataField":"abc","dataType":"int"}]},"zIndex":6}]}}