482 lines
17 KiB
Python
482 lines
17 KiB
Python
"""
|
||
Fusion Accounting - Document AI / OCR Extraction Engine
|
||
|
||
Provides a pluggable OCR back-end that can extract text from scanned
|
||
invoices, receipts, and other accounting documents. Three providers are
|
||
supported out-of-the-box:
|
||
|
||
* **Tesseract** – runs locally via pytesseract (no cloud calls).
|
||
* **Google Cloud Vision** – calls the Vision API v1 TEXT_DETECTION endpoint.
|
||
* **Azure AI Document Intelligence** – calls the Azure prebuilt-invoice
|
||
layout model.
|
||
|
||
Each company may configure one or more extractor records and switch
|
||
between them freely.
|
||
|
||
Original implementation by Nexa Systems Inc.
|
||
"""
|
||
|
||
import base64
|
||
import io
|
||
import json
|
||
import logging
|
||
|
||
import requests
|
||
|
||
from odoo import api, fields, models, _
|
||
from odoo.exceptions import UserError, ValidationError
|
||
|
||
_log = logging.getLogger(__name__)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Optional imports – gracefully degrade when libs are absent so the module
|
||
# can still be installed (the user simply won't be able to use Tesseract).
|
||
# ---------------------------------------------------------------------------
|
||
try:
|
||
from PIL import Image # noqa: F401
|
||
_PILLOW_AVAILABLE = True
|
||
except ImportError:
|
||
_PILLOW_AVAILABLE = False
|
||
|
||
try:
|
||
import pytesseract # noqa: F401
|
||
_TESSERACT_AVAILABLE = True
|
||
except ImportError:
|
||
_TESSERACT_AVAILABLE = False
|
||
|
||
|
||
class FusionDocumentExtractor(models.Model):
|
||
"""
|
||
Configurable OCR / AI extraction back-end.
|
||
|
||
Each record represents a single provider configuration. The
|
||
:meth:`extract_fields` entry-point dispatches to the appropriate
|
||
private method based on the selected *provider*.
|
||
"""
|
||
|
||
_name = "fusion.document.extractor"
|
||
_description = "Document AI Extraction Provider"
|
||
_order = "sequence, id"
|
||
|
||
# ------------------------------------------------------------------
|
||
# Fields
|
||
# ------------------------------------------------------------------
|
||
name = fields.Char(
|
||
string="Name",
|
||
required=True,
|
||
help="A human-readable label for this extractor (e.g. 'Production Tesseract').",
|
||
)
|
||
sequence = fields.Integer(
|
||
string="Sequence",
|
||
default=10,
|
||
help="Lower numbers appear first when multiple extractors exist.",
|
||
)
|
||
provider = fields.Selection(
|
||
selection=[
|
||
("tesseract", "Tesseract (Local)"),
|
||
("google_vision", "Google Cloud Vision"),
|
||
("azure_ai", "Azure AI Document Intelligence"),
|
||
],
|
||
string="Provider",
|
||
required=True,
|
||
default="tesseract",
|
||
help=(
|
||
"The OCR engine to use.\n\n"
|
||
"• Tesseract – free, runs locally; requires pytesseract + Tesseract binary.\n"
|
||
"• Google Cloud Vision – cloud API; requires a service-account JSON key.\n"
|
||
"• Azure AI Document Intelligence – cloud API; requires endpoint + key."
|
||
),
|
||
)
|
||
api_key = fields.Char(
|
||
string="API Key / Credentials",
|
||
groups="base.group_system",
|
||
help=(
|
||
"For Google Vision: paste the full service-account JSON key.\n"
|
||
"For Azure AI: paste the subscription key.\n"
|
||
"Not used for Tesseract."
|
||
),
|
||
)
|
||
api_endpoint = fields.Char(
|
||
string="API Endpoint",
|
||
help=(
|
||
"For Azure AI: the resource endpoint URL "
|
||
"(e.g. https://<resource>.cognitiveservices.azure.com).\n"
|
||
"Not used for Tesseract or Google Vision."
|
||
),
|
||
)
|
||
tesseract_lang = fields.Char(
|
||
string="Tesseract Language",
|
||
default="eng",
|
||
help="Tesseract language code(s), e.g. 'eng', 'fra+eng'. Ignored for cloud providers.",
|
||
)
|
||
is_active = fields.Boolean(
|
||
string="Active",
|
||
default=True,
|
||
help="Inactive extractors are hidden from selection lists.",
|
||
)
|
||
company_id = fields.Many2one(
|
||
comodel_name="res.company",
|
||
string="Company",
|
||
default=lambda self: self.env.company,
|
||
help="Restrict this extractor to a single company, or leave blank for all.",
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Constraints
|
||
# ------------------------------------------------------------------
|
||
@api.constrains("provider", "api_key")
|
||
def _check_api_key_for_cloud_providers(self):
|
||
"""Ensure cloud providers have credentials configured."""
|
||
for rec in self:
|
||
if rec.provider in ("google_vision", "azure_ai") and not rec.api_key:
|
||
raise ValidationError(
|
||
_("An API key is required for the '%s' provider.", rec.get_provider_label())
|
||
)
|
||
|
||
@api.constrains("provider", "api_endpoint")
|
||
def _check_endpoint_for_azure(self):
|
||
"""Azure AI requires an explicit endpoint URL."""
|
||
for rec in self:
|
||
if rec.provider == "azure_ai" and not rec.api_endpoint:
|
||
raise ValidationError(
|
||
_("An API endpoint URL is required for Azure AI Document Intelligence.")
|
||
)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Helpers
|
||
# ------------------------------------------------------------------
|
||
def get_provider_label(self):
|
||
"""Return the human-readable label for the current provider selection."""
|
||
self.ensure_one()
|
||
return dict(self._fields["provider"].selection).get(self.provider, self.provider)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Public API
|
||
# ------------------------------------------------------------------
|
||
def extract_fields(self, image_bytes, document_type="invoice"):
|
||
"""Run OCR on *image_bytes* and return a dict of extracted fields.
|
||
|
||
Args:
|
||
image_bytes (bytes): Raw bytes of a PDF page or image file.
|
||
document_type (str): Hint for the extraction engine
|
||
(``'invoice'``, ``'receipt'``, ``'credit_note'``).
|
||
|
||
Returns:
|
||
dict: Extracted data with at least the key ``'raw_text'``
|
||
(the full OCR output) and provider-specific structured
|
||
fields when available.
|
||
|
||
Raises:
|
||
UserError: When the selected provider cannot be used (missing
|
||
library, bad credentials, …).
|
||
"""
|
||
self.ensure_one()
|
||
_log.info(
|
||
"Fusion OCR: extracting from %d bytes via '%s' (doc_type=%s)",
|
||
len(image_bytes), self.provider, document_type,
|
||
)
|
||
|
||
dispatch = {
|
||
"tesseract": self._extract_via_tesseract,
|
||
"google_vision": self._extract_via_google_vision,
|
||
"azure_ai": self._extract_via_azure_ai,
|
||
}
|
||
handler = dispatch.get(self.provider)
|
||
if not handler:
|
||
raise UserError(_("Unknown extraction provider: %s", self.provider))
|
||
|
||
result = handler(image_bytes, document_type=document_type)
|
||
|
||
# Guarantee a 'raw_text' key exists
|
||
result.setdefault("raw_text", "")
|
||
result["provider"] = self.provider
|
||
return result
|
||
|
||
# ------------------------------------------------------------------
|
||
# Provider: Tesseract (local)
|
||
# ------------------------------------------------------------------
|
||
def _extract_via_tesseract(self, image_bytes, **kwargs):
|
||
"""Extract text locally using Tesseract OCR.
|
||
|
||
Converts the input bytes to a PIL Image, then calls
|
||
``pytesseract.image_to_string``. PDF inputs are converted
|
||
to images via Pillow first.
|
||
|
||
Args:
|
||
image_bytes (bytes): Raw image or PDF bytes.
|
||
|
||
Returns:
|
||
dict: ``{'raw_text': <str>}``
|
||
"""
|
||
self.ensure_one()
|
||
if not _PILLOW_AVAILABLE:
|
||
raise UserError(
|
||
_("The Pillow library is required for Tesseract OCR. "
|
||
"Install it with: pip install Pillow")
|
||
)
|
||
if not _TESSERACT_AVAILABLE:
|
||
raise UserError(
|
||
_("The pytesseract library is required for local OCR. "
|
||
"Install it with: pip install pytesseract")
|
||
)
|
||
|
||
try:
|
||
image = Image.open(io.BytesIO(image_bytes))
|
||
except Exception as exc:
|
||
raise UserError(
|
||
_("Could not open the attachment as an image: %s", str(exc))
|
||
) from exc
|
||
|
||
lang = self.tesseract_lang or "eng"
|
||
try:
|
||
raw_text = pytesseract.image_to_string(image, lang=lang)
|
||
except Exception as exc:
|
||
_log.exception("Fusion OCR – Tesseract failed")
|
||
raise UserError(
|
||
_("Tesseract OCR failed: %s", str(exc))
|
||
) from exc
|
||
|
||
return {"raw_text": raw_text}
|
||
|
||
# ------------------------------------------------------------------
|
||
# Provider: Google Cloud Vision
|
||
# ------------------------------------------------------------------
|
||
def _extract_via_google_vision(self, image_bytes, **kwargs):
|
||
"""Call Google Cloud Vision API TEXT_DETECTION.
|
||
|
||
The *api_key* field is expected to contain either:
|
||
* A plain API key (simple authentication), or
|
||
* A full service-account JSON (used for OAuth – **not yet
|
||
implemented**; for now we use the key-based endpoint).
|
||
|
||
Args:
|
||
image_bytes (bytes): Raw image bytes (PNG / JPEG / TIFF / PDF).
|
||
|
||
Returns:
|
||
dict: ``{'raw_text': <str>, 'annotations': <list>}``
|
||
"""
|
||
self.ensure_one()
|
||
url = (
|
||
"https://vision.googleapis.com/v1/images:annotate"
|
||
f"?key={self.api_key}"
|
||
)
|
||
encoded = base64.b64encode(image_bytes).decode("ascii")
|
||
payload = {
|
||
"requests": [
|
||
{
|
||
"image": {"content": encoded},
|
||
"features": [{"type": "TEXT_DETECTION"}],
|
||
}
|
||
]
|
||
}
|
||
|
||
try:
|
||
resp = requests.post(url, json=payload, timeout=60)
|
||
resp.raise_for_status()
|
||
except requests.RequestException as exc:
|
||
_log.exception("Fusion OCR – Google Vision API request failed")
|
||
raise UserError(
|
||
_("Google Cloud Vision request failed: %s", str(exc))
|
||
) from exc
|
||
|
||
data = resp.json()
|
||
responses = data.get("responses", [{}])
|
||
annotations = responses[0].get("textAnnotations", [])
|
||
raw_text = annotations[0].get("description", "") if annotations else ""
|
||
|
||
return {
|
||
"raw_text": raw_text,
|
||
"annotations": annotations,
|
||
}
|
||
|
||
# ------------------------------------------------------------------
|
||
# Provider: Azure AI Document Intelligence
|
||
# ------------------------------------------------------------------
|
||
def _extract_via_azure_ai(self, image_bytes, document_type="invoice", **kwargs):
|
||
"""Call Azure AI Document Intelligence (formerly Form Recognizer).
|
||
|
||
Uses the **prebuilt-invoice** model for invoices and falls back
|
||
to **prebuilt-read** for generic documents.
|
||
|
||
Args:
|
||
image_bytes (bytes): Raw document bytes.
|
||
document_type (str): ``'invoice'`` selects the prebuilt-invoice
|
||
model; anything else uses prebuilt-read.
|
||
|
||
Returns:
|
||
dict: ``{'raw_text': <str>, 'fields': <dict>, 'pages': <list>}``
|
||
"""
|
||
self.ensure_one()
|
||
endpoint = self.api_endpoint.rstrip("/")
|
||
model_id = "prebuilt-invoice" if document_type == "invoice" else "prebuilt-read"
|
||
analyze_url = (
|
||
f"{endpoint}/formrecognizer/documentModels/{model_id}:analyze"
|
||
"?api-version=2023-07-31"
|
||
)
|
||
|
||
headers = {
|
||
"Ocp-Apim-Subscription-Key": self.api_key,
|
||
"Content-Type": "application/octet-stream",
|
||
}
|
||
|
||
# Step 1 – submit the document for analysis
|
||
try:
|
||
resp = requests.post(
|
||
analyze_url, headers=headers, data=image_bytes, timeout=60,
|
||
)
|
||
resp.raise_for_status()
|
||
except requests.RequestException as exc:
|
||
_log.exception("Fusion OCR – Azure AI submit failed")
|
||
raise UserError(
|
||
_("Azure AI Document Intelligence request failed: %s", str(exc))
|
||
) from exc
|
||
|
||
operation_url = resp.headers.get("Operation-Location")
|
||
if not operation_url:
|
||
raise UserError(
|
||
_("Azure AI did not return an Operation-Location header.")
|
||
)
|
||
|
||
# Step 2 – poll for results (max ~60 s)
|
||
import time
|
||
poll_headers = {"Ocp-Apim-Subscription-Key": self.api_key}
|
||
result_data = {}
|
||
for _attempt in range(30):
|
||
time.sleep(2)
|
||
try:
|
||
poll_resp = requests.get(
|
||
operation_url, headers=poll_headers, timeout=30,
|
||
)
|
||
poll_resp.raise_for_status()
|
||
result_data = poll_resp.json()
|
||
except requests.RequestException as exc:
|
||
_log.warning("Fusion OCR – Azure AI poll attempt failed: %s", exc)
|
||
continue
|
||
status = result_data.get("status", "")
|
||
if status == "succeeded":
|
||
break
|
||
if status == "failed":
|
||
error_detail = result_data.get("error", {}).get("message", "Unknown error")
|
||
raise UserError(
|
||
_("Azure AI analysis failed: %s", error_detail)
|
||
)
|
||
else:
|
||
raise UserError(
|
||
_("Azure AI analysis did not complete within the timeout window.")
|
||
)
|
||
|
||
# Step 3 – parse the result
|
||
analyze_result = result_data.get("analyzeResult", {})
|
||
raw_text = analyze_result.get("content", "")
|
||
extracted_fields = {}
|
||
pages = analyze_result.get("pages", [])
|
||
|
||
# Parse structured invoice fields when available
|
||
documents = analyze_result.get("documents", [])
|
||
if documents:
|
||
doc_fields = documents[0].get("fields", {})
|
||
extracted_fields = self._parse_azure_invoice_fields(doc_fields)
|
||
|
||
return {
|
||
"raw_text": raw_text,
|
||
"fields": extracted_fields,
|
||
"pages": pages,
|
||
}
|
||
|
||
@api.model
|
||
def _parse_azure_invoice_fields(self, doc_fields):
|
||
"""Convert Azure's structured field map into a flat dict.
|
||
|
||
Args:
|
||
doc_fields (dict): The ``documents[0].fields`` portion of
|
||
an Azure analyzeResult response.
|
||
|
||
Returns:
|
||
dict: Normalized field names → values.
|
||
"""
|
||
def _val(field_dict):
|
||
"""Extract the 'content' or 'valueString' from an Azure field."""
|
||
if not field_dict:
|
||
return None
|
||
return (
|
||
field_dict.get("valueString")
|
||
or field_dict.get("valueDate")
|
||
or field_dict.get("valueNumber")
|
||
or field_dict.get("content")
|
||
)
|
||
|
||
mapping = {
|
||
"vendor_name": "VendorName",
|
||
"vendor_address": "VendorAddress",
|
||
"invoice_number": "InvoiceId",
|
||
"invoice_date": "InvoiceDate",
|
||
"due_date": "DueDate",
|
||
"total_amount": "InvoiceTotal",
|
||
"subtotal": "SubTotal",
|
||
"tax_amount": "TotalTax",
|
||
"currency": "CurrencyCode",
|
||
"purchase_order": "PurchaseOrder",
|
||
"customer_name": "CustomerName",
|
||
}
|
||
|
||
result = {}
|
||
for local_key, azure_key in mapping.items():
|
||
result[local_key] = _val(doc_fields.get(azure_key))
|
||
|
||
# Line items
|
||
items_field = doc_fields.get("Items")
|
||
if items_field and items_field.get("valueArray"):
|
||
lines = []
|
||
for item in items_field["valueArray"]:
|
||
item_fields = item.get("valueObject", {})
|
||
lines.append({
|
||
"description": _val(item_fields.get("Description")),
|
||
"quantity": _val(item_fields.get("Quantity")),
|
||
"unit_price": _val(item_fields.get("UnitPrice")),
|
||
"amount": _val(item_fields.get("Amount")),
|
||
"tax": _val(item_fields.get("Tax")),
|
||
})
|
||
result["line_items"] = lines
|
||
|
||
return result
|
||
|
||
# ------------------------------------------------------------------
|
||
# Actions
|
||
# ------------------------------------------------------------------
|
||
def action_test_connection(self):
|
||
"""Quick connectivity / credential check for the configured provider.
|
||
|
||
Creates a tiny white image, sends it through the extraction
|
||
pipeline, and reports success or failure via a notification.
|
||
"""
|
||
self.ensure_one()
|
||
# Build a minimal 10×10 white PNG as test payload
|
||
if not _PILLOW_AVAILABLE:
|
||
raise UserError(_("Pillow is required to run a connection test."))
|
||
|
||
img = Image.new("RGB", (10, 10), color=(255, 255, 255))
|
||
buf = io.BytesIO()
|
||
img.save(buf, format="PNG")
|
||
test_bytes = buf.getvalue()
|
||
|
||
try:
|
||
result = self.extract_fields(test_bytes, document_type="test")
|
||
_log.info("Fusion OCR – connection test succeeded: %s", result.get("provider"))
|
||
except UserError:
|
||
raise
|
||
except Exception as exc:
|
||
raise UserError(
|
||
_("Connection test failed: %s", str(exc))
|
||
) from exc
|
||
|
||
return {
|
||
"type": "ir.actions.client",
|
||
"tag": "display_notification",
|
||
"params": {
|
||
"title": _("Connection Successful"),
|
||
"message": _("The '%s' provider responded correctly.", self.name),
|
||
"type": "success",
|
||
"sticky": False,
|
||
},
|
||
}
|