""" Fusion Accounting - Invoice OCR Extraction Extends ``account.move`` with the ability to extract invoice data from attached PDF / image scans using the :class:`FusionDocumentExtractor` engine. Extracted fields (vendor, amounts, dates, line items) are parsed via regex heuristics and then applied to the invoice form. A manual-review wizard (:class:`FusionExtractionReviewWizard`) is available so the user can validate and correct fields before they are committed. Original implementation by Nexa Systems Inc. """ import base64 import io import logging import re from datetime import datetime from odoo import api, fields, models, _ from odoo.exceptions import UserError _log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Optional imports # --------------------------------------------------------------------------- try: from PIL import Image _PILLOW_AVAILABLE = True except ImportError: _PILLOW_AVAILABLE = False class FusionInvoiceExtractor(models.Model): """ Adds OCR-extraction capabilities to journal entries (invoices / bills). The workflow is: 1. User clicks **Extract from Attachment**. 2. The first PDF / image attachment is sent to the configured :class:`FusionDocumentExtractor`. 3. Raw OCR text is stored and parsed for key invoice fields. 4. A review wizard is shown so the user can inspect / correct before the fields are written to the invoice. """ _inherit = "account.move" # ------------------------------------------------------------------ # Fields # ------------------------------------------------------------------ fusion_extraction_status = fields.Selection( selection=[ ("to_extract", "Pending Extraction"), ("extracting", "Extracting…"), ("done", "Extraction Complete"), ("failed", "Extraction Failed"), ], string="OCR Status", copy=False, tracking=True, help="Tracks the current stage of the document extraction pipeline.", ) fusion_extraction_confidence = fields.Float( string="Extraction Confidence", digits=(5, 2), copy=False, readonly=True, help=( "A score from 0–100 indicating how confident the extraction " "engine is in the accuracy of the parsed fields." ), ) fusion_ocr_raw_text = fields.Text( string="OCR Raw Text", copy=False, readonly=True, help="The full plain-text output returned by the OCR engine.", ) fusion_extractor_id = fields.Many2one( comodel_name="fusion.document.extractor", string="Extractor Used", copy=False, readonly=True, help="The extraction provider that produced the OCR result.", ) fusion_extracted_fields_json = fields.Text( string="Extracted Fields (JSON)", copy=False, readonly=True, help="JSON-serialised dict of all structured fields returned by the extraction.", ) # ------------------------------------------------------------------ # Main action: Extract from Attachment # ------------------------------------------------------------------ def action_extract_from_attachment(self): """Run OCR extraction on the first PDF / image attachment. This method: 1. Locates the first suitable attachment on the invoice. 2. Selects the active extractor for the current company. 3. Sends the binary content to the extraction engine. 4. Stores raw text and parsed fields. 5. Opens the review wizard so the user can validate results. Returns: dict: A window action for the extraction review wizard, or a notification dict on error. """ self.ensure_one() # ---- Find a suitable attachment ---- attachment = self._find_extractable_attachment() if not attachment: raise UserError( _("No PDF or image attachment found on this document. " "Please attach a scanned invoice first.") ) # ---- Locate the active extractor ---- extractor = self._get_active_extractor() if not extractor: raise UserError( _("No active Document Extraction provider is configured. " "Go to Accounting → Configuration → Document Extraction to set one up.") ) # ---- Run extraction ---- self.fusion_extraction_status = "extracting" self.fusion_extractor_id = extractor image_bytes = base64.b64decode(attachment.datas) # If it's a PDF we attempt to convert the first page to an image image_bytes = self._pdf_to_image_if_needed(image_bytes, attachment.mimetype) try: doc_type = "invoice" if self.is_purchase_document() else "invoice" result = extractor.extract_fields(image_bytes, document_type=doc_type) except UserError: self.fusion_extraction_status = "failed" raise except Exception as exc: self.fusion_extraction_status = "failed" _log.exception("Fusion OCR extraction failed for move %s", self.id) raise UserError( _("OCR extraction failed unexpectedly: %s", str(exc)) ) from exc # ---- Store results ---- raw_text = result.get("raw_text", "") self.fusion_ocr_raw_text = raw_text # Parse structured fields (regex fallback + provider fields) parsed = self._parse_invoice_fields(raw_text) # Merge any provider-supplied structured fields (e.g. from Azure) provider_fields = result.get("fields", {}) if provider_fields: for key, value in provider_fields.items(): if value and not parsed.get(key): parsed[key] = value import json self.fusion_extracted_fields_json = json.dumps(parsed, default=str, indent=2) self.fusion_extraction_confidence = self._compute_extraction_confidence(parsed) self.fusion_extraction_status = "done" # ---- Open review wizard ---- return self.action_manual_review() # ------------------------------------------------------------------ # Attachment helpers # ------------------------------------------------------------------ def _find_extractable_attachment(self): """Return the first attachment that looks like a scan. Returns: recordset: An ``ir.attachment`` record, or empty recordset. """ self.ensure_one() domain = [ ("res_model", "=", "account.move"), ("res_id", "=", self.id), ] attachments = self.env["ir.attachment"].search(domain, order="id asc") image_mimes = {"image/png", "image/jpeg", "image/tiff", "image/bmp", "image/gif"} for att in attachments: mime = (att.mimetype or "").lower() if mime == "application/pdf" or mime in image_mimes: return att return self.env["ir.attachment"] def _get_active_extractor(self): """Return the first active extractor for the current company. Returns: recordset: A ``fusion.document.extractor`` record, or empty. """ return self.env["fusion.document.extractor"].search([ ("is_active", "=", True), "|", ("company_id", "=", self.company_id.id), ("company_id", "=", False), ], limit=1) @staticmethod def _pdf_to_image_if_needed(raw_bytes, mimetype): """Convert a PDF's first page to a PNG image if applicable. Uses Pillow to open the image; if the bytes represent a PDF and Pillow cannot open it directly, the raw bytes are returned unchanged (the cloud providers handle PDFs natively). Args: raw_bytes (bytes): File content. mimetype (str): MIME type of the attachment. Returns: bytes: Image bytes (PNG) or the original bytes. """ if not _PILLOW_AVAILABLE: return raw_bytes if mimetype and "pdf" in mimetype.lower(): # Cloud providers accept PDF natively, so return as-is. # For Tesseract, pdf2image (poppler) would be needed; # we skip this dependency and let Tesseract raise a clear # error if the user sends a PDF to a local-only extractor. return raw_bytes # Verify it's a valid image try: img = Image.open(io.BytesIO(raw_bytes)) # Re-encode as PNG to normalise the format buf = io.BytesIO() img.save(buf, format="PNG") return buf.getvalue() except Exception: return raw_bytes # ------------------------------------------------------------------ # Regex-based invoice field parser # ------------------------------------------------------------------ def _parse_invoice_fields(self, raw_text): """Extract structured fields from OCR raw text using regex. This is a best-effort heuristic parser. It handles the most common North-American and European invoice layouts. Args: raw_text (str): Full OCR text output. Returns: dict: Keys may include ``vendor_name``, ``invoice_number``, ``invoice_date``, ``due_date``, ``total_amount``, ``tax_amount``, ``subtotal``, ``currency``, ``line_items``. """ if not raw_text: return {} fields_dict = {} # ---- Invoice Number ---- inv_patterns = [ r"(?:Invoice|Inv|Bill)\s*(?:#|No\.?|Number)\s*[:\s]*([A-Z0-9][\w\-\/]+)", r"(?:Facture|Rechnung)\s*(?:#|Nr\.?|Nummer)\s*[:\s]*([A-Z0-9][\w\-\/]+)", r"(?:Reference|Ref)\s*[:\s]*([A-Z0-9][\w\-\/]+)", ] for pattern in inv_patterns: match = re.search(pattern, raw_text, re.IGNORECASE) if match: fields_dict["invoice_number"] = match.group(1).strip() break # ---- Dates (Invoice Date, Due Date) ---- date_formats = [ # YYYY-MM-DD / YYYY/MM/DD r"(\d{4}[-/]\d{1,2}[-/]\d{1,2})", # DD/MM/YYYY or MM/DD/YYYY r"(\d{1,2}[-/]\d{1,2}[-/]\d{4})", # Month DD, YYYY r"((?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\.?\s+\d{1,2},?\s+\d{4})", # DD Month YYYY r"(\d{1,2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\.?\s+\d{4})", ] date_regex = "|".join(date_formats) # Invoice date inv_date_match = re.search( r"(?:Invoice\s*Date|Date\s*d['\u2019]?\s*émission|Rechnungsdatum|Date)" r"\s*[:\s]*(" + date_regex + r")", raw_text, re.IGNORECASE, ) if inv_date_match: fields_dict["invoice_date"] = self._normalise_date( inv_date_match.group(1).strip() ) # Due date due_date_match = re.search( r"(?:Due\s*Date|Payment\s*Due|Date\s*d['\u2019]?\s*échéance|Fälligkeitsdatum)" r"\s*[:\s]*(" + date_regex + r")", raw_text, re.IGNORECASE, ) if due_date_match: fields_dict["due_date"] = self._normalise_date( due_date_match.group(1).strip() ) # If no labelled date was found, try to grab the first date in the text if "invoice_date" not in fields_dict: generic_date = re.search(date_regex, raw_text, re.IGNORECASE) if generic_date: fields_dict["invoice_date"] = self._normalise_date( generic_date.group(0).strip() ) # ---- Monetary amounts ---- money_re = r"[\$€£¥]?\s*[\d,]+\.?\d{0,2}" # Total total_match = re.search( r"(?:Total\s*(?:Due|Amount|Payable)?|Grand\s*Total|Amount\s*Due|Balance\s*Due)" r"\s*[:\s]*(" + money_re + r")", raw_text, re.IGNORECASE, ) if total_match: fields_dict["total_amount"] = self._parse_amount(total_match.group(1)) # Tax / VAT tax_match = re.search( r"(?:Tax|VAT|GST|HST|Sales\s*Tax|TVA|MwSt)" r"(?:\s*\(?\d+\.?\d*%?\)?)?" r"\s*[:\s]*(" + money_re + r")", raw_text, re.IGNORECASE, ) if tax_match: fields_dict["tax_amount"] = self._parse_amount(tax_match.group(1)) # Subtotal subtotal_match = re.search( r"(?:Sub\s*-?\s*Total|Net\s*Amount|Montant\s*HT|Netto)" r"\s*[:\s]*(" + money_re + r")", raw_text, re.IGNORECASE, ) if subtotal_match: fields_dict["subtotal"] = self._parse_amount(subtotal_match.group(1)) # ---- Vendor name ---- # Usually the first non-empty line or the "From:" block vendor_match = re.search( r"(?:From|Vendor|Supplier|Sold\s*By|Fournisseur)\s*[:\s]*(.+)", raw_text, re.IGNORECASE, ) if vendor_match: fields_dict["vendor_name"] = vendor_match.group(1).strip() else: # Fallback: first non-blank line that looks like a company name for line in raw_text.split("\n"): line = line.strip() if line and len(line) > 3 and not re.match(r"^[\d\s\-/]+$", line): fields_dict["vendor_name"] = line break # ---- Currency detection ---- currency_match = re.search(r"\b(USD|CAD|EUR|GBP|CHF|AUD|JPY)\b", raw_text, re.IGNORECASE) if currency_match: fields_dict["currency"] = currency_match.group(1).upper() elif "$" in raw_text: fields_dict["currency"] = "USD" elif "€" in raw_text: fields_dict["currency"] = "EUR" elif "£" in raw_text: fields_dict["currency"] = "GBP" # ---- Line items (best-effort) ---- fields_dict["line_items"] = self._parse_line_items(raw_text) return fields_dict # ------------------------------------------------------------------ # Line-item parser # ------------------------------------------------------------------ @staticmethod def _parse_line_items(raw_text): """Attempt to extract tabular line items from OCR text. Looks for lines matching patterns like:: Description Qty Unit Price Amount Widget A 2 15.00 30.00 Returns: list[dict]: Each dict has ``description``, ``quantity``, ``unit_price``, ``amount``. """ items = [] # Pattern: description text followed by numeric columns line_pattern = re.compile( r"^(.{3,}?)\s+" # description (at least 3 chars) r"(\d+(?:\.\d+)?)\s+" # quantity r"(\d[\d,]*\.?\d*)\s+" # unit price r"(\d[\d,]*\.?\d*)\s*$", # line total re.MULTILINE, ) for match in line_pattern.finditer(raw_text): desc = match.group(1).strip() # Skip header-like lines if re.match(r"(?:Desc|Item|Product|Qty|Quantity|Unit|Price|Amount)", desc, re.IGNORECASE): continue items.append({ "description": desc, "quantity": float(match.group(2)), "unit_price": float(match.group(3).replace(",", "")), "amount": float(match.group(4).replace(",", "")), }) return items # ------------------------------------------------------------------ # Normalisation helpers # ------------------------------------------------------------------ @staticmethod def _normalise_date(date_str): """Try to parse a date string into YYYY-MM-DD format. Args: date_str (str): A date string in various formats. Returns: str | None: ISO-formatted date string, or ``None``. """ if not date_str: return None # Strip surrounding whitespace and common artefacts date_str = date_str.strip(" \t:,") formats = [ "%Y-%m-%d", "%Y/%m/%d", "%d/%m/%Y", "%m/%d/%Y", "%d-%m-%Y", "%m-%d-%Y", "%B %d, %Y", "%B %d %Y", "%b %d, %Y", "%b %d %Y", "%d %B %Y", "%d %b %Y", ] for fmt in formats: try: dt = datetime.strptime(date_str, fmt) return dt.strftime("%Y-%m-%d") except ValueError: continue return date_str # Return as-is if no format matched @staticmethod def _parse_amount(amount_str): """Convert a money string like ``$1,234.56`` to a float. Args: amount_str (str): Monetary string with optional currency symbol. Returns: float | None: Parsed amount, or ``None``. """ if not amount_str: return None cleaned = re.sub(r"[^\d.,]", "", amount_str.strip()) # Handle European comma-as-decimal: "1.234,56" → "1234.56" if "," in cleaned and "." in cleaned: if cleaned.rindex(",") > cleaned.rindex("."): cleaned = cleaned.replace(".", "").replace(",", ".") else: cleaned = cleaned.replace(",", "") elif "," in cleaned: # Could be thousands separator or decimal – heuristic parts = cleaned.split(",") if len(parts[-1]) == 2: cleaned = cleaned.replace(",", ".") else: cleaned = cleaned.replace(",", "") try: return float(cleaned) except ValueError: return None # ------------------------------------------------------------------ # Confidence scoring # ------------------------------------------------------------------ @staticmethod def _compute_extraction_confidence(parsed_fields): """Compute a simple confidence score (0–100) based on how many key fields were successfully extracted. Args: parsed_fields (dict): The parsed extraction result. Returns: float: Confidence percentage. """ key_fields = [ "vendor_name", "invoice_number", "invoice_date", "total_amount", "due_date", "tax_amount", ] found = sum(1 for k in key_fields if parsed_fields.get(k)) return round((found / len(key_fields)) * 100, 2) # ------------------------------------------------------------------ # Apply extracted fields to the invoice # ------------------------------------------------------------------ def _apply_extracted_fields(self, fields_dict): """Write extracted data to the invoice form fields. This method maps the parsed extraction dict to the appropriate ``account.move`` fields. It is typically called from the review wizard after the user has validated the data. Args: fields_dict (dict): Validated field dict – same structure as returned by :meth:`_parse_invoice_fields`. """ self.ensure_one() vals = {} # ---- Partner (vendor) matching ---- vendor_name = fields_dict.get("vendor_name") if vendor_name: partner = self.env["res.partner"].search([ "|", ("name", "ilike", vendor_name), ("commercial_company_name", "ilike", vendor_name), ], limit=1) if partner: vals["partner_id"] = partner.id # ---- Reference / Invoice Number ---- inv_number = fields_dict.get("invoice_number") if inv_number: vals["ref"] = inv_number # ---- Dates ---- inv_date = fields_dict.get("invoice_date") if inv_date: try: vals["invoice_date"] = fields.Date.to_date(inv_date) except Exception: pass due_date = fields_dict.get("due_date") if due_date: try: vals["invoice_date_due"] = fields.Date.to_date(due_date) except Exception: pass # ---- Currency ---- currency_code = fields_dict.get("currency") if currency_code: currency = self.env["res.currency"].search([ ("name", "=", currency_code), ], limit=1) if currency: vals["currency_id"] = currency.id # Write header-level fields if vals: self.write(vals) # ---- Line items ---- line_items = fields_dict.get("line_items", []) if line_items: self._apply_extracted_line_items(line_items) _log.info( "Fusion OCR: applied extracted fields to move %s – %s", self.id, list(vals.keys()), ) def _apply_extracted_line_items(self, line_items): """Create invoice lines from extracted line item data. Existing lines are **not** deleted; new lines are appended. Args: line_items (list[dict]): Each dict may have ``description``, ``quantity``, ``unit_price``, ``amount``. """ self.ensure_one() from odoo import Command new_lines = [] for item in line_items: description = item.get("description", "") quantity = item.get("quantity", 1) unit_price = item.get("unit_price") or item.get("amount", 0) if not description: continue new_lines.append(Command.create({ "name": description, "quantity": quantity, "price_unit": unit_price, })) if new_lines: self.write({"invoice_line_ids": new_lines}) # ------------------------------------------------------------------ # Review wizard launcher # ------------------------------------------------------------------ def action_manual_review(self): """Open the extraction-review wizard pre-populated with the extracted (or last-extracted) field values. Returns: dict: Window action for the review wizard. """ self.ensure_one() import json extracted = {} if self.fusion_extracted_fields_json: try: extracted = json.loads(self.fusion_extracted_fields_json) except (json.JSONDecodeError, TypeError): extracted = {} wizard = self.env["fusion.extraction.review.wizard"].create({ "move_id": self.id, "vendor_name": extracted.get("vendor_name", ""), "invoice_number": extracted.get("invoice_number", ""), "invoice_date": self._safe_date(extracted.get("invoice_date")), "due_date": self._safe_date(extracted.get("due_date")), "total_amount": extracted.get("total_amount", 0.0), "tax_amount": extracted.get("tax_amount", 0.0), "subtotal": extracted.get("subtotal", 0.0), "currency_code": extracted.get("currency", ""), "raw_text": self.fusion_ocr_raw_text or "", "confidence": self.fusion_extraction_confidence or 0.0, "line_items_json": json.dumps( extracted.get("line_items", []), default=str, indent=2, ), }) return { "type": "ir.actions.act_window", "name": _("Review Extracted Data"), "res_model": "fusion.extraction.review.wizard", "res_id": wizard.id, "view_mode": "form", "target": "new", } @staticmethod def _safe_date(val): """Convert a string to a date, returning False on failure.""" if not val: return False try: return fields.Date.to_date(val) except Exception: return False