Ver código fonte

可视化编程模块、删除多余的用户id、项目id

liweiquan 2 anos atrás
pai
commit
762a829d3f

+ 5 - 0
app/common/decorators.py

@@ -8,6 +8,7 @@ async def verify_user(request: Request, user_token: str = Header(), db: Session
     try:
         user = crud.verify_user_token(db, user_token)
         g.user_id = user.id
+        g.user_name = user.name
     except Exception as e:
         raise HTTPException(status_code=401, detail=str(e))
 
@@ -15,6 +16,7 @@ async def verify_super_admin(request: Request, user_token: str = Header(), db: S
     try:
         user = crud.verify_user_token(db, user_token)
         g.user_id = user.id
+        g.user_name = user.name
     except Exception as e:
         raise HTTPException(status_code=401, detail=str(e))
     if user.id != 1:
@@ -24,6 +26,7 @@ async def verify_all(request: Request, user_token: str = Header(), item_token: s
     try:
         user = crud.verify_user_token(db, user_token)
         g.user_id = user.id
+        g.user_name = user.name
     except Exception as e:
         raise HTTPException(status_code=401, detail=str(e))
     try:
@@ -41,6 +44,7 @@ async def verify_special(request: Request, user_token: str = Header(), item_toke
     try:
         user = crud.verify_user_token(db, user_token)
         g.user_id = user.id
+        g.user_name = user.name
     except Exception as e:
         raise HTTPException(status_code=401, detail=str(e))
     try:
@@ -60,6 +64,7 @@ async def verify_admin(request: Request, user_token: str = Header(), item_token:
     try:
         user = crud.verify_user_token(db, user_token)
         g.user_id = user.id
+        g.user_name = user.name
     except Exception as e:
         raise HTTPException(status_code=401, detail=str(e))
     try:

+ 1 - 0
app/crud/__init__.py

@@ -16,3 +16,4 @@ from app.crud.users import *
 from app.crud.project import *
 from app.crud.roles import *
 from app.crud.data_table import *
+from app.crud.programme import *

+ 4 - 3
app/crud/data_management.py

@@ -3,6 +3,7 @@ import time
 from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
+from configs.globals import g
 
 def create_data_management(db: Session, item: schemas.DataManagementCreate, table_name: str, af_run_id: str):
     create_time: int = int(time.time())
@@ -10,9 +11,9 @@ def create_data_management(db: Session, item: schemas.DataManagementCreate, tabl
         'name': item.name,
         'table_name': table_name,
         'create_time': create_time,
-        'user_name': item.user_name,
-        'user_id': item.user_id,
-        'project_id': item.project_id,
+        'user_name': g.user_name,
+        'user_id': g.user_id,
+        'project_id': g.project_id,
         'af_run_id': af_run_id,
         'status': 1
     })

+ 99 - 0
app/crud/programme.py

@@ -0,0 +1,99 @@
+import time
+from typing import List
+from app import models, schemas
+from sqlalchemy.orm import Session
+import app.utils.send_util as send_util
+from configs.globals import g
+from configs.settings import DefaultOption, config
+
+namespace = config.get('PROGRAMME', 'namespace')
+super_image = config.get('PROGRAMME', 'super_image')
+ordinary_image = config.get('PROGRAMME', 'ordinary_image')
+tag = config.get('PROGRAMME', 'tag')
+host = config.get('PROGRAMME', 'host')
+chart = config.get('PROGRAMME', 'chart')
+path_type = config.get('PROGRAMME', 'path_type')
+
+def create_programme(db: Session, item: schemas.ProgrammeCreate):
+    db_item = db.query(models.Programme).filter(models.Programme.project_id == g.project_id).first()
+    if db_item:
+        raise Exception("该项目已存在编程,不可重复创建")
+    p_res = send_util.get_jupyter_password({"password": item.password})
+    password = p_res['data']
+    db_item = models.Programme(**{
+        'name': item.name,
+        'password': password,
+        'workspace': f"workspace_{g.project_id}",
+        'base_url': f"/nbss_{g.project_id}",
+        'image': super_image if g.project_id == 1 else ordinary_image,
+        'path': f"/nbss_{g.project_id}",
+        'release_name': f"aihub-dag-jpt-{g.project_id}",
+        'status': 0,
+        'user_id': g.user_id,
+        'user_name': item.user_name,
+        'project_id': g.project_id,
+        'create_time': int(time.time())
+    })
+    db.add(db_item)
+    db.commit()
+    db.refresh(db_item)
+    return db_item
+
+def start_jupyter(db: Session, item: schemas.ProgrammeId):
+    db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first()
+    if not db_item:
+        raise Exception("未找到该编程")
+    jupyter_create_data = {
+        'password': db_item.password,
+        'namespace': namespace,
+        'workspace': db_item.workspace,
+        'base_url': db_item.base_url,
+        'image': db_item.image,
+        'tag': tag,
+        'path': db_item.path,
+        'host': host,
+        'release_name': db_item.release_name,
+        'chart': chart,
+        'path_type': path_type
+    }
+    c_res = send_util.create_jupyter(jupyter_create_data)
+    j_data = c_res['data'] if 'data' in c_res.keys() else None
+    if not j_data:
+        send_util.stop_jupyter({'namespace': namespace,'release_name': db_item.release_name})
+        raise Exception("创建Jupyter失败")
+    url = f"http://{j_data['host']}{j_data['base_url']}/lab"
+    db_item.status = 1
+    db.commit()
+    db.flush()
+    db.refresh(db_item)
+    return url
+
+def stop_jupyter(db: Session, item: schemas.ProgrammeId):
+    db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first()
+    if not db_item:
+        raise Exception("未找到该编程")
+    send_util.stop_jupyter({'namespace': namespace,'release_name': db_item.release_name})
+    db_item.status = 0
+    db.commit()
+    db.flush()
+    db.refresh(db_item)
+    return db_item
+
+def update_jupyter_password(db: Session, item: schemas.ProgrammeUpdate):
+    db_item: models.Programme = db.query(models.Programme).filter(models.Programme.id == item.programme_id).first()
+    if not db_item:
+        raise Exception("未找到该编程")
+    print()
+    if db_item.status == 1:
+        raise Exception("程序正在运行,请先停止再修改密码")
+    p_res = send_util.get_jupyter_password({"password": item.password})
+    password = p_res['data']
+    db_item.password = password
+    db.commit()
+    db.flush()
+    db.refresh(db_item)
+    return db_item
+
+def get_programme(db: Session):
+    db_items: List[models.Programme] = db.query(models.Programme).filter(models.Programme.project_id == g.project_id).all()
+    return db_items

+ 3 - 2
app/crud/project.py

@@ -35,6 +35,7 @@ def create_project(db: Session, item: schemas.ProjectCreate):
         'name': item.name,
         'code': project_code,
         'type': 1,
+        'user_id': g.user_id,
         'create_time': int(time.time())
     })
     db.add(db_item)
@@ -48,7 +49,7 @@ def create_project(db: Session, item: schemas.ProjectCreate):
     project_dict = db_item.to_dict()
     # 创建超级管理员与项目的关系
     relation = models.ProjectUserRelation(**{
-        'user_id': item.user_id,
+        'user_id': g.user_id,
         'project_id': db_item.id,
         'role_id': 1,
     })
@@ -58,7 +59,7 @@ def create_project(db: Session, item: schemas.ProjectCreate):
     return project_dict
 
 def update_project(db: Session,item: schemas.ProjectUpdate):
-    db_item: models.Project = db.query(models.Project).filter(models.Project.id == item.project_id).first()
+    db_item: models.Project = db.query(models.Project).filter(models.Project.id == g.project_id).first()
     if not db_item:
         raise Exception('项目不存在')
     db_item.name = item.name

+ 2 - 1
app/models/__init__.py

@@ -16,4 +16,5 @@ from app.models.users import *
 from app.models.project import *
 from app.models.roles import *
 from app.models.project_user_relation import *
-from app.models.data_table import *
+from app.models.data_table import *
+from app.models.programme import *

+ 33 - 0
app/models/programme.py

@@ -0,0 +1,33 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class Programme(BaseModel):
+    __tablename__ = "programme"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 可视化编程名称
+    name = Column(String, nullable=False)
+    # 密码(暗文)
+    password = Column(String, nullable=False)
+    # 挂载目录
+    workspace = Column(String, nullable=False)
+    # api_url
+    base_url = Column(String, nullable=False)
+    # 使用镜像
+    image = Column(String, nullable=False)
+    # 路径
+    path = Column(String, nullable=False)
+    # xx名称
+    release_name = Column(String, nullable=False)
+    # 状态
+    status = Column(Integer, nullable=False)
+    # 创建时间
+    create_time = Column(Integer, nullable=False)
+    # 用户id
+    user_id = Column(String, nullable=False)
+    # 用户名称
+    user_name = Column(String, nullable=False)
+    # 项目id
+    project_id = Column(String, nullable=False)

+ 6 - 6
app/routers/dag.py

@@ -17,15 +17,15 @@ router = APIRouter(
     tags=["dag-dag管理"],
 )
 
-# , dependencies=[Depends(verify_all)]
-@router.post("/execute")
+
+@router.post("/execute", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def execute_dag(dag: schemas.Dag, db: Session = Depends(get_db)):
     af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db)
     return af_job
 
-@router.get("/debug_execute")
+@router.get("/debug_execute", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def debug_execute(dag_uuid: str, db: Session = Depends(get_db)):
@@ -34,7 +34,7 @@ def debug_execute(dag_uuid: str, db: Session = Depends(get_db)):
         return False
     return True
 
-@router.get("/debug_status")
+@router.get("/debug_status", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def get_dag_debug_status(dag_uuid: str, db: Session = Depends(get_db)):
@@ -59,7 +59,7 @@ def get_dag_debug_status(dag_uuid: str, db: Session = Depends(get_db)):
     }
     return res
 
-@router.get("/node_log")
+@router.get("/node_log", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def get_dag_debug_status(dag_uuid: str, node_id: str,db: Session = Depends(get_db)):
@@ -75,7 +75,7 @@ def get_dag_debug_status(dag_uuid: str, node_id: str,db: Session = Depends(get_d
             return task_log['log']
     return None
 
-@router.get("/node_result")
+@router.get("/node_result", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def get_dag_debug_result(dag_uuid: str,node_id: str,out_pin: int ,db: Session = Depends(get_db)):

+ 9 - 25
app/routers/data_management.py

@@ -16,6 +16,7 @@ from constants.constants import RUN_STATUS
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from app.common.hive import hiveDs
+from configs.globals import g
 
 from app import get_db
 from configs.settings import DefaultOption, config
@@ -26,13 +27,12 @@ router = APIRouter(
     tags=["datamanagement-数据管理"],
 )
 
-# , dependencies=[Depends(verify_all)]
-@router.post("/")
+@router.post("/", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)):
     current_time = int(time.time())
-    table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
+    table_name = f'project{g.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
     tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
     af_run_res = data_transfer_run(database_name+'.'+tmp_table_name, database_name+'.'+table_name)
     af_run = af_run_res['data'] if 'data' in af_run_res.keys() else None
@@ -45,18 +45,18 @@ def create_data_management(item: schemas.DataManagementCreate, db: Session = Dep
         raise Exception('中间结果转存失败')
 
 
-@router.get("/")
+@router.get("/", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
-def get_data_managements(user_id: str, project_id: str, db: Session = Depends(get_db)):
-    res = crud.get_data_managements(db, user_id, project_id)
+def get_data_managements(db: Session = Depends(get_db)):
+    res = crud.get_data_managements(db, g.user_id, g.project_id)
     data_management_list = []
     for item in res:
         item.table_name = f'{database_name}.{item.table_name}'
         data_management_list.append(item)
     return data_management_list
 
-@router.get("/info")
+@router.get("/info", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def get_data_management_info(id: int, db: Session = Depends(get_db)):
@@ -70,30 +70,14 @@ def get_data_management_info(id: int, db: Session = Depends(get_db)):
     item.table_name = f'{database_name}.{item.table_name}'
     return item
 
-
-@router.get("/local")
-@web_try()
-@sxtimeit
-def get_local_data_managements(db: Session = Depends(get_db)):
-    t_list = hiveDs.list_tables()
-    res = [f'{database_name}.{t}' for t in t_list]
-    return res
-
-@router.get("/table_schema")
-@web_try()
-@sxtimeit
-def get_data_managements_schema(table_name: str, db: Session = Depends(get_db)):
-    table_name = table_name.split('.')[-1]
-    return hiveDs.get_table_schema(table_name)
-
-@router.delete("/")
+@router.delete("/", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def delete_data_management(data_management_id: int, db: Session = Depends(get_db)):
     data_management = crud.delete_data_management(db, data_management_id)
     return data_management
 
-@router.get("/table_content")
+@router.get("/table_content", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
 def get_data_management_content(table_name: str, page: Optional[int] = 0, size: Optional[int] = 100, db: Session = Depends(get_db)):

+ 5 - 4
app/routers/files.py

@@ -10,6 +10,7 @@ from fastapi.responses import StreamingResponse
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from app.common.minio import minio_client
+from configs.globals import g
 
 from app import get_db
 
@@ -34,18 +35,18 @@ def delete_dag_file(uri: str,db: Session = Depends(get_db)):
 @router.post("/upload_file", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
-def upload_file(file: UploadFile = File(...), project_id: str=Form(...), file_type: str=Form(...)):
+def upload_file(file: UploadFile = File(...), file_type: str=Form(...)):
     print("UploadFile-->",file.filename)
     file_name = str(int(time.time()))+'_'+file.filename
-    url = minio_client.put_byte_file(f"{project_id}/{file_type}/"+file_name, file.file.read())
+    url = minio_client.put_byte_file(f"{g.project_id}/{file_type}/"+file_name, file.file.read())
     return url
 
 
 @router.get("/directory", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
-def get_directory(project_id: str, user_id: str, file_type: str):
-    files = minio_client.ls_file(f'{project_id}/{file_type}/')
+def get_directory(file_type: str):
+    files = minio_client.ls_file(f'{g.project_id}/{file_type}/')
     res = []
     td = timedelta(hours=8)
     tz = timezone(td)

+ 3 - 26
app/routers/jm_homework.py

@@ -15,6 +15,7 @@ from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
 import app.crud as crud
 from app import get_db
+from configs.globals import g
 from configs.settings import DefaultOption, config
 DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
 
@@ -34,8 +35,8 @@ def create_jm_homework(item: schemas.JmHomeworkCreate, db: Session = Depends(get
 @router.get("/", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
-def get_jm_homeworks(project_id: str, db: Session = Depends(get_db)):
-    return crud.get_jm_homeworks(db, project_id)
+def get_jm_homeworks(db: Session = Depends(get_db)):
+    return crud.get_jm_homeworks(db, g.project_id)
 
 @router.get("/info", dependencies=[Depends(verify_all)])
 @web_try()
@@ -66,27 +67,3 @@ def get_test_dag(db: Session = Depends(get_db)):
     jm_homework = crud.get_jm_homework_info(db, 83)
     res = red_dag_and_format(jm_homework, db)
     return res
-
-@router.get("/local_source", dependencies=[Depends(verify_all)])
-@web_try()
-@sxtimeit
-def get_local_source():
-    return [{
-            'database_name': DATABASE_NAME,
-            'datasource': "hive",
-            'datasource_name': DATABASE_NAME,
-            'id': -1
-    }]
-
-@router.get("/local_source_table", dependencies=[Depends(verify_all)])
-@web_try()
-@sxtimeit
-def get_local_source_table():
-    t_list = hiveDs.list_tables()
-    return t_list
-
-@router.get("/local_source_table_schema", dependencies=[Depends(verify_all)])
-@web_try()
-@sxtimeit
-def get_local_source_table_schema(table_name: str, db: Session = Depends(get_db)):
-    return hiveDs.get_table_schema(table_name)

+ 3 - 3
app/routers/job_info.py

@@ -77,7 +77,7 @@ def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session =
 @web_try()
 @sxtimeit
 def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)):
-    job_info = crud.get_job_info(db, item.id)
+    job_info: models.JobInfo = crud.get_job_info(db, item.id)
     relation = crud.get_af_id(db, job_info.id, 'datax')
     job_info.trigger_status = item.trigger_status
     on_off_control(relation.af_id, item.trigger_status)
@@ -88,7 +88,7 @@ def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depe
 @web_try()
 @sxtimeit
 def delete_job_info(job_id: int, db: Session = Depends(get_db)):
-    jm_job = crud.get_job_info(db, job_id)
+    jm_job: models.JobInfo = crud.get_job_info(db, job_id)
     if jm_job.trigger_status == 1:
         raise Exception('任务未停用,不可删除')
     relation = crud.get_af_id(db, job_id, 'datax')
@@ -99,7 +99,7 @@ def delete_job_info(job_id: int, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def execute_job_info(job_id: int, db: Session = Depends(get_db)):
-    jm_job = crud.get_job_info(db, job_id)
+    jm_job: models.JobInfo = crud.get_job_info(db, job_id)
     if jm_job.trigger_status == 0:
         raise Exception('任务已被停用')
     res = execute_job_services(db, job_id)

+ 17 - 5
app/routers/job_jdbc_datasource.py

@@ -12,7 +12,9 @@ from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
 
 from app import get_db
-
+from configs.globals import g
+from configs.settings import DefaultOption, config
+DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
 
 
 router = APIRouter(
@@ -107,12 +109,22 @@ def share_ailab(item: schemas.ShareAilab, db: Session = Depends(get_db)):
 def create_table(item: schemas.CreateAilab, db: Session = Depends(get_db)):
     return crud.create_table(db, item)
 
+@router.get("/ailab_source", dependencies=[Depends(verify_all)])
+@web_try()
+@sxtimeit
+def get_ailab_source():
+    return [{
+            'database_name': DATABASE_NAME,
+            'datasource': "hive",
+            'datasource_name': DATABASE_NAME,
+            'id': -1
+    }]
 
 @router.get("/ailab_table", dependencies=[Depends(verify_all)])
 @web_try()
 @sxtimeit
-def get_ailab_table(project_id: int, db: Session = Depends(get_db)):
-    return crud.get_ailab_table(db, project_id)
+def get_ailab_table(db: Session = Depends(get_db)):
+    return crud.get_ailab_table(db, g.project_id)
 
 @router.get("/ailab_table_schema", dependencies=[Depends(verify_all)])
 @web_try()
@@ -129,8 +141,8 @@ def get_preview_ailab_table(table_name: str, db: Session = Depends(get_db)):
 @router.get("/lake_table", dependencies=[Depends(verify_special)])
 @web_try()
 @sxtimeit
-def get_lake_table(project_id: int, db: Session = Depends(get_db)):
-    return crud.get_lake_table(db, project_id)
+def get_lake_table(db: Session = Depends(get_db)):
+    return crud.get_lake_table(db, g.project_id)
 
 
 @router.get("/lake_table_info", dependencies=[Depends(verify_special)])

+ 46 - 0
app/routers/programme.py

@@ -0,0 +1,46 @@
+from fastapi import APIRouter, Depends
+from sqlalchemy.orm import Session
+from app.common.decorators import verify_all
+from utils.sx_time import sxtimeit
+from utils.sx_web import web_try
+import app.crud as crud
+from app import schemas, get_db
+
+
+
+
+router = APIRouter(
+    prefix="/jpt/programme",
+    tags=["programme-可视化编程管理"],
+)
+
+@router.post("", dependencies=[Depends(verify_all)])
+@web_try()
+@sxtimeit
+def create_programme(item: schemas.ProgrammeCreate, db: Session = Depends(get_db)):
+    return crud.create_programme(db, item)
+
+@router.put("/start_jupyter", dependencies=[Depends(verify_all)])
+@web_try()
+@sxtimeit
+def start_jupyter(item: schemas.ProgrammeId, db: Session = Depends(get_db)):
+    return crud.start_jupyter(db, item)
+
+@router.put("/stop_jupyter", dependencies=[Depends(verify_all)])
+@web_try()
+@sxtimeit
+def stop_jupyter(item: schemas.ProgrammeId, db: Session = Depends(get_db)):
+    return crud.stop_jupyter(db, item)
+
+
+@router.put("/update_password", dependencies=[Depends(verify_all)])
+@web_try()
+@sxtimeit
+def update_jupyter_password(item: schemas.ProgrammeUpdate, db: Session = Depends(get_db)):
+    return crud.update_jupyter_password(db, item)
+
+@router.get("", dependencies=[Depends(verify_all)])
+@web_try()
+@sxtimeit
+def get_programme(db: Session = Depends(get_db)):
+    return crud.get_programme(db)

+ 4 - 4
app/routers/project.py

@@ -32,8 +32,8 @@ def update_project(item: schemas.ProjectUpdate, db: Session = Depends(get_db)):
 @router.get("", dependencies=[Depends(verify_user)])
 @web_try()
 @sxtimeit
-def get_projects(user_id: int, db: Session = Depends(get_db)):
-    return crud.get_project_list(db,user_id)
+def get_projects(db: Session = Depends(get_db)):
+    return crud.get_project_list(db,g.user_id)
 
 @router.get("/share_projects", dependencies=[Depends(verify_special)])
 @web_try()
@@ -44,8 +44,8 @@ def get_share_projects(db: Session = Depends(get_db)):
 @router.get("/switch", dependencies=[Depends(verify_user)])
 @web_try()
 @sxtimeit
-def switch_project(project_id: int, user_id: int, db: Session = Depends(get_db)):
-    return crud.switch_project_by_user(db,project_id,user_id)
+def switch_project(project_id: int, db: Session = Depends(get_db)):
+    return crud.switch_project_by_user(db,project_id,g.user_id)
 
 
 @router.get("/info", dependencies=[Depends(verify_all)])

+ 2 - 1
app/schemas/__init__.py

@@ -15,4 +15,5 @@ from app.schemas.af_job import *
 from app.schemas.af_run import *
 from app.schemas.dag import *
 from app.schemas.users import *
-from app.schemas.project import *
+from app.schemas.project import *
+from app.schemas.programme import *

+ 6 - 9
app/schemas/data_management.py

@@ -5,12 +5,6 @@ from pydantic import BaseModel
 class DataManagementBase(BaseModel):
     # 数据名称
     name: str
-    # 创建人名称
-    user_name: str
-    # 创建人编号
-    user_id: str
-    # 项目编号
-    project_id: str
 
 class DataManagementCreate(DataManagementBase):
     dag_uuid: str
@@ -20,9 +14,6 @@ class DataManagementCreate(DataManagementBase):
         schema_extra = {
             "example": {
                 "name": "test",
-                "user_name": "test",
-                "user_id": "test",
-                "project_id": "test",
                 "dag_uuid": "test",
                 "node_id": "test",
                 "out_pin": "0",
@@ -33,6 +24,12 @@ class DataManagement(DataManagementBase):
     id: int
     # 表格名称
     table_name: str
+    # 创建人名称
+    user_name: str
+    # 创建人编号
+    user_id: str
+    # 项目编号
+    project_id: str
     # 创建时间
     create_time: int
     class Config:

+ 4 - 8
app/schemas/jm_homework.py

@@ -21,10 +21,6 @@ class JmHomeworkBase(BaseModel):
     script_file: str
     # 执行命令
     execute_command: Optional[str]
-    # 用户ID
-    user_id: str
-    # 项目ID
-    project_id: str
 
 
 class JmHomeworkCreate(JmHomeworkBase):
@@ -41,8 +37,6 @@ class JmHomeworkCreate(JmHomeworkBase):
                 "dag_url": "",
                 "script_file": "/test/scripts/example",
                 "execute_command": "ls",
-                "user_id": "test",
-                "project_id": "test",
                 "relation_list": [
                     {
                         "type": "input",
@@ -67,8 +61,6 @@ class JmHomeworkUpdate(JmHomeworkBase):
                 "dag_url": "",
                 "script_file": "/test/scripts/example",
                 "execute_command": "ls",
-                "user_id": "test",
-                "project_id": "test",
                 "relation_list": [
                     {
                         "type": "input",
@@ -82,6 +74,10 @@ class JmHomeworkUpdate(JmHomeworkBase):
 
 class JmHomework(JmHomeworkBase):
     id: int
+    # 用户ID
+    user_id: str
+    # 项目ID
+    project_id: str
     # 创建时间
     create_time: int
     # 更新时间

+ 6 - 12
app/schemas/jm_job_info.py

@@ -17,12 +17,6 @@ class JmJobInfoBase(BaseModel):
     cron_expression: Optional[CronExpression] = None
     # 图形信息
     json_str : Optional[str]
-    # 用户id
-    user_id: str
-    # 用户名称
-    user_name: str
-    # 项目id
-    project_id: str
 
 
 class JmJobInfoCreate(JmJobInfoBase):
@@ -45,9 +39,6 @@ class JmJobInfoCreate(JmJobInfoBase):
                     "month": 2,
                 },
                 "json_str": "\{图形信息\}",
-                "user_id": "test",
-                "user_name": "test",
-                "project_id": "test",
                 "nodes": [
                     {
                         "id": 1,
@@ -93,9 +84,6 @@ class JmJobInfoUpdate(JmJobInfoBase):
                     "month": 2,
                 },
                 "json_str": "\{图形信息\}",
-                "user_id": "test",
-                "user_name": "test",
-                "project_id": "test",
                 "nodes": [
                     {
                         "id": 1,
@@ -122,6 +110,12 @@ class JmJobInfoUpdate(JmJobInfoBase):
 class JmJobInfoStatusUpdate(BaseModel):
     id: int
     status: int
+    # 用户id
+    user_id: str
+    # 用户名称
+    user_name: str
+    # 项目id
+    project_id: str
     class Config:
         schema_extra = {
             "example": {

+ 2 - 3
app/schemas/job_info.py

@@ -37,8 +37,6 @@ class JobInfoBase(BaseModel):
     last_time: Optional[str]
     # 当前时间字段
     current_time: Optional[str]
-    # 创建人
-    user_id: str
 
 
 
@@ -73,7 +71,6 @@ class JobInfoCreate(JobInfoBase):
                 "jvm_param": "",
                 "last_time": "lastTime",
                 "current_time": "currentTime",
-                "user_id": "test",
             }
         }
 
@@ -135,6 +132,8 @@ class JobInfo(JobInfoBase):
     last_handle_code: int
     # 运行周期
     job_cron: str
+    # 创建人
+    user_id: str
 
     class Config:
         orm_mode = True

+ 31 - 0
app/schemas/programme.py

@@ -0,0 +1,31 @@
+from typing import List, Optional
+
+from pydantic import BaseModel
+
+class ProgrammeBase(BaseModel):
+    # 程序名称
+    name: str
+
+class ProgrammeCreate(ProgrammeBase):
+    # 密码
+    password: str
+    # 创建用户
+    user_name: str
+
+    class Config:
+        schema_extra = {
+            "example": {
+                "name": "test",
+                "password": "test",
+                "user_name": "test",
+            }
+        }
+
+class ProgrammeId(BaseModel):
+    # 程序id
+    programme_id: int
+
+
+class ProgrammeUpdate(ProgrammeId):
+    # 密码
+    password: str

+ 0 - 6
app/schemas/project.py

@@ -5,25 +5,19 @@ from pydantic import BaseModel
 class ProjectBase(BaseModel):
     # 项目名称
     name: str
-    # 用户id
-    user_id: int
 
 class ProjectCreate(ProjectBase):
     class Config:
         schema_extra = {
             "example": {
                 "name": "blue_sky",
-                "user_id": 1
             }
         }
 
 class ProjectUpdate(ProjectBase):
-    project_id: int
     class Config:
         schema_extra = {
             "example": {
                 "name": "blue_sky",
-                "user_id": 1,
-                "project_id": 3
             }
         }

+ 2 - 0
app/services/jm_job_info.py

@@ -37,6 +37,7 @@ def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
         'delete_status': 1,
         'create_time': create_time,
         'update_time': create_time,
+        'user_name': g.user_name,
         'user_id': g.user_id,
         'project_id': g.project_id
     })
@@ -82,6 +83,7 @@ def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
     crud.find_and_update(db, '任务标签', tag)
     jm_job_info_update.update({
         'update_time': int(time.time()),
+        'user_name': g.user_name,
         'user_id': g.user_id,
         'project_id': g.project_id
     })

+ 59 - 15
app/utils/send_util.py

@@ -2,11 +2,13 @@ from unittest import result
 import requests
 from configs.settings import config
 
-HOST = config.get('AF_BACKEND', 'host')
-PORT = config.get('AF_BACKEND', 'port')
+AF_HOST = config.get('AF_BACKEND', 'host')
+AF_PORT = config.get('AF_BACKEND', 'port')
+
+PROGRAMME_URL = config.get('PROGRAMME', 'url')
 
 def send_post(uri,data):
-    res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)
+    res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}{uri}', json=data)
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -15,7 +17,7 @@ def send_post(uri,data):
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 def send_submit(af_job_id):
-    res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/submit?id='+str(af_job_id))
+    res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/submit?id='+str(af_job_id))
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -25,7 +27,7 @@ def send_submit(af_job_id):
 
 
 def send_put(uri,path_data,data):
-    res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data)
+    res = requests.put(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}', json=data)
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -34,7 +36,7 @@ def send_put(uri,path_data,data):
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 def send_get(uri,path_data):
-    res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
+    res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -45,7 +47,7 @@ def send_get(uri,path_data):
 
 # 执行任务
 def send_execute(path_data):
-    res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/{str(path_data)}/run')
+    res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(path_data)}/run')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -56,7 +58,7 @@ def send_execute(path_data):
 # 起停任务
 def send_pause(af_job_id, status):
     flag = True if status == 0 else False
-    res = requests.patch(url=f'http://{HOST}:{PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
+    res = requests.patch(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -66,7 +68,7 @@ def send_pause(af_job_id, status):
 
 # 删除任务
 def send_delete(uri, path_data):
-    res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
+    res = requests.delete(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -76,7 +78,7 @@ def send_delete(uri, path_data):
 
 # 获取airflow端dag文件生成时间
 def get_job_last_parsed_time(path_data):
-    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_job/{path_data}/last_parsed_time')
+    res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{path_data}/last_parsed_time')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -86,7 +88,7 @@ def get_job_last_parsed_time(path_data):
 
 # 获取job某次运行的状态
 def get_job_run_status(path_data):
-    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/{path_data}/status')
+    res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/{path_data}/status')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -96,7 +98,7 @@ def get_job_run_status(path_data):
 
 # 中间结果转存
 def data_transfer_run(source_tb: str, target_tb: str):
-    res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
+    res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
     result = res.json()
     print(result)
     if 'code' in result.keys() and result['code'] == 200:
@@ -107,7 +109,7 @@ def data_transfer_run(source_tb: str, target_tb: str):
 
 # 获取task日志
 def get_task_log(job_id: str, af_run_id: str, task_id: str):
-    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}')
+    res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -118,10 +120,52 @@ def get_task_log(job_id: str, af_run_id: str, task_id: str):
 
 # 获取中间结果转存状态
 def get_data_transfer_run_status(af_run_id: str):
-    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/data_transfer_log/{af_run_id}')
+    res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/data_transfer_log/{af_run_id}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'获取中间结果转存状态,请求airflow失败-->{msg}')
+
+# 创建jupyter
+def create_jupyter(data):
+    res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/start', json=data)
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        stop_jupyter({'namespace': data['namespace'],'release_name': data['release_name']})
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'创建jupyter,请求jupyter端失败-->{msg}')
+
+
+# 获取jupyter密码
+def get_jupyter_password(data):
+    res = requests.get(url=f'http://{PROGRAMME_URL}/helm/password', json=data)
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'获取jupyter密码,请求jupyter端失败-->{msg}')
+
+# 停止jupyter
+def stop_jupyter(data):
+    res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/stop', json=data)
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'停止jupyter,请求jupyter端失败-->{msg}')
+
+# 更新jupyter
+def update_jupyter(data):
+    res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/upgrade', json=data)
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
         msg = result['msg'] if 'msg' in result.keys() else result
-        raise Exception(f'获取中间结果转存状态,请求airflow失败-->{msg}')
+        raise Exception(f'更新jupyter,请求jupyter端失败-->{msg}')

+ 16 - 6
development.ini

@@ -1,10 +1,10 @@
 [DATABASE]
-user = root
-pwd = happylay
-db_name = datax_web_dev
-host = 192.168.199.107
-port = 10086
-ssl_disabled = true
+user = sxwl
+pwd = sxwldba
+db_name = aihub-dag
+host = 192.168.199.110
+port = 32306
+ssl_disabled = false
 
 [MINIO]
 url = minio-api.sxkj.com
@@ -71,3 +71,13 @@ kerberos_config = {
                     "kerberosKeytabFilePath": "/workspace/confs/test/user.keytab",
                     "kerberosPrincipal": "ailab@EMR-5XJSY31F"
                   }
+
+[PROGRAMME]
+url = aihub-dag.sxkj.com
+namespace = airflow
+super_image = SXKJ:32775/jupyterlab
+ordinary_image = SXKJ:32775/jupyterlab0
+tag = sxkj
+host = aihub-dag.sxkj.com
+chart = aihub-dag-jupyter.tgz
+path_type = ImplementationSpecific

+ 10 - 0
sxkj.ini

@@ -79,3 +79,13 @@ enable = true
 host_alias = {
        "192.168.199.31": "SXKJ"
     }
+
+[PROGRAMME]
+url = aihub-dag.sxkj.com
+namespace = airflow
+super_image = SXKJ:32775/jupyterlab
+ordinary_image = SXKJ:32775/jupyterlab0
+tag = sxkj
+host = aihub-dag.sxkj.com
+chart = aihub-dag-jupyter.tgz
+path_type = ImplementationSpecific