123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import os
- import re
- import time
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- import app.utils.send_util as send_util
- from configs.settings import DefaultOption, config
- python_image = config.get('TASK_IMAGES', 'python')
- java_image = config.get('TASK_IMAGES', 'java')
- def create_constant(db: Session, item: schemas.ConstantCreate):
- db_item = models.Constant(**item.dict())
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- def get_constant_list(db: Session, type: str):
- res: List[models.Constant] = db.query(models.Constant).filter(
- models.Constant.type == type).all()
- return [r.value for r in res]
- def find_and_update(db: Session, type: str, value: str):
- res: List[models.Constant] = db.query(models.Constant)\
- .filter(models.Constant.type == type)\
- .filter(models.Constant.value == value).all()
- if len(res) == 0:
- db_item = models.Constant(**{
- 'type': type,
- 'value': value
- })
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- def delete_constant(db: Session, type: str, value: str):
- db_item = db.query(models.Constant)\
- .filter(models.Constant.type == type)\
- .filter(models.Constant.value == value).first()
- if not db_item:
- raise Exception('未找到该常量')
- else:
- db.query(models.Constant)\
- .filter(models.Constant.type == type)\
- .filter(models.Constant.value == value).delete()
- db.commit()
- db.flush()
- return db_item
- def get_task_images(language: str, type: int, auth_token: str):
- if type == 0:
- if language == 'python':
- return [python_image]
- elif language == 'java':
- return [java_image]
- else:
- res_list = []
- language_res = send_util.get_ai_mode_language(auth_token)
- language_list = list(filter(lambda x: (bool(re.search(language, str(x), re.IGNORECASE))), [
- language['value'] for language in language_res['data']]))
- for language in language_list:
- image_res = send_util.get_ai_images(language, auth_token)
- for image in image_res['data']:
- source_type = image['sourceType']
- image_url = ''
- # 区分电信云环境与腾讯云环境
- if os.environ.get('APP_ENV') in ['idcprod']:
- image_url = str(image['mirrorContainerName']).split(':')[0]
- elif os.environ.get('APP_ENV') in ['idctest']:
- image_url = str(image['mirrorRepistoryUrl']) if image['mirrorRepistoryUrl'] else str(image['mirrorContainerName']).split(':')[0]
- else:
- # 腾讯云环境这两种类型的镜像需要特殊处理
- if source_type in ['IMPORT', 'ONLINEBUILD']:
- mirror_repistory_url = image['mirrorRepistoryUrl'] if 'mirrorRepistoryUrl' in image.keys(
- ) else None
- if mirror_repistory_url != None:
- image_url = str(mirror_repistory_url).replace(
- ':443', '').split(':')[0]
- else:
- image_url_list = str(image['mirrorContainerName']).split(':')[
- 0].split('/')
- image_url = f'{image_url_list[0]}/aiplatform/docker/{image_url_list[1]}__{image_url_list[2]}'
- else:
- image_url = str(
- image['mirrorContainerName']).split(':')[0]
- res_list.append(
- {
- 'image_id': image['mirrorId'],
- 'image_url': image_url,
- 'image_name': image['mirrorShowName'],
- }
- )
- return res_list
- def get_image_version(image_id: str, auth_token: str):
- res = send_util.get_ai_image_version(image_id, auth_token)
- res_list = []
- for version_dict in res['data']:
- res_list.append(version_dict['versionName'])
- return res_list
|