{ "cells": [ { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[(19, 20), (20, 24)]\n", "{'tasks': [{'name': 'python_task_1664728952', 'file_urls': ['mytest/xxx/1664728951_hello_world.py'], 'cmd': 'curl $FILE_TO_DOWNLOAD > run.py && python -b run.py', 'envs': {'FILE_TO_DOWNLOAD': 'http://minio.default:9000/mytest/xxx/1664728951_hello_world.py'}, 'create_time': 1664728952, 'user_id': 33, 'task_type': 'python', 'id': 19, 'script': '', 'cmd_parameters': '-b ', 'run_image': 'SXKJ:32775/pod_python:1.1', 'update_time': 1664728952}, {'name': 'python_task_1664728953', 'file_urls': ['mytest/xxx/1664728953_hello_world.py'], 'cmd': 'curl $FILE_TO_DOWNLOAD > run.py && python -b run.py', 'envs': {'FILE_TO_DOWNLOAD': 'http://minio.default:9000/mytest/xxx/1664728953_hello_world.py'}, 'create_time': 1664728953, 'user_id': 33, 'task_type': 'python', 'id': 20, 'script': '', 'cmd_parameters': '-b ', 'run_image': 'SXKJ:32775/pod_python:1.1', 'update_time': 1664728953}, {'name': 'sparks_dag_demo_1665309786', 'file_urls': [], 'cmd': '', 'envs': {}, 'create_time': 1665309786, 'user_id': 33, 'task_type': 'sparks', 'id': 24, 'script': '{\"sub_nodes\": [{\"id\": \"1\", \"name\": \"SqlNode1\", \"op\": \"sql\", \"script\": \"select * from train\"}, {\"id\": \"2\", \"name\": \"SqlNode1\", \"op\": \"sql\", \"script\": \"select * from test\"}, {\"id\": \"3\", \"name\": \"PysparkNode1\", \"op\": \"pyspark\", \"inputs\": {\"train_df\": [\"1\", 0], \"test_df\": [\"2\", 0]}, \"script\": \"from pyspark.sql.types import *\\\\nfrom pyspark.ml.classification import LogisticRegression\\\\nfrom pyspark.ml.feature import VectorAssembler\\\\nfrom pyspark.ml import Pipeline\\\\nfrom pyspark.sql.functions import udf, col\\\\nfrom pyspark.sql import DataFrame\\\\n\\\\n\\\\ndef to_array(col):\\\\n def to_array_(v):\\\\n return v.toArray().tolist()\\\\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\\\\n\\\\n\\\\ndef main_func(train_df: DataFrame, test_df: DataFrame):\\\\n feat_cols = [\\'feature1\\', \\'feature2\\', \\'feature3\\', \\'feature4\\', \\'feature5\\', \\'feature6\\', \\'feature7\\', \\'feature8\\',\\\\n \\'feature9\\']\\\\n\\\\n vector_assembler = VectorAssembler().setInputCols(feat_cols).setOutputCol(\\\\\"features\\\\\")\\\\n\\\\n #### \\\\u8bad\\\\u7ec3 ####\\\\n print(\\\\\"step 1\\\\\")\\\\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam \\\\u6b63\\\\u5219\\\\u9879\\\\u53c2\\\\u6570\\\\n pipeline = Pipeline(stages=[vector_assembler, lr])\\\\n model = pipeline.fit(train_df)\\\\n\\\\n # \\\\u6253\\\\u5370\\\\u53c2\\\\u6570\\\\n print(\\\\\"\\\\\\\\n-------------------------------------------------------------------------\\\\\")\\\\n print(\\\\\"LogisticRegression parameters:\\\\\\\\n\\\\\" + lr.explainParams() + \\\\\"\\\\\\\\n\\\\\")\\\\n print(\\\\\"-------------------------------------------------------------------------\\\\\\\\n\\\\\")\\\\n\\\\n #### \\\\u9884\\\\u6d4b, \\\\u4fdd\\\\u5b58\\\\u7ed3\\\\u679c ####\\\\n print(\\\\\"step 2\\\\\")\\\\n labels_and_preds = model.transform(test_df).withColumn(\\\\\"probability_xj\\\\\", to_array(col(\\\\\"probability\\\\\"))[1]) \\\\\\\\\\\\n .select(\\\\\"uuid\\\\\", \\\\\"label\\\\\", \\\\\"prediction\\\\\", \\\\\"probability_xj\\\\\")\\\\n\\\\n return [labels_and_preds]\\\\n\"}], \"edges\": [[\"1\", \"2\"], [\"2\", \"3\"]]}', 'cmd_parameters': '', 'run_image': '', 'update_time': 1665407210}], 'dependence': [(19, 20), (20, 24)], 'name': 'auto_gen_test_job02', 'desc': 'my task include dag', 'trigger_status': 0, 'cron': 'string', 'route_strategy': 'string', 'block_strategy': 'string', 'executor_timeout': 0, 'executor_fail_retry_count': 0}\n" ] } ], "source": [ "import json\n", "import requests\n", "from datetime import timedelta\n", "\n", "url = 'http://192.168.199.109:18082'\n", "\n", "ret = requests.get(f'{url}/jpt/af_task')\n", "\n", "tasks = []\n", "dependence = []\n", "python_task_num =0\n", "for task in ret.json()['data']['items']:\n", " if task[\"task_type\"] ==\"python\" and python_task_num<2:\n", " tasks.append(task)\n", " python_task_num+=1\n", " elif task[\"task_type\"] ==\"sparks\" :\n", " tasks.append(task)\n", "\n", "\n", "last_t = None\n", "for t in tasks:\n", " if last_t:\n", " dependence.append((last_t, t['id']))\n", " last_t = t['id']\n", "\n", "print(dependence)\n", "data = {\n", " \"tasks\": tasks,\n", " \"dependence\": dependence,\n", " \"name\":\"auto_gen_test_job02\",\n", " \"desc\": \"my task include dag\",\n", " # \"job_type\": 1,\n", " # \"user_id\": 33,\n", " \"trigger_status\": 0,\n", " \"cron\": \"string\",\n", " \"route_strategy\": \"string\",\n", " \"block_strategy\": \"string\",\n", " \"executor_timeout\": 0,\n", " \"executor_fail_retry_count\": 0,\n", "}\n", "\n", "print(data)\n", "\n", "# ret = requests.put(f'{url}/jpt/af_job/10', json=data)\n", "# print(ret)\n", "# print(ret.content)\n", "# ret = requests.post(f'{url}/jpt/af_job', json=data)\n", "# print(ret.content)\n", "# filename = 'hello_world.py'\n", "# minio_bucket = 'mytest'\n", "# minio_path = 'xxx/' + filename\n", "#\n", "# minio_handler = FileHandler(bucket_name=minio_bucket)\n", "# with open(filename, 'rb') as f:\n", "# script_str = f.read()\n", "# minio_handler.put_byte_file(file_name=minio_path, file_content=script_str)\n", "#\n", "# print(minio_handler.get_file(filename))" ] }, { "cell_type": "markdown", "source": [ "### 更新任务" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 7, "outputs": [], "source": [ "url = 'http://192.168.199.109:18082'\n", "ret = requests.get(f'{url}/jpt/af_job/getOnce/10')" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 8, "outputs": [ { "data": { "text/plain": "{'code': 200,\n 'data': {'job_type': 1,\n 'name': 'auto_gen_test_job02',\n 'tasks': [{'id': 19,\n 'cmd': 'curl $FILE_TO_DOWNLOAD > run.py && python -b run.py',\n 'envs': {'FILE_TO_DOWNLOAD': 'http://minio.default:9000/mytest/xxx/1664728951_hello_world.py'},\n 'name': 'python_task_1664728952',\n 'script': '',\n 'user_id': 33,\n 'file_urls': ['mytest/xxx/1664728951_hello_world.py'],\n 'run_image': 'SXKJ:32775/pod_python:1.1',\n 'task_type': 'python',\n 'create_time': 1664728952,\n 'update_time': 1664728952,\n 'cmd_parameters': '-b '},\n {'id': 20,\n 'cmd': 'curl $FILE_TO_DOWNLOAD > run.py && python -b run.py',\n 'envs': {'FILE_TO_DOWNLOAD': 'http://minio.default:9000/mytest/xxx/1664728953_hello_world.py'},\n 'name': 'python_task_1664728953',\n 'script': '',\n 'user_id': 33,\n 'file_urls': ['mytest/xxx/1664728953_hello_world.py'],\n 'run_image': 'SXKJ:32775/pod_python:1.1',\n 'task_type': 'python',\n 'create_time': 1664728953,\n 'update_time': 1664728953,\n 'cmd_parameters': '-b '},\n {'id': 24,\n 'cmd': '',\n 'envs': {},\n 'name': 'sparks_dag_demo_1665309786',\n 'script': '{\"sub_nodes\": [{\"id\": \"1\", \"name\": \"SqlNode1\", \"op\": \"sql\", \"script\": \"select * from train\"}, {\"id\": \"2\", \"name\": \"SqlNode1\", \"op\": \"sql\", \"script\": \"select * from test\"}, {\"id\": \"3\", \"name\": \"PysparkNode1\", \"op\": \"pyspark\", \"inputs\": {\"train_df\": [\"1\", 0], \"test_df\": [\"2\", 0]}, \"script\": \"from pyspark.sql.types import *\\\\nfrom pyspark.ml.classification import LogisticRegression\\\\nfrom pyspark.ml.feature import VectorAssembler\\\\nfrom pyspark.ml import Pipeline\\\\nfrom pyspark.sql.functions import udf, col\\\\nfrom pyspark.sql import DataFrame\\\\n\\\\n\\\\ndef to_array(col):\\\\n def to_array_(v):\\\\n return v.toArray().tolist()\\\\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\\\\n\\\\n\\\\ndef main_func(train_df: DataFrame, test_df: DataFrame):\\\\n feat_cols = [\\'feature1\\', \\'feature2\\', \\'feature3\\', \\'feature4\\', \\'feature5\\', \\'feature6\\', \\'feature7\\', \\'feature8\\',\\\\n \\'feature9\\']\\\\n\\\\n vector_assembler = VectorAssembler().setInputCols(feat_cols).setOutputCol(\\\\\"features\\\\\")\\\\n\\\\n #### \\\\u8bad\\\\u7ec3 ####\\\\n print(\\\\\"step 1\\\\\")\\\\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam \\\\u6b63\\\\u5219\\\\u9879\\\\u53c2\\\\u6570\\\\n pipeline = Pipeline(stages=[vector_assembler, lr])\\\\n model = pipeline.fit(train_df)\\\\n\\\\n # \\\\u6253\\\\u5370\\\\u53c2\\\\u6570\\\\n print(\\\\\"\\\\\\\\n-------------------------------------------------------------------------\\\\\")\\\\n print(\\\\\"LogisticRegression parameters:\\\\\\\\n\\\\\" + lr.explainParams() + \\\\\"\\\\\\\\n\\\\\")\\\\n print(\\\\\"-------------------------------------------------------------------------\\\\\\\\n\\\\\")\\\\n\\\\n #### \\\\u9884\\\\u6d4b, \\\\u4fdd\\\\u5b58\\\\u7ed3\\\\u679c ####\\\\n print(\\\\\"step 2\\\\\")\\\\n labels_and_preds = model.transform(test_df).withColumn(\\\\\"probability_xj\\\\\", to_array(col(\\\\\"probability\\\\\"))[1]) \\\\\\\\\\\\n .select(\\\\\"uuid\\\\\", \\\\\"label\\\\\", \\\\\"prediction\\\\\", \\\\\"probability_xj\\\\\")\\\\n\\\\n return [labels_and_preds]\\\\n\"}], \"edges\": [[\"1\", \"2\"], [\"2\", \"3\"]]}',\n 'user_id': 33,\n 'file_urls': [],\n 'run_image': '',\n 'task_type': 'sparks',\n 'create_time': 1665309786,\n 'update_time': 1665407210,\n 'cmd_parameters': ''}],\n 'cron': 'string',\n 'route_strategy': 'string',\n 'executor_timeout': 0,\n 'trigger_status': 0,\n 'trigger_next_time': None,\n 'update_time': 1665408763,\n 'user_id': 33,\n 'job_mode': 1,\n 'id': 10,\n 'dependence': [[19, 20], [20, 24]],\n 'desc': 'my task include dag',\n 'block_strategy': 'string',\n 'executor_fail_retry_count': 0,\n 'trigger_last_time': None,\n 'create_time': 1665310334},\n 'msg': ''}" }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ret.json()" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [], "metadata": { "collapsed": false } } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 0 }