# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) import base64 import json import logging import uuid from odoo import models _logger = logging.getLogger(__name__) INVOICE_EXTRACTION_PROMPT = """You are an accounts payable data extraction expert. Extract ALL fields from the provided invoice/bill document with perfect accuracy. IMPORTANT: The text may come from PDF text extraction where TABLE COLUMNS are jumbled or separated. Carefully reconstruct the table by matching: - Item numbers / descriptions appear first - Quantities and units follow each item - Unit prices may appear near their item OR grouped separately - The AMOUNT column (rightmost) shows the line total (qty * unit_price). Use it to verify or compute unit_price = amount / quantity - Discount columns (often all zeros) may appear between unit_price and amount - If an item number appears more than once, each occurrence is a SEPARATE line item CRITICAL: Extract EVERY individual line item. Do NOT merge or skip any. If the same product appears twice, include both. Return ONLY valid JSON with this exact structure (use null for missing values): { "supplier": "Vendor/supplier company name that issued the bill", "client": "Client/customer/buyer company name (the company being billed)", "total": 0.00, "subtotal": 0.00, "total_tax_amount": 0.00, "invoice_id": "Invoice number, bill number, or reference number", "date": "YYYY-MM-DD", "due_date": "YYYY-MM-DD", "currency": "Three-letter currency code (e.g. CAD, USD, EUR)", "vat_number": "VAT or tax registration number of supplier", "payment_ref": "Payment reference or structured communication", "iban": "Bank account number or IBAN", "country": "Two-letter country code of supplier (e.g. CA, US)", "invoice_lines": [ { "description": "Full product/service description for this line", "quantity": 1.0, "unit_price": 0.00, "taxes": [13.0], "subtotal": 0.00, "total": 0.00 } ] } Rules: - Extract EVERY line item. Count them carefully. If the document lists 14 products, you must return 14 entries. - "subtotal" per line = quantity * unit_price (the line amount before tax) - "total" per line = subtotal + tax for that line. If no per-line tax, set total = subtotal. - The top-level "subtotal" is the document's subtotal (sum of all line amounts before tax) - The top-level "total" is the final amount due including all taxes and charges - For taxes: check the tax summary section. If items have tax code "00" or 0%, use an empty list []. Only include the tax percentage if that line is actually taxed (e.g. [13] for HST 13%). - If the Amount column value differs from qty * unit_price, TRUST the Amount column and compute unit_price = amount / quantity - If you see "Total charges", "Freight", or "Handling" as a separate charge, include it as its own line - Dates must be in YYYY-MM-DD format - Do NOT include any text outside the JSON object""" BANK_STATEMENT_EXTRACTION_PROMPT = """You are a document data extraction assistant. Extract bank statement fields from the provided document content. Return ONLY valid JSON with this exact structure (use null for missing values): { "balance_start": 0.00, "balance_end": 0.00, "date": "YYYY-MM-DD", "lines": [ { "date": "YYYY-MM-DD", "description": "Transaction description", "amount": 0.00 } ] } Rules: - Positive amounts for deposits/credits, negative for withdrawals/debits - Dates must be in YYYY-MM-DD format - Do NOT include any text outside the JSON object""" MIN_USEFUL_TEXT_LENGTH = 50 class FusionDigitizeService(models.AbstractModel): _name = 'fusion.digitize.service' _description = 'Fusion Digitize Extraction Service' # ------------------------------------------------------------------ # Configuration helpers # ------------------------------------------------------------------ def _get_api_key(self): ICP = self.env['ir.config_parameter'].sudo() key = ICP.get_param('fusion_digitize.openai_api_key', '') if key: return key return ICP.get_param('fusion_accounts.openai_api_key', '') def _get_ai_model(self): return self.env['ir.config_parameter'].sudo().get_param( 'fusion_digitize.ai_model', 'gpt-4o-mini', ) def _is_tesseract_enabled(self): return self.env['ir.config_parameter'].sudo().get_param( 'fusion_digitize.enable_tesseract', 'True', ) == 'True' # ------------------------------------------------------------------ # Tier 1: pdfminer text extraction (FREE, instant) # ------------------------------------------------------------------ def _extract_text_pdfminer(self, pdf_bytes): try: from pdfminer.high_level import extract_text from io import BytesIO text = extract_text(BytesIO(pdf_bytes)) if text: _logger.info( "pdfminer extracted %d chars", len(text.strip()), ) return (text or '').strip() except ImportError: _logger.warning("pdfminer not available") return '' except Exception as exc: _logger.warning("pdfminer extraction failed: %s", exc) return '' # ------------------------------------------------------------------ # Tier 2: Tesseract OCR (FREE, ~2-5s) # ------------------------------------------------------------------ def _extract_text_tesseract(self, pdf_bytes): if not self._is_tesseract_enabled(): return '' try: from pdf2image import convert_from_bytes import pytesseract images = convert_from_bytes(pdf_bytes, dpi=300) texts = [pytesseract.image_to_string(img) for img in images[:5]] result = '\n'.join(texts).strip() if result: _logger.info("Tesseract extracted %d chars", len(result)) return result except ImportError: _logger.warning("pytesseract or pdf2image not available") return '' except Exception as exc: _logger.warning("Tesseract extraction failed: %s", exc) return '' # ------------------------------------------------------------------ # Tier 3: OpenAI Vision (PAID, last resort) # ------------------------------------------------------------------ def _pdf_to_base64_images(self, pdf_bytes, max_pages=3): images = [] try: from pdf2image import convert_from_bytes pil_images = convert_from_bytes(pdf_bytes, dpi=200) for img in pil_images[:max_pages]: from io import BytesIO buf = BytesIO() img.save(buf, format='PNG') images.append(base64.b64encode(buf.getvalue()).decode()) except ImportError: _logger.warning("pdf2image not available for vision fallback") except Exception as exc: _logger.warning("PDF to image conversion failed: %s", exc) return images def _extract_via_vision(self, pdf_bytes, doc_type): api_key = self._get_api_key() if not api_key: return {} images = self._pdf_to_base64_images(pdf_bytes) if not images: return {} prompt = ( INVOICE_EXTRACTION_PROMPT if doc_type == 'invoice' else BANK_STATEMENT_EXTRACTION_PROMPT ) content_parts = [{"type": "text", "text": "Extract data from these document images:"}] for img_b64 in images: content_parts.append({ "type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}", "detail": "high"}, }) model = self._get_ai_model() if model == 'gpt-4o-mini': model = 'gpt-4o' return self._call_openai(api_key, model, prompt, content_parts) # ------------------------------------------------------------------ # Orchestration: 3-tier text extraction # ------------------------------------------------------------------ def _extract_text(self, pdf_bytes): text = self._extract_text_pdfminer(pdf_bytes) if self._text_is_useful(text): return text, 'pdfminer' text = self._extract_text_tesseract(pdf_bytes) if self._text_is_useful(text): return text, 'tesseract' return '', 'none' @staticmethod def _text_is_useful(text): if not text: return False clean = ''.join(text.split()) return len(clean) > MIN_USEFUL_TEXT_LENGTH # ------------------------------------------------------------------ # AI field mapping (text -> structured data) # ------------------------------------------------------------------ def _map_fields_from_text(self, text, doc_type): api_key = self._get_api_key() if not api_key: _logger.warning("No OpenAI API key configured for Fusion Digitize") return {} prompt = ( INVOICE_EXTRACTION_PROMPT if doc_type == 'invoice' else BANK_STATEMENT_EXTRACTION_PROMPT ) content_parts = [{"type": "text", "text": f"DOCUMENT TEXT:\n{text[:12000]}"}] model = self._get_ai_model() return self._call_openai(api_key, model, prompt, content_parts) def _call_openai(self, api_key, model, system_prompt, content_parts): try: import requests except ImportError: _logger.error("requests library not available") return {} messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": content_parts}, ] try: resp = requests.post( 'https://api.openai.com/v1/chat/completions', headers={ 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json', }, json={ 'model': model, 'messages': messages, 'max_tokens': 4000, 'temperature': 0.1, }, timeout=90, ) resp.raise_for_status() body = resp.json() content = body['choices'][0]['message']['content'].strip() if content.startswith('```'): lines = content.split('\n') content = '\n'.join( lines[1:-1] if lines[-1].strip() == '```' else lines[1:] ).strip() parsed = json.loads(content) line_count = len(parsed.get('invoice_lines') or parsed.get('lines') or []) _logger.info( "OpenAI response (model=%s, lines=%d): supplier=%s, " "subtotal=%s, tax=%s, total=%s", model, line_count, parsed.get('supplier', ''), parsed.get('subtotal', parsed.get('balance_start', '')), parsed.get('total_tax_amount', ''), parsed.get('total', parsed.get('balance_end', '')), ) for i, line in enumerate(parsed.get('invoice_lines') or parsed.get('lines') or []): _logger.info( " Line %d: %s | qty=%.2f | unit=%.2f | sub=%.2f | tax=%s", i + 1, (line.get('description') or '')[:50], line.get('quantity', line.get('amount', 0)) or 0, line.get('unit_price', 0) or 0, line.get('subtotal', line.get('amount', 0)) or 0, line.get('taxes', ''), ) return parsed except Exception as exc: _logger.error("OpenAI extraction call failed: %s", exc) return {} # ------------------------------------------------------------------ # Validation # ------------------------------------------------------------------ @staticmethod def _validate_invoice(mapped): doc_subtotal = mapped.get('subtotal') or 0.0 if not doc_subtotal: return True lines = mapped.get('invoice_lines') or [] if not lines: return False line_sum = sum( (l.get('subtotal') or l.get('unit_price', 0) * l.get('quantity', 1)) for l in lines ) if line_sum == 0: return False tolerance = 0.10 ratio = abs(line_sum - doc_subtotal) / doc_subtotal return ratio <= tolerance # ------------------------------------------------------------------ # Full extraction pipeline # ------------------------------------------------------------------ def _process_document(self, pdf_bytes, doc_type): text, source = self._extract_text(pdf_bytes) mapped = None if text: _logger.info("Fusion Digitize: using %s text for AI mapping", source) mapped = self._map_fields_from_text(text, doc_type) if mapped and doc_type == 'invoice' and not self._validate_invoice(mapped): line_sum = sum( (l.get('subtotal') or 0) for l in (mapped.get('invoice_lines') or []) ) _logger.info( "Fusion Digitize: text extraction failed validation " "(line_sum=%.2f vs subtotal=%.2f). Retrying with Vision.", line_sum, mapped.get('subtotal', 0), ) vision_mapped = self._extract_via_vision(pdf_bytes, doc_type) if vision_mapped: mapped = vision_mapped if not mapped: _logger.info("Fusion Digitize: using OpenAI Vision (no text available)") text = '' mapped = self._extract_via_vision(pdf_bytes, doc_type) if not mapped: _logger.warning("Fusion Digitize: extraction returned no data") return {} _logger.info( "Fusion Digitize: final result - %d line(s), total=%s, subtotal=%s", len(mapped.get('invoice_lines') or mapped.get('lines') or []), mapped.get('total'), mapped.get('subtotal'), ) if doc_type == 'invoice': return self._build_invoice_result(mapped, text) return self._build_bank_statement_result(mapped, text) # ------------------------------------------------------------------ # Response mappers: AI output -> Odoo's expected OCR format # ------------------------------------------------------------------ @staticmethod def _ocr_field(value, candidates=None): return { 'selected_value': {'content': value}, 'candidates': candidates or [], } def _build_invoice_result(self, data, full_text=''): date_str = data.get('date') or '' if date_str and ' ' not in date_str: date_str += ' 00:00:00' due_date_str = data.get('due_date') or '' if due_date_str and ' ' not in due_date_str: due_date_str += ' 00:00:00' swift_json = json.dumps(data.get('swift_code') or {}) lines = [] for line in (data.get('invoice_lines') or []): lines.append({ 'description': line.get('description', '/'), 'unit_price': line.get('unit_price', 0.0), 'quantity': line.get('quantity', 1.0), 'taxes': line.get('taxes', []), 'subtotal': line.get('subtotal', line.get('unit_price', 0.0)), 'total': line.get('total', line.get('subtotal', 0.0)), }) if not lines: subtotal = data.get('subtotal') or data.get('total') or 0.0 lines.append({ 'description': 'Extracted total', 'unit_price': subtotal, 'quantity': 1.0, 'taxes': [], 'subtotal': subtotal, 'total': data.get('total') or subtotal, }) result = { 'supplier': self._ocr_field(data.get('supplier') or ''), 'client': self._ocr_field(data.get('client') or ''), 'total': self._ocr_field(data.get('total') or 0.0), 'subtotal': self._ocr_field(data.get('subtotal') or 0.0), 'total_tax_amount': self._ocr_field(data.get('total_tax_amount') or 0.0), 'invoice_id': self._ocr_field(data.get('invoice_id') or ''), 'date': self._ocr_field(date_str), 'due_date': self._ocr_field(due_date_str), 'currency': self._ocr_field(data.get('currency') or ''), 'VAT_Number': self._ocr_field(data.get('vat_number') or ''), 'payment_ref': self._ocr_field(data.get('payment_ref') or ''), 'iban': self._ocr_field(data.get('iban') or ''), 'SWIFT_code': self._ocr_field(swift_json), 'country': self._ocr_field(data.get('country') or ''), 'invoice_lines': lines, } if full_text: result['full_text_annotation'] = full_text return result def _build_bank_statement_result(self, data, full_text=''): date_str = data.get('date') or '' if date_str and ' ' not in date_str: date_str += ' 00:00:00' lines = [] for line in (data.get('lines') or []): lines.append({ 'amount': line.get('amount', 0.0), 'date': line.get('date', ''), 'description': line.get('description', ''), }) result = { 'balance_start': self._ocr_field(data.get('balance_start') or 0.0), 'balance_end': self._ocr_field(data.get('balance_end') or 0.0), 'date': self._ocr_field(date_str), 'bank_statement_lines': lines, } if full_text: result['full_text_annotation'] = full_text return result # ------------------------------------------------------------------ # Parse / Get Result handlers (called from model overrides) # ------------------------------------------------------------------ def _handle_parse(self, params, doc_type): documents = params.get('documents', []) if not documents: return {'status': 'error_internal', 'error_message': 'No documents provided'} try: pdf_bytes = base64.b64decode(documents[0]) except Exception as exc: _logger.error("Failed to decode document: %s", exc) return {'status': 'error_internal'} token = str(uuid.uuid4()) ocr_results = self._process_document(pdf_bytes, doc_type) if not ocr_results: return {'status': 'error_internal'} self.env['ir.config_parameter'].sudo().set_param( f'fusion_digitize.result.{token}', json.dumps(ocr_results), ) _logger.info( "Fusion Digitize: stored extraction results for token %s", token, ) return {'status': 'success', 'document_token': token} def _handle_get_result(self, params): token = params.get('document_token', '') key = f'fusion_digitize.result.{token}' ICP = self.env['ir.config_parameter'].sudo() stored = ICP.get_param(key, '') if not stored: _logger.warning("No stored result for token %s", token) return {'status': 'error_internal'} try: ocr_results = json.loads(stored) except (json.JSONDecodeError, TypeError): _logger.error("Corrupt stored result for token %s", token) return {'status': 'error_internal'} ICP.set_param(key, False) return {'status': 'success', 'results': [ocr_results]}