浏览代码

Merge branch 'master' of http://gogsb.soaringnova.com/sxwl_DL/datax-admin

liweiquan 2 年之前
父节点
当前提交
23b3184a92

+ 7 - 2
app/core/airflow/job.py

@@ -14,9 +14,11 @@ class AirflowJobSubmitter:
                        'sparks': SparksTaskCompiler,
                        'java': JavaTaskCompiler}
 
-        nodes = [m_compilers[task.task_type](item=task).translate(job_id=item.id,task_mode=item.job_mode or 1) for task in item.tasks if
+        nodes = [m_compilers[task.task_type](item=task).translate(job_id=item.id, task_mode=item.job_mode or 1) for task
+                 in item.tasks if
                  task.task_type != 'sparks']
-        spark_nodes = [SparksTaskCompiler(item=task).translate(job_id=item.id,task_mode=item.job_mode or 1) for task in item.tasks if
+        spark_nodes = [SparksTaskCompiler(item=task).translate(job_id=item.id, task_mode=item.job_mode or 1) for task in
+                       item.tasks if
                        task.task_type == 'sparks']
         edges = []
         for edge in item.dependence:
@@ -32,10 +34,13 @@ class AirflowJobSubmitter:
         #     "0 0 1 * *": "@monthly",
         #     "0 0 1 1 *": "@yearly"
         # }
+
+        # print(f" image pull key is : {config.get('K8S', 'image_pull_key')}")
         parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
                       'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
                       'interval': item.cron if item.cron != 'None' else None,
                       'af_backend_uri': config.get('AF_BACKEND', 'uri'),
+                      'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None)
                       }
 
         env = Environment(

+ 1 - 1
app/core/airflow/task.py

@@ -47,7 +47,7 @@ class JavaTaskCompiler(TaskCompiler):
         self.default_cmd = "echo \"$SCRIPT\" > run.py && python run.py"
         self.task.cmd = self.task.cmd or self.default_cmd
         tar_name = self.task.file_urls[0].split('/')[-1].split('_')[-1]
-        self.task.cmd = f'curl {config.get("MINIO", "k8s_url")}/{self.task.file_urls[0]}  --output {tar_name} && {self.task.cmd}'
+        self.task.cmd = f'curl http://{config.get("BACKEND", "url")}/jpt/files/{self.task.file_urls[0]}  --output {tar_name} && {self.task.cmd}'
 
 
 class PythonTaskCompiler(TaskCompiler):

+ 2 - 0
app/core/airflow/templates/dag_template.py.jinja2

@@ -64,6 +64,7 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if '
                                                                     reattach_on_restart=True,
                                                                     is_delete_operator_pod=False,
                                                                     get_logs=True,
+                                                                    {% if image_pull_key != None or image_pull_key != ""%}image_pull_secrets='{{ image_pull_key }}',{% endif %}
                                                                     log_events_on_failure=True,
                                                                     cmds={{ spark_sub_node['cmds'] }},
                                                                     env_vars={{ spark_sub_node['env'] }},
@@ -91,6 +92,7 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if '
         is_delete_operator_pod=False,
         get_logs=True,
         log_events_on_failure=True,
+        {% if image_pull_key not in ['None',"",None]%}image_pull_secrets='{{ image_pull_key }}',{% endif %}
         cmds={{ node['cmds'] }},
         env_vars={{ node['env'] }},
         on_success_callback=task_finish_alert,

+ 6 - 2
app/core/airflow/uri.py

@@ -34,8 +34,10 @@ def get_airflow_api_info():
     headers = {
         'content-type': 'application/json',
         'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
-        'Host': f'{config.get("AIRFLOW", "host_in_header")}'
     }
+    host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None)
+    if host_in_header not in ['', None]:
+        headers['Host'] = host_in_header
     return uri_prefix, headers
 
 
@@ -44,8 +46,10 @@ def call_airflow_api(method, uri, args_dict):
     headers = {
         'content-type': 'application/json',
         'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
-        'Host': f'{config.get("AIRFLOW", "host_in_header")}'
     }
+    host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None)
+    if host_in_header not in ['', None]:
+        headers['Host'] = host_in_header
     if method == 'post':
         return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict)
     if method == 'get':

+ 14 - 12
app/routers/run.py

@@ -156,22 +156,24 @@ def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db
     return details
 
 
-@router_af_run.get("/job_status/{job_id}/{af_run_id}")
+@router_af_run.get("/running_status/{job_id}/{af_run_id}")
 @web_try()
 @sxtimeit
-def get_airflow_dagrun_job_info(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
+def get_airflow_dagrun_running_status(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
+    ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}', args_dict={})
     ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={})
-    details = defaultdict(dict)
 
-    for task in ret.json()['task_instances']:
-        details['tasks'][task['task_id']] = {
-            # "log": logs,
-            "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
-            "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
-            "status": task['state']
-        }
-        # print(f"{task['task_id']}:{task['duration']}")
-    return details
+    # details = defaultdict(dict)
+
+    # for task in ret.json()['task_instances']:
+    #     details['tasks'][task['task_id']] = {
+    #         # "log": logs,
+    #         "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
+    #         "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
+    #         "status": task['state']
+    #     }
+    #     # print(f"{task['task_id']}:{task['duration']}")
+    return ret.json()
 
 
 @router_af_run.get("/task_log/{job_id}/{af_run_id}/{task_id}")

+ 2 - 0
configs/settings.py

@@ -20,6 +20,8 @@ class DefaultOption(dict):
 config = configparser.ConfigParser()
 if os.environ.get('APP_ENV', 'development') == 'development':
     config.readfp(open('development.ini'))
+    with open('development.ini') as f:
+        print(f.read())
 elif os.environ.get('APP_ENV') == 'production':
     config.readfp(open('production.ini'))
 

+ 142 - 0
debug.ipynb

@@ -0,0 +1,142 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "outputs": [],
+   "source": [
+    "import os"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "[DATABASE]\n",
+      "user = root\n",
+      "pwd = happylay\n",
+      "db_name = datax_web_dev\n",
+      "host = 192.168.199.107\n",
+      "port = 10086\n",
+      "\n",
+      "[MINIO]\n",
+      "url = minio-api.sxkj.com\n",
+      "k8s_url=minio.default:9000\n",
+      "access_key = admin\n",
+      "secret_key = sxkjadmin\n",
+      "\n",
+      "\n",
+      "[AF_BACKEND]\n",
+      "uri=192.168.199.109:18082\n",
+      "host=192.168.199.109\n",
+      "port=18082\n",
+      "dag_files_dir=/dags/\n",
+      "\n",
+      "[K8S]\n",
+      ";image_pull_key=\n",
+      "\n",
+      "[AIRFLOW]\n",
+      "uri=192.168.199.109\n",
+      "host_in_header=airflow-web.sxkj.com\n",
+      "api_token=YWRtaW46YWRtaW4=\n",
+      "\n",
+      "[HIVE]\n",
+      "host = 192.168.199.27\n",
+      "port = 10000\n",
+      "username = hive\n",
+      "password = hive\n",
+      "database_name = default\n",
+      "kerberos=0\n",
+      "\n",
+      "[HIVE_METASTORE]\n",
+      "uris=thrift://192.168.199.27:9083\n",
+      "\n",
+      "\n",
+      "[TASK_IMAGES]\n",
+      "datax=SXKJ:32775/pod_datax:0.9\n",
+      "python=SXKJ:32775/pod_python:1.1\n",
+      "java=SXKJ:32775/java:1.0\n",
+      "sparks=SXKJ:32775/jupyter:0.96\n",
+      "\n",
+      "get config of development\n",
+      "192.168.199.107\n"
+     ]
+    }
+   ],
+   "source": [
+    "os.environ['APP_ENV']='development'\n",
+    "from configs.settings import config\n"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "outputs": [],
+   "source": [
+    "config.get('K8S', 'image_pull_key', fallback=None)"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 7,
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "1\n"
+     ]
+    }
+   ],
+   "source": [
+    "print (config.get('K8S', 'image_pull_key',fallback=1))"
+   ],
+   "metadata": {
+    "collapsed": false
+   }
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "outputs": [],
+   "source": [],
+   "metadata": {
+    "collapsed": false
+   }
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 2
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython2",
+   "version": "2.7.6"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 0
+}

+ 5 - 0
development.ini

@@ -18,6 +18,11 @@ host=192.168.199.109
 port=18082
 dag_files_dir=/dags/
 
+[BACKEND]
+url=192.168.199.107:18082
+
+[K8S]
+;image_pull_key=codingregistrykey
 
 [AIRFLOW]
 uri=192.168.199.109

+ 4 - 1
production.ini

@@ -16,11 +16,14 @@ uri=aihub-backend-af-yili-test:8080
 host=aihub-backend-af-yili-test
 port=8080
 dag_files_dir=/dags/
+[K8S]
+image_pull_key=codingregistrykey
 
+[BACKEND]
+url=aihub-backend-yili-test:8080
 
 [AIRFLOW]
 uri=aihub-backend-af-yili-test:8080
-host_in_header=airflow-web.sxkj.com
 api_token=YWRtaW46YWRtaW4=