**Post-Processing Amazon Textract with Location-Aware Transformers**

# Optional Extras

> *This notebook works well with the `PyTorch 1.10 Python 3.8 CPU Optimized (Python 3)` kernel on SageMaker Studio - **different** from the others in the series*

This notebook discusses optional extra/alternative steps separate from the typical pipeline setup flow. You won't typically need to run these steps unless specifically guided, or you're digging deeper into customization.

## Common setup

First, as usual, we'll set up and import required libraries. You should run these cells regardless of which optional section(s) you're using:

The Hugging Face `datasets` and `transformers` installs here are used specifically for dataset preparation in the seq2seq section. If you have problems with these libraries and aren't tackling this section, you may be able to omit them. If you regularly need to install several custom libraries in Studio notebooks, refer to the [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-byoi-create.html) and [samples](https://github.com/aws-samples/sagemaker-studio-custom-image-samples) on building **Custom kernel images** for SageMaker.

In [None]:
!pip install amazon-textract-response-parser \
    "datasets>=2.4,<3" \
    "ipywidgets>=7,<8" \
    sagemaker-studio-image-build \
    "sagemaker>=2.87,<3" \
    "transformers>=4.25,<4.26"

In [None]:
%load_ext autoreload
%autoreload 2

# Python Built-Ins:
import json
from logging import getLogger
import os
import time

# External Dependencies:
import boto3  # General-purpose AWS SDK for Python
import numpy as np  # Matrix/math utilities
import pandas as pd  # Data table / dataframe utilities
import sagemaker  # High-level Python SDK for Amazon SageMaker

# Local Dependencies:
import util

logger = getLogger()

# Configuration:
bucket_name = sagemaker.Session().default_bucket()
bucket_prefix = "textract-transformers/"
print(f"Working in bucket s3://{bucket_name}/{bucket_prefix}")
config = util.project.init("ocr-transformers-demo")
print(config)

# AWS service clients:
smclient = boto3.client("sagemaker")
ssm = boto3.client("ssm")

## Contents

The sections of this notebook are independent:

- **[Manual thumbnail generator setup](#Manual-thumbnail-generator-setup)**: Customise online page thumbnail generation endpoint
- **[Optimise costs with endpoint auto-scaling](#Optimise-costs-with-endpoint-auto-scaling)**: Configure your SageMaker endpoint(s) to auto-scale based on incoming request volume
- **[Experimenting with alternative OCR engines](#Experimenting-with-alternative-OCR-engines)**: Substitute Amazon Textract with open-source OCR tools, for use with unsupported languages
- **[Exploring sequence-to-sequence models](#Exploring-sequence-to-sequence-models)**: Use generative models to automatically re-format detected fields and fix common OCR error patterns

---

## Manual thumbnail generator setup

> This section walks through manually building and configuring the endpoint to generate resized page thumbnail images in real time.
>
> You may find it useful if you want to customise the container image or script used by this process, or if you deployed your pipeline without thumbnailing support but want to experiment with image-based models from notebooks.
>
> ‚ö†Ô∏è **Note:** Deploying and registering a thumbnailing endpoint from the notebook will still not turn on thumbnail generation in a pipeline deployed without support for it. Instead, refer to your CDK app parameters to ensure the pipeline state machine gets updated to include a thumbnail generation step.

### Build and register custom container image

The tools we use to read PDF files aren't installed by default in the pre-built SageMaker containers and aren't `pip install`able, so the thumbnail generator will need a custom container image. We can derive a custom image from an existing AWS DLC serving container, to minimise boilerplate code because a SageMaker-compatible serving stack will already be included.

Because SageMaker Studio kernels are already containerized, you won't be able to run typical `docker build` commands you may be used to: So we'll use the [SageMaker Studio Image Build CLI](https://github.com/aws-samples/sagemaker-studio-image-build-cli) to build the image and store it in your account's [Amazon Elastic Container Registry (ECR)](https://aws.amazon.com/ecr/):

In [None]:
# Configurations:
preproc_ecr_repo_name = "sm-ocr-preproc"
preproc_ecr_image_tag = "pytorch-1.10-inf-cpu"

preproc_framework_version = "1.10"
preproc_py_version = "py38"

base_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=os.environ["AWS_REGION"],
    instance_type="ml.c5.xlarge",  # (Just used to check whether GPUs/accelerators are used)
    py_version=preproc_py_version,
    image_scope="inference",  # Inference base because we'll also deploy as an endpoint later
    version=preproc_framework_version,
)

# Combine together into the final URI (not needed for the build, but used later in the notebook):
account_id = sagemaker.Session().account_id()
region = os.environ["AWS_REGION"]
preproc_ecr_image_uri = "{}.dkr.ecr.{}.amazonaws.com/{}:{}".format(
    account_id, region, preproc_ecr_repo_name, preproc_ecr_image_tag
)
print(f"Will build to {preproc_ecr_image_uri}")

In [None]:
%%time
# (No need to re-run this cell if your image is already in ECR)

# Actually build & push the container image:
!cd custom-containers/preproc && sm-docker build . \
    --repository {ecr_repo_name}:{ecr_image_tag} \
    --role {config.sm_image_build_role} \
    --build-arg BASE_IMAGE={base_image_uri}

In [None]:
# Check from notebook whether the image was successfully created:
ecr = boto3.client("ecr")
imgs_desc = ecr.describe_images(
    registryId=account_id,
    repositoryName=preproc_ecr_repo_name,
    imageIds=[{"imageTag": preproc_ecr_image_tag}],
)
assert len(imgs_desc["imageDetails"]) > 0, "Couldn't find ECR image {} after build".format(
    preproc_ecr_image_uri
)

### Deploy and test the thumbnailer endpoint

Because the custom image is based on the standard SageMaker PyTorch inference container, our [preproc/preproc.py](preproc/preproc.py) script can [work with the existing serving stack](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#id3) by exposing custom `model_fn`, `input_fn`, `predict_fn`, and/or `output_fn` functions.

We'll bundle the scripts into a `.tar.gz` file in the format the PyTorch container expects: With inference code in a `code/` subfolder.

Normally this process (and the setting of the `SAGEMAKER_PROGRAM` and `SAGEMAKER_SUBMIT_DIRECTORY` environment variables) is handled automatically by the `PyTorchModel` - which allows "re-packing" the tarball from a training job to create a new tarball with new `source_dir` and `entry_point` scripts. In this case though, we don't need such a two-step process because there's no training artifact to start from and no actual "model" in this tarball - PyTorch or otherwise. Our script just defines code to extract and resize page images, and a dummy `model_fn` so the endpoint won't crash from failing to find a model.

In [None]:
# Compress the archive locally and list the compressed contents:
preproc_model_path = util.deployment.tar_as_inference_code("preproc", "data/preproc-model.tar.gz")
print(f"(Re)-created {preproc_model_path}")
!tar -ztvf {preproc_model_path}
print()

# Upload to S3:
preproc_model_key = "".join((
    bucket_prefix,
    "preproc-model/",
    util.uid.append_timestamp("model"),  # (Maintain history in S3)
    ".tar.gz"
))
preproc_model_s3uri = f"s3://{bucket_name}/{preproc_model_key}"
!aws s3 cp {preproc_model_path} {preproc_model_s3uri}

Once a `model.tar.gz` is available on S3, we're ready to create and deploy a SageMaker "Model" and Endpoint.

In [None]:
from sagemaker.pytorch import PyTorchModel

if config.thumbnails_callback_topic_arn.startswith("arn:"):
    async_notification_config = {
        "SuccessTopic": config.thumbnails_callback_topic_arn,
        "ErrorTopic": config.thumbnails_callback_topic_arn,
    }
else:
    logger.warning("Pipeline stack deployed without thumbnailing callback topic")
    async_notification_config = {}


class PatchedPyTorchModel(PyTorchModel):
    """Modified PyTorchModel to allow manually setting SM Script Mode environment vars

    See: https://github.com/aws/sagemaker-python-sdk/issues/3361
    """

    def prepare_container_def(self, *args, **kwargs):
        # Call the parent function:
        result = super().prepare_container_def(*args, **kwargs)
        # ...But allow our manual env vars configuration to override the internals:
        manual_env = dict(self.env)
        result["Environment"].update(manual_env)
        return result


preproc_model = PatchedPyTorchModel(
    name=util.uid.append_timestamp("ocr-thumbnail"),
    model_data=preproc_model_s3uri,
    entry_point=None,  # Set manually via tarball and SAGEMAKER_PROGRAM
    framework_version="1.10",
    py_version="py38",
    image_uri=preproc_ecr_image_uri,
    role=sagemaker.get_execution_role(),
    env={
        "PYTHONUNBUFFERED": "1",
        "SAGEMAKER_PROGRAM": "preproc.py",
        # TorchServe configurations for large payloads & slow inference:
        "TS_MAX_REQUEST_SIZE": str(100*1024*1024),  # 100MiB instead of default ~6.2MiB
        "TS_MAX_RESPONSE_SIZE": str(100*1024*1024),  # 100MiB instead of default ~6.2MiB
        "TS_DEFAULT_RESPONSE_TIMEOUT": str(60*15),  # 15mins instead of the default (60s maybe?)
    },
)

preproc_predictor = preproc_model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    async_inference_config=sagemaker.async_inference.AsyncInferenceConfig(
        output_path=f"s3://{config.model_results_bucket}/preproc",
        max_concurrent_invocations_per_instance=2,
        notification_config=async_notification_config,
    ),
)

This endpoint accepts images or documents and outputs resized page thumbnail images.

For multi-page documents the main output format is `application/x-npz`, which produces a [compressed numpy archive](https://numpy.org/doc/stable/reference/generated/numpy.savez_compressed.html#numpy.savez_compressed) in which `images` is an **array of images** each represented by **PNG bytes**. These formats require customizing the client (predictor) *serializer* and *deserializer* from the default for PyTorch. Since `Predictor` de/serializers set the `Content-Type` and `Accept` headers, we'll also need to re-configure the serializer whenever switching between input document types (for example PDF vs PNG).

To support potentially large documents, the preprocessor is deployed to an **asynchronous** endpoint which enables larger request and response payload sizes.

So how would it look to test the endpoint from Python? Let's see an example:

In [None]:
%%time

# Choose an input (document or image):
input_file = "data/raw/121 Financial Credit Union/Visa Credit Card Agreement.pdf"
#input_file = "data/imgs-clean/121 Financial Credit Union/Visa Credit Card Agreement-0001-1.png"

# Ensure de/serializers are correctly set up:
preproc_predictor.serializer = util.deployment.FileSerializer.from_filename(input_file)
preproc_predictor.deserializer = util.deployment.CompressedNumpyDeserializer()
# Duplication because of https://github.com/aws/sagemaker-python-sdk/issues/3100
preproc_predictor.predictor.serializer = preproc_predictor.serializer
preproc_predictor.predictor.deserializer = preproc_predictor.deserializer

# Run prediction:
print("Calling endpoint...")
resp = preproc_predictor.predict(input_file)
print(f"Got response of type {type(resp)}")

# Render result:
util.viz.draw_thumbnails_response(resp)

### Connect thumbnailer to the deployed processing pipeline

Once your thumbnailer endpoint is deployed and working, you can connect it into your document processing pipeline via SSM parameter configuration - just like the main enrichment model. This will only have an effect if your pipeline was already deployed with thumbnailing enabled, so the cell below will first check whether that seems to be the case.

In [None]:
if config.thumbnails_callback_topic_arn == "undefined":
    raise ValueError(
        "This pipeline CDK stack was deployed with thumbnailing disabled (by setting parameter "
        "use_thumbnails=False). Either redeploy the CDK stack with updated settings to enable "
        "thumbnailing, or continue without (and consider deleting the thumbnailing endpoint you "
        "created, to save unnecessary cost)."
    )

thumbnail_endpoint_name = preproc_predictor.endpoint_name
print(f"Configuring pipeline with thumbnailer: {thumbnail_endpoint_name}")

ssm.put_parameter(
    Name=config.thumbnail_endpoint_name_param,
    Overwrite=True,
    Value=thumbnail_endpoint_name,
)

### Clean up experimental models

Clean up any endpoints you created that are no longer required, to free up resources and avoid unnecessary ongoing costs. The below code demonstrates how to delete an endpoint, and its associated configuration & model records. you may also like to clean up the `preproc-model/` S3 folder to remove any old draft versions.

> ‚ö†Ô∏è **Note:** If you delete the active endpoint/model your deployed pipeline is configured to use for thumbnailing, your pipeline will fail to process new documents.

---

*[Back to contents](#Contents)*

## Optimise costs with endpoint auto-scaling

> This section demonstrates how you can enable and customise auto-scaling on your SageMaker endpoints to optimise resource use and cost.
>
> **Note:** For endpoints automatically deployed by the pipeline stack (such as the thumbnail generator), there are options available to configure this directly in CDK - which you may prefer.

SageMaker Async Inference endpoints support [auto-scaling down to zero instances](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference-autoscale.html) when not in use, which can provide significant cost-savings for use cases where document processing is occasional and the pipeline is often idle.

‚è∞ **However:** You should be aware that enabling scale-to-zero can introduce cold-start delays of **several minutes** if requests arrive when all instances backing your endpoint have been shut down.

### Setting up auto-scaling

You can configure auto-scaling for your endpoint(s) by first registering them with the [application auto-scaling service](https://docs.aws.amazon.com/autoscaling/application/userguide/what-is-application-auto-scaling.html) and then applying a scaling policy as shown in the following cells.

First, configure which SageMaker endpoint you want to auto-scale by name. SageMaker endpoints may be backed by multiple [variants](https://docs.aws.amazon.com/sagemaker/latest/dg/model-ab-testing.html) which can scale independently, but this sample only typically uses the default "AllTraffic" variant.

In [None]:
# For example, maybe you want to configure whichever enrichment model is currently in pipeline:
endpoint_name = ssm.get_parameter(
    Name=config.sagemaker_endpoint_name_param,
)["Parameter"]["Value"]

# Default variant name unless you know otherwise:
variant_name = "AllTraffic"

print(f"Configuring endpoint name:\n  {endpoint_name}")
print(f"Configuring variant name:\n  {variant_name}")

resource_id = f"endpoint/{endpoint_name}/variant/{variant_name}"
print(f"\nAuto-scaling resource ID:\n  {resource_id}")

In [None]:
endpoint_name = "ocr-thumbnail-2022-10-14-03-37-58-529"
variant_name = "AllTraffic"

From your endpoint and variant name, register a scalable target to configure overall limits:

In [None]:
appscaling = boto3.client("application-autoscaling")

# Define and register your endpoint variant
appscaling.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=0,  # (MinCapacity 0 not supported with real-time endpoints)
    MaxCapacity=5,
)
print(f"Endpoint registered with auto-scaling service: {endpoint_name}")

We can also list any scaling policies that may already be active on this resource:

In [None]:
appscaling.describe_scaling_policies(ResourceId=resource_id, ServiceNamespace="sagemaker")

As discussed in the [SageMaker Asynchronous Inference Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference-autoscale.html), the typical recommended scaling policy for asynchronous endpoints is to track a target on the number of queued requests per active instance - `ApproximateBacklogSizePerInstance`.

However, ‚ö†Ô∏è setting this target value `>=1.0` can yield **un-bounded latency** for single requests arriving when the endpoint has scaled off to 0 instances - because scale-out will not be triggered until a big enough queue has formed.

You can **combine multiple policies** to set up backlog target tracking but also ensure at least one instance gets started when any requests are in queue, using the alternative `HasBacklogWithoutCapacity` metric:

In [None]:
# Main backlog-per-instance target tracking policy:
scaling_policy_resp = appscaling.put_scaling_policy(
    PolicyName="BacklogTargetTracking",
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="TargetTrackingScaling",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue": 4.0,
        "CustomizedMetricSpecification": {
            "MetricName": "ApproximateBacklogSizePerInstance",
            "Namespace": "AWS/SageMaker",
            "Dimensions": [
                {"Name": "EndpointName", "Value": endpoint_name},
            ],
            "Statistic": "Average",
        },
        "ScaleInCooldown": 5 * 60,  # (seconds)
        "ScaleOutCooldown": 4 * 60,  # (seconds)
    },
)
print(f"Created/updated scaling policy ARN:\n{scaling_policy_resp['PolicyARN']}")

In [None]:
# Extra policy to ensure one-off requests get processed promptly:
scaling_policy_resp = appscaling.put_scaling_policy(
    PolicyName="BootstrapSingleRequests",
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="StepScaling",
    StepScalingPolicyConfiguration={
        "AdjustmentType": "ChangeInCapacity",
        "StepAdjustments": [{"MetricIntervalLowerBound": 1.0, "ScalingAdjustment": +1}],
        "Cooldown": 150,  # (Seconds)
        "MetricAggregationType": "Average",
    },
)
print(f"Created/updated scaling policy ARN:\n{scaling_policy_resp['PolicyARN']}")

Your endpoint should now be set up to auto-scale. Refer to the [Endpoints section of the SageMaker Console](https://console.aws.amazon.com/sagemaker/home?#/endpoints) on the detail page for your target endpoint to check.

### Disabling auto-scaling

If you'd like to de-register an endpoint from auto-scaling, you can delete attached policies and de-register the target as shown below:

In [None]:
policies = appscaling.describe_scaling_policies(
    ResourceId=resource_id,
    ServiceNamespace="sagemaker",
)["ScalingPolicies"]

print(f"Deleting scaling policies for {resource_id}:")
time.sleep(3)

for policy in policies:
    appscaling.delete_scaling_policy(
        PolicyName=policy["PolicyName"],
        ServiceNamespace=policy["ServiceNamespace"],
        ResourceId=policy["ResourceId"],
        ScalableDimension=policy["ScalableDimension"],
    )
    print(f" - {policy['PolicyName']}")
print("\nDone")

In [None]:
print(f"De-registering from auto-scaling:\n  {resource_id}")
time.sleep(3)

appscaling.deregister_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
)
print("Done")

---

*[Back to contents](#Contents)*

## Experimenting with alternative OCR engines

> This section demonstrates how to process a batch of documents using alternative, open-source-based OCR engines on Amazon SageMaker - in case you have a use case requiring languages not yet supported by Amazon Textract.

As detailed further in the [Customization Guide](../CUSTOMIZATION_GUIDE.md) - You can use alternative, open-source-based OCR engines with this solution if needed, by packaging them to produce Amazon Textract-compatible result formats and integrating them with the pipeline, for which we use Amazon SageMaker Asynchronous Inference for consistency with other steps.

### Deploy the alternative engine(s)

First, (re)-deploy your solution with the `BUILD_SM_OCRS` variable set, to create container image(s) and SageMaker model(s) for your chosen OCR engine(s).

Because resource tags are automatically added to these deployed models, you'll be able to look them up using the same name - by the code below. For example, `ocr_engine_name=tesseract` in the notebook assumes `BUILD_SM_OCRS=...,tesseract,...` at CDK deploy time:

In [None]:
ocr_engine_name = "tesseract"
ocr_model_desc = util.ocr.describe_sagemaker_ocr_model(ocr_engine_name)

print(f"Found OCR engine {ocr_engine_name}:\n  {ocr_model_desc['ModelName']}")

ocr_image_uri = ocr_model_desc["PrimaryContainer"]["Image"]
print(f"\nImage: {ocr_image_uri}")
ocr_environment = ocr_model_desc["PrimaryContainer"]["Environment"]
print(f"Environment variables:\n{ocr_environment}")

### Extract documents in batch

Just like with batch page image generation in notebook 1, we'll use a SageMaker Processing Job to run the work on a scalable cluster of instances. The input document locations are specified the same way as for page image generation, so the code below takes the whole corpus (S3 prefix) for simplicity.

> ‚è∞ If you'd like to select **just a subset of documents**, you can instead set `ocr_inputs` using the same manifest-based "OPTION 2" approach shown to set `preproc_inputs` in the *Extract clean input images* section of [Notebook 1](1.%20Data%20Preparation.ipynb).

In [None]:
from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput
from util.preproc import DummyFramework

# S3 input & output locations:
raw_s3uri = f"s3://{bucket_name}/{bucket_prefix}data/raw"
textract_s3uri = f"s3://{bucket_name}/{bucket_prefix}data/textracted"

# (Assuming whole corpus - see NB1 image pre-processing for manifest-based example)
ocr_inputs = [
    ProcessingInput(
        destination="/opt/ml/processing/input/raw",  # Expected input location, per our script
        input_name="raw",
        s3_data_distribution_type="ShardedByS3Key",  # Distribute between instances, if multiple
        source=raw_s3uri,  # S3 prefix for full raw document collection
    ),
]

After defining the input and output locations, and with our pre-prepared container image identified, we're ready to run the job.

> ‚è∞ In our tests, the provided Tesseract OCR integration took around 35 minutes on 5x `ml.c5.4xlarge` instances, to process the full ~2,500 document credit cards corpus for English and Thai.

In [None]:
%%time

processor = FrameworkProcessor(
    estimator_cls=DummyFramework,
    image_uri=ocr_image_uri,  # As created above
    framework_version=None,
    base_job_name="ocr-custom",
    role=sagemaker.get_execution_role(),
    instance_count=5,
    instance_type="ml.c5.4xlarge",
    volume_size_in_gb=16,
    max_runtime_in_seconds=60*60,
    env={
        "OMP_THREAD_LIMIT": "1",  # Optimize Tesseract parallelism for batch
        "PYTHONUNBUFFERED": "1",  # For debugging
        **ocr_environment,
        # Override defaults from the model env vars like this:
        "OCR_DEFAULT_LANGUAGES": "eng,tha",
    },
)

processor.run(
    code="ocr.py",  # OCR script
    source_dir="preproc",
    inputs=ocr_inputs[:],  # Either whole corpus or sample, as above
    outputs=[
        ProcessingOutput(
            destination=textract_s3uri,
            output_name="ocr",
            s3_upload_mode="Continuous",
            source="/opt/ml/processing/output/ocr",  # Output folder, per our script
        ),
    ],
    #logs=False,
    #wait=False,
)

Once the job is complete, you can crawl the results on Amazon S3 to build up an equivalent manifest ready for the next stage of data preparation:

In [None]:
# Given that raw docs live under some S3 prefix:
raw_s3uri_prefix = raw_s3uri

# ...And Amazon Textract results live under another:
textract_s3uri = f"s3://{bucket_name}/{bucket_prefix}data/textracted"


# ...And you can define a mapping from one to the other:
def doc_uri_to_textract_uri(doc_uri: str) -> str:
    if not doc_uri.startswith(raw_s3uri_prefix):
        raise ValueError(
            "Document S3 URI '%s' did not start with expected prefix: '%s'"
            % (doc_uri, raw_s3uri_prefix)
        )
    # Replace raw prefix with Textract prefix, and add "/consolidated.json" to filename:
    return textract_s3uri + doc_uri[len(raw_s3uri_prefix):] + "/consolidated.json"

# Then build up the combined manifest, checking existence for each result:
out_filename = "data/textracted-all-smocr.manifest.jsonl"
print(f"Building manifest: {out_filename} ...")
with open("data/raw-sample.manifest.jsonl") as fin:
    with open(out_filename, "w") as fout:
        for doc in (json.loads(line) for line in fin):
            textract_uri = doc_uri_to_textract_uri(doc["raw-ref"])
            if not util.s3.s3_object_exists(textract_uri):
                raise ValueError(
                    "Mapped OCR result URI does not exist in S3.\nFor: %s\nGot: %s"
                    % (doc["raw-ref"], textract_uri)
                )
            doc["textract-ref"] = textract_uri
            fout.write(json.dumps(doc) + "\n")
print("Done!")

### Integrate with the document pipeline

The above steps demonstrate how to process documents in batch with alternative, open-source OCR engines, to produce datasets ready for experimenting with multi-lingual model architectures like LayoutXLM. To actually deploy the alternative OCR into your document pipeline, use the `DEPLOY_SM_OCR` and `USE_SM_OCR` variables at CDK deployment. You'll likely want to update `OCR_DEFAULT_LANGUAGES` in [/pipeline/ocr/sagemaker_ocr.py](../pipeline/ocr/sagemaker_ocr.py) to align with your use case's language needs.

---

*[Back to contents](#Contents)*

## Exploring sequence-to-sequence models

> This section demonstrates training a (non-layout-aware) model that edits extracted text fields to normalize data types or fix common OCR error patterns.

Since the main flow of this solution focusses on "extractive" entity recognition models, you might reasonably wonder whether the same layout-aware ideas could be extended to "generative" models capable of actually editing the OCR'd text: For example to reformat fields or fix errors. The answer to this is **"probably yes, but..."**:

1. Care needs to be taken with large generative models to address bias and privacy concerns: For example will it be possible to extract sensitive or PII data the model was trained on, when it's deployed? Will it be biased to predicting certain patterns that aren't representative of your documents, or are representative on average but leave some user groups with consistently poorer service?
2. Published, pre-trained, layout-aware document models have most often provided a decoder-only stack to date: so finding pre-trained initial weights for a generative output module may be challenging. Due to their large size, training these modules from scratch could be resource-intensive.

Here we show a more basic approach to start realizing some of the same benefits: Pairing the layout-aware NER model **alongside text-only seq2seq models** to normalize and standardize extracted fields.

### Collect datasets

In this example we'll demonstrate **normalizing dates** to a consistent format. Text-to-text models can tackle this in a flexible, example-driven and statistics-oriented way. Although maximum achievable accuracy might sometimes be higher with rule-based approaches, we'll show how the ML-based approach can yield good performance quickly without needing to build lots of rules and parsing expressions.

This task can be tackled via **synthetic dataset generation**: randomly generating dates and input prompts, according to expected statistical distribution of your target data.

Run the cell below to generate a training and evaluation dataset. As shown in the preview, the data will include a wide range of source date formats but **also** support multiple different *target* formats - controllable via the first part of the prompt:

In [None]:
from src.code.data.seq2seq.date_normalization import generate_seq2seq_date_norm_dataset

rng = np.random.default_rng(42)
train_dataset = generate_seq2seq_date_norm_dataset(n=1000, rng=rng)
eval_dataset = generate_seq2seq_date_norm_dataset(n=200, rng=rng)

train_dataset.save_to_disk("data/seq2seq-train")
eval_dataset.save_to_disk("data/seq2seq-validation")

print("Dataset sample (top 15 records):")
pd.DataFrame(train_dataset[0:15])

As usual with SageMaker, once the datasets are prepared we'll stage them to Amazon S3 ready to use in model training:

In [None]:
train_s3uri = f"s3://{bucket_name}/{bucket_prefix}seq2seq/date-norm/train"
validation_s3uri = f"s3://{bucket_name}/{bucket_prefix}seq2seq/date-norm/validation"

In [None]:
!aws s3 sync --delete data/seq2seq-train {train_s3uri}
!aws s3 sync --delete data/seq2seq-validation {validation_s3uri}

### üß™ (Experimental) Training with annotated documents

If you annotated your documents using the **custom** SageMaker Ground Truth task UI in Notebook 1 (with OCR transcript reviews), instead of the default (bounding-box-only) UI, you should also be able to directly train the seq2seq model on your manually-annotated data.

To do this, set your `train`, `textract` and `validation` channels as shown in Notebook 2 instead of the synthetic/augmented dataset used below. The script will build seq2seq examples from your annotated entity types, raw OCR text, and corrected OCR texts - something like:

```json
{
    "src_texts": "Normalize Card Name: mycool Credit Card.",
    "tgt_texts": "MyCool Credit Card"
}
```

In the *Integrate with processing pipeline* section below, you'd then configure your normalization prompts to be of the format `Normalize {YourFieldLabel}: ` for each field where you wanted to turn the normalizing model on, instead of the `Convert dates...` prompt we use.

You'll probably find it easiest to run through this example with the generated date-normalization dataset first to understand the flow, before trying to use your SMGT annotations instead.

### Look up custom container images

The training and inference jobs in this section will use the same customized container images created in the main notebook series for model training and deployment (see [Notebook 2 Model Training](2.%20Model%20Training.ipynb)): so you need to have built those first.

The code below will check the container images are already prepared and staged in your account's Amazon Elastic Container Registry (ECR).

In [None]:
# Configurations:
train_repo_name = "sm-ocr-training"
train_repo_tag = "hf-4.26-pt-gpu"  # TODO: Check this matches your ECR repo name and tagging
inf_repo_name = "sm-ocr-inference"
inf_repo_tag = train_repo_tag

account_id = sagemaker.Session().account_id()
region = os.environ["AWS_REGION"]

# Combine together into the final URIs:
train_image_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{train_repo_name}:{train_repo_tag}"
print(f"Target training image: {train_image_uri}")
inf_image_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{inf_repo_name}:{inf_repo_tag}"
print(f"Target inference image: {inf_image_uri}")

# Check from notebook whether the images were successfully created:
ecr = boto3.client("ecr")
for repo, tag, uri in (
    (train_repo_name, train_repo_tag, train_image_uri),
    (inf_repo_name, inf_repo_tag, inf_image_uri)
):
    imgs_desc = ecr.describe_images(
        registryId=account_id,
        repositoryName=repo,
        imageIds=[{"imageTag": tag}],
    )
    assert len(imgs_desc["imageDetails"]) > 0, f"Couldn't find ECR image {uri} after build"
    print(f"Found {uri}")

### Train a model

With data prepared, model training is very similar to the setup from the main notebooks. Some key differences include:

- Setting `task_name: seq2seq` to indicate we're training a sequence-to-sequence model instead of the usual layout-aware `ner`.
- Choosing a text-only pre-trained base model compatible with text generation, in this case `t5-base`.
- Since the data is synthetic, we can easily generate quite a large dataset in comparison to the amount of training we want to run: So logging, evaluation, and model saving will be controlled in terms of number of training steps rather than number of epochs (passes through the whole dataset).

In [None]:
from sagemaker.huggingface.estimator import HuggingFace as HuggingFaceEstimator

hyperparameters = {
    "model_name_or_path": "google/byt5-base",
    "task_name": "seq2seq",
    "logging_steps": 100,
    "evaluation_strategy": "steps",
    "eval_steps": 250,  # (=Twice per epoch, at 1000 data points & batch size 2)
    # Only need to set do_eval when validation channel is not provided and want to generate:
    "do_eval": "1",
    "save_strategy": "steps",
    "save_steps": 250,
    "learning_rate": 1e-4,
    "per_device_train_batch_size": 2,
    "per_device_eval_batch_size": 4,
    "seed": 1337,

    "num_train_epochs": 5.01,  # Make sure the epoch==5.0 evaluation gets taken
    "early_stopping_patience": 4,
    "metric_for_best_model": "eval_acc",
    # "greater_is_better": "false",
    # Avoid filling up disk with too many saved model checkpoints:
    "save_total_limit": 10,
}

metric_definitions = [
    {"Name": "epoch", "Regex": util.training.get_hf_metric_regex("epoch")},
    {"Name": "learning_rate", "Regex": util.training.get_hf_metric_regex("learning_rate")},
    {"Name": "train:loss", "Regex": util.training.get_hf_metric_regex("loss")},
    {
        "Name": "validation:n_examples",
        "Regex": util.training.get_hf_metric_regex("eval_n_examples"),
    },
    {"Name": "validation:loss_avg", "Regex": util.training.get_hf_metric_regex("eval_loss")},
    {"Name": "validation:acc", "Regex": util.training.get_hf_metric_regex("eval_acc")},
]

estimator = HuggingFaceEstimator(
    role=sagemaker.get_execution_role(),
    entry_point="train.py",
    source_dir="src",
    py_version=None,
    pytorch_version=None,
    transformers_version=None,
    image_uri=train_image_uri,  # Use the customized training container image

    base_job_name="byt5-datenorm",
    output_path=f"s3://{bucket_name}/{bucket_prefix}trainjobs",

    instance_type="ml.p3.2xlarge",  # t5-base fits on ml.g4dn.xlarge GPU, but not byt5-base
    instance_count=1,
    volume_size=80,

    debugger_hook_config=False,

    hyperparameters=hyperparameters,
    metric_definitions=metric_definitions,
    environment={
        # Required for our custom dataset loading code (which depends on tokenizer):
        "TOKENIZERS_PARALLELISM": "false",
    },
)

There is no `textract` input data channel for this job, as both the `training` and `validation` datasets simply provide plain text.

Run the cell below to kick off the job and view logs.

> ‚è∞ In our tests, the training took about 30 minutes to complete on an `ml.g4dn.xlarge` instance in default configuration

In [None]:
inputs = {
    "train": train_s3uri,
    "validation": validation_s3uri,
}

estimator.fit(inputs)

Once the training is complete, you have a model ready to normalize detected dates to specific target formats.

As discussed in the main solution notebooks, you can also 'attach' the notebook to previously-completed training jobs as shown below:

In [None]:
#estimator = HuggingFaceEstimator.attach("t5-datenorm-2023-01-09-12-19-12-377")

### Deploy for inference

Model deployment is similar to the entity recognition and other models shown in this solution. Note that for this endpoint we'll set up a [real-time inference endpoint](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints.html) (not specifying an `async_inference_config` as with some other examples), and use a separate [inference_seq2seq.py](src/inference_seq2seq.py) entrypoint because the handling logic is quite different from standard `inference.py` models that consume Amazon Textract JSON.

In [None]:
from sagemaker.huggingface import HuggingFaceModel

# Look up the model artifact location from the training job:
training_job_desc = estimator.latest_training_job.describe()
model_s3uri = training_job_desc["ModelArtifacts"]["S3ModelArtifacts"]
model_name = training_job_desc["TrainingJobName"]

# Make sure we don't accidentally re-use same model:
try:
    smclient.delete_model(ModelName=model_name)
    print(f"Deleted existing model {model_name}")
except smclient.exceptions.ClientError as e:
    if not (
        e.response["Error"]["Code"] in (404, "404")
        or e.response["Error"].get("Message", "").startswith("Could not find model")
    ):
        raise e

model = HuggingFaceModel(
    name=model_name,
    model_data=model_s3uri,
    role=sagemaker.get_execution_role(),
    source_dir="src/",
    entry_point="inference_seq2seq.py",
    py_version=None,
    pytorch_version=None,
    transformers_version=None,
    image_uri=inf_image_uri,
    env={
        "PYTHONUNBUFFERED": "1",  # TODO: Disable once debugging is done
    },
)

In [None]:
# Delete previous endpoint, if already in use:
try:
    predictor.delete_endpoint(delete_endpoint_config=True)
    print("Deleting previous endpoint...")
    time.sleep(8)
except (NameError, smclient.exceptions.ResourceNotFound):
    pass  # No existing endpoint to delete
except smclient.exceptions.ClientError as e:
    if "Could not find" not in e.response["Error"].get("Message", ""):
        raise e

print("Deploying model...")
predictor = model.deploy(
    endpoint_name=training_job_desc["TrainingJobName"],
    initial_instance_count=1,
    instance_type="ml.m5.large",
    serializer=sagemaker.serializers.JSONSerializer(),
    deserializer=sagemaker.deserializers.JSONDeserializer(),
)
print("\nDone!")

### Validate the endpoint

Once the model is deployed, we can run (some or all of) the evaluation dataset through it to validate performance - as shown below.

> ‚è∞ In our tests, it took about a minute to run the full evaluation dataset through the model. For a faster turnaround, you could process just the first N samples of the dataset by instead running e.g. `eval_results = eval_dataset.select(range(N)).map(...`

In [None]:
import datasets

eval_dataset = datasets.load_from_disk("data/seq2seq-validation")


def predict_batch(batch):
    """Run a dataset batch through the SageMaker endpoint and check per-example correctness"""
    input_texts = batch["src_texts"]
    result = predictor.predict({"inputs": input_texts})
    result["correct"] = [
        gen == batch["tgt_texts"][ix] for ix, gen in enumerate(result["generated_text"])
    ]
    return {**batch, **result}


eval_results = eval_dataset.map(
    predict_batch,
    desc="Running inference",
    batched=True,
    batch_size=16,
)

Below we measure overall "accuracy" on this evaluation set and print out some examples, to demonstrate performance:

In [None]:
# Calculate overall accuracy:
n_correct = sum(eval_results["correct"])
n_total = len(eval_results)
print(
    "{} of {} samples correct.\n  Overall accuracy: {:.2%}".format(
        n_correct, n_total, n_correct / n_total
    )
)

# Present some examples from the dataset:
pd.DataFrame(eval_results)

As shown above, this text-to-text model can take in a raw detected date mention (e.g. `Sunday Dec 31st 2000`) with a prompt prefix (e.g. `Convert dates to YYYY-MM-DD: `) and attempt to output the desired normalized format (e.g. `2000-12-31`).

Note that the "overall accuracy" metric reported above should match with the `eval_acc` metric emitted by the training job, since the same validation dataset is used.

### Integrate with processing pipeline

So how can such a field normalizing model be integrated with the overall document processing pipeline?

In fact, the **post-processing Lambda function** invoked after our entity recognition model to extract and consolidate entities, is able to call out to additional "normalizing" models where required.

These are configured through the same **entity/field type configuration** we originally set up for the pipeline in [Notebook 1 (Data Preparation)](1.%20Data%20Preparation.ipynb).

First, we can load up the current pipeline entity configuration:

In [None]:
print("Loading current pipeline field configuration...")
# Load JSON text from AWS SSM Parameter Store:
# (If this fails, you could also try reading from data/field-config.json)
fields_json = ssm.get_parameter(Name=config.entity_config_param)["Parameter"]["Value"]
# Parse the JSON into Python config classes:
fields = [
    util.postproc.config.FieldConfiguration.from_dict(cfg)
    for cfg in json.loads(fields_json)
]
print("Done")

Next, find any entity type that looks like a date (any with 'date' in the name), and configure the normalizer for those fields:

> ‚ö†Ô∏è **Note:** Check the way you prompt your normalization model matches how it was trained, for good results!

In [None]:
for f in fields:
    if "date" in f.name.lower():
        print(f"Found date field: {f.name}")
        f.normalizer_endpoint = predictor.endpoint_name
        print(f"  - Setting normalizer_endpoint = '{f.normalizer_endpoint}'")
        f.normalizer_prompt = "Convert dates to YYYY-MM-DD: "
        print(f"  - Setting normalizer_prompt = '{f.normalizer_prompt}'")

When you're happy with the updated field configuration, you can run the below to update the pipeline parameter:

You may also like to check these updates in the [AWS Systems Manager Parameter Store console](https://console.aws.amazon.com/systems-manager/parameters/?&tab=Table).

In [None]:
print("Saving new field configuration locally...")
with open("data/field-config.json", "w") as f:
    f.write(json.dumps(
        [cfg.to_dict() for cfg in fields],
        indent=2,
    ))

print("Uploading new field configuration to pipeline...")
pipeline_entity_config = json.dumps([f.to_dict(omit=["annotation_guidance"]) for f in fields], indent=2)
ssm.put_parameter(
    Name=config.entity_config_param,
    Overwrite=True,
    Value=pipeline_entity_config,
)

After updating your pipeline's field configuration SSM parameter to set `normalizer_endpoint` and `normalizer_prompt` on your target entity types, your pipeline's Post-processing Lambda function should automatically start calling your SageMaker model endpoints to normalize mentions on the relevant fields. For example with the Credit Card Agreements sample data, you should see that `Agreement Effective Date` results start to show in `YYYY-MM-DD` format instead of the document's source format, when reviewing results in Amazon A2I or the Step Functions console.

> ‚ö†Ô∏è **Note:** There may be a few minutes' delay before normalization starts to take effect, if your post-processing Lambda is configured to cache the SSM configuration. Check your AWS Lambda logs for error messages, in case normalization model calls are failing.

This example of normalizing individual extracted date fields is just one option in a spectrum of ways you could combine generative and extractive models for document understanding. For example, you could:

- Train additional normalization types, for example for other data types or to fix common OCR error patterns
- Include more context from around the original mention, to help the model perform better (such as interpreting whether a raw date is likely to be DD/MM or MM/DD given other information)
- Explore linking generative and layout-aware aspects into one end-to-end trainable model

---

*[Back to contents](#Contents)*