123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- import numpy as np
- from dataclasses import dataclass
- # result 对象
- from utils.time import timeit
- @dataclass
- class OcrResult(object):
- box: np.ndarray
- txt: str
- conf: float
- def __hash__(self):
- return hash(repr(self))
- def __repr__(self):
- return f'txt: {self.txt}, box: {self.box.tolist()}, conf: {self.conf}'
- @property
- def lt(self):
- l, t = np.min(self.box, 0)
- return [l, t]
- @property
- def rb(self):
- r, b = np.max(self.box, 0)
- return [r, b]
- @property
- def wh(self):
- l, t = self.lt
- r, b = self.rb
- return [r - l, b - t]
- @property
- def area(self):
- w, h = self.wh
- return w * h
- @property
- def is_slope(self):
- p0 = self.box[0]
- p1 = self.box[1]
- if p0[0] == p1[0]:
- return False
- slope = abs(1. * (p0[1] - p1[1]) / (p0[0] - p1[0]))
- return 0.4 < slope < 2.5
- @property
- def center(self):
- l, t = self.lt
- r, b = self.rb
- return [(r + l) / 2, (b + t) / 2]
- def one_line(self, b, is_horizontal, eps: float = 20.0) -> bool:
- y_idx = 0 + is_horizontal
- x_idx = 1 - y_idx
- if b.lt[x_idx] < self.lt[x_idx] < self.rb[x_idx] < b.rb[x_idx]: return False
- if self.lt[x_idx] < b.lt[x_idx] < b.rb[x_idx] < self.rb[x_idx]: return False
- eps = 0.45 * (self.wh[y_idx] + b.wh[y_idx])
- dist = abs(self.center[y_idx] - b.center[y_idx])
- return dist < eps
- def one_row(self, b, spacing_num):
- # 进来图片已为正向
- eps = 10.
- if abs(self.lt[0] - b.lt[0]) > eps: return False
- if abs(self.lt[1] - b.lt[1]) > (eps + self.wh[1]) * spacing_num: return False
- return True
- # 行处理器
- class LineParser(object):
- def __init__(self, ocr_raw_result, filters=None):
- if filters is None:
- filters = [lambda x: x.is_slope]
- self.ocr_res = []
- for re in ocr_raw_result:
- o = OcrResult(np.array(re[0]), re[1][0], re[1][1])
- if any([f(o) for f in filters]): continue
- self.ocr_res.append(o)
- # for f in filters:
- # self.ocr_res = list(filter(f, self.ocr_res))
- self.ocr_res = sorted(self.ocr_res, key=lambda x: x.area, reverse=True)
- self.eps = self.avg_height * 0.86
- @property
- def is_horizontal(self):
- res = self.ocr_res
- wh = np.stack([np.abs(np.array(r.lt) - np.array(r.rb)) for r in res])
- return np.sum(wh[:, 0] > wh[:, 1]) > np.sum(wh[:, 0] < wh[:, 1])
- @property
- def avg_height(self):
- idx = self.is_horizontal + 0
- return np.mean(np.array([r.wh[idx] for r in self.ocr_res]))
- # 整体置信度
- @property
- def confidence(self):
- return np.mean([r.conf for r in self.ocr_res])
- # 处理器函数
- @timeit
- def parse(self, eps=40.0):
- # 存返回值
- res = []
- # 需要 处理的 OcrResult 对象 的长度
- length = len(self.ocr_res)
- # 如果字段数 小于等于1 就抛出异常
- if length <= 1:
- raise Exception('无法识别')
- # 遍历数组 并处理他
- for i in range(length):
- # 拿出 OcrResult对象的 第i值 -暂存-
- res_i = self.ocr_res[i]
- # 这次的 res_i 之前已经在结果集中,就继续下一个
- if any(map(lambda x: res_i in x, res)): continue
- # set() -> {}
- # 初始化一个集合 即-输出-
- res_row = set()
- for j in range(i, length):
- res_j = self.ocr_res[j]
- # 这次的 res_i 之前已经在结果集中,就继续下一个
- if any(map(lambda x: res_j in x, res)): continue
- if res_i.one_line(res_j, self.is_horizontal, self.eps):
- # LineParser 对象 不可以直接加入字典
- res_row.add(res_j)
- if j >= i and res_i.one_row(res_j, j):
- res_row.add(res_j)
- res.append(res_row)
- idx = self.is_horizontal + 0
- res = sorted([sorted(list(r), key=lambda x: x.lt[1 - idx]) for r in res], key=lambda x: x[0].lt[idx])
- for row in res:
- print('---')
- print(''.join([r.txt for r in row]))
- return res
|