ranks_parse.py 15 KB


  1. import copy
  2. import math
  3. import pickle
  4. import re
  5. from dataclasses import dataclass
  6. from typing import List
  7. import numpy as np
  8. # result 对象
  9. # box: np.ndarray
  10. # txt: str
  11. # conf: float
  12. @dataclass
  13. class OcrResult(object):
  14. box: np.ndarray
  15. txt: str
  16. conf: float
  17. def __hash__(self):
  18. return hash(repr(self))
  19. def __repr__(self):
  20. return f'txt: {self.txt}, box: {self.box.tolist()}, conf: {self.conf}'
  21. @property
  22. def ltrb(self):
  23. l, t = np.min(self.box, 0)
  24. r, b = np.max(self.box, 0)
  25. return [l, t, r, b]
  26. def lt(self):
  27. l, t = np.min(self.box, 0)
  28. return [l, t]
  29. @property
  30. def wh(self):
  31. l, t = self.ltrb[:2]
  32. r, b = self.ltrb[2:]
  33. return [r - l, b - t]
  34. @property
  35. def center(self):
  36. l, t = self.ltrb[:2]
  37. r, b = self.ltrb[2:]
  38. return [(r + l) / 2, (b + t) / 2]
  39. # 共需要识别6个字段
  40. # 4: 姓名 出生地 籍贯 身份证号(性别)
  41. # 1: 血型 信仰
  42. class RanksParser(object):
  43. def __init__(self, res: List):
  44. self.col_fields = None
  45. self.row_fields = None
  46. Or = []
  47. for r in res:
  48. # box: np.ndarray | txt: str | conf: float
  49. _r = OcrResult(np.array(r[0]), r[1][0], r[1][1])
  50. Or.append(_r)
  51. self.ocr_res = Or
  52. del_index = []
  53. for _r in Or:
  54. if '常驻' in _r.txt or '常住' in _r.txt or '人口' in _r.txt or '口登' in _r.txt or '记卡' in _r.txt:
  55. title_t = _r.ltrb[1] - self.mean_h_esp * 0.3
  56. title_b = _r.ltrb[3] + self.mean_h_esp * 0.3
  57. for _r_k, _r_v in enumerate(Or):
  58. if len(_r_v.txt) == 1 and _r_v == '美':
  59. del_index.append(_r_k)
  60. if title_t < _r_v.center[1] < title_b:
  61. del_index.append(_r_k)
  62. break
  63. for k, i in enumerate(del_index):
  64. del Or[i - k]
  65. self.ocr_res = Or
  66. @property
  67. def confidence(self):
  68. return np.mean([r.conf for r in self.ocr_res])
  69. @property
  70. def center_five_row(self):
  71. boxs = []
  72. for row in self.ocr_res:
  73. boxs.extend(row.box)
  74. boxs = np.stack(boxs)
  75. l, t = np.min(boxs, 0)
  76. r, b = np.max(boxs, 0)
  77. five = (b - t) / 5
  78. return [t+five, b-five]
  79. @property
  80. def mean_h_esp(self):
  81. """
  82. 由框的平均高度 获取esp
  83. """
  84. esps = [r.wh[1] for r in self.ocr_res]
  85. return np.mean(esps)
  86. @property
  87. def anchor(self) -> OcrResult:
  88. """
  89. 获取锚点:身份证号
  90. """
  91. pass
  92. @staticmethod
  93. def merger_or(or_a, or_b):
  94. """
  95. 合并两个框
  96. """
  97. or_a.txt += or_b.txt
  98. l, t = np.min(np.min([or_a.box, or_b.box], 0), 0)
  99. r, b = np.max(np.max([or_a.box, or_b.box], 0), 0)
  100. return OcrResult(np.array([[l, t], [r, t], [r, b], [l, b]]), or_a.txt, or_a.conf)
  101. # 获得 r 左上角的xy坐标
  102. def get_xy(self, r: OcrResult):
  103. return [r.ltrb[0], r.ltrb[1]]
  104. def all_required_fields(self, eps):
  105. """
  106. 获取所有需要的字段
  107. :return:
  108. """
  109. # 添加 位于 anc 右侧的 字段
  110. # 简单的合并两个OcrResult
  111. def get_one_line(anc, field):
  112. # 获取处于anc行 且 在field右侧 的字段
  113. result = []
  114. anc_field = None
  115. en = re.compile(u'[\u0041-\u005a+\u0061-\u007a]')
  116. for r in self.ocr_res:
  117. if '型' in r.txt and ''.join(re.findall(en, r.txt)):
  118. r.txt = '血型' + ''.join(re.findall(en, r.txt))[0] + '型'
  119. return r
  120. if '不' in r.txt and '不便' not in r.txt:
  121. r.txt = '血型不明'
  122. return r
  123. if anc_field is None and field in r.txt and r.ltrb[0] - anc.ltrb[2] > 0:
  124. # 在anc 右侧找到 标志txt
  125. anc_field = r
  126. result.append(anc_field)
  127. for j in self.ocr_res:
  128. if anc_field and abs(j.ltrb[1] - anc.ltrb[1]) < self.mean_h_esp * 0.8 and \
  129. (j.ltrb[0] - anc_field.ltrb[2] > 0 or abs(j.ltrb[0] - anc_field.ltrb[2]) < 15):
  130. # anc_field 找到后 & r不在result & r在anc同一行 & r在anc_field右侧
  131. result.append(j)
  132. if len(result) == 0:
  133. return
  134. for res in result[1:]:
  135. if res.txt in result[0].txt: continue
  136. result[0] = self.merger_or(result[0], res)
  137. box = result[0].box
  138. box[0][1] = anc.ltrb[1]
  139. box[1][1] = anc.ltrb[1]
  140. return OcrResult(box, result[0].txt, result[0].conf)
  141. def grt_row_up(anc: OcrResult):
  142. result = []
  143. for r in self.ocr_res:
  144. if "天主教" in r.txt:
  145. r.txt = '宗教信仰是天主教'
  146. return r
  147. if '无宗教' in r.txt or '无亲教' in r.txt:
  148. r.txt = '宗教信仰是无宗教信仰'
  149. return r
  150. if "无" in r.txt and len(r.txt) < 3 and anc.center[1] - r.center[1] > 0:
  151. r.txt = '宗教信仰是无'
  152. return r
  153. if anc and(abs(r.ltrb[0] - anc.ltrb[0]) < self.mean_h_esp * 0.8 and r.ltrb[1] - anc.ltrb[1] < 0):
  154. # 宗教信仰在上一行 在血型不能太
  155. result.append(r)
  156. if anc:
  157. if len(result) == 0:
  158. txt = '宗教信仰是'
  159. return OcrResult(anc.box, txt, anc.conf)
  160. for _r in result:
  161. if '宗' in _r.txt or '教' in _r.txt or '信' in _r.txt or '仰' in _r.txt:
  162. _r.txt = '宗教信仰是' + _r.txt
  163. return _r
  164. txt = '宗教信仰是'
  165. return OcrResult(anc.box, txt, anc.conf)
  166. # 剔除曾用名
  167. def del_former_name(fields):
  168. del fields[1]
  169. return fields[1]
  170. # 剔除常住人口登记卡附近火星文
  171. anchor_xy = self.get_xy(self.anchor)
  172. # 行
  173. row_fields = [self.anchor]
  174. for row in self.ocr_res:
  175. if abs(row.ltrb[1] - anchor_xy[1]) < eps and row.ltrb[0] - anchor_xy[0] > 0:
  176. row_fields.append(row)
  177. anchor_xy = self.get_xy(row)
  178. continue
  179. row_fields = sorted(row_fields, key=lambda x: [x.ltrb[1], x.ltrb[0]])
  180. # 列 - 姓名、出生地、籍贯
  181. col_fields = [self.anchor]
  182. for col in self.ocr_res:
  183. if '常' in col.txt or '住' in col.txt or '人' in col.txt or '口' in col.txt: continue
  184. if '农业家' in col.txt or '户' in col.txt or '姓名' in col.txt or '出生地' in col.txt or '居民' in col.txt: continue
  185. if ('户' in col.txt or '性' in col.txt or '民' in col.txt or '出' in col.txt) and abs(col.center[0] - self.anchor.ltrb[2]) < 20: continue
  186. # 处理 名xxx 错误
  187. if (self.anchor.ltrb[0] < col.center[0] < self.anchor.ltrb[2] or
  188. self.anchor.ltrb[0] < col.ltrb[2] < self.anchor.ltrb[2]) and \
  189. self.mean_h_esp * 12.6 > self.anchor.center[1] - col.center[1] > self.mean_h_esp * 3:
  190. col_fields.append(col)
  191. self.get_xy(col)
  192. if len(col.txt) < 5 and col.txt[0] == '名':
  193. col.txt = col.txt.split('名')[-1]
  194. col_fields.append(col)
  195. continue
  196. col_fields = sorted(col_fields, key=lambda x: [x.ltrb[1], x.ltrb[0]])
  197. # 检测曾用名
  198. if len(col_fields[1].txt) < 5:
  199. del_former_name(col_fields)
  200. # 整合住址
  201. col_fields = self.merge_address(col_fields)
  202. # 添加 血型(anchor ->身份证号)
  203. if get_one_line(self.anchor, '血'):
  204. col_fields.append(get_one_line(self.anchor, '血'))
  205. # 添加 宗教(anchor -> 身份证号)
  206. if grt_row_up(get_one_line(self.anchor, '血')):
  207. col_fields.append(grt_row_up(get_one_line(self.anchor, '血')))
  208. return [row_fields, col_fields]
  209. # 整合地址
  210. def merge_address(self, fields: List[OcrResult]):
  211. id_index = 0
  212. fields_on_id = []
  213. for r in range(len(fields)):
  214. code_val = re.findall("\d{10,18}", fields[r].txt)
  215. if len(code_val):
  216. id_index = r
  217. fields_on_id = fields[:id_index]
  218. break
  219. # 对兴安盟科尔做特殊处理
  220. for add in fields_on_id:
  221. if '兴安盟科尔' in add.txt:
  222. num = 1
  223. for add_k, add_v in enumerate(self.ocr_res):
  224. if '前' in add_v.txt:
  225. fields_on_id[num].txt = '内蒙古兴安盟科尔沁右翼前旗' + add_v.txt.split('旗')[-1]
  226. num += 1
  227. if '中' in add_v.txt:
  228. fields_on_id[num].txt = '内蒙古兴安盟科尔沁右翼中旗' + add_v.txt.split('旗')[-1]
  229. num += 1
  230. if '后' in add_v.txt:
  231. fields_on_id[num].txt = '内蒙古兴安盟科尔沁右翼后旗' + add_v.txt.split('旗')[-1]
  232. num += 1
  233. if num > 2: return fields_on_id + fields[id_index:]
  234. # 地址多行三种情况
  235. if len(fields_on_id) == 2:
  236. fields_on_id.append(fields_on_id[1])
  237. return fields_on_id + fields[id_index:]
  238. if len(fields_on_id) == 3:
  239. # 正常地址
  240. return fields_on_id + fields[id_index:]
  241. if len(fields_on_id) == 4:
  242. # 出生地多行 或者 籍贯多行
  243. if len(fields_on_id[-1].txt) < 7:
  244. # 籍贯多行
  245. fields_on_id[-2] = self.merger_or(fields_on_id[-2], fields_on_id[-1])
  246. del fields_on_id[-1]
  247. else:
  248. # 出生地多行
  249. fields_on_id[1] = self.merger_or(fields_on_id[1], fields_on_id[2])
  250. del fields_on_id[2]
  251. return fields_on_id + fields[id_index:]
  252. if len(fields_on_id) == 5:
  253. # 出生地 籍贯 都多行
  254. fields_on_id[1] = self.merger_or(fields_on_id[1], fields_on_id[2])
  255. fields_on_id[-2] = self.merger_or(fields_on_id[-2], fields_on_id[-1])
  256. del fields_on_id[2]
  257. del fields_on_id[-1]
  258. return fields_on_id + fields[id_index:]
  259. def parse(self, eps=5.0):
  260. """
  261. 解析所有字段
  262. :return:
  263. """
  264. ranks_fields: List = self.all_required_fields(eps)
  265. # ranks_fields = [sorted(ranks_fields[0], key=lambda x: [x.ltrb[1], x.ltrb[0]]),
  266. # sorted(ranks_fields[1], key=lambda x: [x.ltrb[1], x.ltrb[0]])]
  267. return ranks_fields
  268. def all_required_fields_f(self, eps):
  269. def merge_address(fields: List[OcrResult]):
  270. if len(fields) == 1:
  271. # 正常地址
  272. return fields
  273. if len(fields) == 2:
  274. # 两行地址
  275. box = fields[0].box
  276. txt = fields[0].txt + fields[1].txt
  277. conf = fields[0].conf
  278. fields[0] = (OcrResult(box, txt, conf))
  279. return fields
  280. def zero_ocr():
  281. return OcrResult(np.zeros((4, 2)), '', 0.)
  282. anchor_xy = self.get_xy(self.anchor)
  283. # 行 -> 住址
  284. row_fields = []
  285. for row in self.ocr_res:
  286. if len(row.txt) == 1 or ('住址' in row.txt and len(row.txt) < 7): continue
  287. if len(row_fields) == 0 and len(row.txt) < 6: continue
  288. if abs(row.ltrb[1] - anchor_xy[1]) < self.mean_h_esp * 1.5 \
  289. and row.center[0] - self.anchor.center[0] > self.anchor.wh[0] * 0.8:
  290. row_fields.append(row)
  291. # if self.anchor.ltrb[1] < row.center[1] < self.anchor.ltrb[3] \
  292. # and row.center[0] - self.anchor.center[0] > 0:
  293. # row_fields.append(row)
  294. if len(row_fields) == 0: row_fields.append(zero_ocr())
  295. merge_address(row_fields)
  296. # 列 -> 户别
  297. col_fields = []
  298. for col in self.ocr_res:
  299. if not col_fields and len(col.txt) == 1: continue
  300. # if abs(col.ltrb[0] - anchor_xy[0]) < eps + 40. and col.ltrb[1] - anchor_xy[1] < 0 \
  301. # and col.ltrb[2] > self.anchor.ltrb[0]:
  302. if self.anchor.ltrb[0] < col.center[0] < self.anchor.ltrb[2] \
  303. and self.anchor.ltrb[1] > col.center[1] > self.mean_h_esp * 1.5:
  304. col_fields.append(col)
  305. if not col_fields: col_fields.append(zero_ocr())
  306. return [row_fields, col_fields]
  307. def parse_f(self, eps=5.0):
  308. """
  309. 解析所有字段
  310. :return:
  311. """
  312. return False if self.anchor is False else self.all_required_fields_f(eps)
  313. # 子类 常驻人口页0
  314. class PeopleParser(RanksParser):
  315. def __init__(self, res: OcrResult):
  316. super(PeopleParser, self).__init__(res)
  317. @property
  318. def anchor(self):
  319. for r in self.ocr_res:
  320. txt = r.txt
  321. if "X" in txt or "x" in txt:
  322. code_val = re.findall("\d*[X|x]", txt)
  323. else:
  324. code_val = re.findall("\d{10,18}", txt)
  325. if len(code_val) > 0 and (len(code_val[0]) == 18 or len(code_val[0]) > 10) and \
  326. self.center_five_row[0] < r.center[1] < self.center_five_row[1]:
  327. return OcrResult(r.box, code_val[0], r.conf)
  328. raise Exception("没有找到身份证号")
  329. def parse(self, eps=5.0):
  330. return super(PeopleParser, self).parse(eps)
  331. # 子类 户口本首页1
  332. class FrontParser(RanksParser):
  333. def __init__(self, res: OcrResult):
  334. super(FrontParser, self).__init__(res)
  335. @property
  336. def anchor(self): # sourcery skip: merge-nested-ifs, reintroduce-else, remove-redundant-continue
  337. res = self.ocr_res
  338. code_val = []
  339. for r in res:
  340. txt = r.txt.replace('-', '')
  341. if bool(re.findall("\d{5,12}", txt)):
  342. # if bool(re.findall(u"[\u4e00-\u9fa5]", txt)): continue
  343. code_val.append(r)
  344. if len(code_val) == 1:
  345. return code_val[0]
  346. elif len(code_val) > 1:
  347. return code_val[1]
  348. # 空间解析失败 换字符串解析
  349. else:
  350. return False
  351. def parse_f(self, eps=5.0):
  352. return super(FrontParser, self).parse_f(eps)