demo.dag 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. {
  2. "dag_id": "N1wP690lywNj1xrkzP8qD",
  3. "user_name": "XXX",
  4. "user_id": 1,
  5. "nodes_task_name": "dfs",
  6. "nodes_task_id": 123,
  7. "itermidate_data": ["hdfs://host:port/uri"],
  8. "nodes": [{
  9. "id": "8CSbH3t1CTdxSKZ0xqHf3",
  10. "name": "datasourceNode1",
  11. "op": "datasource",
  12. "data": {
  13. "input_source": [{
  14. "dataSelect": true,
  15. "dataField": "uuid",
  16. "dataType": "string"
  17. }, {
  18. "dataSelect": true,
  19. "dataField": "label",
  20. "dataType": "int"
  21. }, {
  22. "dataSelect": true,
  23. "dataField": "feature1",
  24. "dataType": "double"
  25. }, {
  26. "dataSelect": true,
  27. "dataField": "feature2",
  28. "dataType": "double"
  29. }, {
  30. "dataSelect": true,
  31. "dataField": "feature3",
  32. "dataType": "double"
  33. }, {
  34. "dataSelect": true,
  35. "dataField": "feature4",
  36. "dataType": "double"
  37. }, {
  38. "dataSelect": true,
  39. "dataField": "feature5",
  40. "dataType": "double"
  41. }, {
  42. "dataSelect": true,
  43. "dataField": "feature6",
  44. "dataType": "double"
  45. }, {
  46. "dataSelect": true,
  47. "dataField": "feature7",
  48. "dataType": "double"
  49. }, {
  50. "dataSelect": true,
  51. "dataField": "feature8",
  52. "dataType": "double"
  53. }, {
  54. "dataSelect": true,
  55. "dataField": "feature9",
  56. "dataType": "double"
  57. }],
  58. "input_table": 0
  59. }
  60. }, {
  61. "id": "GqZitaihZUjpafezqtLOf",
  62. "name": "test",
  63. "op": "datasource",
  64. "data": {
  65. "input_source": [{
  66. "dataSelect": true,
  67. "dataField": "uuid",
  68. "dataType": "string"
  69. }, {
  70. "dataSelect": true,
  71. "dataField": "label",
  72. "dataType": "int"
  73. }, {
  74. "dataSelect": true,
  75. "dataField": "feature1",
  76. "dataType": "double"
  77. }, {
  78. "dataSelect": true,
  79. "dataField": "feature2",
  80. "dataType": "double"
  81. }, {
  82. "dataSelect": true,
  83. "dataField": "feature3",
  84. "dataType": "double"
  85. }, {
  86. "dataSelect": true,
  87. "dataField": "feature4",
  88. "dataType": "double"
  89. }, {
  90. "dataSelect": true,
  91. "dataField": "feature5",
  92. "dataType": "double"
  93. }, {
  94. "dataSelect": true,
  95. "dataField": "feature6",
  96. "dataType": "double"
  97. }, {
  98. "dataSelect": true,
  99. "dataField": "feature7",
  100. "dataType": "double"
  101. }, {
  102. "dataSelect": true,
  103. "dataField": "feature8",
  104. "dataType": "double"
  105. }, {
  106. "dataSelect": true,
  107. "dataField": "feature9",
  108. "dataType": "double"
  109. }],
  110. "input_table": 1
  111. }
  112. }, {
  113. "id": "SHb3z_rKGtOvBFvcaKTKj",
  114. "name": "pysparkNode1",
  115. "op": "pyspark",
  116. "data": {
  117. "input_number": 2,
  118. "output": [{
  119. "outputVar": "r0"
  120. }],
  121. "script": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]"
  122. }
  123. }],
  124. "edges": [{
  125. "source": "8CSbH3t1CTdxSKZ0xqHf3",
  126. "target": "SHb3z_rKGtOvBFvcaKTKj"
  127. }, {
  128. "source": "GqZitaihZUjpafezqtLOf",
  129. "target": "SHb3z_rKGtOvBFvcaKTKj"
  130. }],
  131. "dag_script": {
  132. "sub_nodes": [{
  133. "id": "8CSbH3t1CTdxSKZ0xqHf3",
  134. "name": "datasourceNode1",
  135. "type": "datasource",
  136. "op": "sql",
  137. "script": "select uuid, label, feature1, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9 from train"
  138. }, {
  139. "id": "GqZitaihZUjpafezqtLOf",
  140. "name": "test",
  141. "type": "datasource",
  142. "op": "sql",
  143. "script": "select uuid, label, feature1, feature2, feature3, feature4, feature5, feature6, feature7, feature8, feature9 from test"
  144. }, {
  145. "id": "SHb3z_rKGtOvBFvcaKTKj",
  146. "type": "script",
  147. "name": "pysparkNode1",
  148. "op": "pyspark",
  149. "inputs": {
  150. "input0": ["8CSbH3t1CTdxSKZ0xqHf3", 0],
  151. "input1": ["GqZitaihZUjpafezqtLOf", 0]
  152. },
  153. "script": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]"
  154. }],
  155. "edges": [
  156. ["8CSbH3t1CTdxSKZ0xqHf3", "SHb3z_rKGtOvBFvcaKTKj"],
  157. ["GqZitaihZUjpafezqtLOf", "SHb3z_rKGtOvBFvcaKTKj"]
  158. ]
  159. },
  160. "graph": {
  161. "cells": [{
  162. "shape": "dag-edge",
  163. "attrs": {
  164. "line": {
  165. "strokeDasharray": ""
  166. }
  167. },
  168. "id": "73ef6d80-3587-4bd6-b5ac-e2a16e8bb3d2",
  169. "zIndex": -1,
  170. "source": {
  171. "cell": "d4fb565a-83b3-4831-8180-4b2043b620a8",
  172. "port": "bottomPort"
  173. },
  174. "target": {
  175. "cell": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68",
  176. "port": "input0"
  177. }
  178. }, {
  179. "shape": "dag-edge",
  180. "attrs": {
  181. "line": {
  182. "strokeDasharray": ""
  183. }
  184. },
  185. "id": "bd1cf153-d0cb-48ff-a13d-0aca75295fc2",
  186. "zIndex": -1,
  187. "source": {
  188. "cell": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2",
  189. "port": "bottomPort"
  190. },
  191. "target": {
  192. "cell": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68",
  193. "port": "input1"
  194. }
  195. }, {
  196. "position": {
  197. "x": 170,
  198. "y": 120
  199. },
  200. "size": {
  201. "width": 180,
  202. "height": 80
  203. },
  204. "view": "react-shape-view",
  205. "shape": "dag-node",
  206. "component": {
  207. "key": null,
  208. "ref": null,
  209. "props": {},
  210. "_owner": null,
  211. "_store": {}
  212. },
  213. "portMarkup": [{
  214. "tagName": "foreignObject",
  215. "selector": "fo",
  216. "children": [{
  217. "ns": "http://www.w3.org/1999/xhtml",
  218. "tagName": "body",
  219. "selector": "foBody",
  220. "attrs": {
  221. "xmlns": "http://www.w3.org/1999/xhtml"
  222. },
  223. "style": {
  224. "width": "100%",
  225. "height": "100%",
  226. "background": "transparent"
  227. },
  228. "children": [{
  229. "tagName": "div",
  230. "selector": "foContent",
  231. "style": {
  232. "width": "100%",
  233. "height": "100%"
  234. }
  235. }]
  236. }]
  237. }],
  238. "ports": {
  239. "groups": {
  240. "top": {
  241. "position": "top",
  242. "attrs": {
  243. "fo": {
  244. "width": 10,
  245. "height": 10,
  246. "x": -5,
  247. "y": -5,
  248. "magnet": "true"
  249. }
  250. }
  251. },
  252. "bottom": {
  253. "position": "bottom",
  254. "attrs": {
  255. "fo": {
  256. "width": 10,
  257. "height": 10,
  258. "x": -5,
  259. "y": -5,
  260. "magnet": "true"
  261. }
  262. }
  263. }
  264. },
  265. "items": [{
  266. "id": "bottomPort",
  267. "group": "bottom"
  268. }]
  269. },
  270. "id": "d4fb565a-83b3-4831-8180-4b2043b620a8",
  271. "data": {
  272. "label": "InputSource",
  273. "status": "default",
  274. "type": "datasource",
  275. "id": "d4fb565a-83b3-4831-8180-4b2043b620a8",
  276. "nodeId": "8CSbH3t1CTdxSKZ0xqHf3",
  277. "inputSource": [{
  278. "dataSelect": true,
  279. "dataField": "uuid",
  280. "dataType": "string"
  281. }, {
  282. "dataSelect": true,
  283. "dataField": "label",
  284. "dataType": "int"
  285. }, {
  286. "dataSelect": true,
  287. "dataField": "feature1",
  288. "dataType": "double"
  289. }, {
  290. "dataSelect": true,
  291. "dataField": "feature2",
  292. "dataType": "double"
  293. }, {
  294. "dataSelect": true,
  295. "dataField": "feature3",
  296. "dataType": "double"
  297. }, {
  298. "dataSelect": true,
  299. "dataField": "feature4",
  300. "dataType": "double"
  301. }, {
  302. "dataSelect": true,
  303. "dataField": "feature5",
  304. "dataType": "double"
  305. }, {
  306. "dataSelect": true,
  307. "dataField": "feature6",
  308. "dataType": "double"
  309. }, {
  310. "dataSelect": true,
  311. "dataField": "feature7",
  312. "dataType": "double"
  313. }, {
  314. "dataSelect": true,
  315. "dataField": "feature8",
  316. "dataType": "double"
  317. }, {
  318. "dataSelect": true,
  319. "dataField": "feature9",
  320. "dataType": "double"
  321. }],
  322. "dataTableId": 0,
  323. "dataTableName": "train",
  324. "nodeName": "datasourceNode1"
  325. },
  326. "zIndex": 1
  327. }, {
  328. "position": {
  329. "x": 540,
  330. "y": 110
  331. },
  332. "size": {
  333. "width": 180,
  334. "height": 80
  335. },
  336. "view": "react-shape-view",
  337. "shape": "dag-node",
  338. "component": {
  339. "key": null,
  340. "ref": null,
  341. "props": {},
  342. "_owner": null,
  343. "_store": {}
  344. },
  345. "portMarkup": [{
  346. "tagName": "foreignObject",
  347. "selector": "fo",
  348. "children": [{
  349. "ns": "http://www.w3.org/1999/xhtml",
  350. "tagName": "body",
  351. "selector": "foBody",
  352. "attrs": {
  353. "xmlns": "http://www.w3.org/1999/xhtml"
  354. },
  355. "style": {
  356. "width": "100%",
  357. "height": "100%",
  358. "background": "transparent"
  359. },
  360. "children": [{
  361. "tagName": "div",
  362. "selector": "foContent",
  363. "style": {
  364. "width": "100%",
  365. "height": "100%"
  366. }
  367. }]
  368. }]
  369. }],
  370. "ports": {
  371. "groups": {
  372. "top": {
  373. "position": "top",
  374. "attrs": {
  375. "fo": {
  376. "width": 10,
  377. "height": 10,
  378. "x": -5,
  379. "y": -5,
  380. "magnet": "true"
  381. }
  382. }
  383. },
  384. "bottom": {
  385. "position": "bottom",
  386. "attrs": {
  387. "fo": {
  388. "width": 10,
  389. "height": 10,
  390. "x": -5,
  391. "y": -5,
  392. "magnet": "true"
  393. }
  394. }
  395. }
  396. },
  397. "items": [{
  398. "id": "bottomPort",
  399. "group": "bottom"
  400. }]
  401. },
  402. "id": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2",
  403. "data": {
  404. "label": "InputSource",
  405. "status": "default",
  406. "type": "datasource",
  407. "id": "55b8e5d3-ba4b-4ebc-b670-59951103e1e2",
  408. "nodeId": "GqZitaihZUjpafezqtLOf",
  409. "inputSource": [{
  410. "dataSelect": true,
  411. "dataField": "uuid",
  412. "dataType": "string"
  413. }, {
  414. "dataSelect": true,
  415. "dataField": "label",
  416. "dataType": "int"
  417. }, {
  418. "dataSelect": true,
  419. "dataField": "feature1",
  420. "dataType": "double"
  421. }, {
  422. "dataSelect": true,
  423. "dataField": "feature2",
  424. "dataType": "double"
  425. }, {
  426. "dataSelect": true,
  427. "dataField": "feature3",
  428. "dataType": "double"
  429. }, {
  430. "dataSelect": true,
  431. "dataField": "feature4",
  432. "dataType": "double"
  433. }, {
  434. "dataSelect": true,
  435. "dataField": "feature5",
  436. "dataType": "double"
  437. }, {
  438. "dataSelect": true,
  439. "dataField": "feature6",
  440. "dataType": "double"
  441. }, {
  442. "dataSelect": true,
  443. "dataField": "feature7",
  444. "dataType": "double"
  445. }, {
  446. "dataSelect": true,
  447. "dataField": "feature8",
  448. "dataType": "double"
  449. }, {
  450. "dataSelect": true,
  451. "dataField": "feature9",
  452. "dataType": "double"
  453. }],
  454. "dataTableId": 1,
  455. "dataTableName": "test",
  456. "nodeName": "test"
  457. },
  458. "zIndex": 2
  459. }, {
  460. "position": {
  461. "x": 360,
  462. "y": 440
  463. },
  464. "size": {
  465. "width": 180,
  466. "height": 36
  467. },
  468. "view": "react-shape-view",
  469. "shape": "dag-node",
  470. "component": {
  471. "key": null,
  472. "ref": null,
  473. "props": {},
  474. "_owner": null,
  475. "_store": {}
  476. },
  477. "portMarkup": [{
  478. "tagName": "foreignObject",
  479. "selector": "fo",
  480. "children": [{
  481. "ns": "http://www.w3.org/1999/xhtml",
  482. "tagName": "body",
  483. "selector": "foBody",
  484. "attrs": {
  485. "xmlns": "http://www.w3.org/1999/xhtml"
  486. },
  487. "style": {
  488. "width": "100%",
  489. "height": "100%",
  490. "background": "transparent"
  491. },
  492. "children": [{
  493. "tagName": "div",
  494. "selector": "foContent",
  495. "style": {
  496. "width": "100%",
  497. "height": "100%"
  498. }
  499. }]
  500. }]
  501. }],
  502. "ports": {
  503. "groups": {
  504. "top": {
  505. "position": "top",
  506. "attrs": {
  507. "fo": {
  508. "width": 10,
  509. "height": 10,
  510. "x": -5,
  511. "y": -5,
  512. "magnet": "true"
  513. }
  514. }
  515. },
  516. "bottom": {
  517. "position": "bottom",
  518. "attrs": {
  519. "fo": {
  520. "width": 10,
  521. "height": 10,
  522. "x": -5,
  523. "y": -5,
  524. "magnet": "true"
  525. }
  526. }
  527. }
  528. },
  529. "items": [{
  530. "group": "top",
  531. "id": "input0"
  532. }, {
  533. "group": "top",
  534. "id": "input1"
  535. }, {
  536. "group": "bottom",
  537. "id": "r0"
  538. }]
  539. },
  540. "id": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68",
  541. "data": {
  542. "label": "pyspark",
  543. "status": "undone",
  544. "type": "script",
  545. "id": "1aa2f6c2-fdb3-4241-ae06-8f8f71d84e68",
  546. "nodeId": "SHb3z_rKGtOvBFvcaKTKj",
  547. "nodeName": "pysparkNode1",
  548. "scriptText": "# import sys\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import *\nfrom time import *\n\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml import Pipeline\nfrom pyspark.sql.functions import udf, col\n\ndef getFeatureName():\n featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9']\n colLst = ['uuid', 'label'] + featureLst\n return featureLst, colLst\n\ndef parseFloat(x):\n try:\n rx = float(x)\n except:\n rx = 0.0\n return rx\n\n\ndef getDict(dictDataLst, colLst):\n dictData = {}\n for i in range(len(colLst)):\n dictData[colLst[i]] = parseFloat(dictDataLst[i])\n return dictData\n\ndef to_array(col):\n def to_array_(v):\n return v.toArray().tolist()\n # Important: asNondeterministic requires Spark 2.3 or later\n # It can be safely removed i.e.\n # return udf(to_array_, ArrayType(DoubleType()))(col)\n # but at the cost of decreased performance\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\n\n# def read_input(ss: SparkSession, t1, t2):\n# trainDF = ss.sql(f'select * from {t1}')\n# testDF = ss.sql(f'select * from {t2}')\n# return trainDF, testDF\n\n# def write_output(df1, m1):\n# df1.write.mode(\"overwrite\").saveAsTable('prediction')\n# m1.write().overwrite().save(\"hdfs://192.168.199.27:9000/tmp/dag/target/model/lrModel\")\n\n\n\n\ndef main_func(input0, input1, spark, sc):\n trainDF = input0\n testDF = input1\n \n\n featureLst, colLst = getFeatureName()\n\n\n #用于训练的特征featureLst\n vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol(\"features\")\n\n #### 训练 ####\n print(\"step 1\")\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam 正则项参数\n\n pipeline = Pipeline(stages=[vectorAssembler, lr])\n model = pipeline.fit(trainDF)\n #打印参数\n print(\"\\n-------------------------------------------------------------------------\")\n print(\"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\")\n print(\"-------------------------------------------------------------------------\\n\")\n\n #### 预测, 保存结果 ####\n print(\"step 2\")\n labelsAndPreds = model.transform(testDF).withColumn(\"probability_xj\", to_array(col(\"probability\"))[1])\\\n .select(\"uuid\", \"label\", \"prediction\", \"probability_xj\")\n labelsAndPreds.show()\n # labelsAndPreds.write.mode(\"overwrite\").options(header=\"true\").csv(outputHDFS + \"/target/output\")\n\n\n #### 评估不同阈值下的准确率、召回率\n print(\"step 3\")\n labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)\n labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)\n labelsAndPreds_label_1.show(3)\n labelsAndPreds_label_0.show(3)\n t_cnt = labelsAndPreds_label_1.count()\n f_cnt = labelsAndPreds_label_0.count()\n print(\"thre\\ttp\\ttn\\tfp\\tfn\\taccuracy\\trecall\")\n for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]:\n tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count()\n tn = t_cnt - tp\n fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count()\n fn = f_cnt - fp\n print(\"%.1f\\t%d\\t%d\\t%d\\t%d\\t%.4f\\t%.4f\"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))\n \n r0 = labelsAndPreds\n return [r0]",
  549. "outputData": [{
  550. "outputVar": "r0"
  551. }],
  552. "inputNumber": 2
  553. },
  554. "zIndex": 5
  555. }]
  556. }
  557. }