sql.dag 19 KB

1
  1. {"dag_id":"N1wP690lywNj1xrkzP8qD","user_name":"XXX","user_id":1,"nodes_task_name":"dfs","nodes_task_id":123,"itermidate_data":["hdfs://host:port/uri"],"nodes":[{"id":"8CSbH3t1CTdxSKZ0xqHf3","name":"datasourceNode1","op":"datasource","data":{"input_source":[{"dataSelect":true,"dataField":"uuid","dataType":"string"},{"dataSelect":true,"dataField":"label","dataType":"int"},{"dataSelect":true,"dataField":"feature1","dataType":"double"},{"dataSelect":true,"dataField":"feature2","dataType":"double"},{"dataSelect":true,"dataField":"feature3","dataType":"double"},{"dataSelect":true,"dataField":"feature4","dataType":"double"},{"dataSelect":true,"dataField":"feature5","dataType":"double"},{"dataSelect":true,"dataField":"feature6","dataType":"double"},{"dataSelect":true,"dataField":"feature7","dataType":"double"},{"dataSelect":true,"dataField":"feature8","dataType":"double"},{"dataSelect":true,"dataField":"feature9","dataType":"double"}],"input_table":0}},{"id":"GqZitaihZUjpafezqtLOf","name":"test","op":"datasource","data":{"input_source":[{"dataSelect":true,"dataField":"uuid","dataType":"string"},{"dataSelect":true,"dataField":"label","dataType":"int"},{"dataSelect":true,"dataField":"feature1","dataType":"double"},{"dataSelect":true,"dataField":"feature2","dataType":"double"},{"dataSelect":true,"dataField":"feature3","dataType":"double"},{"dataSelect":true,"dataField":"feature4","dataType":"double"},{"dataSelect":true,"dataField":"feature5","dataType":"double"},{"dataSelect":true,"dataField":"feature6","dataType":"double"},{"dataSelect":true,"dataField":"feature7","dataType":"double"},{"dataSelect":true,"dataField":"feature8","dataType":"double"},{"dataSelect":true,"dataField":"feature9","dataType":"double"}],"input_table":1}},{"id":"ScMnUyt9UL8XwnE3dP7gO","name":"sqlNode1","op":"sql","data":{"input_number":1,"output":[{"outputVar":"r1"}],"script":"select * from input0"}},{"id":"sTHVzpQ48qx62BdxjuHhs","name":"sqlNode2","op":"sql","data":{"input_number":1,"output":[{"outputVar":"r1"}],"script":"select * from input0"}},{"id":"SHb3zarKGtOvBFvcaKTKj","name":"pysparkNode1","op":"pyspark","data":{"input_number":2,"output":[{"outputVar":"r0"}],"script":"# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]"}},{"id":"8H1NofQJ1ahe0NnKzMbDs","name":"test","op":"outputsource","data":{"output_source":[{"dataType":"int","dataField":"123"},{"dataField":"ads","dataType":"text"},{"dataType":"int","dataField":"132"}]}}],"edges":[{"source":"8CSbH3t1CTdxSKZ0xqHf3","target":"ScMnUyt9UL8XwnE3dP7gO"},{"source":"GqZitaihZUjpafezqtLOf","target":"sTHVzpQ48qx62BdxjuHhs"},{"source":"ScMnUyt9UL8XwnE3dP7gO","target":"SHb3zarKGtOvBFvcaKTKj"},{"source":"sTHVzpQ48qx62BdxjuHhs","target":"SHb3zarKGtOvBFvcaKTKj"},{"source":"SHb3zarKGtOvBFvcaKTKj","target":"8H1NofQJ1ahe0NnKzMbDs"}],"graph":{"cells":[{"shape":"dag-edge","attrs":{"line":{"strokeDasharray":"","style":{"animation":""}}},"id":"f8dc1a4a-947d-4ddb-af4f-0cf758aa00ec","zIndex":-1,"source":{"cell":"d4fb565a-83b3-4831-8180-4b2043b620a8","port":"bottomPort"},"target":{"cell":"0043e7a9-b029-48b5-8204-dce43da02210","port":"input0"}},{"shape":"dag-edge","attrs":{"line":{"strokeDasharray":"","style":{"animation":""}}},"id":"f773189e-2fcc-46ba-86e3-f986a09405ee","zIndex":-1,"source":{"cell":"55b8e5d3-ba4b-4ebc-b670-59951103e1e2","port":"bottomPort"},"target":{"cell":"09e274c0-f0ed-4fa5-a235-e58bc93eb3b2","port":"input0"}},{"shape":"dag-edge","attrs":{"line":{"strokeDasharray":"","style":{"animation":""}}},"id":"ac5a4d7e-831f-4764-89d9-bcba4b69465c","zIndex":-1,"source":{"cell":"0043e7a9-b029-48b5-8204-dce43da02210","port":"r1"},"target":{"cell":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","port":"input0"}},{"shape":"dag-edge","attrs":{"line":{"strokeDasharray":"","style":{"animation":""}}},"id":"e0b0993f-dc3c-458d-9fd9-ce67d2b0c273","zIndex":-1,"source":{"cell":"09e274c0-f0ed-4fa5-a235-e58bc93eb3b2","port":"r1"},"target":{"cell":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","port":"input1"}},{"shape":"dag-edge","attrs":{"line":{"strokeDasharray":""}},"id":"4a066e3c-15d2-4e13-917c-af8c11e3fa64","zIndex":-1,"source":{"cell":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","port":"r0"},"target":{"cell":"b637c603-9fde-4d22-8044-5fff7547adcf","port":"topPort"}},{"position":{"x":210,"y":110},"size":{"width":180,"height":80},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"id":"bottomPort","group":"bottom"}]},"id":"d4fb565a-83b3-4831-8180-4b2043b620a8","data":{"label":"InputSource","status":"default","type":"datasource","id":"d4fb565a-83b3-4831-8180-4b2043b620a8","nodeId":"8CSbH3t1CTdxSKZ0xqHf3","inputSource":[{"dataSelect":true,"dataField":"uuid","dataType":"string"},{"dataSelect":true,"dataField":"label","dataType":"int"},{"dataSelect":true,"dataField":"feature1","dataType":"double"},{"dataSelect":true,"dataField":"feature2","dataType":"double"},{"dataSelect":true,"dataField":"feature3","dataType":"double"},{"dataSelect":true,"dataField":"feature4","dataType":"double"},{"dataSelect":true,"dataField":"feature5","dataType":"double"},{"dataSelect":true,"dataField":"feature6","dataType":"double"},{"dataSelect":true,"dataField":"feature7","dataType":"double"},{"dataSelect":true,"dataField":"feature8","dataType":"double"},{"dataSelect":true,"dataField":"feature9","dataType":"double"}],"dataTableId":0,"dataTableName":"train","nodeName":"datasourceNode1"},"zIndex":1},{"position":{"x":540,"y":110},"size":{"width":180,"height":80},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"id":"bottomPort","group":"bottom"}]},"id":"55b8e5d3-ba4b-4ebc-b670-59951103e1e2","data":{"label":"InputSource","status":"default","type":"datasource","id":"55b8e5d3-ba4b-4ebc-b670-59951103e1e2","nodeId":"GqZitaihZUjpafezqtLOf","inputSource":[{"dataSelect":true,"dataField":"uuid","dataType":"string"},{"dataSelect":true,"dataField":"label","dataType":"int"},{"dataSelect":true,"dataField":"feature1","dataType":"double"},{"dataSelect":true,"dataField":"feature2","dataType":"double"},{"dataSelect":true,"dataField":"feature3","dataType":"double"},{"dataSelect":true,"dataField":"feature4","dataType":"double"},{"dataSelect":true,"dataField":"feature5","dataType":"double"},{"dataSelect":true,"dataField":"feature6","dataType":"double"},{"dataSelect":true,"dataField":"feature7","dataType":"double"},{"dataSelect":true,"dataField":"feature8","dataType":"double"},{"dataSelect":true,"dataField":"feature9","dataType":"double"}],"dataTableId":1,"dataTableName":"test","nodeName":"test"},"zIndex":2},{"position":{"x":210,"y":300},"size":{"width":180,"height":36},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"group":"top","id":"input0"},{"group":"bottom","id":"r1"}]},"id":"0043e7a9-b029-48b5-8204-dce43da02210","data":{"label":"sql","status":"success","type":"script","id":"0043e7a9-b029-48b5-8204-dce43da02210","nodeId":"ScMnUyt9UL8XwnE3dP7gO","nodeName":"sqlNode1","scriptText":"select * from input0","outputData":[{"outputVar":"r1"}],"inputNumber":1},"zIndex":3},{"position":{"x":540,"y":300},"size":{"width":180,"height":36},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"group":"top","id":"input0"},{"group":"bottom","id":"r1"}]},"id":"09e274c0-f0ed-4fa5-a235-e58bc93eb3b2","data":{"label":"sql","status":"success","type":"script","id":"09e274c0-f0ed-4fa5-a235-e58bc93eb3b2","nodeId":"sTHVzpQ48qx62BdxjuHhs","nodeName":"sqlNode2","scriptText":"select * from input0","outputData":[{"outputVar":"r1"}],"inputNumber":1},"zIndex":4},{"position":{"x":360,"y":440},"size":{"width":180,"height":36},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"group":"top","id":"input0"},{"group":"top","id":"input1"},{"group":"bottom","id":"r0"}]},"id":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","data":{"label":"pyspark","status":"success","type":"script","id":"1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68","nodeId":"SHb3zarKGtOvBFvcaKTKj","nodeName":"pysparkNode1","scriptText":"# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]","outputData":[{"outputVar":"r0"}],"inputNumber":2},"zIndex":5},{"position":{"x":360,"y":550},"size":{"width":180,"height":80},"view":"react-shape-view","shape":"dag-node","component":{"key":null,"ref":null,"props":{},"_owner":null,"_store":{}},"portMarkup":[{"tagName":"foreignObject","selector":"fo","children":[{"ns":"http://www.w3.org/1999/xhtml","tagName":"body","selector":"foBody","attrs":{"xmlns":"http://www.w3.org/1999/xhtml"},"style":{"width":"100%","height":"100%","background":"transparent"},"children":[{"tagName":"div","selector":"foContent","style":{"width":"100%","height":"100%"}}]}]}],"ports":{"groups":{"top":{"position":"top","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}},"bottom":{"position":"bottom","attrs":{"fo":{"width":10,"height":10,"x":-5,"y":-5,"magnet":"true"}}}},"items":[{"id":"topPort","group":"top"}]},"id":"b637c603-9fde-4d22-8044-5fff7547adcf","data":{"label":"OutputSource","status":"default","type":"outputsource","id":"b637c603-9fde-4d22-8044-5fff7547adcf","nodeId":"8H1NofQJ1ahe0NnKzMbDs","outputSource":[{"dataType":"int","dataField":"123"},{"dataField":"ads","dataType":"text"},{"dataType":"int","dataField":"132"}],"nodeName":"test"},"zIndex":6}]}}