""" Fusion Accounting - Document AI / OCR Extraction Engine Provides a pluggable OCR back-end that can extract text from scanned invoices, receipts, and other accounting documents. Three providers are supported out-of-the-box: * **Tesseract** – runs locally via pytesseract (no cloud calls). * **Google Cloud Vision** – calls the Vision API v1 TEXT_DETECTION endpoint. * **Azure AI Document Intelligence** – calls the Azure prebuilt-invoice layout model. Each company may configure one or more extractor records and switch between them freely. Original implementation by Nexa Systems Inc. """ import base64 import io import json import logging import requests from odoo import api, fields, models, _ from odoo.exceptions import UserError, ValidationError _log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Optional imports – gracefully degrade when libs are absent so the module # can still be installed (the user simply won't be able to use Tesseract). # --------------------------------------------------------------------------- try: from PIL import Image # noqa: F401 _PILLOW_AVAILABLE = True except ImportError: _PILLOW_AVAILABLE = False try: import pytesseract # noqa: F401 _TESSERACT_AVAILABLE = True except ImportError: _TESSERACT_AVAILABLE = False class FusionDocumentExtractor(models.Model): """ Configurable OCR / AI extraction back-end. Each record represents a single provider configuration. The :meth:`extract_fields` entry-point dispatches to the appropriate private method based on the selected *provider*. """ _name = "fusion.document.extractor" _description = "Document AI Extraction Provider" _order = "sequence, id" # ------------------------------------------------------------------ # Fields # ------------------------------------------------------------------ name = fields.Char( string="Name", required=True, help="A human-readable label for this extractor (e.g. 'Production Tesseract').", ) sequence = fields.Integer( string="Sequence", default=10, help="Lower numbers appear first when multiple extractors exist.", ) provider = fields.Selection( selection=[ ("tesseract", "Tesseract (Local)"), ("google_vision", "Google Cloud Vision"), ("azure_ai", "Azure AI Document Intelligence"), ], string="Provider", required=True, default="tesseract", help=( "The OCR engine to use.\n\n" "• Tesseract – free, runs locally; requires pytesseract + Tesseract binary.\n" "• Google Cloud Vision – cloud API; requires a service-account JSON key.\n" "• Azure AI Document Intelligence – cloud API; requires endpoint + key." ), ) api_key = fields.Char( string="API Key / Credentials", groups="base.group_system", help=( "For Google Vision: paste the full service-account JSON key.\n" "For Azure AI: paste the subscription key.\n" "Not used for Tesseract." ), ) api_endpoint = fields.Char( string="API Endpoint", help=( "For Azure AI: the resource endpoint URL " "(e.g. https://.cognitiveservices.azure.com).\n" "Not used for Tesseract or Google Vision." ), ) tesseract_lang = fields.Char( string="Tesseract Language", default="eng", help="Tesseract language code(s), e.g. 'eng', 'fra+eng'. Ignored for cloud providers.", ) is_active = fields.Boolean( string="Active", default=True, help="Inactive extractors are hidden from selection lists.", ) company_id = fields.Many2one( comodel_name="res.company", string="Company", default=lambda self: self.env.company, help="Restrict this extractor to a single company, or leave blank for all.", ) # ------------------------------------------------------------------ # Constraints # ------------------------------------------------------------------ @api.constrains("provider", "api_key") def _check_api_key_for_cloud_providers(self): """Ensure cloud providers have credentials configured.""" for rec in self: if rec.provider in ("google_vision", "azure_ai") and not rec.api_key: raise ValidationError( _("An API key is required for the '%s' provider.", rec.get_provider_label()) ) @api.constrains("provider", "api_endpoint") def _check_endpoint_for_azure(self): """Azure AI requires an explicit endpoint URL.""" for rec in self: if rec.provider == "azure_ai" and not rec.api_endpoint: raise ValidationError( _("An API endpoint URL is required for Azure AI Document Intelligence.") ) # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def get_provider_label(self): """Return the human-readable label for the current provider selection.""" self.ensure_one() return dict(self._fields["provider"].selection).get(self.provider, self.provider) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def extract_fields(self, image_bytes, document_type="invoice"): """Run OCR on *image_bytes* and return a dict of extracted fields. Args: image_bytes (bytes): Raw bytes of a PDF page or image file. document_type (str): Hint for the extraction engine (``'invoice'``, ``'receipt'``, ``'credit_note'``). Returns: dict: Extracted data with at least the key ``'raw_text'`` (the full OCR output) and provider-specific structured fields when available. Raises: UserError: When the selected provider cannot be used (missing library, bad credentials, …). """ self.ensure_one() _log.info( "Fusion OCR: extracting from %d bytes via '%s' (doc_type=%s)", len(image_bytes), self.provider, document_type, ) dispatch = { "tesseract": self._extract_via_tesseract, "google_vision": self._extract_via_google_vision, "azure_ai": self._extract_via_azure_ai, } handler = dispatch.get(self.provider) if not handler: raise UserError(_("Unknown extraction provider: %s", self.provider)) result = handler(image_bytes, document_type=document_type) # Guarantee a 'raw_text' key exists result.setdefault("raw_text", "") result["provider"] = self.provider return result # ------------------------------------------------------------------ # Provider: Tesseract (local) # ------------------------------------------------------------------ def _extract_via_tesseract(self, image_bytes, **kwargs): """Extract text locally using Tesseract OCR. Converts the input bytes to a PIL Image, then calls ``pytesseract.image_to_string``. PDF inputs are converted to images via Pillow first. Args: image_bytes (bytes): Raw image or PDF bytes. Returns: dict: ``{'raw_text': }`` """ self.ensure_one() if not _PILLOW_AVAILABLE: raise UserError( _("The Pillow library is required for Tesseract OCR. " "Install it with: pip install Pillow") ) if not _TESSERACT_AVAILABLE: raise UserError( _("The pytesseract library is required for local OCR. " "Install it with: pip install pytesseract") ) try: image = Image.open(io.BytesIO(image_bytes)) except Exception as exc: raise UserError( _("Could not open the attachment as an image: %s", str(exc)) ) from exc lang = self.tesseract_lang or "eng" try: raw_text = pytesseract.image_to_string(image, lang=lang) except Exception as exc: _log.exception("Fusion OCR – Tesseract failed") raise UserError( _("Tesseract OCR failed: %s", str(exc)) ) from exc return {"raw_text": raw_text} # ------------------------------------------------------------------ # Provider: Google Cloud Vision # ------------------------------------------------------------------ def _extract_via_google_vision(self, image_bytes, **kwargs): """Call Google Cloud Vision API TEXT_DETECTION. The *api_key* field is expected to contain either: * A plain API key (simple authentication), or * A full service-account JSON (used for OAuth – **not yet implemented**; for now we use the key-based endpoint). Args: image_bytes (bytes): Raw image bytes (PNG / JPEG / TIFF / PDF). Returns: dict: ``{'raw_text': , 'annotations': }`` """ self.ensure_one() url = ( "https://vision.googleapis.com/v1/images:annotate" f"?key={self.api_key}" ) encoded = base64.b64encode(image_bytes).decode("ascii") payload = { "requests": [ { "image": {"content": encoded}, "features": [{"type": "TEXT_DETECTION"}], } ] } try: resp = requests.post(url, json=payload, timeout=60) resp.raise_for_status() except requests.RequestException as exc: _log.exception("Fusion OCR – Google Vision API request failed") raise UserError( _("Google Cloud Vision request failed: %s", str(exc)) ) from exc data = resp.json() responses = data.get("responses", [{}]) annotations = responses[0].get("textAnnotations", []) raw_text = annotations[0].get("description", "") if annotations else "" return { "raw_text": raw_text, "annotations": annotations, } # ------------------------------------------------------------------ # Provider: Azure AI Document Intelligence # ------------------------------------------------------------------ def _extract_via_azure_ai(self, image_bytes, document_type="invoice", **kwargs): """Call Azure AI Document Intelligence (formerly Form Recognizer). Uses the **prebuilt-invoice** model for invoices and falls back to **prebuilt-read** for generic documents. Args: image_bytes (bytes): Raw document bytes. document_type (str): ``'invoice'`` selects the prebuilt-invoice model; anything else uses prebuilt-read. Returns: dict: ``{'raw_text': , 'fields': , 'pages': }`` """ self.ensure_one() endpoint = self.api_endpoint.rstrip("/") model_id = "prebuilt-invoice" if document_type == "invoice" else "prebuilt-read" analyze_url = ( f"{endpoint}/formrecognizer/documentModels/{model_id}:analyze" "?api-version=2023-07-31" ) headers = { "Ocp-Apim-Subscription-Key": self.api_key, "Content-Type": "application/octet-stream", } # Step 1 – submit the document for analysis try: resp = requests.post( analyze_url, headers=headers, data=image_bytes, timeout=60, ) resp.raise_for_status() except requests.RequestException as exc: _log.exception("Fusion OCR – Azure AI submit failed") raise UserError( _("Azure AI Document Intelligence request failed: %s", str(exc)) ) from exc operation_url = resp.headers.get("Operation-Location") if not operation_url: raise UserError( _("Azure AI did not return an Operation-Location header.") ) # Step 2 – poll for results (max ~60 s) import time poll_headers = {"Ocp-Apim-Subscription-Key": self.api_key} result_data = {} for _attempt in range(30): time.sleep(2) try: poll_resp = requests.get( operation_url, headers=poll_headers, timeout=30, ) poll_resp.raise_for_status() result_data = poll_resp.json() except requests.RequestException as exc: _log.warning("Fusion OCR – Azure AI poll attempt failed: %s", exc) continue status = result_data.get("status", "") if status == "succeeded": break if status == "failed": error_detail = result_data.get("error", {}).get("message", "Unknown error") raise UserError( _("Azure AI analysis failed: %s", error_detail) ) else: raise UserError( _("Azure AI analysis did not complete within the timeout window.") ) # Step 3 – parse the result analyze_result = result_data.get("analyzeResult", {}) raw_text = analyze_result.get("content", "") extracted_fields = {} pages = analyze_result.get("pages", []) # Parse structured invoice fields when available documents = analyze_result.get("documents", []) if documents: doc_fields = documents[0].get("fields", {}) extracted_fields = self._parse_azure_invoice_fields(doc_fields) return { "raw_text": raw_text, "fields": extracted_fields, "pages": pages, } @api.model def _parse_azure_invoice_fields(self, doc_fields): """Convert Azure's structured field map into a flat dict. Args: doc_fields (dict): The ``documents[0].fields`` portion of an Azure analyzeResult response. Returns: dict: Normalized field names → values. """ def _val(field_dict): """Extract the 'content' or 'valueString' from an Azure field.""" if not field_dict: return None return ( field_dict.get("valueString") or field_dict.get("valueDate") or field_dict.get("valueNumber") or field_dict.get("content") ) mapping = { "vendor_name": "VendorName", "vendor_address": "VendorAddress", "invoice_number": "InvoiceId", "invoice_date": "InvoiceDate", "due_date": "DueDate", "total_amount": "InvoiceTotal", "subtotal": "SubTotal", "tax_amount": "TotalTax", "currency": "CurrencyCode", "purchase_order": "PurchaseOrder", "customer_name": "CustomerName", } result = {} for local_key, azure_key in mapping.items(): result[local_key] = _val(doc_fields.get(azure_key)) # Line items items_field = doc_fields.get("Items") if items_field and items_field.get("valueArray"): lines = [] for item in items_field["valueArray"]: item_fields = item.get("valueObject", {}) lines.append({ "description": _val(item_fields.get("Description")), "quantity": _val(item_fields.get("Quantity")), "unit_price": _val(item_fields.get("UnitPrice")), "amount": _val(item_fields.get("Amount")), "tax": _val(item_fields.get("Tax")), }) result["line_items"] = lines return result # ------------------------------------------------------------------ # Actions # ------------------------------------------------------------------ def action_test_connection(self): """Quick connectivity / credential check for the configured provider. Creates a tiny white image, sends it through the extraction pipeline, and reports success or failure via a notification. """ self.ensure_one() # Build a minimal 10×10 white PNG as test payload if not _PILLOW_AVAILABLE: raise UserError(_("Pillow is required to run a connection test.")) img = Image.new("RGB", (10, 10), color=(255, 255, 255)) buf = io.BytesIO() img.save(buf, format="PNG") test_bytes = buf.getvalue() try: result = self.extract_fields(test_bytes, document_type="test") _log.info("Fusion OCR – connection test succeeded: %s", result.get("provider")) except UserError: raise except Exception as exc: raise UserError( _("Connection test failed: %s", str(exc)) ) from exc return { "type": "ir.actions.client", "tag": "display_notification", "params": { "title": _("Connection Successful"), "message": _("The '%s' provider responded correctly.", self.name), "type": "success", "sticky": False, }, }