Explorar el Código

数据管理与中间结果管理

liweiquan hace 2 años
padre
commit
7f3f944dcc

+ 18 - 18
Dockerfile

@@ -79,24 +79,24 @@ EXPOSE 8080
 
 
 
-FROM builder2 as builder3
-
-RUN apt-get update && apt-get install -y --no-install-recommends openssh-server && rm -rf /var/lib/apt/lists/*
-RUN mkdir /var/run/sshd
-RUN echo 'root:root' | chpasswd
-RUN sed -i 's/.*PermitRootLogin .*/PermitRootLogin yes/' /etc/ssh/sshd_config
-# SSH login fix. Otherwise user is kicked off after login
-RUN sed -i 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' /etc/pam.d/sshd
-
-RUN echo "\
-[program:sshd] \n\
-command=/usr/sbin/sshd -D\n\
-autorestart=True\n\
-autostart=True\n\
-redirect_stderr = true\n\
-" > /etc/supervisor/conf.d/sshd.conf
-
-EXPOSE 22
+# FROM builder2 as builder3
+
+# RUN apt-get update && apt-get install -y --no-install-recommends openssh-server && rm -rf /var/lib/apt/lists/*
+# RUN mkdir /var/run/sshd
+# RUN echo 'root:root' | chpasswd
+# RUN sed -i 's/.*PermitRootLogin .*/PermitRootLogin yes/' /etc/ssh/sshd_config
+# # SSH login fix. Otherwise user is kicked off after login
+# RUN sed -i 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' /etc/pam.d/sshd
+
+# RUN echo "\
+# [program:sshd] \n\
+# command=/usr/sbin/sshd -D\n\
+# autorestart=True\n\
+# autostart=True\n\
+# redirect_stderr = true\n\
+# " > /etc/supervisor/conf.d/sshd.conf
+
+# EXPOSE 22
 
 
 # RUN mamba install -y jupyterlab -n base && mamba init zsh

+ 0 - 0
app/common/__init__.py


+ 98 - 0
app/common/minio.py

@@ -0,0 +1,98 @@
+# -*- coding:utf-8 -*-
+# powered by Carlos
+# October 23, 2019
+import base64
+import io
+from minio import Minio
+from minio.error import (ResponseError, BucketAlreadyOwnedByYou,
+                         BucketAlreadyExists, NoSuchKey)
+from configs.settings import config
+from PIL import Image
+from io import BytesIO
+
+URL = config.get('MINIO', 'URL')
+ACCESS_KEY = config.get('MINIO', 'ACCESS_KEY')
+SECRET_KEY = config.get('MINIO', 'SECRET_KEY')
+
+class FileHandler(object):
+
+    def __init__(self, bucket_name):
+        self.bucket_name = bucket_name
+        self.minio_client = Minio(URL,
+                                  access_key=ACCESS_KEY,
+                                  secret_key=SECRET_KEY,
+                                  secure=False)
+        # 无脑创桶
+        try:
+            self.minio_client.make_bucket(self.bucket_name)
+        except BucketAlreadyExists:
+            pass
+        except BucketAlreadyOwnedByYou:
+            pass
+        except ResponseError:
+            # print('dssds')
+            raise Exception("minio连接失败")
+
+    # 上传图片
+    def put_image(self, img_name, image):
+        file_name = img_name
+        file_byte = base64.b64decode(image)
+        f = io.BytesIO(file_byte)
+        try:
+            self.minio_client.put_object(self.bucket_name, file_name, f, len(file_byte))
+            return file_name
+        except ResponseError:
+            raise Exception("minio创建图片失败 文件名:%s" % file_name)
+
+    def put_byte_file(self, file_name, file_content):
+        f = io.BytesIO(file_content)
+        try:
+            self.minio_client.put_object(self.bucket_name, file_name, f, len(file_content))
+            return file_name
+        except ResponseError:
+            raise Exception("minio创建文件失败 文件名:%s" % file_name)
+
+    def put_np_image(self, img_name, np_image):
+        file_name = img_name
+        img_crop_pil = Image.fromarray(np_image)
+        byte_io = BytesIO()
+        img_crop_pil.save(byte_io, format="jpeg")
+        jpg_buffer = byte_io.getvalue()
+        byte_io = BytesIO(jpg_buffer)
+
+        try:
+            self.minio_client.put_object(self.bucket_name, img_name, byte_io, len(jpg_buffer))
+            return file_name
+        except ResponseError:
+            raise Exception("minio创建图片失败 文件名:%s" % file_name)
+
+    # 获取照片
+    def get_file(self, img_name):
+        file_name = img_name
+        try:
+            d = self.minio_client.get_object(self.bucket_name, file_name)
+            return d.data
+        except NoSuchKey:
+            pass
+        except Exception as e:
+            print("Get minio pic failed:{}".format(e))
+        return bytes()
+
+    def del_image(self, img_name):
+        file_name = img_name
+        try:
+            self.minio_client.remove_object(self.bucket_name, file_name)
+        except ResponseError:
+            raise Exception("删除minio图片失败 文件名:%s" % file_name)
+
+    def ls_file(self, filename):
+        objects = []
+        try:
+            objects = self.minio_client.list_objects(self.bucket_name, prefix=filename,
+                              recursive=True)
+            return objects
+        except ResponseError:
+            raise Exception("列出文件失败")
+        return objects
+
+

+ 2 - 1
app/crud/__init__.py

@@ -1,4 +1,5 @@
 from app.crud.job_jdbc_datasource import *
 from app.crud.job_info import *
 from app.crud.datax_json import *
-from app.crud.job_log import *
+from app.crud.job_log import *
+from app.crud.data_management import *

+ 27 - 0
app/crud/data_management.py

@@ -0,0 +1,27 @@
+import time
+from typing import List
+from app import models, schemas
+from sqlalchemy.orm import Session
+
+def create_data_management(db: Session, item: schemas.DataManagementCreate):
+    create_time: int = int(time.time())
+    db_item = models.DataManagement(**item.dict(), **{
+        'create_time': create_time,
+    })
+    db.add(db_item)
+    db.commit()
+    db.refresh(db_item)
+    return db_item
+
+def get_data_managements(db: Session, user_id: str, project_id: str):
+    res: List[models.DataManagement] = db.query(models.DataManagement).filter(models.DataManagement.project_id == project_id,models.DataManagement.user_id == user_id).all()
+    return res
+
+def delete_data_management(db: Session, d_id: int):
+    dm_item = db.query(models.DataManagement).filter(models.DataManagement.id == d_id).first()
+    if not dm_item:
+        raise Exception("Data management not found")
+    db.delete(dm_item)
+    db.commit()
+    db.flush()
+    return dm_item

+ 2 - 1
app/models/__init__.py

@@ -1,3 +1,4 @@
 from app.models.job_jdbc_datasource import *
 from app.models.job_info import *
-from app.models.job_log import *
+from app.models.job_log import *
+from app.models.data_management import *

+ 31 - 0
app/models/data_management.py

@@ -0,0 +1,31 @@
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
+
+from app.models.database import BaseModel
+
+
+class DataManagement(BaseModel):
+    __tablename__ = "data_management"
+
+    id = Column(Integer, primary_key=True, index=True)
+    # 数据名称
+    name = Column(String)
+    # 数据类型
+    data_type = Column(String)
+    # 数据条数
+    data_num = Column(Integer)
+    # 存储位置
+    storage_location = Column(String)
+    # 占用存储
+    storage_usage = Column(String)
+    # 存储路径
+    storage_path = Column(String)
+    # 完整性
+    integrity = Column(String)
+    # 创建时间
+    create_time = Column(Integer, nullable=False)
+    # 创建人名称
+    user_name = Column(String, nullable=False)
+    # 创建人编号
+    user_id = Column(String, nullable=False)
+    # 项目编号
+    project_id = Column(String, nullable=False)

+ 43 - 0
app/routers/data_management.py

@@ -0,0 +1,43 @@
+from re import A
+from typing import Optional
+from fastapi import APIRouter
+
+from fastapi import Depends
+from sqlalchemy.orm import Session
+from app import routers, schemas
+from app.common.minio import FileHandler
+
+import app.crud as crud
+from utils.sx_time import sxtimeit
+from utils.sx_web import web_try
+from fastapi_pagination import Page, add_pagination, paginate, Params
+
+from app import get_db
+
+router = APIRouter(
+    prefix="/jpt/datamanagement",
+    tags=["datamanagement-数据管理"],
+)
+
+
+@router.post("/")
+@web_try()
+@sxtimeit
+def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)):
+    return crud.create_data_management(db, item)
+
+
+@router.get("/")
+@web_try()
+@sxtimeit
+def get_data_managements(user_id: str, project_id: str, db: Session = Depends(get_db)):
+    return crud.get_data_managements(db, user_id, project_id)
+
+@router.delete("/")
+@web_try()
+@sxtimeit
+def delete_data_management(data_management_id: int, db: Session = Depends(get_db)):
+    data_management = crud.delete_data_management(db, data_management_id)
+    file_handler = FileHandler("datax")
+    file_handler.del_image(data_management.storage_path)
+    return data_management

+ 46 - 0
app/routers/files.py

@@ -0,0 +1,46 @@
+import io
+import uuid
+from fastapi import APIRouter
+
+from fastapi import File, UploadFile
+from fastapi.responses import StreamingResponse
+from utils.sx_time import sxtimeit
+from utils.sx_web import web_try
+from utils.sx_image import get_b64
+from app.common.minio import FileHandler
+
+from app import get_db
+
+
+
+router = APIRouter(
+    prefix="/jpt/files",
+    tags=["files-文件管理"],
+)
+
+
+@router.post("/data_management/")
+@web_try()
+@sxtimeit
+def put_file( file: UploadFile = File(...),):
+    print("UploadFile-->",file.filename)
+    file_handler = FileHandler("datax")
+    file_name = str(uuid.uuid1())
+    url = file_handler.put_byte_file("data_management/"+file_name, file.file.read())
+    return url
+
+@router.get("/")
+# @web_try()
+@sxtimeit
+def get_file( uri: str):
+    file_handler = FileHandler("datax")
+    file = file_handler.get_file(uri)
+    print(type(file))
+    code = 200
+    if len(file) == 0:
+        code = 404
+
+    response = StreamingResponse(io.BytesIO(file), status_code=code, media_type="application/octet-stream")
+    # 在请求头进行配置
+    response.headers["Content-Disposition"] = "attachment; filename="+uri+".table"
+    return response

+ 62 - 0
app/routers/intermediate.py

@@ -0,0 +1,62 @@
+from ast import Raise
+import io
+import uuid
+from fastapi import APIRouter
+
+from fastapi import File, UploadFile
+from fastapi.responses import StreamingResponse
+from app.utils.utils import byte_conversion
+from utils.sx_time import sxtimeit
+from utils.sx_web import web_try
+from utils.sx_image import get_b64
+from app.common.minio import FileHandler
+
+from app import get_db
+
+
+
+router = APIRouter(
+    prefix="/jpt/intermediate",
+    tags=["intermediate-中间结果管理"],
+)
+
+
+@router.post("/")
+@web_try()
+@sxtimeit
+def put_intermediate(project_id: str, user_id: str, dag_uuid: str, node_uuid: str, file: UploadFile = File(...),):
+    print("UploadFile-->",file.filename)
+    file_name = file.filename.split('.', 1 )[0]
+    print(project_id, user_id, dag_uuid, node_uuid,file_name)
+    file_handler = FileHandler("datax")
+    file_exist = file_handler.get_file("intermediate/{}/{}/{}/{}/{}".format(project_id, user_id, dag_uuid, node_uuid, file_name))
+    if len(file_exist) > 0:
+        raise Exception("file is exists")
+    url = file_handler.put_byte_file("intermediate/{}/{}/{}/{}/{}".format(project_id, user_id, dag_uuid, node_uuid, file_name), file.file.read())
+    return url
+
+
+@router.get("/get_intermediate")
+@web_try()
+@sxtimeit
+def get_file(project_id: str, user_id: str, dag_uuid: str):
+    file_handler = FileHandler("datax")
+    prefix = "intermediate/{}/{}/{}".format(project_id, user_id, dag_uuid)
+    objects = file_handler.ls_file(prefix)
+    res = []
+    for obj in objects:
+        intermediate = {}
+        storage_path = obj.object_name
+        file_path = storage_path.replace(prefix+"/", "")
+        n_n = file_path.split("/",1)
+        node_uuid = n_n[0]
+        file_name = n_n[1]
+        create_time = obj.last_modified.strftime("%Y:%d:%m %H:%M")
+        size = byte_conversion(obj.size)
+        intermediate.update({'name': file_name,
+                            'node_uuid': node_uuid,
+                            'storage_path':storage_path,
+                            'create_time': create_time,
+                            'size': size})
+        res.append(intermediate)
+    return res

+ 2 - 1
app/schemas/__init__.py

@@ -1,4 +1,5 @@
 from app.schemas.job_jdbc_datasouce import *
 from app.schemas.job_info import *
 from app.schemas.datax_json import *
-from app.schemas.job_log import *
+from app.schemas.job_log import *
+from app.schemas.data_management import *

+ 61 - 0
app/schemas/data_management.py

@@ -0,0 +1,61 @@
+from typing import List, Optional
+
+from pydantic import BaseModel
+
+class DataManagementBase(BaseModel):
+    # 数据名称
+    name: str
+    # 数据类型
+    data_type: str
+    # 数据条数
+    data_num: int
+    # 存储位置
+    storage_location: str
+    # 占用存储
+    storage_usage: str
+    # 存储路径
+    storage_path: str
+    # 完整性
+    integrity: str
+    # 创建人名称
+    user_name: str
+    # 创建人编号
+    user_id: str
+    # 项目编号
+    project_id: str
+
+class DataManagementCreate(DataManagementBase):
+    class Config:
+        schema_extra = {
+            "example": {
+                "name": "test",
+                "data_type": "数据表",
+                "data_num": 25,
+                "storage_location": "minio",
+                "storage_usage": "23M",
+                "storage_path": "/datax/usgdcnkasojcxasuscbv",
+                "integrity": "-",
+                "user_name": "test",
+                "user_id": "test",
+                "project_id": "test",
+            }
+        }
+
+class DataManagement(DataManagementBase):
+    id: int
+    # 创建时间
+    create_time: int
+    class Config:
+        orm_mode = True
+
+
+class DataManagementSelect(BaseModel):
+    user_id: str
+    project_id: str
+    class Config:
+        schema_extra = {
+            "example": {
+                "user_id": "test",
+                "project_id": "test",
+            }
+        }

+ 11 - 0
app/utils/utils.py

@@ -16,3 +16,14 @@ def decode_user(username, password):
 
 def encode_base64(str):
     return  base64.encodebytes(str.encode('utf-8')).decode('utf-8')
+
+
+def byte_conversion(size):
+    if size < 1024:
+        return str("%.1f"%size) + 'B'
+    elif size < 1024 * 1024:
+        return str("%.1f"%(size/1024)) + 'KB'
+    elif size < 1024 * 1024 * 1024:
+        return str("%.1f"%(size/1024/1024)) + 'MB'
+    else:
+        return str("%.1f"%(size/1024/1024/1024)) + 'GB'

+ 12 - 0
constants/test.table

@@ -0,0 +1,12 @@
+{
+A: [1,2,3]
+},
+{
+B: [2,3,4]
+},
+{
+C: [4,5,6]
+},
+{
+D: [5,6,7]
+}

+ 22 - 1
data/data.sql

@@ -92,4 +92,25 @@ CREATE TABLE `job_log` (
   PRIMARY KEY (`id`) USING BTREE,
   KEY `I_trigger_time` (`trigger_time`) USING BTREE,
   KEY `I_handle_code` (`handle_code`) USING BTREE
-) ENGINE=InnoDB AUTO_INCREMENT=1581 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;
+) ENGINE=InnoDB AUTO_INCREMENT=1581 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;
+
+
+-- ----------------------------
+-- Table structure for data_management
+-- ----------------------------
+DROP TABLE IF EXISTS `data_management`;
+CREATE TABLE `data_management` (
+  `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `name` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '数据名称',
+  `data_type` varchar(20) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '数据类型',
+  `data_num` int(20) DEFAULT NULL COMMENT '数据条数',
+  `storage_location` varchar(20) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '存储位置',
+  `storage_usage` varchar(20) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '占用存储',
+  `storage_path` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '存储路径',
+  `integrity` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '完整性',
+  `create_time` bigint(13) NOT NULL COMMENT '创建时间',
+  `user_name` varchar(20) COLLATE utf8_unicode_ci NOT NULL COMMENT '创建人名称',
+  `user_id` varchar(50) COLLATE utf8_unicode_ci NOT NULL COMMENT '创建人编号',
+  `project_id` varchar(50) COLLATE utf8_unicode_ci NOT NULL COMMENT '项目编号',
+  PRIMARY KEY (`id`)
+) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='数据管理';

+ 4 - 0
development.ini

@@ -10,3 +10,7 @@ port = 10086
 ; db_name = aihubtest_dag_admin_db
 ; host = 10.254.12.7
 ; port = 3306
+[MINIO]
+url = minio-api.sxkj.com
+access_key = admin
+secret_key = sxkjadmin

+ 2 - 0
docker-compose.yml

@@ -12,5 +12,7 @@ services:
     ports:
       - '18082:8080'
       - '18224:22'
+    extra_hosts:
+      - 'minio-api.sxkj.com:192.168.199.109'
     # volumes:
     #   - ./:/workspace

+ 3 - 0
environment.yml

@@ -10,6 +10,7 @@ dependencies:
   - pip
   - pip:
       - cmake
+      - opencv-python
       - cython
       - fastapi
       - uvicorn
@@ -30,5 +31,7 @@ dependencies:
       - SQLAlchemy==1.4.9
       - numpy
       - pandas
+      - minio==5.0.1
+      - Pillow==9.1.1
       - -i https://mirror.baidu.com/pypi/simple
 prefix: /opt/conda/envs/py38

+ 4 - 0
production.ini

@@ -10,3 +10,7 @@ port = 10086
 ; db_name = aihubtest_dag_admin_db
 ; host = 10.254.12.7
 ; port = 3306
+[MINIO]
+url = minio-api.sxkj.com
+access_key = admin
+secret_key = sxkjadmin

+ 7 - 0
server.py

@@ -9,6 +9,9 @@ import app.routers.constants as router_constants
 import app.routers.job_info as router_job_info
 import app.routers.job_log as router_job_log
 import app.routers.datax_json as router_datax
+import app.routers.data_management as router_data_management
+import app.routers.files as router_files
+import app.routers.intermediate as router_intermediate
 
 Base.metadata.create_all(bind=engine)
 app = FastAPI( docs_url='/jpt/docs', redoc_url='/jpt/redoc', title="DAG管理系统")
@@ -28,7 +31,11 @@ app.include_router(router_jjds.router)
 app.include_router(router_constants.router)
 app.include_router(router_job_info.router)
 app.include_router(router_job_log.router)
+app.include_router(router_data_management.router)
 app.include_router(router_datax.router)
+app.include_router(router_files.router)
+app.include_router(router_intermediate.router)
+
 
 
 # Get 健康检查

+ 1 - 0
utils/__init__.py

@@ -1,5 +1,6 @@
 from utils.sx_web import *
 from utils.sx_time import *
+from utils.sx_image import *
 
 def flat_map(f, xs):
         ys = []

+ 12 - 0
utils/sx_image.py

@@ -0,0 +1,12 @@
+from base64 import b64decode
+import base64
+import numpy as np
+import cv2
+
+def base64_to_np(img_data):
+    color_image_flag = 1
+    img_data = img_data.split(',',1)[-1]
+    return cv2.imdecode(np.fromstring(b64decode(img_data), dtype=np.uint8), color_image_flag)
+
+def get_b64(f):
+    return base64.encodebytes(f).decode('utf-8')