import re from collections import defaultdict from dataclasses import dataclass import random from typing import List import cpca import cv2 import numpy as np import string from paddleocr import PaddleOCR from zhon.hanzi import punctuation import cn2an from core.business_parse import BussinessParse0, BussinessParse1 from core.line_parser import OcrResult from core.square_parser import parser_xy from stamp.d_stamp import send_request def fix_text(text): err_dict = {'伍任': '伍仟', '看翟永奇': '翟永奇', '马依伴中国玻璃网': '马依俤', '20144年': '2014年', '江苏永东方网络': '江苏隽永东方网络', '(': '(', ')': ')', '型型': '类型', '壹任': '壹仟', '查佰': '壹佰'} for k, v in err_dict.items(): text = text.replace(k, v) return text def clear_punctuation(txt): t = txt[:3] for c in string.punctuation: t = t.replace(c, '') for c in punctuation: t = t.replace(c, '') txt = t + txt[3:] return txt @dataclass class RecItem: text: str = '' confidence: float = 0. def to_dict(self): return {"text": self.text.strip(), "confidence": np.nan_to_num(self.confidence)} class Parser(object): def __init__(self, ocr_results: List[List[OcrResult]], raw_results: List): self.result = ocr_results self.res = defaultdict(RecItem) self.raw_results = raw_results self.keys = ["social_code", "company_name", "legal_person", "registered_capital", 'type', 'start_date', "business_scope", 'expire_date', 'address', 'stamp'] for key in self.keys: self.res[key] = RecItem() # ch_an_al = re.compile('[\u4e00-\u9fa5+\u0030-\u0039+\u0041-\u005a+\u0061-\u007a]') for item in self.result: tail = ['', 1.0] for k in range(len(item)): tail[0] = tail[0] + item[k].txt tail[1] = tail[1] + item[k].conf tail[1] = (tail[1] - 1.0) / len(item) item.append(tail) for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] if "登记机关" in txt: self.result = self.result[:i + 1] break raw_OR_list = [OcrResult(np.array(res_raw[0]), res_raw[1][0].replace(' ', ''), res_raw[1][1]) for res_raw in self.raw_results] self.raw_results = raw_OR_list def parse(self): return self.res class BusinessLicenseParser0(Parser): def __init__(self, ocr_results: List[List[OcrResult]], image, raw_results: List): Parser.__init__(self, ocr_results, raw_results) self.image = image def social_code(self): """ 社会信用代码 """ # 得在"营业执照"以下 result = [] for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] if "统一社" in txt or "会信用" in txt or "用代码" in txt: result = self.result[i:] break for i in range(len(result)): res = result[i] txt = res[-1][0] conf = res[-1][1] code = ''.join(re.findall(u'[\u0030-\u0039+\u0041-\u005a+\u0061-\u007a]{15,18}', txt)) if len(code): self.res['social_code'] = RecItem(code, conf) return def company_name(self): """ 公司名称 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] if '称尔' in txt: txt = txt.replace('称尔', '称') if '名' in txt[:4] and '称' in txt[:4]: txt = '名称' + txt.split('称')[-1] if '名称' in txt: company_name = txt.split('名称')[-1].split('注册')[0].split('组织')[0].split('组成')[0] self.res['company_name'] = RecItem(fix_text(clear_punctuation(company_name)), conf) return if '称' in txt and txt[0] == '称' and len(txt) > 5: company_name = txt.split('称')[-1].split('注册')[0].split('组织')[0].split('组成')[0] self.res['company_name'] = RecItem(fix_text(clear_punctuation(company_name)), conf) return def legal_person(self): """ 法人姓名 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0].replace('市场监督', '') conf = res[-1][1] if '法定代表人' in txt or '代表人' in txt: legal_person = txt.split('代表人')[-1].split('营业')[0] self.res['legal_person'] = RecItem(fix_text(clear_punctuation(legal_person)), conf) return if '经营者' in txt: legal_person = txt.split('经营者')[-1].split('经营')[0] self.res['legal_person'] = RecItem(fix_text(clear_punctuation(legal_person)), conf) return if '负责人' in txt: legal_person = txt.split('负责人')[-1].split('责人')[0] self.res['legal_person'] = RecItem(fix_text(clear_punctuation(legal_person)), conf) return def registered_capital(self): """ 注册资本 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] txt = fix_text(txt) if '注册资本' in txt: if '人民币' in txt[:4]: registered_capital = txt.split('人民币')[-1].split('万元')[0] txt = f'人民币{self.cn_to_an(fix_text(clear_punctuation(registered_capital)))}' elif '美元' in txt[:4]: registered_capital = txt.split('美元')[-1].split('万元')[0] txt = f'美元{self.cn_to_an(fix_text(clear_punctuation(registered_capital)))}' elif '人民币' in txt[-4:]: registered_capital = txt.split('资本')[-1].split('人民币')[0].split('万元')[0] txt = f'{self.cn_to_an(fix_text(clear_punctuation(registered_capital)))}人民币' else: registered_capital = txt.split('资本')[-1].split('币')[-1].split('万')[0] txt = self.cn_to_an(fix_text(clear_punctuation(registered_capital))) self.res['registered_capital'] = RecItem(txt, conf) return def type(self): # sourcery skip: hoist-similar-statement-from-if """ 类型 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] txt = fix_text(clear_punctuation(txt)) if '类型' in txt: txt = txt.split('类型')[-1].split('成立')[0].split('注册')[0] if '公司' in txt: t_s, s_e = txt.split('公司')[0], txt.split('公司')[-1].replace('(', '').replace(')', '').replace('(', '').replace( ')', '') # 分公司 if '分公司' in txt: t_s = f'{t_s}公司分' txt = f'{t_s}公司({s_e})' if s_e else f'{t_s}公司' if txt[0] == '型': txt = txt[1:] self.res['type'] = RecItem(txt, conf) return def start_date(self): """ 成立日期 ⚠️ 注册日期 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] txt = fix_text(txt) if '日期' in txt: txt = txt.split('日期')[-1] date = self.to_date(txt) self.res['start_date'] = RecItem(date, conf) def expire_date(self): # sourcery skip: hoist-similar-statement-from-if """ 有效期 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] if '期限' in txt: if '至' in txt: txt = ''.join(txt.split('期限')[1:]).replace('*', '') date_from = txt.split('至')[0] date_to = txt.split('至')[-1] date = f'{self.to_date(date_from)} 至 {self.to_date(date_to)}' self.res['expire_date'] = RecItem(date, conf) return if '长期' in txt: self.res['expire_date'] = RecItem('长期', conf) return else: self.res['expire_date'] = RecItem('', conf) return def business_scope(self): """ 经营范围 """ ocr = PaddleOCR(use_gpu=True) bs_txt, bs_conf, ad_txt, ad_conf = BussinessParse0(ocr).detection(self.image, self.raw_results) if bool(bs_txt): self.res['business_scope'] = RecItem(bs_txt, bs_conf) add_or_0: OcrResult = parser_xy(self.result, self.raw_results, '住所') if add_or_0: add_or = add_or_0 else: add_or_1: OcrResult = parser_xy(self.result, self.raw_results, '场所') if add_or_1: add_or = add_or_1 else: return txt = add_or.txt if '所' in txt[:3] or '厂' in txt[:3]: txt = txt[:3].split('所')[-1].replace('厂', '') + txt[3:] if len(ad_txt) >= len(txt): self.res['address'] = RecItem(ad_txt, ad_conf) else: self.res['address'] = RecItem(txt, add_or.conf) return def address(self): # sourcery skip: use-named-expression """ 住所 """ add_or_0: OcrResult = parser_xy(self.result, self.raw_results, '住所') if add_or_0: add_or = add_or_0 else: add_or_1: OcrResult = parser_xy(self.result, self.raw_results, '场所') if add_or_1: add_or = add_or_1 else: return txt = add_or.txt if '所' in txt[:3] or '厂' in txt[:3]: txt = txt[:3].split('所')[-1].replace('厂', '') + txt[3:] self.res['address'] = RecItem(txt, add_or.conf) return def stamp(self): """ 印章检测 """ self.res['stamp'] = RecItem(send_request(self.image), 1.) return @staticmethod def cn_to_an(num): try: num = int(num) except ValueError: num = str(cn2an.cn2an(f'{num}万'))[:-4] except Exception: raise Exception('注册资本转化出错') finally: return f'{num}万元' @staticmethod def to_date(txt): if '长期' in txt: return '长期' if '永久' in txt: return '永久' if '不约定' in txt: return '不约定期限' date_in = re.findall(r"\d+", txt) if len(date_in) == 3: return f'{date_in[0][-4:]}年{date_in[1]}月{date_in[2]}日' else: return '' # ["social_code", "company_name", "legal_person", "registered_capital", 'type', 'start_date', # "business_scope", 'expire_date', 'address', 'stamp'] def parse(self): self.social_code() self.company_name() self.legal_person() self.registered_capital() self.type() self.start_date() self.expire_date() self.business_scope() # self.address() self.stamp() return {key: self.res[key].to_dict() for key in self.keys} class BusinessLicenseParser1(Parser): def __init__(self, ocr_results: List[List[OcrResult]], image, raw_results: List): Parser.__init__(self, ocr_results, raw_results) self.image = image self.ocr = PaddleOCR(use_gpu=True) def social_code(self): """ 社会信用代码 """ # 得在"营业执照"以下 result = [] for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] if "统一社" in txt or "会信用" in txt or "用代码" in txt: result = self.result[i:] break for i in range(len(result)): res = result[i] txt = res[-1][0] conf = res[-1][1] code = ''.join(re.findall(u'[\u0030-\u0039+\u0041-\u005a+\u0061-\u007a]{15,18}', txt)) if len(code): self.res['social_code'] = RecItem(code, conf) return def company_name(self): """ 公司名称 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] if '称尔' in txt: txt = txt.replace('称尔', '称') if '名' in txt[:4] and '称' in txt[:4]: txt = '名称' + txt.split('称')[-1] if '名称' in txt: company_name = txt.split('名称')[-1].split('注册')[0].split('组织')[0].split('组成')[0] self.res['company_name'] = RecItem(fix_text(clear_punctuation(company_name)), conf) return if '称' in txt and txt[0] == '称' and len(txt) > 5: company_name = txt.split('称')[-1].split('注册')[0].split('组织')[0].split('组成')[0] self.res['company_name'] = RecItem(fix_text(clear_punctuation(company_name)), conf) return def legal_person(self): """ 法人姓名 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0].replace('市场监督', '') conf = res[-1][1] if '法定代表人' in txt or '代表人' in txt: legal_person = txt.split('代表人')[-1].split('营业')[0] self.res['legal_person'] = RecItem(fix_text(clear_punctuation(legal_person)), conf) return if '经营者' in txt: legal_person = txt.split('经营者')[-1].split('经营')[0] self.res['legal_person'] = RecItem(fix_text(clear_punctuation(legal_person)), conf) return if '负责人' in txt: legal_person = txt.split('负责人')[-1].split('责人')[0] self.res['legal_person'] = RecItem(fix_text(clear_punctuation(legal_person)), conf) return def registered_capital(self): """ 注册资本 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] txt = fix_text(txt) if '注册资本' in txt: if '人民币' in txt[:4]: registered_capital = txt.split('人民币')[-1].split('万元')[0] txt = f'人民币{self.cn_to_an(fix_text(clear_punctuation(registered_capital)))}' elif '美元' in txt[:4]: registered_capital = txt.split('美元')[-1].split('万元')[0] txt = f'美元{self.cn_to_an(fix_text(clear_punctuation(registered_capital)))}' elif '人民币' in txt[-4:]: registered_capital = txt.split('资本')[-1].split('人民币')[0].split('万元')[0] txt = f'{self.cn_to_an(fix_text(clear_punctuation(registered_capital)))}人民币' else: registered_capital = txt.split('资本')[-1].split('币')[-1].split('万')[0] txt = self.cn_to_an(fix_text(clear_punctuation(registered_capital))) self.res['registered_capital'] = RecItem(txt, conf) return def type(self): # sourcery skip: hoist-similar-statement-from-if """ 类型 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] txt = fix_text(clear_punctuation(txt)) if '类型' in txt: txt = txt.split('类型')[-1].split('成立')[0].split('注册')[0] if '公司' in txt: t_s, s_e = txt.split('公司')[0], txt.split('公司')[-1].replace('(', '').replace(')', '').replace('(', '').replace( ')', '') # 分公司 if '分公司' in txt: t_s = f'{t_s}公司分' txt = f'{t_s}公司({s_e})' if s_e else f'{t_s}公司' if txt[0] == '型': txt = txt[1:] self.res['type'] = RecItem(txt, conf) return def start_date(self): """ 成立日期 ⚠️ 注册日期 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] txt = fix_text(txt) if '日期' in txt: txt = txt.split('日期')[-1] date = self.to_date(txt) self.res['start_date'] = RecItem(date, conf) def expire_date(self): # sourcery skip: hoist-similar-statement-from-if """ 有效期 """ for i in range(len(self.result)): res = self.result[i] txt = res[-1][0] conf = res[-1][1] if '期限' in txt: if '至' in txt: txt = ''.join(txt.split('期限')[1:]).replace('*', '') date_from = txt.split('至')[0] date_to = txt.split('至')[-1] date = f'{self.to_date(date_from)} 至 {self.to_date(date_to)}' self.res['expire_date'] = RecItem(date, conf) return if '长期' in txt: self.res['expire_date'] = RecItem('长期', conf) return else: self.res['expire_date'] = RecItem('', conf) return def business_scope(self): """ 经营范围 """ print('-------------经营范围处理开始--------------') bs_txt, bs_conf = BussinessParse1(self.ocr).bs_detection(self.image, self.raw_results) if bool(bs_txt): self.res['business_scope'] = RecItem(bs_txt, bs_conf) # sb_or: OcrResult = parser_xy(self.result, self.raw_results, '经营范围') # if bool(sb_or): # self.res['business_scope'] = RecItem(sb_or.txt, sb_or.conf) # else: # self.res['business_scope'] = RecItem('经营范围', random.random()) print('-------------经营范围处理结束--------------') return def address(self): # sourcery skip: use-named-expression """ 住所 """ # 切割方案 ad_txt, ad_conf = BussinessParse1(self.ocr).ad_detection(self.image, self.raw_results) # 关键字方案 add_or_0: OcrResult = parser_xy(self.result, self.raw_results, '住所') if add_or_0: add_or = add_or_0 else: add_or_1: OcrResult = parser_xy(self.result, self.raw_results, '场所') if add_or_1: add_or = add_or_1 else: return txt = add_or.txt if '所' in txt[:3] or '厂' in txt[:3]: txt = txt[:3].split('所')[-1].replace('厂', '') + txt[3:] if len(ad_txt) >= len(txt): self.res['address'] = RecItem(ad_txt, ad_conf) else: self.res['address'] = RecItem(txt, add_or.conf) return def stamp(self): """ 印章检测 """ self.res['stamp'] = RecItem(send_request(self.image), 1.) return @staticmethod def cn_to_an(num): try: num = int(num) except ValueError: num = str(cn2an.cn2an(f'{num}万'))[:-4] except Exception: raise Exception('注册资本转化出错') finally: return f'{num}万元' @staticmethod def to_date(txt): if '长期' in txt: return '长期' if '永久' in txt: return '永久' if '不约定' in txt: return '不约定期限' date_in = re.findall(r"\d+", txt) if len(date_in) == 3: return f'{date_in[0][-4:]}年{date_in[1]}月{date_in[2]}日' else: return '' # ["social_code", "company_name", "legal_person", "registered_capital", 'type', 'start_date', # "business_scope", 'expire_date', 'address', 'stamp'] def parse(self): self.social_code() self.company_name() self.legal_person() self.registered_capital() self.type() self.start_date() self.expire_date() self.business_scope() self.address() self.stamp() return {key: self.res[key].to_dict() for key in self.keys}