Browse Source

Merge remote-tracking branch 'origin/master'

luoyulong 2 years ago
parent
commit
2105c3e12e

+ 2 - 2
app/core/datasource/hive.py

@@ -77,9 +77,9 @@ class HiveDS(DataSourceBase):
         sql = f'describe {self.database_name}.{table_name}'
         res = self._execute_sql([sql])
         if res:
-            res = [[str(i) , *x]for i, x in enumerate(res[0])]
+            print(res[0])
+            res = [[str(i) , *x]for i, x in enumerate(filter(lambda x: x[0] != '', res[0]))]
             logger.info(res)
-
             return flat_map(lambda x: [':'.join(x[:3])], res)
         else:
             raise Exception('table not found')

+ 2 - 1
app/core/datasource/mysql.py

@@ -92,7 +92,8 @@ class MysqlDS(DataSourceBase):
         return flat_map(lambda x: x, res[0])
 
     def get_table_schema(self, table_name):
-        sql = f'describe {self.database_name}.{table_name}'
+        sql = f'describe `{self.database_name}`.{table_name}'
+        logger.info(sql)
         res = self._execute_sql([sql])
         if res:
             res = [[str(i) , *x]for i, x in enumerate(res[0])]

+ 11 - 1
app/crud/job_info.py

@@ -3,10 +3,20 @@ from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
 
+from app.utils.cron_utils import *
+
 
 def create_job_info(db: Session, item: schemas.JobInfoCreate):
     create_time: int = int(time.time())
-    db_item = models.JobInfo(**item.dict(), **{
+    item_dict = item.dict()
+    cron_expression_dict = item_dict.pop('cron_expression')
+    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
+    cron_select_type = cron_expression_dict["cron_select_type"]
+    item_dict.update({
+        'cron_select_type': cron_select_type,
+        'job_cron': cron_expression,
+    })
+    db_item = models.JobInfo(**item_dict, **{
         'trigger_status': 0,
         'create_time': create_time,
         'update_time': create_time,

+ 2 - 0
app/models/job_info.py

@@ -7,6 +7,8 @@ class JobInfo(BaseModel):
     __tablename__ = "job_info"
 
     id = Column(Integer, primary_key=True, index=True)
+    # 选择周期类型(0:时,1:日,2:周,3:月,4:cron)
+    cron_select_type = Column(Integer)
     # 任务执行CRON
     job_cron = Column(String, nullable=False, index=True)
     # 任务描述

+ 18 - 4
app/schemas/job_info.py

@@ -2,9 +2,9 @@ from typing import List, Optional
 
 from pydantic import BaseModel
 
+from app.schemas.cron_expression import CronExpression
+
 class JobInfoBase(BaseModel):
-    # 任务执行CRON
-    job_cron: str
     # 任务描述
     job_desc: str
     # 执行器路由策略
@@ -33,10 +33,20 @@ class JobInfoBase(BaseModel):
 
 
 class JobInfoCreate(JobInfoBase):
+    # 周期表达式
+    cron_expression: CronExpression
     class Config:
         schema_extra = {
             "example": {
-                "job_cron": "0 0/2 * * * ?",
+                "cron_expression": {
+                    "cron_select_type": 3,
+                    "cron_expression": "",
+                    "minute": 0,
+                    "hour": 0,
+                    "day": 1,
+                    "week": 3,
+                    "month": 2,
+                },
                 "job_desc": "mysql-mysql同步",
                 "executor_route_strategy": "FIRST",
                 "executor_handler": "",
@@ -53,12 +63,14 @@ class JobInfoCreate(JobInfoBase):
         }
 
 class JobInfoUpdate(JobInfoBase):
+    # 运行周期
+    job_cron: str
     # 调度状态: 0-停止 1-运行
     trigger_status: int
     class Config:
         schema_extra = {
             "example": {
-                "job_cron": "0 0/2 * * * ?",
+                "job_cron": "0 0 1 1/2 ?",
                 "job_desc": "mysql-mysql同步",
                 "executor_route_strategy": "FIRST",
                 "executor_handler": "",
@@ -95,6 +107,8 @@ class JobInfo(JobInfoBase):
     job_json: str
     # 最近一次执行状态
     last_handle_code: int
+    # 运行周期
+    job_cron: str
 
     class Config:
         orm_mode = True

+ 11 - 1
data/data.sql

@@ -251,4 +251,14 @@ CREATE TABLE `jm_job_log` (
   `executor_result` tinyint(4) DEFAULT NULL COMMENT '执行结果',
   `job_log_uri` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '日志',
   PRIMARY KEY (`id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8_unicode_ci COMMENT='定时任务日志';
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8_unicode_ci COMMENT='定时任务日志';
+
+
+-- ----------------------------
+-- Alter Table structure for job_info
+-- ----------------------------
+ALTER TABLE `job_info`
+ADD COLUMN `cron_select_type` tinyint(4) NOT NULL COMMENT '周期选择类型' AFTER `id`,
+ADD COLUMN `replace_param` varchar(20) NULL COMMENT '增量时间' AFTER `delete_status`,
+ADD COLUMN `partition_info` varchar(20) NULL COMMENT '分区信息' AFTER `replace_param`,
+ADD COLUMN `jvm_param` varchar(50) NULL COMMENT 'jvm参数' AFTER `partition_info`;

+ 6 - 8
docker-compose.yml

@@ -2,22 +2,20 @@ version: '2'
 services:
   datax-admin:
     hostname: datax-admin
-    container_name: ai2_be
+    container_name: datax-admin
     restart: always
     image: registry.cn-hangzhou.aliyuncs.com/sxtest/datax-admin:latest
     privileged: true
     ipc: host
     tty: true
     working_dir: /workspace
-    volumes:
-      - /home/sxkj/luoyulong/ai2/ai2_be:/workspace
-      - /mnt/nfs/airflow-airflow-dags-pvc-b2638332-6249-4a45-b99e-7a54dc63482f/fc309d7dd0f5c1de9299e5e9a222a098faec1de0:/dags
+    # volumes:
+    #   - /home/sxkj/luoyulong/ai2/ai2_be:/workspace
+    #   - /mnt/nfs/airflow-airflow-dags-pvc-b2638332-6249-4a45-b99e-7a54dc63482f/fc309d7dd0f5c1de9299e5e9a222a098faec1de0:/dags
     ports:
       - '18082:8080'
       - '18224:22'
     extra_hosts:
       - 'minio-api.sxkj.com:192.168.199.109'
-    environment:
-      - APP_ENV=development
-
-
+    # environment:
+    #   - APP_ENV=development

+ 3 - 0
environment.yml

@@ -36,5 +36,8 @@ dependencies:
       - minio==5.0.1
       - Pillow==9.1.1
       - croniter==1.3.7
+      - uvloop
+      - kubernetes
+      - httptools
       - -i https://mirror.baidu.com/pypi/simple
 prefix: /opt/conda/envs/py38