Files
Odoo-Modules/Fusion Accounting/models/document_extraction.py
2026-02-22 01:22:18 -05:00

482 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Fusion Accounting - Document AI / OCR Extraction Engine
Provides a pluggable OCR back-end that can extract text from scanned
invoices, receipts, and other accounting documents. Three providers are
supported out-of-the-box:
* **Tesseract** runs locally via pytesseract (no cloud calls).
* **Google Cloud Vision** calls the Vision API v1 TEXT_DETECTION endpoint.
* **Azure AI Document Intelligence** calls the Azure prebuilt-invoice
layout model.
Each company may configure one or more extractor records and switch
between them freely.
Original implementation by Nexa Systems Inc.
"""
import base64
import io
import json
import logging
import requests
from odoo import api, fields, models, _
from odoo.exceptions import UserError, ValidationError
_log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Optional imports gracefully degrade when libs are absent so the module
# can still be installed (the user simply won't be able to use Tesseract).
# ---------------------------------------------------------------------------
try:
from PIL import Image # noqa: F401
_PILLOW_AVAILABLE = True
except ImportError:
_PILLOW_AVAILABLE = False
try:
import pytesseract # noqa: F401
_TESSERACT_AVAILABLE = True
except ImportError:
_TESSERACT_AVAILABLE = False
class FusionDocumentExtractor(models.Model):
"""
Configurable OCR / AI extraction back-end.
Each record represents a single provider configuration. The
:meth:`extract_fields` entry-point dispatches to the appropriate
private method based on the selected *provider*.
"""
_name = "fusion.document.extractor"
_description = "Document AI Extraction Provider"
_order = "sequence, id"
# ------------------------------------------------------------------
# Fields
# ------------------------------------------------------------------
name = fields.Char(
string="Name",
required=True,
help="A human-readable label for this extractor (e.g. 'Production Tesseract').",
)
sequence = fields.Integer(
string="Sequence",
default=10,
help="Lower numbers appear first when multiple extractors exist.",
)
provider = fields.Selection(
selection=[
("tesseract", "Tesseract (Local)"),
("google_vision", "Google Cloud Vision"),
("azure_ai", "Azure AI Document Intelligence"),
],
string="Provider",
required=True,
default="tesseract",
help=(
"The OCR engine to use.\n\n"
"• Tesseract free, runs locally; requires pytesseract + Tesseract binary.\n"
"• Google Cloud Vision cloud API; requires a service-account JSON key.\n"
"• Azure AI Document Intelligence cloud API; requires endpoint + key."
),
)
api_key = fields.Char(
string="API Key / Credentials",
groups="base.group_system",
help=(
"For Google Vision: paste the full service-account JSON key.\n"
"For Azure AI: paste the subscription key.\n"
"Not used for Tesseract."
),
)
api_endpoint = fields.Char(
string="API Endpoint",
help=(
"For Azure AI: the resource endpoint URL "
"(e.g. https://<resource>.cognitiveservices.azure.com).\n"
"Not used for Tesseract or Google Vision."
),
)
tesseract_lang = fields.Char(
string="Tesseract Language",
default="eng",
help="Tesseract language code(s), e.g. 'eng', 'fra+eng'. Ignored for cloud providers.",
)
is_active = fields.Boolean(
string="Active",
default=True,
help="Inactive extractors are hidden from selection lists.",
)
company_id = fields.Many2one(
comodel_name="res.company",
string="Company",
default=lambda self: self.env.company,
help="Restrict this extractor to a single company, or leave blank for all.",
)
# ------------------------------------------------------------------
# Constraints
# ------------------------------------------------------------------
@api.constrains("provider", "api_key")
def _check_api_key_for_cloud_providers(self):
"""Ensure cloud providers have credentials configured."""
for rec in self:
if rec.provider in ("google_vision", "azure_ai") and not rec.api_key:
raise ValidationError(
_("An API key is required for the '%s' provider.", rec.get_provider_label())
)
@api.constrains("provider", "api_endpoint")
def _check_endpoint_for_azure(self):
"""Azure AI requires an explicit endpoint URL."""
for rec in self:
if rec.provider == "azure_ai" and not rec.api_endpoint:
raise ValidationError(
_("An API endpoint URL is required for Azure AI Document Intelligence.")
)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def get_provider_label(self):
"""Return the human-readable label for the current provider selection."""
self.ensure_one()
return dict(self._fields["provider"].selection).get(self.provider, self.provider)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def extract_fields(self, image_bytes, document_type="invoice"):
"""Run OCR on *image_bytes* and return a dict of extracted fields.
Args:
image_bytes (bytes): Raw bytes of a PDF page or image file.
document_type (str): Hint for the extraction engine
(``'invoice'``, ``'receipt'``, ``'credit_note'``).
Returns:
dict: Extracted data with at least the key ``'raw_text'``
(the full OCR output) and provider-specific structured
fields when available.
Raises:
UserError: When the selected provider cannot be used (missing
library, bad credentials, …).
"""
self.ensure_one()
_log.info(
"Fusion OCR: extracting from %d bytes via '%s' (doc_type=%s)",
len(image_bytes), self.provider, document_type,
)
dispatch = {
"tesseract": self._extract_via_tesseract,
"google_vision": self._extract_via_google_vision,
"azure_ai": self._extract_via_azure_ai,
}
handler = dispatch.get(self.provider)
if not handler:
raise UserError(_("Unknown extraction provider: %s", self.provider))
result = handler(image_bytes, document_type=document_type)
# Guarantee a 'raw_text' key exists
result.setdefault("raw_text", "")
result["provider"] = self.provider
return result
# ------------------------------------------------------------------
# Provider: Tesseract (local)
# ------------------------------------------------------------------
def _extract_via_tesseract(self, image_bytes, **kwargs):
"""Extract text locally using Tesseract OCR.
Converts the input bytes to a PIL Image, then calls
``pytesseract.image_to_string``. PDF inputs are converted
to images via Pillow first.
Args:
image_bytes (bytes): Raw image or PDF bytes.
Returns:
dict: ``{'raw_text': <str>}``
"""
self.ensure_one()
if not _PILLOW_AVAILABLE:
raise UserError(
_("The Pillow library is required for Tesseract OCR. "
"Install it with: pip install Pillow")
)
if not _TESSERACT_AVAILABLE:
raise UserError(
_("The pytesseract library is required for local OCR. "
"Install it with: pip install pytesseract")
)
try:
image = Image.open(io.BytesIO(image_bytes))
except Exception as exc:
raise UserError(
_("Could not open the attachment as an image: %s", str(exc))
) from exc
lang = self.tesseract_lang or "eng"
try:
raw_text = pytesseract.image_to_string(image, lang=lang)
except Exception as exc:
_log.exception("Fusion OCR Tesseract failed")
raise UserError(
_("Tesseract OCR failed: %s", str(exc))
) from exc
return {"raw_text": raw_text}
# ------------------------------------------------------------------
# Provider: Google Cloud Vision
# ------------------------------------------------------------------
def _extract_via_google_vision(self, image_bytes, **kwargs):
"""Call Google Cloud Vision API TEXT_DETECTION.
The *api_key* field is expected to contain either:
* A plain API key (simple authentication), or
* A full service-account JSON (used for OAuth **not yet
implemented**; for now we use the key-based endpoint).
Args:
image_bytes (bytes): Raw image bytes (PNG / JPEG / TIFF / PDF).
Returns:
dict: ``{'raw_text': <str>, 'annotations': <list>}``
"""
self.ensure_one()
url = (
"https://vision.googleapis.com/v1/images:annotate"
f"?key={self.api_key}"
)
encoded = base64.b64encode(image_bytes).decode("ascii")
payload = {
"requests": [
{
"image": {"content": encoded},
"features": [{"type": "TEXT_DETECTION"}],
}
]
}
try:
resp = requests.post(url, json=payload, timeout=60)
resp.raise_for_status()
except requests.RequestException as exc:
_log.exception("Fusion OCR Google Vision API request failed")
raise UserError(
_("Google Cloud Vision request failed: %s", str(exc))
) from exc
data = resp.json()
responses = data.get("responses", [{}])
annotations = responses[0].get("textAnnotations", [])
raw_text = annotations[0].get("description", "") if annotations else ""
return {
"raw_text": raw_text,
"annotations": annotations,
}
# ------------------------------------------------------------------
# Provider: Azure AI Document Intelligence
# ------------------------------------------------------------------
def _extract_via_azure_ai(self, image_bytes, document_type="invoice", **kwargs):
"""Call Azure AI Document Intelligence (formerly Form Recognizer).
Uses the **prebuilt-invoice** model for invoices and falls back
to **prebuilt-read** for generic documents.
Args:
image_bytes (bytes): Raw document bytes.
document_type (str): ``'invoice'`` selects the prebuilt-invoice
model; anything else uses prebuilt-read.
Returns:
dict: ``{'raw_text': <str>, 'fields': <dict>, 'pages': <list>}``
"""
self.ensure_one()
endpoint = self.api_endpoint.rstrip("/")
model_id = "prebuilt-invoice" if document_type == "invoice" else "prebuilt-read"
analyze_url = (
f"{endpoint}/formrecognizer/documentModels/{model_id}:analyze"
"?api-version=2023-07-31"
)
headers = {
"Ocp-Apim-Subscription-Key": self.api_key,
"Content-Type": "application/octet-stream",
}
# Step 1 submit the document for analysis
try:
resp = requests.post(
analyze_url, headers=headers, data=image_bytes, timeout=60,
)
resp.raise_for_status()
except requests.RequestException as exc:
_log.exception("Fusion OCR Azure AI submit failed")
raise UserError(
_("Azure AI Document Intelligence request failed: %s", str(exc))
) from exc
operation_url = resp.headers.get("Operation-Location")
if not operation_url:
raise UserError(
_("Azure AI did not return an Operation-Location header.")
)
# Step 2 poll for results (max ~60 s)
import time
poll_headers = {"Ocp-Apim-Subscription-Key": self.api_key}
result_data = {}
for _attempt in range(30):
time.sleep(2)
try:
poll_resp = requests.get(
operation_url, headers=poll_headers, timeout=30,
)
poll_resp.raise_for_status()
result_data = poll_resp.json()
except requests.RequestException as exc:
_log.warning("Fusion OCR Azure AI poll attempt failed: %s", exc)
continue
status = result_data.get("status", "")
if status == "succeeded":
break
if status == "failed":
error_detail = result_data.get("error", {}).get("message", "Unknown error")
raise UserError(
_("Azure AI analysis failed: %s", error_detail)
)
else:
raise UserError(
_("Azure AI analysis did not complete within the timeout window.")
)
# Step 3 parse the result
analyze_result = result_data.get("analyzeResult", {})
raw_text = analyze_result.get("content", "")
extracted_fields = {}
pages = analyze_result.get("pages", [])
# Parse structured invoice fields when available
documents = analyze_result.get("documents", [])
if documents:
doc_fields = documents[0].get("fields", {})
extracted_fields = self._parse_azure_invoice_fields(doc_fields)
return {
"raw_text": raw_text,
"fields": extracted_fields,
"pages": pages,
}
@api.model
def _parse_azure_invoice_fields(self, doc_fields):
"""Convert Azure's structured field map into a flat dict.
Args:
doc_fields (dict): The ``documents[0].fields`` portion of
an Azure analyzeResult response.
Returns:
dict: Normalized field names → values.
"""
def _val(field_dict):
"""Extract the 'content' or 'valueString' from an Azure field."""
if not field_dict:
return None
return (
field_dict.get("valueString")
or field_dict.get("valueDate")
or field_dict.get("valueNumber")
or field_dict.get("content")
)
mapping = {
"vendor_name": "VendorName",
"vendor_address": "VendorAddress",
"invoice_number": "InvoiceId",
"invoice_date": "InvoiceDate",
"due_date": "DueDate",
"total_amount": "InvoiceTotal",
"subtotal": "SubTotal",
"tax_amount": "TotalTax",
"currency": "CurrencyCode",
"purchase_order": "PurchaseOrder",
"customer_name": "CustomerName",
}
result = {}
for local_key, azure_key in mapping.items():
result[local_key] = _val(doc_fields.get(azure_key))
# Line items
items_field = doc_fields.get("Items")
if items_field and items_field.get("valueArray"):
lines = []
for item in items_field["valueArray"]:
item_fields = item.get("valueObject", {})
lines.append({
"description": _val(item_fields.get("Description")),
"quantity": _val(item_fields.get("Quantity")),
"unit_price": _val(item_fields.get("UnitPrice")),
"amount": _val(item_fields.get("Amount")),
"tax": _val(item_fields.get("Tax")),
})
result["line_items"] = lines
return result
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
def action_test_connection(self):
"""Quick connectivity / credential check for the configured provider.
Creates a tiny white image, sends it through the extraction
pipeline, and reports success or failure via a notification.
"""
self.ensure_one()
# Build a minimal 10×10 white PNG as test payload
if not _PILLOW_AVAILABLE:
raise UserError(_("Pillow is required to run a connection test."))
img = Image.new("RGB", (10, 10), color=(255, 255, 255))
buf = io.BytesIO()
img.save(buf, format="PNG")
test_bytes = buf.getvalue()
try:
result = self.extract_fields(test_bytes, document_type="test")
_log.info("Fusion OCR connection test succeeded: %s", result.get("provider"))
except UserError:
raise
except Exception as exc:
raise UserError(
_("Connection test failed: %s", str(exc))
) from exc
return {
"type": "ir.actions.client",
"tag": "display_notification",
"params": {
"title": _("Connection Successful"),
"message": _("The '%s' provider responded correctly.", self.name),
"type": "success",
"sticky": False,
},
}