123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import re
- import time
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- import app.utils.send_util as send_util
- from configs.globals import g
- from configs.settings import DefaultOption, config
- namespace = config.get('PROGRAMME', 'namespace')
- super_image = config.get('PROGRAMME', 'super_image')
- ordinary_image = config.get('PROGRAMME', 'ordinary_image')
- tag = config.get('PROGRAMME', 'tag')
- host = config.get('PROGRAMME', 'host')
- chart = config.get('PROGRAMME', 'chart')
- path_type = config.get('PROGRAMME', 'path_type')
- ingress_class = config.get('PROGRAMME', 'ingress_class', vars=DefaultOption(config, 'PROGRAMME', INGRESS_CLASS = None))
- def check_password(password: str):
- p_bool = bool(re.search(r"(?:,|/|\$)", password))
- if p_bool:
- raise Exception("密码中存在特殊符号(,或/或$)")
- def create_programme(db: Session, item: schemas.ProgrammeCreate):
- db_item = db.query(models.Programme).filter(models.Programme.project_id == g.project_id).first()
- if db_item:
- raise Exception("该项目已存在编程,不可重复创建")
- check_password(item.password)
- p_res = send_util.get_jupyter_password({"password": item.password})
- password = p_res['data']
- db_item = models.Programme(**{
- 'name': item.name,
- 'password': password,
- 'workspace': f"workspace_{g.project_id}",
- 'base_url': f"/nbss_{g.project_id}",
- 'image': super_image if g.project_id == 1 else ordinary_image,
- 'path': f"/nbss_{g.project_id}",
- 'release_name': f"aihub-dag-jpt-{g.project_id}",
- 'status': 0,
- 'user_id': g.user_id,
- 'user_name': g.user_name,
- 'project_id': g.project_id,
- 'create_time': int(time.time())
- })
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- def start_jupyter(db: Session, item: schemas.ProgrammeId):
- db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first()
- if not db_item:
- raise Exception("未找到该编程")
- jupyter_create_data = {
- 'password': db_item.password,
- 'namespace': namespace,
- 'workspace': db_item.workspace,
- 'base_url': db_item.base_url,
- 'image': db_item.image,
- 'tag': tag,
- 'path': db_item.path,
- 'host': host,
- 'release_name': db_item.release_name,
- 'chart': chart,
- 'path_type': path_type,
- 'ingress_class': ingress_class if ingress_class else "",
- 'image_pull_secret': config.get('PROGRAMME', 'image_pull_secret', fallback=""),
- 'node_selector': config.get('PROGRAMME', 'node_selector', fallback="")
- }
- c_res = send_util.create_jupyter(jupyter_create_data)
- j_data = c_res['data'] if 'data' in c_res.keys() else None
- if not j_data:
- raise Exception("启动结构化编码失败")
- db_item.status = 0
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item
- def stop_jupyter(db: Session, item: schemas.ProgrammeId):
- db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first()
- if not db_item:
- raise Exception("未找到该编程")
- send_util.stop_jupyter({'namespace': namespace,'release_name': db_item.release_name})
- db_item.status = 0
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item
- def update_jupyter_password(db: Session, item: schemas.ProgrammeUpdate):
- db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first()
- if not db_item:
- raise Exception("未找到该编程")
- if db_item.status == 1:
- raise Exception("程序正在运行,请先停止再修改密码")
- check_password(item.password)
- p_res = send_util.get_jupyter_password({"password": item.password})
- password = p_res['data']
- db_item.password = password
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item
- def get_programme(db: Session):
- db_items: List[models.Programme] = db.query(models.Programme).filter(models.Programme.project_id == g.project_id).all()
- res = []
- for item in db_items:
- # 前端与本地访问使用
- complete_url = f'http://{host}{item.base_url}/lab'
- # 集群内部访问使用
- request_url = f'http://{item.release_name}-jupyterlab:8888{item.base_url}/lab'
- jupyter_status = get_programme_status(request_url)
- item.status = jupyter_status
- db.commit()
- db.flush()
- db.refresh(item)
- i_dict = item.to_dict()
- i_dict.update({'complete_url': complete_url})
- res.append(i_dict)
- return res
- def get_programme_status(url: str):
- try:
- res_header= send_util.get_jupyter_html(url)
- # 若为启动状态,则Server中包含Tornado,否则没有启动
- jupyter_status = 0 if res_header['Server'].find('Tornado') < 0 else 1
- except Exception as e:
- jupyter_status = 0
- return jupyter_status
|