{ "cells": [ { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import json\n", "import requests\n", "from auo_tests.tasks.config import host" ] }, { "cell_type": "code", "execution_count": 9, "outputs": [], "source": [ "dag_script = {\n", " \"sub_nodes\": [\n", " {\n", " \"id\": \"1\",\n", " \"name\": \"SqlNode1\",\n", " \"op\": \"sql\",\n", " \"script\": \"select * from train\",\n", " },\n", " {\n", " \"id\": \"2\",\n", " \"name\": \"SqlNode1\",\n", " \"op\": \"sql\",\n", " \"script\": \"select * from test\",\n", " },\n", " {\n", " \"id\": \"3\",\n", " \"name\": \"PysparkNode1\",\n", " \"op\": \"pyspark\", # or python\n", " \"inputs\": {'train': (\"1\", 0),\n", " 'test': (\"2\", 0)\n", " },\n", " \"script\": \"\",\n", " },\n", " ],\n", " \"edges\": [\n", " (\"1\", \"2\"),\n", " (\"2\", \"3\")\n", " ]\n", "}\n", "with open('./spark_script_1012.py', 'r') as f:\n", " pyspark_script = f.read()\n", "dag_script[\"sub_nodes\"][2][\"script\"] = pyspark_script\n" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "##### 创建sparks作业" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 10, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://192.168.199.109:18082/jpt/af_task\n", "{'code': 200, 'data': {'name': 'sparks_test_group1012', 'file_urls': [], 'cmd': '', 'envs': {}, 'create_time': 1665561957, 'user_id': 33, 'task_type': 'sparks', 'id': 49, 'script': '{\"sub_nodes\": [{\"id\": \"1\", \"name\": \"SqlNode1\", \"op\": \"sql\", \"script\": \"select * from train\"}, {\"id\": \"2\", \"name\": \"SqlNode1\", \"op\": \"sql\", \"script\": \"select * from test\"}, {\"id\": \"3\", \"name\": \"PysparkNode1\", \"op\": \"pyspark\", \"inputs\": {\"train\": [\"1\", 0], \"test\": [\"2\", 0]}, \"script\": \"from pyspark.sql.types import *\\\\nfrom pyspark.ml.classification import LogisticRegression\\\\nfrom pyspark.ml.feature import VectorAssembler\\\\nfrom pyspark.ml import Pipeline\\\\nfrom pyspark.sql.functions import udf, col\\\\nfrom pyspark.sql import DataFrame\\\\n\\\\n\\\\ndef to_array(col):\\\\n def to_array_(v):\\\\n return v.toArray().tolist()\\\\n\\\\n return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)\\\\n\\\\n\\\\ndef main_func(train_df: DataFrame, test_df: DataFrame, spark):\\\\n feat_cols = [\\'feature1\\', \\'feature2\\', \\'feature3\\', \\'feature4\\', \\'feature5\\', \\'feature6\\', \\'feature7\\', \\'feature8\\',\\\\n \\'feature9\\']\\\\n\\\\n vector_assembler = VectorAssembler().setInputCols(feat_cols).setOutputCol(\\\\\"features\\\\\")\\\\n\\\\n #### \\\\u8bad\\\\u7ec3 ####\\\\n print(\\\\\"step 1\\\\\")\\\\n lr = LogisticRegression(regParam=0.01, maxIter=100) # regParam \\\\u6b63\\\\u5219\\\\u9879\\\\u53c2\\\\u6570\\\\n pipeline = Pipeline(stages=[vector_assembler, lr])\\\\n model = pipeline.fit(train_df)\\\\n\\\\n # \\\\u6253\\\\u5370\\\\u53c2\\\\u6570\\\\n print(\\\\\"\\\\\\\\n-------------------------------------------------------------------------\\\\\")\\\\n print(\\\\\"LogisticRegression parameters:\\\\\\\\n\\\\\" + lr.explainParams() + \\\\\"\\\\\\\\n\\\\\")\\\\n print(\\\\\"-------------------------------------------------------------------------\\\\\\\\n\\\\\")\\\\n\\\\n #### \\\\u9884\\\\u6d4b, \\\\u4fdd\\\\u5b58\\\\u7ed3\\\\u679c ####\\\\n print(\\\\\"step 2\\\\\")\\\\n labels_and_preds = model.transform(test_df).withColumn(\\\\\"probability_xj\\\\\", to_array(col(\\\\\"probability\\\\\"))[1]) \\\\\\\\\\\\n .select(\\\\\"uuid\\\\\", \\\\\"label\\\\\", \\\\\"prediction\\\\\", \\\\\"probability_xj\\\\\")\\\\n\\\\n return [labels_and_preds]\\\\n\"}], \"edges\": [[\"1\", \"2\"], [\"2\", \"3\"]]}', 'cmd_parameters': '', 'run_image': 'SXKJ:32775/jupyter:dag', 'update_time': 1665561957}, 'msg': ''}\n" ] } ], "source": [ "data = {\n", " \"name\": \"sparks_test_group1012\",\n", " \"file_urls\": [],\n", " \"script\": json.dumps(dag_script),\n", " \"cmd\": \"\",\n", " \"cmd_parameters\": \"\",\n", " \"envs\": {},\n", " \"run_image\": \"SXKJ:32775/jupyter:dag\",\n", " \"task_type\": \"sparks\",\n", " \"user_id\": 33\n", "}\n", "print(f'http://{host}/jpt/af_task')\n", "ret = requests.post(url=f'http://{host}/jpt/af_task', json=data)\n", "print(ret.json())" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "##### 用task创建/更新job" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 13, "outputs": [], "source": [ "ret = requests.get(url=f'http://{host}/jpt/af_task/getOnce/49')" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 17, "outputs": [], "source": [ "task_info = ret.json()['data']" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 21, "outputs": [], "source": [ "data_item = {\n", " \"tasks\": [\n", " task_info\n", " ],\n", " \"name\": \"test_sparks_job\",\n", " \"dependence\": [],\n", " \"cron\": \"0 0 * * 0\",\n", " \"desc\": \"string\",\n", " \"route_strategy\": \"string\",\n", " \"block_strategy\": \"string\",\n", " \"executor_timeout\": 0,\n", " \"executor_fail_retry_count\": 0,\n", " \"trigger_status\": 0,\n", " \"job_mode\": 2,\n", "}" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 22, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'code': 200, 'data': None, 'msg': ''}\n" ] } ], "source": [ "is_update = False\n", "if is_update:\n", " ret = requests.post(url=f'http://{host}/jpt/af_job',json=data_item)\n", " print(ret.json())\n", "else:\n", " ret = requests.put(url=f'http://{host}/jpt/af_job/35',json=data_item)\n", " ret = requests.post(url=f'http://{host}/jpt/af_job/submit?id=35')\n", " print(ret.json())" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "ret = requests.post(url=f'http://{host}/jpt/af_job/submit',json=new_item)\n", "print(ret.json())" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 1, "outputs": [ { "data": { "text/plain": "False" }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from datetime import timedelta\n", "delta1 = timedelta(seconds=57)\n", "delta2 = timedelta(hours=25, seconds=2)\n", "delta2 != delta1\n", "\n", "delta2 == 5" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [], "metadata": { "collapsed": false } } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 0 }