import re import time from typing import List from app import models, schemas from sqlalchemy.orm import Session import app.utils.send_util as send_util from configs.globals import g from configs.settings import DefaultOption, config special_project_id = config.get('PERMISSIONS', 'special_project_id') namespace = config.get('PROGRAMME', 'namespace') super_image = config.get('PROGRAMME', 'super_image') ordinary_image = config.get('PROGRAMME', 'ordinary_image') tag = config.get('PROGRAMME', 'tag') host = config.get('PROGRAMME', 'host') chart = config.get('PROGRAMME', 'chart') path_type = config.get('PROGRAMME', 'path_type') ingress_class = config.get('PROGRAMME', 'ingress_class', vars=DefaultOption(config, 'PROGRAMME', INGRESS_CLASS = None)) def check_password(password: str): p_bool = bool(re.search(r"(?:,|/|\$)", password)) if p_bool: raise Exception("密码中存在特殊符号(,或/或$)") def create_programme(db: Session, item: schemas.ProgrammeCreate): db_item = db.query(models.Programme).filter(models.Programme.project_id == g.project_id).first() if db_item: raise Exception("该项目已存在编程,不可重复创建") check_password(item.password) p_res = send_util.get_jupyter_password({"password": item.password}) password = p_res['data'] db_item = models.Programme(**{ 'name': item.name, 'password': password, 'workspace': f"workspace_{g.project_id}", 'base_url': f"/nbss_{g.project_id}", 'image': super_image if g.project_id == special_project_id else ordinary_image, 'path': f"/nbss_{g.project_id}", 'release_name': f"aihub-dag-jpt-{g.project_id}", 'status': 0, 'user_id': g.user_id, 'user_name': g.user_name, 'project_id': g.project_id, 'create_time': int(time.time()) }) db.add(db_item) db.commit() db.refresh(db_item) return db_item def start_jupyter(db: Session, item: schemas.ProgrammeId): db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first() if not db_item: raise Exception("未找到该编程") jupyter_create_data = { 'password': db_item.password, 'namespace': namespace, 'workspace': db_item.workspace, 'base_url': db_item.base_url, 'image': db_item.image, 'tag': tag, 'path': db_item.path, 'host': host, 'release_name': db_item.release_name, 'chart': chart, 'path_type': path_type, 'ingress_class': ingress_class if ingress_class else "", 'image_pull_secret': config.get('PROGRAMME', 'image_pull_secret', fallback=""), 'node_selector': config.get('PROGRAMME', 'node_selector', fallback="") } c_res = send_util.create_jupyter(jupyter_create_data) j_data = c_res['data'] if 'data' in c_res.keys() else None if not j_data: raise Exception("启动结构化编码失败") db_item.status = 0 db.commit() db.flush() db.refresh(db_item) return db_item def stop_jupyter(db: Session, item: schemas.ProgrammeId): db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first() if not db_item: raise Exception("未找到该编程") send_util.stop_jupyter({'namespace': namespace,'release_name': db_item.release_name}) db_item.status = 0 db.commit() db.flush() db.refresh(db_item) return db_item def update_jupyter_password(db: Session, item: schemas.ProgrammeUpdate): db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first() if not db_item: raise Exception("未找到该编程") if db_item.status == 1: raise Exception("程序正在运行,请先停止再修改密码") check_password(item.password) p_res = send_util.get_jupyter_password({"password": item.password}) password = p_res['data'] db_item.password = password db.commit() db.flush() db.refresh(db_item) return db_item def get_programme(db: Session): db_items: List[models.Programme] = db.query(models.Programme).filter(models.Programme.project_id == g.project_id).all() res = [] for item in db_items: # 前端与本地访问使用 complete_url = f'http://{host}{item.base_url}/lab' # 集群内部访问使用 request_url = f'http://{item.release_name}-jupyterlab:8888{item.base_url}/lab' jupyter_status = get_programme_status(request_url) item.status = jupyter_status db.commit() db.flush() db.refresh(item) i_dict = item.to_dict() i_dict.update({'complete_url': complete_url}) res.append(i_dict) return res def get_programme_status(url: str): try: res_header= send_util.get_jupyter_html(url) # 若为启动状态, 则Server中包含Tornado,否则没有启动 jupyter_status = 0 if res_header['Server'].find('Tornado') < 0 else 1 except Exception as e: jupyter_status = 0 return jupyter_status