Commit a9933f90 authored by Cao Duc Anh's avatar Cao Duc Anh

add export onnx

parent e61ef9b9
.vscode
__pycached__
\ No newline at end of file
This diff is collapsed.
# ConvertVietOcr2Onnx
Tutorial: [Chuyển đổi mô hình học sâu về ONNX](https://viblo.asia/p/chuyen-doi-mo-hinh-hoc-sau-ve-onnx-bWrZnz4vZxw)
# change to list chars of your dataset or use default vietnamese chars
vocab: 'aAàÀảẢãÃáÁạẠăĂằẰẳẲẵẴắẮặẶâÂầẦẩẨẫẪấẤậẬbBcCdDđĐeEèÈẻẺẽẼéÉẹẸêÊềỀểỂễỄếẾệỆfFgGhHiIìÌỉỈĩĨíÍịỊjJkKlLmMnNoOòÒỏỎõÕóÓọỌôÔồỒổỔỗỖốỐộỘơƠờỜởỞỡỠớỚợỢpPqQrRsStTuUùÙủỦũŨúÚụỤưƯừỪửỬữỮứỨựỰvVwWxXyYỳỲỷỶỹỸýÝỵỴzZ0123456789!"#$%&''()*+,-./:;<=>?@[\]^_`{|}~ '
# cpu, cuda, cuda:0
device: cuda:0
seq_modeling: transformer
transformer:
d_model: 256
nhead: 8
num_encoder_layers: 6
num_decoder_layers: 6
dim_feedforward: 2048
max_seq_length: 1024
pos_dropout: 0.1
trans_dropout: 0.1
optimizer:
max_lr: 0.0003
pct_start: 0.1
trainer:
batch_size: 32
print_every: 200
valid_every: 4000
iters: 100000
# where to save our model for prediction
export: ./weights/transformerocr.pth
checkpoint: ./checkpoint/transformerocr_checkpoint.pth
log: ./train.log
# null to disable compuate accuracy, or change to number of sample to enable validiation while training
metrics: null
dataset:
# name of your dataset
name: data
# path to annotation and image
data_root: ./img/
train_annotation: annotation_train.txt
valid_annotation: annotation_val_small.txt
# resize image to 32 height, larger height will increase accuracy
image_height: 32
image_min_width: 32
image_max_width: 512
dataloader:
num_workers: 3
pin_memory: True
aug:
image_aug: true
masked_language_model: true
predictor:
# disable or enable beamsearch while prediction, use beamsearch will be slower
beamsearch: False
quiet: False
\ No newline at end of file
import torch
from torch import nn
import model.backbone.vgg as vgg
class CNN(nn.Module):
def __init__(self, backbone, **kwargs):
super(CNN, self).__init__()
if backbone == 'vgg11_bn':
self.model = vgg.vgg11_bn(**kwargs)
elif backbone == 'vgg19_bn':
self.model = vgg.vgg19_bn(**kwargs)
def forward(self, x):
return self.model(x)
def freeze(self):
for name, param in self.model.features.named_parameters():
if name != 'last_conv_1x1':
param.requires_grad = False
def unfreeze(self):
for param in self.model.features.parameters():
param.requires_grad = True
\ No newline at end of file
import torch
from torch import nn
from torchvision import models
from einops import rearrange
from torchvision.models._utils import IntermediateLayerGetter
class Vgg(nn.Module):
def __init__(self, name, ss, ks, hidden, pretrained=True, dropout=0.5):
super(Vgg, self).__init__()
if name == 'vgg11_bn':
cnn = models.vgg11_bn(pretrained=pretrained)
elif name == 'vgg19_bn':
cnn = models.vgg19_bn(pretrained=pretrained)
pool_idx = 0
for i, layer in enumerate(cnn.features):
if isinstance(layer, torch.nn.MaxPool2d):
cnn.features[i] = torch.nn.AvgPool2d(kernel_size=ks[pool_idx], stride=ss[pool_idx], padding=0)
pool_idx += 1
self.features = cnn.features
self.dropout = nn.Dropout(dropout)
self.last_conv_1x1 = nn.Conv2d(512, hidden, 1)
def forward(self, x):
"""
Shape:
- x: (N, C, H, W)
- output: (W, N, C)
"""
conv = self.features(x)
conv = self.dropout(conv)
conv = self.last_conv_1x1(conv)
# conv = rearrange(conv, 'b d h w -> b d (w h)')
conv = conv.permute(0, 1, 3, 2)
conv = conv.flatten(2)
conv = conv.permute(2, 0, 1)
return conv
def vgg11_bn(ss, ks, hidden, pretrained=True, dropout=0.5):
return Vgg('vgg11_bn', ss, ks, hidden, pretrained, dropout)
def vgg19_bn(ss, ks, hidden, pretrained=True, dropout=0.5):
return Vgg('vgg19_bn', ss, ks, hidden, pretrained, dropout)
\ No newline at end of file
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self, emb_dim, enc_hid_dim, dec_hid_dim, dropout):
super().__init__()
self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional = True)
self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, src):
"""
src: src_len x batch_size x img_channel
outputs: src_len x batch_size x hid_dim
hidden: batch_size x hid_dim
"""
embedded = self.dropout(src)
outputs, hidden = self.rnn(embedded)
hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))
return outputs, hidden
class Attention(nn.Module):
def __init__(self, enc_hid_dim, dec_hid_dim):
super().__init__()
self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim)
self.v = nn.Linear(dec_hid_dim, 1, bias = False)
def forward(self, hidden, encoder_outputs):
"""
hidden: batch_size x hid_dim
encoder_outputs: src_len x batch_size x hid_dim,
outputs: batch_size x src_len
"""
batch_size = encoder_outputs.shape[1]
src_len = encoder_outputs.shape[0]
hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
encoder_outputs = encoder_outputs.permute(1, 0, 2)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim = 2)))
attention = self.v(energy).squeeze(2)
return F.softmax(attention, dim = 1)
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention):
super().__init__()
self.output_dim = output_dim
self.attention = attention
self.embedding = nn.Embedding(output_dim, emb_dim)
self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)
self.fc_out = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, encoder_outputs):
"""
inputs: batch_size
hidden: batch_size x hid_dim
encoder_outputs: src_len x batch_size x hid_dim
"""
input = input.unsqueeze(0)
embedded = self.dropout(self.embedding(input))
a = self.attention(hidden, encoder_outputs)
a = a.unsqueeze(1)
encoder_outputs = encoder_outputs.permute(1, 0, 2)
weighted = torch.bmm(a, encoder_outputs)
weighted = weighted.permute(1, 0, 2)
rnn_input = torch.cat((embedded, weighted), dim = 2)
output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
assert (output == hidden).all()
embedded = embedded.squeeze(0)
output = output.squeeze(0)
weighted = weighted.squeeze(0)
prediction = self.fc_out(torch.cat((output, weighted, embedded), dim = 1))
return prediction, hidden.squeeze(0), a.squeeze(1)
class Seq2Seq(nn.Module):
def __init__(self, vocab_size, encoder_hidden, decoder_hidden, img_channel, decoder_embedded, dropout=0.1):
super().__init__()
attn = Attention(encoder_hidden, decoder_hidden)
self.encoder = Encoder(img_channel, encoder_hidden, decoder_hidden, dropout)
self.decoder = Decoder(vocab_size, decoder_embedded, encoder_hidden, decoder_hidden, dropout, attn)
def forward_encoder(self, src):
"""
src: timestep x batch_size x channel
hidden: batch_size x hid_dim
encoder_outputs: src_len x batch_size x hid_dim
"""
encoder_outputs, hidden = self.encoder(src)
return (hidden, encoder_outputs)
def forward_decoder(self, tgt, memory):
"""
tgt: timestep x batch_size
hidden: batch_size x hid_dim
encouder: src_len x batch_size x hid_dim
output: batch_size x 1 x vocab_size
"""
tgt = tgt[-1]
hidden, encoder_outputs = memory
output, hidden, _ = self.decoder(tgt, hidden, encoder_outputs)
output = output.unsqueeze(1)
return output, (hidden, encoder_outputs)
def forward(self, src, trg):
"""
src: time_step x batch_size
trg: time_step x batch_size
outputs: batch_size x time_step x vocab_size
"""
batch_size = src.shape[1]
trg_len = trg.shape[0]
trg_vocab_size = self.decoder.output_dim
device = src.device
outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(device)
encoder_outputs, hidden = self.encoder(src)
for t in range(trg_len):
input = trg[t]
output, hidden, _ = self.decoder(input, hidden, encoder_outputs)
outputs[t] = output
outputs = outputs.transpose(0, 1).contiguous()
return outputs
def expand_memory(self, memory, beam_size):
hidden, encoder_outputs = memory
hidden = hidden.repeat(beam_size, 1)
encoder_outputs = encoder_outputs.repeat(1, beam_size, 1)
return (hidden, encoder_outputs)
def get_memory(self, memory, i):
hidden, encoder_outputs = memory
hidden = hidden[[i]]
encoder_outputs = encoder_outputs[:, [i],:]
return (hidden, encoder_outputs)
from model.backbone.cnn import CNN
from model.seqmodel.seq2seq import Seq2Seq
from torch import nn
class VietOCR(nn.Module):
def __init__(self, vocab_size,
backbone,
cnn_args,
transformer_args, seq_modeling='transformer'):
super(VietOCR, self).__init__()
self.cnn = CNN(backbone, **cnn_args)
self.seq_modeling = seq_modeling
self.transformer = Seq2Seq(vocab_size, **transformer_args)
def forward(self, img, tgt_input, tgt_key_padding_mask):
"""
Shape:
- img: (N, C, H, W)
- tgt_input: (T, N)
- tgt_key_padding_mask: (N, T)
- output: b t v
"""
src = self.cnn(img)
outputs = self.transformer(src, tgt_input)
return outputs
\ No newline at end of file
class Vocab():
def __init__(self, chars):
self.pad = 0
self.go = 1
self.eos = 2
self.mask_token = 3
self.chars = chars
self.c2i = {c:i+4 for i, c in enumerate(chars)}
self.i2c = {i+4:c for i, c in enumerate(chars)}
self.i2c[0] = '<pad>'
self.i2c[1] = '<sos>'
self.i2c[2] = '<eos>'
self.i2c[3] = '*'
def encode(self, chars):
return [self.go] + [self.c2i[c] for c in chars] + [self.eos]
def decode(self, ids):
first = 1 if self.go in ids else 0
last = ids.index(self.eos) if self.eos in ids else None
sent = ''.join([self.i2c[i] for i in ids[first:last]])
return sent
def __len__(self):
return len(self.c2i) + 4
def batch_decode(self, arr):
texts = [self.decode(ids) for ids in arr]
return texts
def __str__(self):
return self.chars
import yaml
def load_config(config_file):
with open(config_file, encoding='utf-8') as f:
config = yaml.safe_load(f)
return config
class Cfg(dict):
def __init__(self, config_dict):
super(Cfg, self).__init__(**config_dict)
self.__dict__ = self
@staticmethod
def load_config_from_file(fname, base_file='./config/base.yml'):
base_config = load_config(base_file)
with open(fname, encoding='utf-8') as f:
config = yaml.safe_load(f)
base_config.update(config)
return Cfg(base_config)
def save(self, fname):
with open(fname, 'w') as outfile:
yaml.dump(dict(self), outfile, default_flow_style=False, allow_unicode=True)
import torch
import numpy as np
import cv2
from model.vocab import Vocab
from model.transformerocr import VietOCR
import math
from PIL import Image
def translate(img, model, max_seq_length=128, sos_token=1, eos_token=2):
"""data: BxCxHxW"""
model.eval()
device = img.device
with torch.no_grad():
src = model.cnn(img)
memory = model.transformer.forward_encoder(src)
translated_sentence = [[sos_token] * len(img)]
max_length = 0
while max_length <= max_seq_length and not all(np.any(np.asarray(translated_sentence).T == eos_token, axis=1)):
tgt_inp = torch.LongTensor(translated_sentence).to(device)
output, memory = model.transformer.forward_decoder(tgt_inp, memory)
output = output.to('cpu')
values, indices = torch.topk(output, 1)
indices = indices[:, -1, 0]
indices = indices.tolist()
translated_sentence.append(indices)
max_length += 1
del output
translated_sentence = np.asarray(translated_sentence).T
return translated_sentence
def build_model(config):
vocab = Vocab(config['vocab'])
device = config['device']
model = VietOCR(len(vocab),
config['backbone'],
config['cnn'],
config['transformer'],
config['seq_modeling'])
model = model.to(device)
return model, vocab
def resize(w, h, expected_height, image_min_width, image_max_width):
new_w = int(expected_height * float(w) / float(h))
round_to = 10
new_w = math.ceil(new_w/round_to)*round_to
new_w = max(new_w, image_min_width)
new_w = min(new_w, image_max_width)
return new_w, expected_height
def process_image(image, image_height, image_min_width, image_max_width):
img = image.convert('RGB')
w, h = img.size
new_w, image_height = resize(w, h, image_height, image_min_width, image_max_width)
img = img.resize((new_w, image_height), Image.ANTIALIAS)
img = np.asarray(img).transpose(2,0, 1)
img = img/255
return img
def process_input(image, image_height, image_min_width, image_max_width):
img = process_image(image, image_height, image_min_width, image_max_width)
img = img[np.newaxis, ...]
img = torch.FloatTensor(img)
return img
...@@ -41,4 +41,26 @@ test_annotation dùng để tính valid loss. ...@@ -41,4 +41,26 @@ test_annotation dùng để tính valid loss.
docker-compose -f training.docker-compose.yml up --build docker-compose -f training.docker-compose.yml up --build
``` ```
Theo dõi kết quả trên màn hình terminal. <br> Theo dõi kết quả trên màn hình terminal. <br>
Model sau khi train được lưu tại **"./weights"** Model sau khi train được lưu tại **"./weights"**
\ No newline at end of file
Sau khi train: docker system prune
## Export model Pytorch to ONNX
### 1. Config
Model pth được lưu tại **"./weights"** <br>
Chỉnh sửa file **vgg-seq2seq.yml**. Các thông tin cần lưu ý:
```
device: cuda:0
transformer:
encoder_hidden: 256
decoder_hidden: 256
img_channel: 256
decoder_embedded: 256
dropout: 0.1
```
### 2. Run
```
docker-compose -f export.docker-compose.yml up --build
```
Model sau khi export được lưu tại **"./weights"**
\ No newline at end of file
...@@ -40,17 +40,17 @@ dataset: ...@@ -40,17 +40,17 @@ dataset:
valid_annotation: test_annotation.txt valid_annotation: test_annotation.txt
device: cuda:0 device: cuda:0
optimizer: optimizer:
max_lr: 0.0003 max_lr: 0.001
pct_start: 0.1 pct_start: 0.1
predictor: predictor:
beamsearch: false beamsearch: false
pretrain: https://vocr.vn/data/vietocr/vgg_transformer.pth pretrain: https://vocr.vn/data/vietocr/vgg_seq2seq.pth
quiet: false quiet: false
seq_modeling: transformer seq_modeling: seq2seq
trainer: trainer:
batch_size: 8 batch_size: 8
checkpoint: ./checkpoint/transformerocr_checkpoint.pth checkpoint: ./checkpoint/transformerocr_checkpoint.pth
export: ./weights/transformerocr.pth export: ./weights/vetocr.pth
iters: 20000 iters: 20000
log: ./train.log log: ./train.log
metrics: 2 metrics: 2
...@@ -66,4 +66,4 @@ transformer: ...@@ -66,4 +66,4 @@ transformer:
pos_dropout: 0.1 pos_dropout: 0.1
trans_dropout: 0.1 trans_dropout: 0.1
vocab: 'aAàÀảẢãÃáÁạẠăĂằẰẳẲẵẴắẮặẶâÂầẦẩẨẫẪấẤậẬbBcCdDđĐeEèÈẻẺẽẼéÉẹẸêÊềỀểỂễỄếẾệỆfFgGhHiIìÌỉỈĩĨíÍịỊjJkKlLmMnNoOòÒỏỎõÕóÓọỌôÔồỒổỔỗỖốỐộỘơƠờỜởỞỡỠớỚợỢpPqQrRsStTuUùÙủỦũŨúÚụỤưƯừỪửỬữỮứỨựỰvVwWxXyYỳỲỷỶỹỸýÝỵỴzZ0123456789!"#$%&''()*+,-./:;<=>?@[\]^_`{|}~ ' vocab: 'aAàÀảẢãÃáÁạẠăĂằẰẳẲẵẴắẮặẶâÂầẦẩẨẫẪấẤậẬbBcCdDđĐeEèÈẻẺẽẼéÉẹẸêÊềỀểỂễỄếẾệỆfFgGhHiIìÌỉỈĩĨíÍịỊjJkKlLmMnNoOòÒỏỎõÕóÓọỌôÔồỒổỔỗỖốỐộỘơƠờỜởỞỡỠớỚợỢpPqQrRsStTuUùÙủỦũŨúÚụỤưƯừỪửỬữỮứỨựỰvVwWxXyYỳỲỷỶỹỸýÝỵỴzZ0123456789!"#$%&''()*+,-./:;<=>?@[\]^_`{|}~ '
weights: https://vocr.vn/data/vietocr/vgg_transformer.pth weights: https://vocr.vn/data/vietocr/vgg_seq2seq.pth
FROM pytorch/pytorch:2.3.1-cuda11.8-cudnn8-runtime
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=True \
PORT=9090
# Install dependencies
RUN apt-get update \
&& apt-get install -y wget libgl1-mesa-glx libglib2.0-0
WORKDIR /src
COPY ./export_requirements.txt /src/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY ./ConvertVietOcr2Onnx/ /src/
COPY ./export.py /src/
ENV PYTHONPATH=/src
\ No newline at end of file
version: '3.9'
services:
export-vietocr:
build:
context: ./
dockerfile: export.Dockerfile
volumes:
- ./vgg-seq2seq.yml:/src/config/export_config.yml
- ./weights:/src/weights/
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids: ['0']
capabilities: [gpu]
command: python export.py
# tty: True
\ No newline at end of file
import matplotlib.pyplot as plt
from tool.config import Cfg
from tool.translate import build_model, process_input, translate
import torch
import onnxruntime
import numpy as np
config = Cfg.load_config_from_file('/src/config/export_config.yml')
config['cnn']['pretrained']=False
weight_path = '/src/weights/vetocr.pth'
# build model
model, vocab = build_model(config)
# load weight
model.load_state_dict(torch.load(weight_path, map_location=torch.device(config['device'])))
model = model.eval()
# Export mô hình CNN
def convert_cnn_part(img, save_path, model):
with torch.no_grad():
src = model.cnn(img)
torch.onnx.export(model.cnn, img, save_path, export_params=True, opset_version=12, do_constant_folding=True, verbose=True, input_names=['img'], output_names=['output'], dynamic_axes={'img': {3: 'lenght'}, 'output': {0: 'channel'}})
return src
img = torch.rand(1, 3, 32, 475).cuda()
src = convert_cnn_part(img, '/src/weights/cnn.onnx', model)
# Export mô hình Encoder
def convert_encoder_part(model, src, save_path):
encoder_outputs, hidden = model.transformer.encoder(src)
torch.onnx.export(model.transformer.encoder, src, save_path, export_params=True, opset_version=11, do_constant_folding=True, input_names=['src'], output_names=['encoder_outputs', 'hidden'], dynamic_axes={'src':{0: "channel_input"}, 'encoder_outputs': {0: 'channel_output'}})
return hidden, encoder_outputs
hidden, encoder_outputs = convert_encoder_part(model, src, '/src/weights/encoder.onnx')
# Export mô hình Decoder
def convert_decoder_part(model, tgt, hidden, encoder_outputs, save_path):
tgt = tgt[-1]
torch.onnx.export(model.transformer.decoder,
(tgt, hidden, encoder_outputs),
save_path,
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['tgt', 'hidden', 'encoder_outputs'],
output_names=['output', 'hidden_out', 'last'],
dynamic_axes={'encoder_outputs':{0: "channel_input"},
'last': {0: 'channel_output'}})
device = img.device
tgt = torch.LongTensor([[1] * len(img)]).to(device)
convert_decoder_part(model, tgt, hidden, encoder_outputs, '/src/weights/decoder.onnx')
# Kiểm tra mô hình sau khi chuyển đổi
import onnx
# load model from onnx
cnn = onnx.load('/src/weights/cnn.onnx')
decoder = onnx.load('/src/weights/encoder.onnx')
encoder = onnx.load('/src/weights/decoder.onnx')
# confirm model has valid schema
onnx.checker.check_model(cnn)
onnx.checker.check_model(decoder)
onnx.checker.check_model(encoder)
# Print a human readable representation of the graph
onnx.helper.printable_graph(encoder.graph)
\ No newline at end of file
# Base on pytorch/pytorch:2.3.1-cuda11.8-cudnn8-runtime
vietocr
albumentations
matplotlib
onnxruntime
\ No newline at end of file
pretrain:
id_or_url: 1nTKlEog9YFK74kPyX0qLwCWi60_YHHk4
md5: efcabaa6d3adfca8e52bda2fd7d2ee04
cached: /tmp/tranformerorc.pth
device: cuda:0
# url or local path
weights: https://drive.google.com/uc?id=1nTKlEog9YFK74kPyX0qLwCWi60_YHHk4
backbone: vgg19_bn
cnn:
# pooling stride size
ss:
- [2, 2]
- [2, 2]
- [2, 1]
- [2, 1]
- [1, 1]
# pooling kernel size
ks:
- [2, 2]
- [2, 2]
- [2, 1]
- [2, 1]
- [1, 1]
# dim of ouput feature map
hidden: 256
seq_modeling: seq2seq
transformer:
encoder_hidden: 256
decoder_hidden: 256
img_channel: 256
decoder_embedded: 256
dropout: 0.1
optimizer:
max_lr: 0.001
pct_start: 0.1
\ No newline at end of file
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
This diff is collapsed.
# VietOCR
**Các bạn vui lòng cập nhật lên version mới nhất để không xảy ra lỗi.**
<p align="center">
<img src="https://github.com/pbcquoc/vietocr/raw/master/image/sample.png" width="1000" height="300">
</p>
Trong project này, mình cài đặt mô hình Transformer OCR nhận dạng chữ viết tay, chữ đánh máy cho Tiếng Việt. Kiến trúc mô hình là sự kết hợp tuyệt vời giữ mô hình CNN và Transformer (là mô hình nền tảng của BERT khá nổi tiếng). Mô hình TransformerOCR có rất nhiều ưu điểm so với kiến trúc của mô hình CRNN đã được mình cài đặt. Các bạn có thể đọc [tại](https://pbcquoc.github.io/vietocr) đây về kiến trúc và cách huấn luyện mô hình với các tập dữ liệu khác nhau.
Mô hình VietOCR có tính tổng quát cực tốt, thậm chí có độ chính xác khá cao trên một bộ dataset mới mặc dù mô hình chưa được huấn luyện bao giờ.
<p align="center">
<img src="https://raw.githubusercontent.com/pbcquoc/vietocr/master/image/vietocr.jpg" width="512" height="614">
</p>
# Cài Đặt
Để cài đặt các bạn gõ lệnh sau
```
pip install vietocr
```
# Quick Start
Các bạn tham khảo notebook [này](https://github.com/pbcquoc/vietocr/blob/master/vietocr_gettingstart.ipynb) để biết cách sử dụng nhé.
# Cách tạo file train/test
File train/test có 2 cột, cột đầu tiên là tên file, cột thứ 2 là nhãn(không chứa kí tự \t), 2 cột này cách nhau bằng \t
```
20160518_0151_25432_1_tg_3_5.png để nghe phổ biến chủ trương của UBND tỉnh Phú Yên
20160421_0102_25464_2_tg_0_4.png môi trường lại đều đồng thanh
```
Tham khảo file mẫu tại [đây](https://vocr.vn/data/vietocr/data_line.zip)
# Model Zoo
Thư viện này cài đặt cả 2 kiểu seq model đó là attention seq2seq và transfomer. Seq2seq có tốc độ dự đoán rất nhanh và được dùng trong industry khá nhiều, tuy nhiên transformer lại chính xác hơn nhưng lúc dự đoán lại khá chậm. Do đó mình cung cấp cả 2 loại cho các bạn lựa chọn.
Mô hình này được huấn luyện trên tập dữ liệu gồm 10m ảnh, bao gồm nhiều loại ảnh khác nhau như ảnh tự phát sinh, chữ viết tay, các văn bản scan thực tế.
Pretrain model được cung cấp sẵn.
# Kết quả thử nghiệm trên tập 10m
| Backbone | Config | Precision full sequence | time |
| ------------- |:-------------:| ---:|---:|
| VGG19-bn - Transformer | vgg_transformer | 0.8800 | 86ms @ 1080ti |
| VGG19-bn - Seq2Seq | vgg_seq2seq | 0.8701 | 12ms @ 1080ti |
Thời gian dự đoán của mô hình vgg-transformer quá lâu so với mô hình seq2seq, trong khi đó không có sự khác biệt rõ ràng giữ độ chính xác của 2 loại kiến trúc này.
# Dataset
Mình chỉ cung cấp tập dữ liệu mẫu khoảng 1m ảnh tự phát sinh. Các bạn có thể tải về tại [đây](https://drive.google.com/file/d/1T0cmkhTgu3ahyMIwGZeby612RpVdDxOR/view).
# License
Mình phát hành thư viện này dưới các điều khoản của [Apache 2.0 license]().
# Liên hệ
Nếu bạn có bất kì vấn đề gì, vui lòng tạo issue hoặc liên hệ mình tại pbcquoc@gmail.com
# change to list chars of your dataset or use default vietnamese chars
vocab: 'aAàÀảẢãÃáÁạẠăĂằẰẳẲẵẴắẮặẶâÂầẦẩẨẫẪấẤậẬbBcCdDđĐeEèÈẻẺẽẼéÉẹẸêÊềỀểỂễỄếẾệỆfFgGhHiIìÌỉỈĩĨíÍịỊjJkKlLmMnNoOòÒỏỎõÕóÓọỌôÔồỒổỔỗỖốỐộỘơƠờỜởỞỡỠớỚợỢpPqQrRsStTuUùÙủỦũŨúÚụỤưƯừỪửỬữỮứỨựỰvVwWxXyYỳỲỷỶỹỸýÝỵỴzZ0123456789!"#$%&''()*+,-./:;<=>?@[\]^_`{|}~ '
# cpu, cuda, cuda:0
device: cuda:0
seq_modeling: transformer
transformer:
d_model: 256
nhead: 8
num_encoder_layers: 6
num_decoder_layers: 6
dim_feedforward: 2048
max_seq_length: 1024
pos_dropout: 0.1
trans_dropout: 0.1
optimizer:
max_lr: 0.0003
pct_start: 0.1
trainer:
batch_size: 32
print_every: 200
valid_every: 4000
iters: 100000
# where to save our model for prediction
export: ./weights/transformerocr.pth
checkpoint: ./checkpoint/transformerocr_checkpoint.pth
log: ./train.log
# null to disable compuate accuracy, or change to number of sample to enable validiation while training
metrics: null
dataset:
# name of your dataset
name: data
# path to annotation and image
data_root: ./img/
train_annotation: annotation_train.txt
valid_annotation: annotation_val_small.txt
# resize image to 32 height, larger height will increase accuracy
image_height: 32
image_min_width: 32
image_max_width: 512
dataloader:
num_workers: 3
pin_memory: True
aug:
image_aug: true
masked_language_model: true
predictor:
# disable or enable beamsearch while prediction, use beamsearch will be slower
beamsearch: False
quiet: False
pretrain:
id_or_url: 13327Y1tz1ohsm5YZMyXVMPIOjoOA0OaA
md5: 7068030afe2e8fc639d0e1e2c25612b3
cached: /tmp/tranformerorc.pth
weights: https://drive.google.com/uc?id=12dTOZ9VP7ZVzwQgVvqBWz5JO5RXXW5NY
backbone: resnet50
cnn:
ss:
- [2, 2]
- [2, 1]
- [2, 1]
- [2, 1]
- [1, 1]
hidden: 256
pretrain:
id_or_url: 13327Y1tz1ohsm5YZMyXVMPIOjoOA0OaA
md5: 7068030afe2e8fc639d0e1e2c25612b3
cached: /tmp/tranformerorc.pth
weights: https://drive.google.com/uc?id=12dTOZ9VP7ZVzwQgVvqBWz5JO5RXXW5NY
backbone: resnet50_fpn
cnn: {}
pretrain:
id_or_url: 13327Y1tz1ohsm5YZMyXVMPIOjoOA0OaA
md5: fbefa85079ad9001a71eb1bf47a93785
cached: /tmp/tranformerorc.pth
# url or local path
weights: https://drive.google.com/uc?id=13327Y1tz1ohsm5YZMyXVMPIOjoOA0OaA
backbone: vgg19_bn
cnn:
# pooling stride size
ss:
- [2, 2]
- [2, 2]
- [2, 1]
- [2, 1]
- [1, 1]
# pooling kernel size
ks:
- [2, 2]
- [2, 2]
- [2, 1]
- [2, 1]
- [1, 1]
# dim of ouput feature map
hidden: 256
seq_modeling: convseq2seq
transformer:
emb_dim: 256
hid_dim: 512
enc_layers: 10
dec_layers: 10
enc_kernel_size: 3
dec_kernel_size: 3
dropout: 0.1
pad_idx: 0
device: cuda:0
enc_max_length: 512
dec_max_length: 512
# for train
pretrain: https://vocr.vn/data/vietocr/vgg_seq2seq.pth
# url or local path (for predict)
weights: https://vocr.vn/data/vietocr/vgg_seq2seq.pth
backbone: vgg19_bn
cnn:
# pooling stride size
ss:
- [2, 2]
- [2, 2]
- [2, 1]
- [2, 1]
- [1, 1]
# pooling kernel size
ks:
- [2, 2]
- [2, 2]
- [2, 1]
- [2, 1]
- [1, 1]
# dim of ouput feature map
hidden: 256
seq_modeling: seq2seq
transformer:
encoder_hidden: 256
decoder_hidden: 256
img_channel: 256
decoder_embedded: 256
dropout: 0.1
optimizer:
max_lr: 0.001
pct_start: 0.1
# for training
pretrain: https://vocr.vn/data/vietocr/vgg_transformer.pth
# url or local path (predict)
weights: https://vocr.vn/data/vietocr/vgg_transformer.pth
backbone: vgg19_bn
cnn:
pretrained: True
# pooling stride size
ss:
- [2, 2]
- [2, 2]
- [2, 1]
- [2, 1]
- [1, 1]
# pooling kernel size
ks:
- [2, 2]
- [2, 2]
- [2, 1]
- [2, 1]
- [1, 1]
# dim of ouput feature map
hidden: 256
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="vietocr",
version="0.3.13",
author="pbcquoc",
author_email="pbcquoc@gmail.com",
description="Transformer base text detection",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pbcquoc/vietocr",
packages=setuptools.find_packages(),
install_requires=[
'einops==0.2.0',
'gdown==4.4.0',
'prefetch_generator==1.0.1',
'imgaug==0.4.0',
'albumentations==1.4.2',
'lmdb>=1.0.0',
'scikit-image>=0.21.0',
'pillow==10.2.0'
],
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires='>=3.6',
)
from PIL import Image
import numpy as np
from imgaug import augmenters as iaa
import imgaug as ia
import albumentations as A
class ImgAugTransform:
def __init__(self):
sometimes = lambda aug: iaa.Sometimes(0.3, aug)
self.aug = iaa.Sequential(iaa.SomeOf((1, 5),
[
# blur
sometimes(iaa.OneOf([iaa.GaussianBlur(sigma=(0, 1.0)),
iaa.MotionBlur(k=3)])),
# color
sometimes(iaa.AddToHueAndSaturation(value=(-10, 10), per_channel=True)),
sometimes(iaa.SigmoidContrast(gain=(3, 10), cutoff=(0.4, 0.6), per_channel=True)),
sometimes(iaa.Invert(0.25, per_channel=0.5)),
sometimes(iaa.Solarize(0.5, threshold=(32, 128))),
sometimes(iaa.Dropout2d(p=0.5)),
sometimes(iaa.Multiply((0.5, 1.5), per_channel=0.5)),
sometimes(iaa.Add((-40, 40), per_channel=0.5)),
sometimes(iaa.JpegCompression(compression=(5, 80))),
# distort
sometimes(iaa.Crop(percent=(0.01, 0.05), sample_independently=True)),
sometimes(iaa.PerspectiveTransform(scale=(0.01, 0.01))),
sometimes(iaa.Affine(scale=(0.7, 1.3), translate_percent=(-0.1, 0.1),
# rotate=(-5, 5), shear=(-5, 5),
order=[0, 1], cval=(0, 255),
mode=ia.ALL)),
sometimes(iaa.PiecewiseAffine(scale=(0.01, 0.01))),
sometimes(iaa.OneOf([iaa.Dropout(p=(0, 0.1)),
iaa.CoarseDropout(p=(0, 0.1), size_percent=(0.02, 0.25))])),
],
random_order=True),
random_order=True)
def __call__(self, img):
img = np.array(img)
img = self.aug.augment_image(img)
img = Image.fromarray(img)
return img
class ImgAugTransformV2:
def __init__(self):
self.aug = A.Compose([
A.InvertImg(p=0.2),
A.ColorJitter(p=0.2),
A.MotionBlur(blur_limit=3, p=0.2),
A.RandomBrightnessContrast(p=0.2),
A.Perspective(scale=(0.01, 0.05))
])
def __call__(self, img):
img = np.array(img)
transformed = self.aug(image=img)
img = transformed["image"]
img = Image.fromarray(img)
return img
import sys
import os
import random
from PIL import Image
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from collections import defaultdict
import numpy as np
import torch
import lmdb
import six
import time
from tqdm import tqdm
from torch.utils.data import Dataset
from torch.utils.data.sampler import Sampler
from vietocr.tool.translate import process_image
from vietocr.tool.create_dataset import createDataset
from vietocr.tool.translate import resize
class OCRDataset(Dataset):
def __init__(self, lmdb_path, root_dir, annotation_path, vocab, image_height=32, image_min_width=32, image_max_width=512, transform=None):
self.root_dir = root_dir
self.annotation_path = os.path.join(root_dir, annotation_path)
self.vocab = vocab
self.transform = transform
self.image_height = image_height
self.image_min_width = image_min_width
self.image_max_width = image_max_width
self.lmdb_path = lmdb_path
if os.path.isdir(self.lmdb_path):
print('{} exists. Remove folder if you want to create new dataset'.format(self.lmdb_path))
sys.stdout.flush()
else:
createDataset(self.lmdb_path, root_dir, annotation_path)
self.env = lmdb.open(
self.lmdb_path,
max_readers=8,
readonly=True,
lock=False,
readahead=False,
meminit=False)
self.txn = self.env.begin(write=False)
nSamples = int(self.txn.get('num-samples'.encode()))
self.nSamples = nSamples
self.build_cluster_indices()
def build_cluster_indices(self):
self.cluster_indices = defaultdict(list)
pbar = tqdm(range(self.__len__()),
desc='{} build cluster'.format(self.lmdb_path),
ncols = 100, position=0, leave=True)
for i in pbar:
bucket = self.get_bucket(i)
self.cluster_indices[bucket].append(i)
def get_bucket(self, idx):
key = 'dim-%09d'%idx
dim_img = self.txn.get(key.encode())
dim_img = np.fromstring(dim_img, dtype=np.int32)
imgH, imgW = dim_img
new_w, image_height = resize(imgW, imgH, self.image_height, self.image_min_width, self.image_max_width)
return new_w
def read_buffer(self, idx):
img_file = 'image-%09d'%idx
label_file = 'label-%09d'%idx
path_file = 'path-%09d'%idx
imgbuf = self.txn.get(img_file.encode())
label = self.txn.get(label_file.encode()).decode()
img_path = self.txn.get(path_file.encode()).decode()
buf = six.BytesIO()
buf.write(imgbuf)
buf.seek(0)
return buf, label, img_path
def read_data(self, idx):
buf, label, img_path = self.read_buffer(idx)
img = Image.open(buf).convert('RGB')
if self.transform:
img = self.transform(img)
img_bw = process_image(img, self.image_height, self.image_min_width, self.image_max_width)
word = self.vocab.encode(label)
return img_bw, word, img_path
def __getitem__(self, idx):
img, word, img_path = self.read_data(idx)
img_path = os.path.join(self.root_dir, img_path)
sample = {'img': img, 'word': word, 'img_path': img_path}
return sample
def __len__(self):
return self.nSamples
class ClusterRandomSampler(Sampler):
def __init__(self, data_source, batch_size, shuffle=True):
self.data_source = data_source
self.batch_size = batch_size
self.shuffle = shuffle
def flatten_list(self, lst):
return [item for sublist in lst for item in sublist]
def __iter__(self):
batch_lists = []
for cluster, cluster_indices in self.data_source.cluster_indices.items():
if self.shuffle:
random.shuffle(cluster_indices)
batches = [cluster_indices[i:i + self.batch_size] for i in range(0, len(cluster_indices), self.batch_size)]
batches = [_ for _ in batches if len(_) == self.batch_size]
if self.shuffle:
random.shuffle(batches)
batch_lists.append(batches)
lst = self.flatten_list(batch_lists)
if self.shuffle:
random.shuffle(lst)
lst = self.flatten_list(lst)
return iter(lst)
def __len__(self):
return len(self.data_source)
class Collator(object):
def __init__(self, masked_language_model=True):
self.masked_language_model = masked_language_model
def __call__(self, batch):
filenames = []
img = []
target_weights = []
tgt_input = []
max_label_len = max(len(sample['word']) for sample in batch)
for sample in batch:
img.append(sample['img'])
filenames.append(sample['img_path'])
label = sample['word']
label_len = len(label)
tgt = np.concatenate((
label,
np.zeros(max_label_len - label_len, dtype=np.int32)))
tgt_input.append(tgt)
one_mask_len = label_len - 1
target_weights.append(np.concatenate((
np.ones(one_mask_len, dtype=np.float32),
np.zeros(max_label_len - one_mask_len,dtype=np.float32))))
img = np.array(img, dtype=np.float32)
tgt_input = np.array(tgt_input, dtype=np.int64).T
tgt_output = np.roll(tgt_input, -1, 0).T
tgt_output[:, -1]=0
# random mask token
if self.masked_language_model:
mask = np.random.random(size=tgt_input.shape) < 0.05
mask = mask & (tgt_input != 0) & (tgt_input != 1) & (tgt_input != 2)
tgt_input[mask] = 3
tgt_padding_mask = np.array(target_weights)==0
rs = {
'img': torch.FloatTensor(img),
'tgt_input': torch.LongTensor(tgt_input),
'tgt_output': torch.LongTensor(tgt_output),
'tgt_padding_mask': torch.BoolTensor(tgt_padding_mask),
'filenames': filenames
}
return rs
import torch
import numpy as np
from PIL import Image
import random
from vietocr.model.vocab import Vocab
from vietocr.tool.translate import process_image
import os
from collections import defaultdict
import math
from prefetch_generator import background
class BucketData(object):
def __init__(self, device):
self.max_label_len = 0
self.data_list = []
self.label_list = []
self.file_list = []
self.device = device
def append(self, datum, label, filename):
self.data_list.append(datum)
self.label_list.append(label)
self.file_list.append(filename)
self.max_label_len = max(len(label), self.max_label_len)
return len(self.data_list)
def flush_out(self):
"""
Shape:
- img: (N, C, H, W)
- tgt_input: (T, N)
- tgt_output: (N, T)
- tgt_padding_mask: (N, T)
"""
# encoder part
img = np.array(self.data_list, dtype=np.float32)
# decoder part
target_weights = []
tgt_input = []
for label in self.label_list:
label_len = len(label)
tgt = np.concatenate((
label,
np.zeros(self.max_label_len - label_len, dtype=np.int32)))
tgt_input.append(tgt)
one_mask_len = label_len - 1
target_weights.append(np.concatenate((
np.ones(one_mask_len, dtype=np.float32),
np.zeros(self.max_label_len - one_mask_len,dtype=np.float32))))
# reshape to fit input shape
tgt_input = np.array(tgt_input, dtype=np.int64).T
tgt_output = np.roll(tgt_input, -1, 0).T
tgt_output[:, -1]=0
tgt_padding_mask = np.array(target_weights)==0
filenames = self.file_list
self.data_list, self.label_list, self.file_list = [], [], []
self.max_label_len = 0
rs = {
'img': torch.FloatTensor(img).to(self.device),
'tgt_input': torch.LongTensor(tgt_input).to(self.device),
'tgt_output': torch.LongTensor(tgt_output).to(self.device),
'tgt_padding_mask':torch.BoolTensor(tgt_padding_mask).to(self.device),
'filenames': filenames
}
return rs
def __len__(self):
return len(self.data_list)
def __iadd__(self, other):
self.data_list += other.data_list
self.label_list += other.label_list
self.max_label_len = max(self.max_label_len, other.max_label_len)
self.max_width = max(self.max_width, other.max_width)
def __add__(self, other):
res = BucketData()
res.data_list = self.data_list + other.data_list
res.label_list = self.label_list + other.label_list
res.max_width = max(self.max_width, other.max_width)
res.max_label_len = max((self.max_label_len, other.max_label_len))
return res
class DataGen(object):
def __init__(self,data_root, annotation_fn, vocab, device, image_height=32, image_min_width=32, image_max_width=512):
self.image_height = image_height
self.image_min_width = image_min_width
self.image_max_width = image_max_width
self.data_root = data_root
self.annotation_path = os.path.join(data_root, annotation_fn)
self.vocab = vocab
self.device = device
self.clear()
def clear(self):
self.bucket_data = defaultdict(lambda: BucketData(self.device))
@background(max_prefetch=1)
def gen(self, batch_size, last_batch=True):
with open(self.annotation_path, 'r') as ann_file:
lines = ann_file.readlines()
np.random.shuffle(lines)
for l in lines:
img_path, lex = l.strip().split('\t')
img_path = os.path.join(self.data_root, img_path)
try:
img_bw, word = self.read_data(img_path, lex)
except IOError:
print('ioread image:{}'.format(img_path))
width = img_bw.shape[-1]
bs = self.bucket_data[width].append(img_bw, word, img_path)
if bs >= batch_size:
b = self.bucket_data[width].flush_out()
yield b
if last_batch:
for bucket in self.bucket_data.values():
if len(bucket) > 0:
b = bucket.flush_out()
yield b
self.clear()
def read_data(self, img_path, lex):
with open(img_path, 'rb') as img_file:
img = Image.open(img_file).convert('RGB')
img_bw = process_image(img, self.image_height, self.image_min_width, self.image_max_width)
word = self.vocab.encode(lex)
return img_bw, word
import torch
from torch import nn
import vietocr.model.backbone.vgg as vgg
from vietocr.model.backbone.resnet import Resnet50
class CNN(nn.Module):
def __init__(self, backbone, **kwargs):
super(CNN, self).__init__()
if backbone == 'vgg11_bn':
self.model = vgg.vgg11_bn(**kwargs)
elif backbone == 'vgg19_bn':
self.model = vgg.vgg19_bn(**kwargs)
elif backbone == 'resnet50':
self.model = Resnet50(**kwargs)
def forward(self, x):
return self.model(x)
def freeze(self):
for name, param in self.model.features.named_parameters():
if name != 'last_conv_1x1':
param.requires_grad = False
def unfreeze(self):
for param in self.model.features.parameters():
param.requires_grad = True
import torch
from torch import nn
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None):
super(BasicBlock, self).__init__()
self.conv1 = self._conv3x3(inplanes, planes)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = self._conv3x3(planes, planes)
self.bn2 = nn.BatchNorm2d(planes)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def _conv3x3(self, in_planes, out_planes, stride=1):
"3x3 convolution with padding"
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
padding=1, bias=False)
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, input_channel, output_channel, block, layers):
super(ResNet, self).__init__()
self.output_channel_block = [int(output_channel / 4), int(output_channel / 2), output_channel, output_channel]
self.inplanes = int(output_channel / 8)
self.conv0_1 = nn.Conv2d(input_channel, int(output_channel / 16),
kernel_size=3, stride=1, padding=1, bias=False)
self.bn0_1 = nn.BatchNorm2d(int(output_channel / 16))
self.conv0_2 = nn.Conv2d(int(output_channel / 16), self.inplanes,
kernel_size=3, stride=1, padding=1, bias=False)
self.bn0_2 = nn.BatchNorm2d(self.inplanes)
self.relu = nn.ReLU(inplace=True)
self.maxpool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.layer1 = self._make_layer(block, self.output_channel_block[0], layers[0])
self.conv1 = nn.Conv2d(self.output_channel_block[0], self.output_channel_block[
0], kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(self.output_channel_block[0])
self.maxpool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.layer2 = self._make_layer(block, self.output_channel_block[1], layers[1], stride=1)
self.conv2 = nn.Conv2d(self.output_channel_block[1], self.output_channel_block[
1], kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(self.output_channel_block[1])
self.maxpool3 = nn.MaxPool2d(kernel_size=2, stride=(2, 1), padding=(0, 1))
self.layer3 = self._make_layer(block, self.output_channel_block[2], layers[2], stride=1)
self.conv3 = nn.Conv2d(self.output_channel_block[2], self.output_channel_block[
2], kernel_size=3, stride=1, padding=1, bias=False)
self.bn3 = nn.BatchNorm2d(self.output_channel_block[2])
self.layer4 = self._make_layer(block, self.output_channel_block[3], layers[3], stride=1)
self.conv4_1 = nn.Conv2d(self.output_channel_block[3], self.output_channel_block[
3], kernel_size=2, stride=(2, 1), padding=(0, 1), bias=False)
self.bn4_1 = nn.BatchNorm2d(self.output_channel_block[3])
self.conv4_2 = nn.Conv2d(self.output_channel_block[3], self.output_channel_block[
3], kernel_size=2, stride=1, padding=0, bias=False)
self.bn4_2 = nn.BatchNorm2d(self.output_channel_block[3])
def _make_layer(self, block, planes, blocks, stride=1):
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.inplanes, planes * block.expansion,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(planes * block.expansion),
)
layers = []
layers.append(block(self.inplanes, planes, stride, downsample))
self.inplanes = planes * block.expansion
for i in range(1, blocks):
layers.append(block(self.inplanes, planes))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv0_1(x)
x = self.bn0_1(x)
x = self.relu(x)
x = self.conv0_2(x)
x = self.bn0_2(x)
x = self.relu(x)
x = self.maxpool1(x)
x = self.layer1(x)
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool2(x)
x = self.layer2(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
x = self.maxpool3(x)
x = self.layer3(x)
x = self.conv3(x)
x = self.bn3(x)
x = self.relu(x)
x = self.layer4(x)
x = self.conv4_1(x)
x = self.bn4_1(x)
x = self.relu(x)
x = self.conv4_2(x)
x = self.bn4_2(x)
conv = self.relu(x)
conv = conv.transpose(-1, -2)
conv = conv.flatten(2)
conv = conv.permute(-1, 0, 1)
return conv
def Resnet50(ss, hidden):
return ResNet(3, hidden, BasicBlock, [1, 2, 5, 3])
import torch
from torch import nn
from torchvision import models
from einops import rearrange
from torchvision.models._utils import IntermediateLayerGetter
class Vgg(nn.Module):
def __init__(self, name, ss, ks, hidden, pretrained=True, dropout=0.5):
super(Vgg, self).__init__()
if pretrained:
weights = 'DEFAULT'
else:
weights = None
if name == 'vgg11_bn':
cnn = models.vgg11_bn(weights=weights)
elif name == 'vgg19_bn':
cnn = models.vgg19_bn(weights=weights)
pool_idx = 0
for i, layer in enumerate(cnn.features):
if isinstance(layer, torch.nn.MaxPool2d):
cnn.features[i] = torch.nn.AvgPool2d(kernel_size=ks[pool_idx], stride=ss[pool_idx], padding=0)
pool_idx += 1
self.features = cnn.features
self.dropout = nn.Dropout(dropout)
self.last_conv_1x1 = nn.Conv2d(512, hidden, 1)
def forward(self, x):
"""
Shape:
- x: (N, C, H, W)
- output: (W, N, C)
"""
conv = self.features(x)
conv = self.dropout(conv)
conv = self.last_conv_1x1(conv)
# conv = rearrange(conv, 'b d h w -> b d (w h)')
conv = conv.transpose(-1, -2)
conv = conv.flatten(2)
conv = conv.permute(-1, 0, 1)
return conv
def vgg11_bn(ss, ks, hidden, pretrained=True, dropout=0.5):
return Vgg('vgg11_bn', ss, ks, hidden, pretrained, dropout)
def vgg19_bn(ss, ks, hidden, pretrained=True, dropout=0.5):
return Vgg('vgg19_bn', ss, ks, hidden, pretrained, dropout)
import torch
class Beam:
def __init__(self, beam_size=8, min_length=0, n_top=1, ranker=None,
start_token_id=1, end_token_id=2):
self.beam_size = beam_size
self.min_length = min_length
self.ranker = ranker
self.end_token_id = end_token_id
self.top_sentence_ended = False
self.prev_ks = []
self.next_ys = [torch.LongTensor(beam_size).fill_(start_token_id)] # remove padding
self.current_scores = torch.FloatTensor(beam_size).zero_()
self.all_scores = []
# Time and k pair for finished.
self.finished = []
self.n_top = n_top
self.ranker = ranker
def advance(self, next_log_probs):
# next_probs : beam_size X vocab_size
vocabulary_size = next_log_probs.size(1)
# current_beam_size = next_log_probs.size(0)
current_length = len(self.next_ys)
if current_length < self.min_length:
for beam_index in range(len(next_log_probs)):
next_log_probs[beam_index][self.end_token_id] = -1e10
if len(self.prev_ks) > 0:
beam_scores = next_log_probs + self.current_scores.unsqueeze(1).expand_as(next_log_probs)
# Don't let EOS have children.
last_y = self.next_ys[-1]
for beam_index in range(last_y.size(0)):
if last_y[beam_index] == self.end_token_id:
beam_scores[beam_index] = -1e10 # -1e20 raises error when executing
else:
beam_scores = next_log_probs[0]
flat_beam_scores = beam_scores.view(-1)
top_scores, top_score_ids = flat_beam_scores.topk(k=self.beam_size, dim=0, largest=True, sorted=True)
self.current_scores = top_scores
self.all_scores.append(self.current_scores)
prev_k = top_score_ids // vocabulary_size # (beam_size, )
next_y = top_score_ids - prev_k * vocabulary_size # (beam_size, )
self.prev_ks.append(prev_k)
self.next_ys.append(next_y)
for beam_index, last_token_id in enumerate(next_y):
if last_token_id == self.end_token_id:
# skip scoring
self.finished.append((self.current_scores[beam_index], len(self.next_ys) - 1, beam_index))
if next_y[0] == self.end_token_id:
self.top_sentence_ended = True
def get_current_state(self):
"Get the outputs for the current timestep."
return torch.stack(self.next_ys, dim=1)
def get_current_origin(self):
"Get the backpointers for the current timestep."
return self.prev_ks[-1]
def done(self):
return self.top_sentence_ended and len(self.finished) >= self.n_top
def get_hypothesis(self, timestep, k):
hypothesis = []
for j in range(len(self.prev_ks[:timestep]) - 1, -1, -1):
hypothesis.append(self.next_ys[j + 1][k])
# for RNN, [:, k, :], and for trnasformer, [k, :, :]
k = self.prev_ks[j][k]
return hypothesis[::-1]
def sort_finished(self, minimum=None):
if minimum is not None:
i = 0
# Add from beam until we have minimum outputs.
while len(self.finished) < minimum:
# global_scores = self.global_scorer.score(self, self.scores)
# s = global_scores[i]
s = self.current_scores[i]
self.finished.append((s, len(self.next_ys) - 1, i))
i += 1
self.finished = sorted(self.finished, key=lambda a: a[0], reverse=True)
scores = [sc for sc, _, _ in self.finished]
ks = [(t, k) for _, t, k in self.finished]
return scores, ks
This diff is collapsed.
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self, emb_dim, enc_hid_dim, dec_hid_dim, dropout):
super().__init__()
self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional = True)
self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, src):
"""
src: src_len x batch_size x img_channel
outputs: src_len x batch_size x hid_dim
hidden: batch_size x hid_dim
"""
embedded = self.dropout(src)
outputs, hidden = self.rnn(embedded)
hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))
return outputs, hidden
class Attention(nn.Module):
def __init__(self, enc_hid_dim, dec_hid_dim):
super().__init__()
self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim)
self.v = nn.Linear(dec_hid_dim, 1, bias = False)
def forward(self, hidden, encoder_outputs):
"""
hidden: batch_size x hid_dim
encoder_outputs: src_len x batch_size x hid_dim,
outputs: batch_size x src_len
"""
batch_size = encoder_outputs.shape[1]
src_len = encoder_outputs.shape[0]
hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
encoder_outputs = encoder_outputs.permute(1, 0, 2)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim = 2)))
attention = self.v(energy).squeeze(2)
return F.softmax(attention, dim = 1)
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention):
super().__init__()
self.output_dim = output_dim
self.attention = attention
self.embedding = nn.Embedding(output_dim, emb_dim)
self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)
self.fc_out = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, encoder_outputs):
"""
inputs: batch_size
hidden: batch_size x hid_dim
encoder_outputs: src_len x batch_size x hid_dim
"""
input = input.unsqueeze(0)
embedded = self.dropout(self.embedding(input))
a = self.attention(hidden, encoder_outputs)
a = a.unsqueeze(1)
encoder_outputs = encoder_outputs.permute(1, 0, 2)
weighted = torch.bmm(a, encoder_outputs)
weighted = weighted.permute(1, 0, 2)
rnn_input = torch.cat((embedded, weighted), dim = 2)
output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
assert (output == hidden).all()
embedded = embedded.squeeze(0)
output = output.squeeze(0)
weighted = weighted.squeeze(0)
prediction = self.fc_out(torch.cat((output, weighted, embedded), dim = 1))
return prediction, hidden.squeeze(0), a.squeeze(1)
class Seq2Seq(nn.Module):
def __init__(self, vocab_size, encoder_hidden, decoder_hidden, img_channel, decoder_embedded, dropout=0.1):
super().__init__()
attn = Attention(encoder_hidden, decoder_hidden)
self.encoder = Encoder(img_channel, encoder_hidden, decoder_hidden, dropout)
self.decoder = Decoder(vocab_size, decoder_embedded, encoder_hidden, decoder_hidden, dropout, attn)
def forward_encoder(self, src):
"""
src: timestep x batch_size x channel
hidden: batch_size x hid_dim
encoder_outputs: src_len x batch_size x hid_dim
"""
encoder_outputs, hidden = self.encoder(src)
return (hidden, encoder_outputs)
def forward_decoder(self, tgt, memory):
"""
tgt: timestep x batch_size
hidden: batch_size x hid_dim
encouder: src_len x batch_size x hid_dim
output: batch_size x 1 x vocab_size
"""
tgt = tgt[-1]
hidden, encoder_outputs = memory
output, hidden, _ = self.decoder(tgt, hidden, encoder_outputs)
output = output.unsqueeze(1)
return output, (hidden, encoder_outputs)
def forward(self, src, trg):
"""
src: time_step x batch_size
trg: time_step x batch_size
outputs: batch_size x time_step x vocab_size
"""
batch_size = src.shape[1]
trg_len = trg.shape[0]
trg_vocab_size = self.decoder.output_dim
device = src.device
outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(device)
encoder_outputs, hidden = self.encoder(src)
for t in range(trg_len):
input = trg[t]
output, hidden, _ = self.decoder(input, hidden, encoder_outputs)
outputs[t] = output
outputs = outputs.transpose(0, 1).contiguous()
return outputs
def expand_memory(self, memory, beam_size):
hidden, encoder_outputs = memory
hidden = hidden.repeat(beam_size, 1)
encoder_outputs = encoder_outputs.repeat(1, beam_size, 1)
return (hidden, encoder_outputs)
def get_memory(self, memory, i):
hidden, encoder_outputs = memory
hidden = hidden[[i]]
encoder_outputs = encoder_outputs[:, [i],:]
return (hidden, encoder_outputs)
from einops import rearrange
from torchvision import models
import math
import torch
from torch import nn
class LanguageTransformer(nn.Module):
def __init__(self, vocab_size,
d_model, nhead,
num_encoder_layers, num_decoder_layers,
dim_feedforward, max_seq_length,
pos_dropout, trans_dropout):
super().__init__()
self.d_model = d_model
self.embed_tgt = nn.Embedding(vocab_size, d_model)
self.pos_enc = PositionalEncoding(d_model, pos_dropout, max_seq_length)
# self.learned_pos_enc = LearnedPositionalEncoding(d_model, pos_dropout, max_seq_length)
self.transformer = nn.Transformer(d_model, nhead,
num_encoder_layers, num_decoder_layers,
dim_feedforward, trans_dropout)
self.fc = nn.Linear(d_model, vocab_size)
def forward(self, src, tgt, src_key_padding_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None):
"""
Shape:
- src: (W, N, C)
- tgt: (T, N)
- src_key_padding_mask: (N, S)
- tgt_key_padding_mask: (N, T)
- memory_key_padding_mask: (N, S)
- output: (N, T, E)
"""
tgt_mask = self.gen_nopeek_mask(tgt.shape[0]).to(src.device)
src = self.pos_enc(src*math.sqrt(self.d_model))
# src = self.learned_pos_enc(src*math.sqrt(self.d_model))
tgt = self.pos_enc(self.embed_tgt(tgt) * math.sqrt(self.d_model))
output = self.transformer(src, tgt, tgt_mask=tgt_mask, src_key_padding_mask=src_key_padding_mask,
tgt_key_padding_mask=tgt_key_padding_mask.float(), memory_key_padding_mask=memory_key_padding_mask)
# output = rearrange(output, 't n e -> n t e')
output = output.transpose(0, 1)
return self.fc(output)
def gen_nopeek_mask(self, length):
mask = (torch.triu(torch.ones(length, length)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
def forward_encoder(self, src):
src = self.pos_enc(src*math.sqrt(self.d_model))
memory = self.transformer.encoder(src)
return memory
def forward_decoder(self, tgt, memory):
tgt_mask = self.gen_nopeek_mask(tgt.shape[0]).to(tgt.device)
tgt = self.pos_enc(self.embed_tgt(tgt) * math.sqrt(self.d_model))
output = self.transformer.decoder(tgt, memory, tgt_mask=tgt_mask)
# output = rearrange(output, 't n e -> n t e')
output = output.transpose(0, 1)
return self.fc(output), memory
def expand_memory(self, memory, beam_size):
memory = memory.repeat(1, beam_size, 1)
return memory
def get_memory(self, memory, i):
memory = memory[:, [i], :]
return memory
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=100):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
class LearnedPositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=100):
super(LearnedPositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
self.pos_embed = nn.Embedding(max_len, d_model)
self.layernorm = LayerNorm(d_model)
def forward(self, x):
seq_len = x.size(0)
pos = torch.arange(seq_len, dtype=torch.long, device=x.device)
pos = pos.unsqueeze(-1).expand(x.size()[:2])
x = x + self.pos_embed(pos)
return self.dropout(self.layernorm(x))
class LayerNorm(nn.Module):
"A layernorm module in the TF style (epsilon inside the square root)."
def __init__(self, d_model, variance_epsilon=1e-12):
super().__init__()
self.gamma = nn.Parameter(torch.ones(d_model))
self.beta = nn.Parameter(torch.zeros(d_model))
self.variance_epsilon = variance_epsilon
def forward(self, x):
u = x.mean(-1, keepdim=True)
s = (x - u).pow(2).mean(-1, keepdim=True)
x = (x - u) / torch.sqrt(s + self.variance_epsilon)
return self.gamma * x + self.beta
This diff is collapsed.
from vietocr.model.backbone.cnn import CNN
from vietocr.model.seqmodel.transformer import LanguageTransformer
from vietocr.model.seqmodel.seq2seq import Seq2Seq
from vietocr.model.seqmodel.convseq2seq import ConvSeq2Seq
from torch import nn
class VietOCR(nn.Module):
def __init__(self, vocab_size,
backbone,
cnn_args,
transformer_args, seq_modeling='transformer'):
super(VietOCR, self).__init__()
self.cnn = CNN(backbone, **cnn_args)
self.seq_modeling = seq_modeling
if seq_modeling == 'transformer':
self.transformer = LanguageTransformer(vocab_size, **transformer_args)
elif seq_modeling == 'seq2seq':
self.transformer = Seq2Seq(vocab_size, **transformer_args)
elif seq_modeling == 'convseq2seq':
self.transformer = ConvSeq2Seq(vocab_size, **transformer_args)
else:
raise('Not Support Seq Model')
def forward(self, img, tgt_input, tgt_key_padding_mask):
"""
Shape:
- img: (N, C, H, W)
- tgt_input: (T, N)
- tgt_key_padding_mask: (N, T)
- output: b t v
"""
src = self.cnn(img)
if self.seq_modeling == 'transformer':
outputs = self.transformer(src, tgt_input, tgt_key_padding_mask=tgt_key_padding_mask)
elif self.seq_modeling == 'seq2seq':
outputs = self.transformer(src, tgt_input)
elif self.seq_modeling == 'convseq2seq':
outputs = self.transformer(src, tgt_input)
return outputs
class Vocab():
def __init__(self, chars):
self.pad = 0
self.go = 1
self.eos = 2
self.mask_token = 3
self.chars = chars
self.c2i = {c:i+4 for i, c in enumerate(chars)}
self.i2c = {i+4:c for i, c in enumerate(chars)}
self.i2c[0] = '<pad>'
self.i2c[1] = '<sos>'
self.i2c[2] = '<eos>'
self.i2c[3] = '*'
def encode(self, chars):
return [self.go] + [self.c2i[c] for c in chars] + [self.eos]
def decode(self, ids):
first = 1 if self.go in ids else 0
last = ids.index(self.eos) if self.eos in ids else None
sent = ''.join([self.i2c[i] for i in ids[first:last]])
return sent
def __len__(self):
return len(self.c2i) + 4
def batch_decode(self, arr):
texts = [self.decode(ids) for ids in arr]
return texts
def __str__(self):
return self.chars
import torch
from torch import nn
class LabelSmoothingLoss(nn.Module):
def __init__(self, classes, padding_idx, smoothing=0.0, dim=-1):
super(LabelSmoothingLoss, self).__init__()
self.confidence = 1.0 - smoothing
self.smoothing = smoothing
self.cls = classes
self.dim = dim
self.padding_idx = padding_idx
def forward(self, pred, target):
pred = pred.log_softmax(dim=self.dim)
with torch.no_grad():
# true_dist = pred.data.clone()
true_dist = torch.zeros_like(pred)
true_dist.fill_(self.smoothing / (self.cls - 2))
true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
true_dist[:, self.padding_idx] = 0
mask = torch.nonzero(target.data == self.padding_idx, as_tuple=False)
if mask.dim() > 0:
true_dist.index_fill_(0, mask.squeeze(), 0.0)
return torch.mean(torch.sum(-true_dist * pred, dim=self.dim))
class ScheduledOptim():
'''A simple wrapper class for learning rate scheduling'''
def __init__(self, optimizer, d_model, init_lr, n_warmup_steps):
assert n_warmup_steps > 0, 'must be greater than 0'
self._optimizer = optimizer
self.init_lr = init_lr
self.d_model = d_model
self.n_warmup_steps = n_warmup_steps
self.n_steps = 0
def step(self):
"Step with the inner optimizer"
self._update_learning_rate()
self._optimizer.step()
def zero_grad(self):
"Zero out the gradients with the inner optimizer"
self._optimizer.zero_grad()
def _get_lr_scale(self):
d_model = self.d_model
n_steps, n_warmup_steps = self.n_steps, self.n_warmup_steps
return (d_model ** -0.5) * min(n_steps ** (-0.5), n_steps * n_warmup_steps ** (-1.5))
def state_dict(self):
optimizer_state_dict = {
'init_lr':self.init_lr,
'd_model':self.d_model,
'n_warmup_steps':self.n_warmup_steps,
'n_steps':self.n_steps,
'_optimizer':self._optimizer.state_dict(),
}
return optimizer_state_dict
def load_state_dict(self, state_dict):
self.init_lr = state_dict['init_lr']
self.d_model = state_dict['d_model']
self.n_warmup_steps = state_dict['n_warmup_steps']
self.n_steps = state_dict['n_steps']
self._optimizer.load_state_dict(state_dict['_optimizer'])
def _update_learning_rate(self):
''' Learning rate scheduling per step '''
self.n_steps += 1
for param_group in self._optimizer.param_groups:
lr = self.init_lr*self._get_lr_scale()
self.lr = lr
param_group['lr'] = lr
import argparse
from PIL import Image
from vietocr.tool.predictor import Predictor
from vietocr.tool.config import Cfg
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--img', required=True, help='foo help')
parser.add_argument('--config', required=True, help='foo help')
args = parser.parse_args()
config = Cfg.load_config_from_file(args.config)
detector = Predictor(config)
img = Image.open(args.img)
s = detector.predict(img)
print(s)
if __name__ == '__main__':
main()
./image/036170002830.jpeg HOÀNG THỊ THOI
./image/079193002341.jpeg TRỊNH THỊ THÚY HẰNG
./image/001099025107.jpeg NGUYỄN VĂN BÌNH
./image/060085000115.jpeg NGUYỄN MINH TOÀN
./image/026301003919.jpeg NGUYỄN THỊ KIỀU TRANG
./image/079084000809.jpeg LÊ NGỌC PHƯƠNG KHANH
./image/038144000109.jpeg ĐÀO THỊ TƠ
./image/072183002222.jpeg NGUYỄN THANH PHƯỚC
./image/038078002355.jpeg HÀ ĐÌNH LỢI
./image/038089010274.jpeg HÀ VĂN LUÂN
from vietocr.loader.dataloader_v1 import DataGen
from vietocr.model.vocab import Vocab
def test_loader():
chars = 'aAàÀảẢãÃáÁạẠăĂằẰẳẲẵẴắẮặẶâÂầẦẩẨẫẪấẤậẬbBcCdDđĐeEèÈẻẺẽẼéÉẹẸêÊềỀểỂễỄếẾệỆfFgGhHiIìÌỉỈĩĨíÍịỊjJkKlLmMnNoOòÒỏỎõÕóÓọỌôÔồỒổỔỗỖốỐộỘơƠờỜởỞỡỠớỚợỢpPqQrRsStTuUùÙủỦũŨúÚụỤưƯừỪửỬữỮứỨựỰvVwWxXyYỳỲỷỶỹỸýÝỵỴzZ0123456789!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
vocab = Vocab(chars)
s_gen = DataGen('./vietocr/tests/', 'sample.txt', vocab, 'cpu', 32, 512)
iterator = s_gen.gen(30)
for batch in iterator:
assert batch['img'].shape[1]==3, 'image must have 3 channels'
assert batch['img'].shape[2]==32, 'the height must be 32'
print(batch['img'].shape, batch['tgt_input'].shape, batch['tgt_output'].shape, batch['tgt_padding_mask'].shape)
if __name__ == '__main__':
test_loader()
import yaml
from vietocr.tool.utils import download_config
url_config = {
'vgg_transformer':'vgg-transformer.yml',
'resnet_transformer':'resnet_transformer.yml',
'resnet_fpn_transformer':'resnet_fpn_transformer.yml',
'vgg_seq2seq':'vgg-seq2seq.yml',
'vgg_convseq2seq':'vgg_convseq2seq.yml',
'vgg_decoderseq2seq':'vgg_decoderseq2seq.yml',
'base':'base.yml',
}
class Cfg(dict):
def __init__(self, config_dict):
super(Cfg, self).__init__(**config_dict)
self.__dict__ = self
@staticmethod
def load_config_from_file(fname):
#base_config = download_config(url_config['base'])
base_config = {}
with open(fname, encoding='utf-8') as f:
config = yaml.safe_load(f)
base_config.update(config)
return Cfg(base_config)
@staticmethod
def load_config_from_name(name):
base_config = download_config(url_config['base'])
config = download_config(url_config[name])
base_config.update(config)
return Cfg(base_config)
def save(self, fname):
with open(fname, 'w', encoding='utf-8') as outfile:
yaml.dump(dict(self), outfile, default_flow_style=False, allow_unicode=True)
import sys
import os
import lmdb # install lmdb by "pip install lmdb"
import cv2
import numpy as np
from tqdm import tqdm
def checkImageIsValid(imageBin):
isvalid = True
imgH = None
imgW = None
imageBuf = np.fromstring(imageBin, dtype=np.uint8)
try:
img = cv2.imdecode(imageBuf, cv2.IMREAD_GRAYSCALE)
imgH, imgW = img.shape[0], img.shape[1]
if imgH * imgW == 0:
isvalid = False
except Exception as e:
isvalid = False
return isvalid, imgH, imgW
def writeCache(env, cache):
with env.begin(write=True) as txn:
for k, v in cache.items():
txn.put(k.encode(), v)
def createDataset(outputPath, root_dir, annotation_path):
"""
Create LMDB dataset for CRNN training.
ARGS:
outputPath : LMDB output path
imagePathList : list of image path
labelList : list of corresponding groundtruth texts
lexiconList : (optional) list of lexicon lists
checkValid : if true, check the validity of every image
"""
annotation_path = os.path.join(root_dir, annotation_path)
with open(annotation_path, 'r', encoding='utf-8') as ann_file:
lines = ann_file.readlines()
annotations = [l.strip().split('\t') for l in lines]
nSamples = len(annotations)
env = lmdb.open(outputPath, map_size=1099511627776)
cache = {}
cnt = 0
error = 0
pbar = tqdm(range(nSamples), ncols = 100, desc='Create {}'.format(outputPath))
for i in pbar:
imageFile, label = annotations[i]
imagePath = os.path.join(root_dir, imageFile)
if not os.path.exists(imagePath):
error += 1
continue
with open(imagePath, 'rb') as f:
imageBin = f.read()
isvalid, imgH, imgW = checkImageIsValid(imageBin)
if not isvalid:
error += 1
continue
imageKey = 'image-%09d' % cnt
labelKey = 'label-%09d' % cnt
pathKey = 'path-%09d' % cnt
dimKey = 'dim-%09d' % cnt
cache[imageKey] = imageBin
cache[labelKey] = label.encode()
cache[pathKey] = imageFile.encode()
cache[dimKey] = np.array([imgH, imgW], dtype=np.int32).tobytes()
cnt += 1
if cnt % 1000 == 0:
writeCache(env, cache)
cache = {}
nSamples = cnt-1
cache['num-samples'] = str(nSamples).encode()
writeCache(env, cache)
if error > 0:
print('Remove {} invalid images'.format(error))
print('Created dataset with %d samples' % nSamples)
sys.stdout.flush()
import os
class Logger():
def __init__(self, fname):
path, _ = os.path.split(fname)
os.makedirs(path, exist_ok=True)
self.logger = open(fname, 'w')
def log(self, string):
self.logger.write(string+'\n')
self.logger.flush()
def close(self):
self.logger.close()
from vietocr.tool.translate import build_model, translate, translate_beam_search, process_input, predict
from vietocr.tool.utils import download_weights
import torch
from collections import defaultdict
class Predictor():
def __init__(self, config):
device = config['device']
model, vocab = build_model(config)
weights = '/tmp/weights.pth'
if config['weights'].startswith('http'):
weights = download_weights(config['weights'])
else:
weights = config['weights']
model.load_state_dict(torch.load(weights, map_location=torch.device(device)))
self.config = config
self.model = model
self.vocab = vocab
self.device = device
def predict(self, img, return_prob=False):
img = process_input(img, self.config['dataset']['image_height'],
self.config['dataset']['image_min_width'], self.config['dataset']['image_max_width'])
img = img.to(self.config['device'])
if self.config['predictor']['beamsearch']:
sent = translate_beam_search(img, self.model)
s = sent
prob = None
else:
s, prob = translate(img, self.model)
s = s[0].tolist()
prob = prob[0]
s = self.vocab.decode(s)
if return_prob:
return s, prob
else:
return s
def predict_batch(self, imgs, return_prob=False):
bucket = defaultdict(list)
bucket_idx = defaultdict(list)
bucket_pred = {}
sents, probs = [0]*len(imgs), [0]*len(imgs)
for i, img in enumerate(imgs):
img = process_input(img, self.config['dataset']['image_height'],
self.config['dataset']['image_min_width'], self.config['dataset']['image_max_width'])
bucket[img.shape[-1]].append(img)
bucket_idx[img.shape[-1]].append(i)
for k, batch in bucket.items():
batch = torch.cat(batch, 0).to(self.device)
s, prob = translate(batch, self.model)
prob = prob.tolist()
s = s.tolist()
s = self.vocab.batch_decode(s)
bucket_pred[k] = (s, prob)
for k in bucket_pred:
idx = bucket_idx[k]
sent, prob = bucket_pred[k]
for i, j in enumerate(idx):
sents[j] = sent[i]
probs[j] = prob[i]
if return_prob:
return sents, probs
else:
return sents
import torch
import numpy as np
import math
from PIL import Image
from torch.nn.functional import log_softmax, softmax
from vietocr.model.transformerocr import VietOCR
from vietocr.model.vocab import Vocab
from vietocr.model.beam import Beam
def batch_translate_beam_search(img, model, beam_size=4, candidates=1, max_seq_length=128, sos_token=1, eos_token=2):
# img: NxCxHxW
model.eval()
device = img.device
sents = []
with torch.no_grad():
src = model.cnn(img)
print(src.shap)
memories = model.transformer.forward_encoder(src)
for i in range(src.size(0)):
# memory = memories[:,i,:].repeat(1, beam_size, 1) # TxNxE
memory = model.transformer.get_memory(memories, i)
sent = beamsearch(memory, model, device, beam_size, candidates, max_seq_length, sos_token, eos_token)
sents.append(sent)
sents = np.asarray(sents)
return sents
def translate_beam_search(img, model, beam_size=4, candidates=1, max_seq_length=128, sos_token=1, eos_token=2):
# img: 1xCxHxW
model.eval()
device = img.device
with torch.no_grad():
src = model.cnn(img)
memory = model.transformer.forward_encoder(src) #TxNxE
sent = beamsearch(memory, model, device, beam_size, candidates, max_seq_length, sos_token, eos_token)
return sent
def beamsearch(memory, model, device, beam_size=4, candidates=1, max_seq_length=128, sos_token=1, eos_token=2):
# memory: Tx1xE
model.eval()
beam = Beam(beam_size=beam_size, min_length=0, n_top=candidates, ranker=None, start_token_id=sos_token, end_token_id=eos_token)
with torch.no_grad():
# memory = memory.repeat(1, beam_size, 1) # TxNxE
memory = model.transformer.expand_memory(memory, beam_size)
for _ in range(max_seq_length):
tgt_inp = beam.get_current_state().transpose(0,1).to(device) # TxN
decoder_outputs, memory = model.transformer.forward_decoder(tgt_inp, memory)
log_prob = log_softmax(decoder_outputs[:,-1, :].squeeze(0), dim=-1)
beam.advance(log_prob.cpu())
if beam.done():
break
scores, ks = beam.sort_finished(minimum=1)
hypothesises = []
for i, (times, k) in enumerate(ks[:candidates]):
hypothesis = beam.get_hypothesis(times, k)
hypothesises.append(hypothesis)
return [1] + [int(i) for i in hypothesises[0][:-1]]
def translate(img, model, max_seq_length=128, sos_token=1, eos_token=2):
"data: BxCXHxW"
model.eval()
device = img.device
with torch.no_grad():
src = model.cnn(img)
memory = model.transformer.forward_encoder(src)
translated_sentence = [[sos_token]*len(img)]
char_probs = [[1]*len(img)]
max_length = 0
while max_length <= max_seq_length and not all(np.any(np.asarray(translated_sentence).T==eos_token, axis=1)):
tgt_inp = torch.LongTensor(translated_sentence).to(device)
# output = model(img, tgt_inp, tgt_key_padding_mask=None)
# output = model.transformer(src, tgt_inp, tgt_key_padding_mask=None)
output, memory = model.transformer.forward_decoder(tgt_inp, memory)
output = softmax(output, dim=-1)
output = output.to('cpu')
values, indices = torch.topk(output, 5)
indices = indices[:, -1, 0]
indices = indices.tolist()
values = values[:, -1, 0]
values = values.tolist()
char_probs.append(values)
translated_sentence.append(indices)
max_length += 1
del output
translated_sentence = np.asarray(translated_sentence).T
char_probs = np.asarray(char_probs).T
char_probs = np.multiply(char_probs, translated_sentence>3)
char_probs = np.sum(char_probs, axis=-1)/(char_probs>0).sum(-1)
return translated_sentence, char_probs
def build_model(config):
vocab = Vocab(config['vocab'])
device = config['device']
model = VietOCR(len(vocab),
config['backbone'],
config['cnn'],
config['transformer'],
config['seq_modeling'])
model = model.to(device)
return model, vocab
def resize(w, h, expected_height, image_min_width, image_max_width):
new_w = int(expected_height * float(w) / float(h))
round_to = 10
new_w = math.ceil(new_w/round_to)*round_to
new_w = max(new_w, image_min_width)
new_w = min(new_w, image_max_width)
return new_w, expected_height
def process_image(image, image_height, image_min_width, image_max_width):
img = image.convert('RGB')
w, h = img.size
new_w, image_height = resize(w, h, image_height, image_min_width, image_max_width)
img = img.resize((new_w, image_height), Image.LANCZOS)
img = np.asarray(img).transpose(2,0, 1)
img = img/255
return img
def process_input(image, image_height, image_min_width, image_max_width):
img = process_image(image, image_height, image_min_width, image_max_width)
img = img[np.newaxis, ...]
img = torch.FloatTensor(img)
return img
def predict(filename, config):
img = Image.open(filename)
img = process_input(img)
img = img.to(config['device'])
model, vocab = build_model(config)
s = translate(img, model)[0].tolist()
s = vocab.decode(s)
return s
import os
import gdown
import yaml
import numpy as np
import uuid
import requests
import tempfile
from tqdm import tqdm
def download_weights(uri, cached=None, md5=None, quiet=False):
if uri.startswith('http'):
return download(url=uri, quiet=quiet)
return uri
def download(url, quiet=False):
tmp_dir = tempfile.gettempdir()
filename = url.split('/')[-1]
full_path = os.path.join(tmp_dir, filename)
if os.path.exists(full_path):
print('Model weight {} exsits. Ignore download!'.format(full_path))
return full_path
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(full_path, 'wb') as f:
for chunk in tqdm(r.iter_content(chunk_size=8192)):
# If you have chunk encoded response uncomment if
# and set chunk_size parameter to None.
#if chunk:
f.write(chunk)
return full_path
def download_config(id):
url = 'https://vocr.vn/data/vietocr/config/{}'.format(id)
r = requests.get(url)
config = yaml.safe_load(r.text)
return config
def compute_accuracy(ground_truth, predictions, mode='full_sequence'):
"""
Computes accuracy
:param ground_truth:
:param predictions:
:param display: Whether to print values to stdout
:param mode: if 'per_char' is selected then
single_label_accuracy = correct_predicted_char_nums_of_single_sample / single_label_char_nums
avg_label_accuracy = sum(single_label_accuracy) / label_nums
if 'full_sequence' is selected then
single_label_accuracy = 1 if the prediction result is exactly the same as label else 0
avg_label_accuracy = sum(single_label_accuracy) / label_nums
:return: avg_label_accuracy
"""
if mode == 'per_char':
accuracy = []
for index, label in enumerate(ground_truth):
prediction = predictions[index]
total_count = len(label)
correct_count = 0
try:
for i, tmp in enumerate(label):
if tmp == prediction[i]:
correct_count += 1
except IndexError:
continue
finally:
try:
accuracy.append(correct_count / total_count)
except ZeroDivisionError:
if len(prediction) == 0:
accuracy.append(1)
else:
accuracy.append(0)
avg_accuracy = np.mean(np.array(accuracy).astype(np.float32), axis=0)
elif mode == 'full_sequence':
try:
correct_count = 0
for index, label in enumerate(ground_truth):
prediction = predictions[index]
if prediction == label:
correct_count += 1
avg_accuracy = correct_count / len(ground_truth)
except ZeroDivisionError:
if not predictions:
avg_accuracy = 1
else:
avg_accuracy = 0
else:
raise NotImplementedError('Other accuracy compute mode has not been implemented')
return avg_accuracy
import argparse
from vietocr.model.trainer import Trainer
from vietocr.tool.config import Cfg
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--config', required=True, help='see example at ')
parser.add_argument('--checkpoint', required=False, help='your checkpoint')
args = parser.parse_args()
config = Cfg.load_config_from_file(args.config)
trainer = Trainer(config)
if args.checkpoint:
trainer.load_checkpoint(args.checkpoint)
trainer.train()
if __name__ == '__main__':
main()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment