Text Digitalization

Text Digitalization

Choices of OCR Engine

We will perform OCR on the images of Zhongjing quan shu (仲景全書) from the CUHK Digital Repository. Some free OCR engines like Pytesseract or PaddleOCR are tested but they have given unsatisfactory results in recognizing vertical ancient Chinese texts. Eventually, we have chosen two OCR engines, one from the Academia Sinica Center for Digital Cultures (ASCDC)1 and one from GJ.cool2.

ASCDC OCR

ASCDC provides a user-friendly interface (Figure 1) for OCR. Their OCR engine will recognise most of the words, with 91% accuracy. The engine will mark the uncertain words for us so that we can proofread them manually.

Figure 1: ASCDC OCR Interface

Apart from correcting the words, we also need to correct the bounding box position or reorder the text columns. If the bounding box overlaps, the text recognized will be confusing. For example, the text recognized in Figure 2 will turn from 「病脈」 to 「病不能」. For the oversize bounding box in Figure 3, the text recognized will be 「厚朴胖」. On the other hand, the number of columns is sometimes mismatched (Figure 4), which is not easily noticed. The correct column order should be 5-6-7, instead of 5-7-6.

Figure 2: Overlap bounding box
Figure 3: Oversize bounding box
Figure 4: Mismatch column number

The OCR result is shown in Figure 5. Note that the result starts with the image name, which contains the volume number and page number.

Figure 5: ASCDC OCR Result

Removal of Annotation

In the beginning, we passed the whole image to the ASCDC platform. Since the OCR engine sometimes mixes up the text in the top and side annotation (Figure 6) with the content, we decided to remove all the annotations programmatically using OpenCV.

Figure 6: Structure of the image
Figure 7: Cropped Image

Here is the process of removing annotations.

First, we have employed the Canny edge detection algorithm to mark all the edges in grey. The algorithm mainly finds the pixels where the colour changes rapidly around them. For example, on the boundary of each character, the colour changes from black to pale yellow. When the pixel is deep inside a character, all pixels around it are black and the color change is tiny. Therefore, only the edges of the characters are recognized. 

After obtaining the edge points, we could detect the line by HoughLines Transform. For each edge point, the algorithm will try all possible lines that pass it. If there is a line that passes through many points, we could conclude that this line exists in the image.

Finally, based on the boundary coordinates observed in 30 samples, the longest lines lying on the boundaries of the annotation are chosen. The image is cropped according to the boundary.

Figure 8: Detected edges
Figure 9: Detected lines
Figure 10: Longest lines on the boundary

GJ.cool OCR

GJ.cool has provided us with 10000 API quotas.  Their OCR engines perform well in recognising the parallel lines of Chinese annotations (Figure 11), so the OCR result (Figure 12) has more accurate line segmentation. The OCR engine only takes about 20 seconds to parse an image.

Figure 11: Parallel lines of annotations 雙行夾注
Figure 12: GJ.cool OCR Result

Python Code

It is highly recommended to view the code in a code editor.

Python Code for Calling GJ.cool API

First, we need to obtain the access token and refresh token at the login gateway. The lifetimes of the access token and refresh token are 1 hour and 1 week respectively.

# Get access token and refresh token

import requests, json

login_url = 'https://gj.cool/ocr_login'
apiid = ''
pw = ''
with open("../secrets.txt") as secret:
    contents = secret.readlines()
    apiid = contents[0].replace('\n', '')
    pw = contents[1].replace('\n', '')

if apiid and pw:
    print("start to login")
    with open("../gujicool_token.json", "wb+") as f:
        payload = {'apiid': apiid, 'password': pw}
        response = requests.post(login_url, data = payload).content
        f.write(response)

If the access token expires, we can get a new access token by using a valid refresh token. If both tokens expire, a new login is required.

import requests
import json
import certifi
import urllib3

tokenPath = "../gujicool_token.json"
refresh_url = "https://gj.cool/ocr_refresh"

def refresh():
    with open(tokenPath, "r") as f:
        tokens = json.load(f)
        refresh_token = tokens["refresh_token"]
        headers = {'Authorization': f"gjcool {refresh_token}", "Content-Type": "application/json"}
        http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=certifi.where(),headers=headers)
        refreshResponse = http.request('POST', refresh_url, headers=headers).data
        resJSON = json.loads(refreshResponse)

    if "access_token" in resJSON:
        new_access_token = resJSON["access_token"]
        tokens["access_token"] = new_access_token
        with open(tokenPath, "w") as f:
            json.dump(tokens, f)
        print("New access token saved")

    elif "msg" in resJSON:
        print("Refresh fails.")
        print(resJSON["msg"])
        
refresh()

By including the access token in the request header, we can call GJ.cool OCR API. To utilize the API quotas wisely, we will not pass the same image to OCR twice.

import re
import os
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
import requests, mimetypes
from pathlib import Path

def extract_numbers(filename):
    pattern = r'\d+'  # Matches one or more digits
    numbers = re.findall(pattern, filename)
    return tuple(map(int, numbers))

# return a list of image names that are not yet processed
def find_task(src_dir, result_dir):
    completedTasks = set()
    print(result_dir)
    for resultName in os.listdir(result_dir):
        resultName = resultName.replace("gujicool_", "")
        if resultName.lower().endswith(".json"):
            completedTasks.add(resultName.replace(".json", ".jpg"))
    allTasks = set()
    for srcName in os.listdir(src_dir):
        if srcName.lower().endswith(".jpg"):
            allTasks.add(srcName)
    remainingTask = list(allTasks - completedTasks)
    remainingTask.sort(key=lambda name: extract_numbers(name))
    return remainingTask

ocr_url = 'https://api.jzd.cool:9013/ocr_pro'
targetDir = "../all_crop_result/v1"
resultDir = "../gujicool_result/ocr_json"

# overWrite is for fine-tuning some images
def call_gujicool_api(resultDir, overWrite=False):
    with open(tokenPath, "r") as f:
        tokens = json.load(f)
        access_token = tokens["access_token"]
        headers = {'Authorization': f"gjcool {access_token}"}

        # Check if the path is a directory
        if os.path.isdir(targetDir):
            print(f"Processing files in directory: {targetDir}")

            if not overWrite:
                tasks = find_task(targetDir, resultDir)
            else:
                tasks = os.listdir(targetDir)
                tasks.sort(key=lambda name: extract_numbers(name))
    
            # Iterate over each file within the directory
            for img_full_name in tasks:
                img_path = os.path.join(targetDir, img_full_name)
    
                # Check if the file is a JPG file
                # since there are files like .DS_store
                if img_full_name.lower().endswith(".jpg"):
                    mime, _ = mimetypes.guess_type(img_path)
                    img_name = Path(img_path).stem
                    with open(img_path, 'rb') as img_file:
                        files = [('img', (img_name, img_file, mime))]
                        payload = {'layout':0, 'area':'[]', 'compact':1}
                                            
                        try:
                            response = requests.post(ocr_url, headers=headers, data=payload, files=files)
                            content = json.loads(response.content)
                            
                            # access_token expired
                            # after getting new access_token, retry
                            if response.status_code == 422 or ('msg' in content and content['msg'] == "token wrong"):
                                return 'Retry'
                                
                            if response.status_code != 200 or 'msg' in content:
                                print(f"ERROR {response}")
                                return 'ERROR'
    
                            with open(f"{resultDir}/gujicool_{img_name}.json", "w+") as res:
                                json.dump(content, res)
                                print(f"Received response for {img_name}.jpg, Used: {response.elapsed.total_seconds()} seconds")
    
                        except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
                            print(f"Request failed: {e}")
                            return 'Exception'
            print("Finish")
            return "Finish"

def gujicool_ocr(resultDir, overWrite=False):
    resultResponse = call_gujicool_api(resultDir, overWrite)
    if resultResponse == 'Retry':
        refresh()
        call_gujicool_api(resultDir, overWrite)

gujicool_ocr(srcDir, overWrite=True)

The GJ.cool OCR result is a json file. To format the result, we write the json to txt file.

import json
import os
import re

def extract_numbers(filename):
    pattern = r'\d+'  # Matches one or more digits
    numbers = re.findall(pattern, filename)
    return tuple(map(int, numbers))

gujicool_result_dirs = os.listdir("../gujicool_result/ocr_json")
gujicool_result_dirs.sort(key=lambda name: extract_numbers(name))

with open("../resultDir/gujicool_allResult.txt", "w+") as f:
    for resultPath in gujicool_result_dirs:
        if resultPath.lower().endswith(".json"):
            resultFullPath = os.path.join("../gujicool_result", resultPath)
            with open(resultFullPath, 'r') as sample:
                content = json.load(sample)
                chars = content['chars']
                f.write(f"======== {resultPath.replace('gujicool_', '')} ========\n\n")
                f.write("".join(chars) + "\n\n")

Python Code for Removing Annotation

import cv2
import numpy as np
import os
import re

# lines in type [[[x1, y1, x2, y2]], [[x1, y1, x2, y2]], ...]
# lines is a 3D array while line is a 2D array with only one element
def find_longest_horizontal_line(lines):
    longest_horizontal_line = None
    max_horizontal_length = 0
    for line in lines:
        x1, y1, x2, y2 = line[0]
        if y1 > 1500 or y1 < 800 or y2 > 1500 or y2 < 800:
            continue
        length = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)      
        # Check if the line is semi-horizontal and longer than the current longest line
        if abs(y2 - y1) < abs(x2 - x1) and length > max_horizontal_length:
            longest_horizontal_line = line[0]
            max_horizontal_length = length
    return longest_horizontal_line

def find_longest_vertical_line(lines, page):
    longest_vertical_line = None
    max_vertical_length = 0
    for line in lines:
        x1, y1, x2, y2 = line[0]
        if page % 2 == 0 and not (x1 > 2950 and x2 > 2950):
            continue
        elif page % 2 != 0 and not (100 <= x1 < 190 and 100 <= x2 < 190):
            continue
        length = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)      
        # Check if the line is semi-vertical and longer than the current longest line
        if abs(y2 - y1) > abs(x2 - x1) and length > max_vertical_length:
            longest_vertical_line = line[0]
            max_vertical_length = length
    return longest_vertical_line

def extractBody(image, longest_horizontal_line, longest_vertical_line, page, imagePath, errorLog):
    haveError = False
    if longest_horizontal_line is not None and longest_vertical_line is not None:
        cutY = longest_horizontal_line[1]
        cutX = longest_vertical_line[0]
        if page % 2 == 0:
            result = image[cutY:, :cutX] # note that y is put first in image slicing
        else:
            result = image[cutY:, cutX:]
        print(f"success")
    else:
        if longest_horizontal_line is not None:
            cutY = longest_horizontal_line[1]
            result = image[cutY:, :]
            print("semi-fail. No longest vertical line.")
            errorLog.write(f"{imagePath} semi-fail. Only found longest horizontal line.\n")
        elif longest_vertical_line is not None:
            cutX = longest_vertical_line[0]
            if page % 2 == 0:
                result = image[:, :cutX]
            else:
                result = image[:, cutX:]
            print("semi-fail. No longest horizontal line.")
            errorLog.write(f"{imagePath} semi-fail. Only found longest vertical line.\n")
        else:
            result = image
            print("fail. No longest line.")
            errorLog.write(f"{imagePath} fail. No longest line.\n")

        haveError = True
    return (result, haveError)

def extractAnnotation(image, longest_horizontal_line, page, imagePath, errorLog):
    haveError = False
    if longest_horizontal_line is not None:
        cutY = longest_horizontal_line[1]
        if page % 2 == 0:
            result = image[:cutY, :]
        else:
            result = image[:cutY, :]
        print(f"success")
    else:
        result = image
        print("fail. No longest line.")
        errorLog.write(f"{imagePath} fail. No longest horizontal line.\n")
        haveError = True
    return (result, haveError)

def ocr_crop(src_dir, dest_dir, getBody=True):
    if not (os.path.exists(src_dir) and os.path.exists(dest_dir)):
        print("ERROR: Please make sure both directories exist.")
        return
    
    def extract_numbers(filename):
        pattern = r'\d+'  # Matches one or more digits
        numbers = re.findall(pattern, filename)
        return tuple(map(int, numbers))

    img_list = os.listdir(src_dir)
    img_list.sort(key=lambda name: extract_numbers(name))
    
    print("start...")
    with open(f"{dest_dir}/errorLog.txt", "w+") as errorLog:
        errorCount = 0
        errorLog.write("Start to record errors...\n")
        for imagePath in img_list:
            imageFullPath = f"{src_dir}/{imagePath}"
            name = imagePath.split('.')[0]
            extension = imagePath.split('.')[-1]
            if len(name) == 0:
                errorLog.write(f"{imagePath} has empty name\n")
                errorCount += 1
                continue
            if extension not in ["png", "jpeg", "jpg"]:
                errorLog.write(f"{imagePath} is not supported\n")
                errorCount += 1
                continue
            volume, page = tuple(map(int, name.split('_')))
                
            print(f"{name} start, ", end='')
            image = cv2.imread(imageFullPath)

            # Convert to grayscale
            gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

            # edge pixels detection
            edges = cv2.Canny(gray, 50, 150)

            # Find lines using HoughLinesP
            lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=100, minLineLength=100, maxLineGap=10)

            if lines is None:
                print("fail. Cannot detect any lines in this image.")
                errorLog.write(f"{imagePath} fail. Cannot detect any lines in this image.\n")
                errorCount += 1
                cv2.imwrite(f"{dest_dir}/{name}.jpg", result) 
                continue
            
            longest_horizontal_line = find_longest_horizontal_line(lines)
            
            if getBody:
                longest_vertical_line = find_longest_vertical_line(lines, page)
                result, haveError = extractBody(image, longest_horizontal_line, longest_vertical_line, page, imagePath, errorLog)
            else:
                result, haveError = extractAnnotation(image, longest_horizontal_line, page, imagePath, errorLog)
            errorCount += 1 if haveError else 0

            cv2.imwrite(f"{dest_dir}/{name}.jpg", result) 
        errorLog.write(f"{errorCount} error(s) found.\n")
    print("end")

  1. 中央研究院文字辨識與校對平台 https://ocr.ascdc.tw/index.php ↩︎
  2. GJ.cool 古籍酷 https://ocr.gj.cool/ ↩︎