ソースを参照

数据源 kerberos验证

liweiquan 2 年 前
コミット
4b6c89857b

+ 12 - 4
app/common/hive.py

@@ -1,6 +1,6 @@
 from app.core.datasource.datasource import DataSourceBase
 from app.core.datasource.hive import HiveDS
-from configs.settings import config
+from configs.settings import DefaultOption, config
 from utils import flat_map
 
 HOST = config.get('HIVE', 'HOST')
@@ -8,12 +8,20 @@ PORT = config.get('HIVE', 'PORT')
 USERNAME = config.get('HIVE', 'USERNAME')
 PASSWORD =  config.get('HIVE', 'PASSWORD')
 DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
-
-
+KERBEROS = config.get('HIVE', 'KERBEROS')
+KEYTAB = config.get('HIVE', 'KEYTAB', vars=DefaultOption(config, 'HIVE', KEYTAB = None))
+KRB5CONFIG = config.get('HIVE', 'KRB5CONFIG', vars=DefaultOption(config, 'HIVE', KRB5CONFIG = None))
+KERBEROS_SERVICE_NAME = config.get('HIVE', 'KERBEROS_SERVICE_NAME', vars=DefaultOption(config, 'HIVE', KERBEROS_SERVICE_NAME = None))
+PRINCIPAL = config.get('HIVE', 'PRINCIPAL', vars=DefaultOption(config, 'HIVE', PRINCIPAL = None))
 
 hiveDs = HiveDS(**{'type': 'hive',
                     'host': HOST,
                     'port': PORT,
                     'username': USERNAME,
                     'password': PASSWORD,
-                    'database_name': DATABASE_NAME})
+                    'database_name': DATABASE_NAME,
+                    'kerberos': KERBEROS,
+                    'keytab': KEYTAB,
+                    'krb5config': KRB5CONFIG,
+                    'kerberos_service_name': KERBEROS_SERVICE_NAME,
+                    'principal':PRINCIPAL})

+ 2 - 3
app/common/minio.py

@@ -78,12 +78,11 @@ class FileHandler(object):
             print("Get minio pic failed:{}".format(e))
         return bytes()
 
-    def del_image(self, img_name):
-        file_name = img_name
+    def del_file(self, file_name):
         try:
             self.minio_client.remove_object(self.bucket_name, file_name)
         except ResponseError:
-            raise Exception("删除minio图片失败 文件名:%s" % file_name)
+            raise Exception("删除minio文件失败 文件名:%s" % file_name)
 
     def ls_file(self, filename):
         objects = []

+ 19 - 4
app/core/datasource/hive.py

@@ -1,3 +1,4 @@
+import os
 from app.core.datasource.datasource import DataSourceBase
 from pyhive import hive
 from pyhive.exc import DatabaseError
@@ -7,13 +8,22 @@ from utils import flat_map
 class HiveDS(DataSourceBase):
     type = 'hive'
 
-    def __init__(self, host, port, username, password, database_name, type='hive'):
+    def __init__(self, host, port,database_name,\
+        username=None, password=None,  kerberos=0, \
+        keytab=None, krb5config=None, kerberos_service_name=None, \
+        principal=None, type='hive'):
         DataSourceBase.__init__(self, host, port, username, password, database_name, type)
         self.host = host
         self.port = port
         self.username = username
         self.password = password
         self.database_name = 'default' if not database_name else database_name
+        self.kerberos = int(kerberos)
+        self.keytab = keytab
+        self.krb5config = krb5config
+        self.kerberos_service_name = kerberos_service_name
+        self.principal = principal
+
 
     @property
     def jdbc_url(self):
@@ -31,7 +41,12 @@ class HiveDS(DataSourceBase):
         conn = None
         res = []
         try:
-            conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
+            if self.kerberos == 0:
+                conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
+            else:
+                os.system(f'kiinit -kt {self.keytab} {self.principal}')
+                conn = hive.Connection(host=self.host, database=self.database_name, port=self.port,  auth="KERBEROS", kerberos_service_name=self.kerberos_service_name)
+
 
             cursor = conn.cursor()
             for sql in sqls:
@@ -83,7 +98,7 @@ class HiveDS(DataSourceBase):
         res = self._execute_sql([sql1])
         if res:
             columns = list(map(lambda x: x[0],res[0]))
-            logger.info(columns)
+            # logger.info(columns)
         else:
             raise Exception(f'{table_name} no columns')
         ans = []
@@ -92,7 +107,7 @@ class HiveDS(DataSourceBase):
             try:
                 res = self._execute_sql([sql])
                 if res:
-                        print(res[0])
+                        # print(res[0])
                         res = [[str(i), *x] for x in filter(lambda x: x[0] != '', res[0])]
                         ans.append(''.join(flat_map(lambda x: ':'.join(x[:3]), res)))
 

+ 6 - 0
app/crud/jm_homework.py

@@ -91,4 +91,10 @@ def delete_jm_homework(db: Session, id: int):
 def get_jm_homeworks_by_ids(db: Session, ids: List[int]):
     res: List[models.JmHomework] = db.query(models.JmHomework)\
             .filter(models.JmHomework.id.in_(ids)).all()
+    return res
+
+def get_jm_homework_by_dag_url(db: Session, dag_url: str):
+    res: List[models.JmHomework] = db.query(models.JmHomework)\
+        .filter(models.JmHomework.dag_url == dag_url)\
+        .filter(models.JmHomework.status == 1).all()
     return res

+ 10 - 0
app/models/job_jdbc_datasource.py

@@ -35,4 +35,14 @@ class JobJdbcDatasource(BaseModel):
     comments = Column(String)
     # 标签
     tag = Column(String)
+    # kerberos 验证(0:未开启,1:开启)
+    kerberos = Column(Integer)
+    # keytab 文件
+    keytab = Column(String)
+    # krb5config 文件
+    krb5config = Column(String)
+    # service 名称
+    kerberos_service_name = Column(String)
+    # principal
+    principal = Column(String)
 

+ 13 - 1
app/routers/data_management.py

@@ -1,6 +1,6 @@
 from asyncio import current_task
 from re import A
-from time import time
+import time
 from typing import Optional
 from fastapi import APIRouter
 
@@ -40,6 +40,18 @@ def create_data_management(item: schemas.DataManagementCreate, db: Session = Dep
 def get_data_managements(user_id: str, project_id: str, db: Session = Depends(get_db)):
     return crud.get_data_managements(db, user_id, project_id)
 
+@router.get("/local")
+@web_try()
+@sxtimeit
+def get_local_data_managements(db: Session = Depends(get_db)):
+    return hiveDs.list_tables()
+
+@router.get("/table_schema")
+@web_try()
+@sxtimeit
+def get_data_managements_schema(table_name: str, db: Session = Depends(get_db)):
+    return hiveDs.get_table_schema(table_name)
+
 @router.delete("/")
 @web_try()
 @sxtimeit

+ 19 - 5
app/routers/files.py

@@ -2,7 +2,9 @@ import io
 import json
 import time
 import uuid
-from fastapi import APIRouter, File, UploadFile, Form
+import app.crud as crud
+from fastapi import APIRouter, File, UploadFile, Form, Depends
+from sqlalchemy.orm import Session
 from fastapi.responses import StreamingResponse
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
@@ -24,12 +26,21 @@ def get_file(uri: str):
     code = 200
     if len(file) == 0:
         code = 404
-    print(file)
     response = StreamingResponse(io.BytesIO(file), status_code=code, media_type="application/octet-stream")
     # 在请求头进行配置
     response.headers["Content-Disposition"] = "attachment; filename="+uri
     return response
 
+@router.delete("/")
+@web_try()
+@sxtimeit
+def delete_file(uri: str,db: Session = Depends(get_db)):
+    res = crud.get_jm_homework_by_dag_url(db,uri)
+    if len(res) == 0:
+        minio_client.del_file(uri)
+    else:
+        raise Exception("该算子正在被作业使用,不可删除")
+
 @router.post("/upload_file")
 @web_try()
 @sxtimeit
@@ -43,18 +54,21 @@ def upload_file(file: UploadFile = File(...), project_id: str=Form(...), file_ty
 @router.get("/directory")
 @web_try()
 @sxtimeit
-def get_dags(project_id: str, user_id: str, file_type: str):
+def get_directory(project_id: str, user_id: str, file_type: str):
     files = minio_client.ls_file(f'{project_id}/{file_type}/')
     res = []
     for file in files:
+        timestamp = file.object_name.split('_',1)[0].split('/')[-1]
+        time_str = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(timestamp)))
         dag_name = file.object_name.split('_',1)[1]
-        res.append({'name':dag_name, 'uri':file.object_name})
+        res.append({'name':time_str+'-'+dag_name, 'uri':file.object_name,'timestamp':timestamp})
+    res.sort(key=lambda x: x['timestamp'],reverse=True)
     return res
 
 @router.get("/dag_content")
 @sxtimeit
 @web_try()
-def get_file(uri: str):
+def get_dag_content(uri: str):
     file = minio_client.get_file(uri)
     if len(file) == 0:
         raise Exception('No file found')

+ 0 - 5
app/routers/job_info.py

@@ -75,11 +75,6 @@ def get_job_info(job_id: int, db: Session = Depends(get_db)):
 @sxtimeit
 def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
     job_info = crud.update_job_info(db, id, update_item)
-    relation = crud.get_af_id(db, job_info.id, 'datax')
-    if not relation:
-        datax_create_job(job_info,db)
-    else:
-        datax_update_job(job_info,db)
     return job_info
 
 @router.put("/update_trigger_status/")

+ 23 - 3
app/schemas/job_jdbc_datasouce.py

@@ -1,4 +1,4 @@
-from typing import List
+from typing import List, Optional
 
 from pydantic import BaseModel
 
@@ -11,15 +11,25 @@ class JobJdbcDatasourceBase(BaseModel):
     # 数据库名
     database_name: str
     # 数据库用户名
-    jdbc_username: str
+    jdbc_username: Optional[str]
     # 数据库密码
-    jdbc_password: str
+    jdbc_password: Optional[str]
     # jdbc url
     jdbc_url: str
     # 备注
     comments: str
     # 标签
     tag: str
+    # kerberos 验证(0:未开启,1:开启)
+    kerberos: int
+    # keytab 文件
+    keytab: Optional[str]
+    # krb5config 文件
+    krb5config: Optional[str]
+    # service 名称
+    kerberos_service_name: Optional[str]
+    # principal
+    principal: Optional[str]
     class Config:
         schema_extra = {
             # "example": {
@@ -41,6 +51,11 @@ class JobJdbcDatasourceBase(BaseModel):
                 "jdbc_url": '192.168.199.107:10000',
                 "comments": 'This is a very nice Item',
                 "tag": '线下',
+                "kerberos": 0,
+                "keytab": "test/kerberos/user.keytab",
+                "krb5config": "test/kerberos/user.conf",
+                "kerberos_service_name": "hadoop",
+                "principal": "ailab@EMR-5XJSY31F",
             }
         }
 
@@ -73,6 +88,11 @@ class JobJdbcDatasourceUpdate(JobJdbcDatasourceBase):
                 "jdbc_url": '192.168.199.107:10000',
                 "comments": 'This is a very nice Item',
                 "tag": '线下',
+                "kerberos": 0,
+                "keytab": "test/kerberos/user.keytab",
+                "krb5config": "test/kerberos/user.conf",
+                "kerberos_service_name": "hadoop",
+                "principal": "ailab@EMR-5XJSY31F",
                 "status": 1,
             }
         }

+ 26 - 3
app/services/datax.py

@@ -1,3 +1,4 @@
+import time
 from app import crud, models
 from app.utils.send_util import *
 from app.utils.utils import get_cmd_parameter
@@ -26,7 +27,8 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
     af_job = res['data']
     crud.create_relation(db, job_info.id,'datax', af_job['id'])
     send_submit(af_job['id'])
-    send_pause(af_job['id'], job_info.trigger_status)
+    get_job_last_parsed_time()
+    # on_off_control(af_job['id'], job_info.trigger_status)
 
 def datax_create_task(job_info: models.JobInfo):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param)
@@ -84,7 +86,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     res = send_put('/jpt/af_job', old_af_job['id'], af_job)
     af_job = res['data']
     send_submit(af_job['id'])
-    send_pause(af_job['id'], job_info.trigger_status)
+    on_off_control(af_job['id'], job_info.trigger_status)
 
 
 def datax_put_task(job_info: models.JobInfo,old_af_task):
@@ -114,4 +116,25 @@ def datax_put_task(job_info: models.JobInfo,old_af_task):
     }
     res = send_put('/jpt/af_task', old_af_task['id'],af_task)
     af_task = res['data']
-    return af_task
+    return af_task
+
+
+def datax_job_submit(job_info: models.JobInfo, db: Session):
+    relation = crud.get_af_id(db, job_info.id, 'datax')
+    if not relation:
+        datax_create_job(job_info,db)
+    else:
+        datax_update_job(job_info,db)
+
+
+def on_off_control(af_job_id: int,status: int):
+    for i in range(0,11):
+        parsed_res = get_job_last_parsed_time(af_job_id)
+        last_parsed_time = parsed_res['data']['last_parsed_time']
+        if last_parsed_time:
+            send_pause(af_job_id,status)
+            print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
+            break
+        if i >= 10:
+            raise Exception(f"{af_job_id}==>状态修改失败")
+        time.sleep(2)

+ 25 - 11
app/services/jm_job.py

@@ -1,10 +1,13 @@
+from asyncio import current_task
 import json
+import time
 from turtle import update
 from app import crud, models
+from app.common import minio
 from app.crud.jm_homework_datasource_relation import get_jm_relations
 from app.utils.send_util import *
 from sqlalchemy.orm import Session
-from app.common.minio import FileHandler
+from app.common.minio import minio_client
 
 type_dict = {
     "Java": "java",
@@ -92,7 +95,7 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
     af_job = res['data']
     crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
     send_submit(af_job['id'])
-    send_pause(af_job['id'], jm_job_info.status)
+    # on_off_control(af_job['id'],jm_job_info.status)
 
 
 def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
@@ -122,7 +125,8 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
     res = send_put('/jpt/af_job', job_relation.af_id, af_job)
     af_job = res['data']
     send_submit(af_job['id'])
-    send_pause(af_job['id'],jm_job_info.status)
+    on_off_control(af_job['id'],jm_job_info.status)
+
 
 def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
     job_relation = crud.get_af_id(db,jm_job_info.id,'job')
@@ -135,10 +139,8 @@ def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
 def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
     relations = get_jm_relations(db,jm_homework.id)
     node_relation_dict = { relation.node_uuid:relation for relation in relations}
-    f = open('./dag' + jm_homework.dag_url)
-    lines = f.read()
-    result = json.loads(lines)
-    f.close()
+    file = minio_client.get_file(jm_homework.dag_url)
+    result = json.loads(file)
     edges = result['edges']
     t_s = {}
     input_num = {}
@@ -167,7 +169,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
         elif node['op'] == 'outputsource':
             fileds = node['data']['output_source']
             script = '''def main_func (input0, spark,sc):
-    input0.write.mode("overwrite").saveAsTable('''+node_relation_dict[node['id']].table+''')'''
+    input0.write.mode("overwrite").saveAsTable("'''+node_relation_dict[node['id']].table+'''")'''
             inputs = {}
             index = 0
             input_list = t_s[node['id']]
@@ -212,6 +214,18 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
     return json.dumps(res)
 
 def red_python_and_format(jm_homework):
-    file_handler = FileHandler("datax")
-    file = file_handler.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
-    return file.decode("utf-8")
+    file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
+    return file.decode("utf-8")
+
+
+def on_off_control(af_job_id: int,status: int):
+    for i in range(0,11):
+        parsed_res = get_job_last_parsed_time(af_job_id)
+        last_parsed_time = parsed_res['data']['last_parsed_time']
+        if last_parsed_time:
+            send_pause(af_job_id,status)
+            print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
+            break
+        if i >= 10:
+            raise Exception(f"{af_job_id}==>执行失败")
+        time.sleep(2)

+ 2 - 0
app/utils/get_kerberos.py

@@ -0,0 +1,2 @@
+def get_kerberos_to_local():
+    print("====")

+ 27 - 0
configs/krb5.conf

@@ -0,0 +1,27 @@
+[libdefaults]
+    dns_lookup_realm = false
+    dns_lookup_kdc = false
+    ticket_lifetime = 24h
+    renew_lifetime = 7d
+    forwardable = true
+    rdns = false
+    default_realm = EMR-5XJSY31F
+    default_tgs_enctypes = des3-cbc-sha1
+    default_tkt_enctypes = des3-cbc-sha1
+    permitted_enctypes = des3-cbc-sha1
+    kdc_timeout = 3000
+    max_retries = 3
+[realms]
+    EMR-5XJSY31F = {
+
+        kdc = 10.254.20.18:88
+        admin_server = 10.254.20.18
+        kdc = 10.254.20.22:88
+		admin_server = 10.254.20.22
+
+    }
+
+[domain_realm]
+# .example.com = EXAMPLE.COM
+
+

+ 15 - 0
configs/settings.py

@@ -2,6 +2,21 @@
 import configparser
 import os
 
+class DefaultOption(dict):
+    def __init__(self, config, section, **kv):
+        self._config = config
+        self._section = section
+        dict.__init__(self, **kv)
+    def items(self):
+        _items = []
+        for option in self:
+            if not self._config.has_option(self._section, option):
+                _items.append((option, self[option]))
+            else:
+                value_in_config = self._config.get(self._section, option)
+                _items.append((option, value_in_config))
+        return _items
+
 config = configparser.ConfigParser()
 if os.environ.get('APP_ENV', 'development') == 'development':
     config.readfp(open('development.ini'))

BIN
configs/user.keytab


+ 5 - 0
data/data.sql

@@ -23,6 +23,11 @@ CREATE TABLE `job_jdbc_datasource` (
   `update_time` int(20) DEFAULT NULL COMMENT '更新时间',
   `comments` varchar(1000) DEFAULT NULL COMMENT '备注',
   `tag` varchar(20) DEFAULT NULL COMMENT '标签: 0开发1测试2开发',
+  `kerberos` tinyint(1) NOT NULL COMMENT 'kerberos验证是否开启(0:不开启,1:开启)',
+  `keytab` varchar(200) DEFAULT NULL COMMENT 'keytab文件',
+  `krb5config` varchar(200) DEFAULT NULL COMMENT 'krb5config',
+  `kerberos_service_name` varchar(100) DEFAULT NULL COMMENT 'kerberos_service_name',
+  `principal` varchar(100) DEFAULT NULL COMMENT 'principal',
   PRIMARY KEY (`id`) USING BTREE
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='jdbc数据源配置';
 

+ 1 - 0
development.ini

@@ -17,3 +17,4 @@ port = 10000
 username = hive
 password = hive
 database_name = default
+kerberos = 0

+ 7 - 1
production.ini

@@ -16,4 +16,10 @@ host = 192.168.199.27
 port = 10000
 username = hive
 password = hive
-database_name = default
+database_name = default
+kerberos = 0
+keytab = configs/user.keytab
+krb5config = configs/krb5.conf
+kerberos_service_name = hadoop
+principal = ailab@EMR-5XJSY31F
+

+ 2 - 0
server.py

@@ -17,9 +17,11 @@ import app.routers.jm_job_log as router_jm_job_log
 from app.routers.run import router_af_run
 from app.routers.job import router_af_job
 from app.routers.task import router_af_task
+from app.utils.get_kerberos import get_kerberos_to_local
 from utils.sx_log import format_print
 
 format_print()
+get_kerberos_to_local()
 
 Base.metadata.create_all(bind=engine)
 app = FastAPI(docs_url='/jpt/docs', redoc_url='/jpt/redoc', title="DAG管理系统")