615 lines
23 KiB
Python
615 lines
23 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2026 Nexa Systems Inc.
|
|
# License OPL-1 (Odoo Proprietary License v1.0)
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
import re
|
|
|
|
from odoo import models
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
EXTRACTION_PROMPT = """You are an accounts payable assistant. Extract billing information from the attached invoice/bill document and email.
|
|
|
|
IMPORTANT RULES:
|
|
- The PDF attachment is the PRIMARY source of truth. Always prefer data from the PDF over the email body.
|
|
- "vendor_name" = the company that ISSUED the invoice/bill (the seller/supplier name on the document), NOT the email sender.
|
|
- "invoice_number" = the Invoice Number, Bill Number, Reference Number, or Sales Order Number printed on the document.
|
|
- "invoice_date" = the date the invoice was issued (not the email date).
|
|
- "due_date" = the payment due date on the invoice.
|
|
- For line items, extract each product/service line with description, quantity, unit price, and line total.
|
|
|
|
Return ONLY valid JSON with this exact structure (use null for missing values):
|
|
{
|
|
"vendor_name": "string - the company name that issued the bill",
|
|
"invoice_number": "string - invoice/bill/reference number",
|
|
"invoice_date": "YYYY-MM-DD",
|
|
"due_date": "YYYY-MM-DD",
|
|
"currency": "CAD or USD",
|
|
"subtotal": 0.00,
|
|
"tax_amount": 0.00,
|
|
"total_amount": 0.00,
|
|
"po_reference": "string or null - any PO reference on the document",
|
|
"lines": [
|
|
{
|
|
"description": "string",
|
|
"quantity": 1.0,
|
|
"unit_price": 0.00,
|
|
"amount": 0.00
|
|
}
|
|
]
|
|
}
|
|
|
|
If you cannot determine a value, use null. For lines, include as many as you can find.
|
|
Do NOT include any text outside the JSON object."""
|
|
|
|
|
|
class AIBillExtractor(models.AbstractModel):
|
|
_name = 'fusion.accounts.ai.extractor'
|
|
_description = 'AI Bill Data Extractor'
|
|
|
|
def _get_api_key(self):
|
|
"""Get the OpenAI API key from settings."""
|
|
return self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_accounts.openai_api_key', ''
|
|
)
|
|
|
|
def _get_ai_model(self):
|
|
"""Get the configured AI model."""
|
|
return self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_accounts.ai_model', 'gpt-4o-mini'
|
|
)
|
|
|
|
def _get_max_pages(self):
|
|
"""Get the max PDF pages to process."""
|
|
try:
|
|
return int(self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_accounts.ai_max_pages', '2'
|
|
))
|
|
except (ValueError, TypeError):
|
|
return 2
|
|
|
|
def _is_ai_enabled(self):
|
|
"""Check if AI extraction is enabled."""
|
|
return self.env['ir.config_parameter'].sudo().get_param(
|
|
'fusion_accounts.ai_enabled', 'True'
|
|
) == 'True'
|
|
|
|
def extract_bill_data_from_raw(self, email_body, raw_attachments=None):
|
|
"""Extract bill data using raw attachments from msg_dict.
|
|
|
|
Raw attachments come as a list that can contain:
|
|
- tuples: (filename, content_bytes, info_dict)
|
|
- ir.attachment records (if already created)
|
|
|
|
Args:
|
|
email_body: HTML email body
|
|
raw_attachments: list from msg_dict['attachments']
|
|
|
|
Returns:
|
|
dict with extracted data, or empty dict on failure
|
|
"""
|
|
if not self._is_ai_enabled():
|
|
_logger.info("AI extraction is disabled")
|
|
return {}
|
|
|
|
api_key = self._get_api_key()
|
|
if not api_key:
|
|
_logger.warning("No OpenAI API key configured")
|
|
return {}
|
|
|
|
try:
|
|
import requests as req_lib
|
|
except ImportError:
|
|
_logger.error("requests library not available")
|
|
return {}
|
|
|
|
clean_body = self._strip_html(email_body or '')
|
|
content_parts = []
|
|
has_pdf_content = False
|
|
|
|
# Process raw attachments from msg_dict
|
|
if raw_attachments:
|
|
for att in raw_attachments[:3]:
|
|
fname = ''
|
|
content = None
|
|
|
|
if hasattr(att, 'datas'):
|
|
# ir.attachment record
|
|
fname = att.name or ''
|
|
content = base64.b64decode(att.datas) if att.datas else None
|
|
mimetype = att.mimetype or ''
|
|
elif hasattr(att, 'fname') and hasattr(att, 'content'):
|
|
# Odoo Attachment namedtuple (fname, content, info)
|
|
fname = att.fname or ''
|
|
content = att.content if isinstance(att.content, bytes) else None
|
|
mimetype = getattr(att, 'info', {}).get('content_type', '') if hasattr(att, 'info') and att.info else ''
|
|
elif isinstance(att, (tuple, list)) and len(att) >= 2:
|
|
# (filename, content_bytes, ...) tuple
|
|
fname = att[0] or ''
|
|
content = att[1] if isinstance(att[1], bytes) else None
|
|
mimetype = ''
|
|
else:
|
|
continue
|
|
|
|
# Determine mimetype from filename if not set
|
|
if not mimetype:
|
|
if fname.lower().endswith('.pdf'):
|
|
mimetype = 'application/pdf'
|
|
elif fname.lower().endswith(('.png', '.jpg', '.jpeg')):
|
|
mimetype = 'image/' + fname.rsplit('.', 1)[-1].lower()
|
|
|
|
if not content:
|
|
continue
|
|
|
|
_logger.info("Processing attachment: %s (%d bytes)", fname, len(content))
|
|
|
|
if fname.lower().endswith('.pdf') or mimetype == 'application/pdf':
|
|
# Convert PDF to images
|
|
pdf_images = self._pdf_bytes_to_images(content)
|
|
if pdf_images:
|
|
has_pdf_content = True
|
|
for img_data in pdf_images:
|
|
content_parts.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/png;base64,{img_data}",
|
|
"detail": "high",
|
|
}
|
|
})
|
|
else:
|
|
# Fallback: text extraction
|
|
pdf_text = self._pdf_bytes_to_text(content)
|
|
if pdf_text:
|
|
has_pdf_content = True
|
|
content_parts.append({
|
|
"type": "text",
|
|
"text": f"INVOICE/BILL DOCUMENT:\n{pdf_text[:8000]}"
|
|
})
|
|
elif mimetype.startswith('image/'):
|
|
has_pdf_content = True
|
|
img_b64 = base64.b64encode(content).decode()
|
|
content_parts.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{mimetype};base64,{img_b64}",
|
|
"detail": "high",
|
|
}
|
|
})
|
|
|
|
# Email body as secondary context
|
|
if clean_body and not has_pdf_content:
|
|
content_parts.append({
|
|
"type": "text",
|
|
"text": f"EMAIL BODY (no invoice attachment):\n{clean_body[:5000]}"
|
|
})
|
|
elif clean_body and has_pdf_content:
|
|
content_parts.append({
|
|
"type": "text",
|
|
"text": f"ADDITIONAL CONTEXT FROM EMAIL:\n{clean_body[:2000]}"
|
|
})
|
|
|
|
if not content_parts:
|
|
_logger.info("No content to extract from")
|
|
return {}
|
|
|
|
# Call OpenAI API
|
|
model = self._get_ai_model()
|
|
messages = [
|
|
{"role": "system", "content": EXTRACTION_PROMPT},
|
|
{"role": "user", "content": content_parts},
|
|
]
|
|
|
|
try:
|
|
response = req_lib.post(
|
|
'https://api.openai.com/v1/chat/completions',
|
|
headers={
|
|
'Authorization': f'Bearer {api_key}',
|
|
'Content-Type': 'application/json',
|
|
},
|
|
json={
|
|
'model': model,
|
|
'messages': messages,
|
|
'max_tokens': 2000,
|
|
'temperature': 0.1,
|
|
},
|
|
timeout=60,
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
content = result['choices'][0]['message']['content']
|
|
|
|
content = content.strip()
|
|
if content.startswith('```'):
|
|
lines = content.split('\n')
|
|
content = '\n'.join(lines[1:-1] if lines[-1].strip() == '```' else lines[1:])
|
|
content = content.strip()
|
|
|
|
if not content:
|
|
_logger.warning("AI returned empty response")
|
|
return {}
|
|
|
|
extracted = json.loads(content)
|
|
_logger.info("AI extraction successful: %s", json.dumps(extracted, indent=2)[:500])
|
|
return extracted
|
|
|
|
except Exception as e:
|
|
_logger.error("AI extraction failed: %s", e)
|
|
return {}
|
|
|
|
def _pdf_bytes_to_images(self, pdf_bytes):
|
|
"""Convert raw PDF bytes to base64 PNG images."""
|
|
max_pages = self._get_max_pages()
|
|
images = []
|
|
try:
|
|
import fitz
|
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
|
for page_num in range(min(len(doc), max_pages)):
|
|
page = doc[page_num]
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
|
|
img_data = base64.b64encode(pix.tobytes("png")).decode()
|
|
images.append(img_data)
|
|
_logger.info("Converted PDF page %d to image (%d bytes)", page_num + 1, len(img_data))
|
|
doc.close()
|
|
except ImportError:
|
|
_logger.warning("PyMuPDF not available")
|
|
except Exception as e:
|
|
_logger.warning("PDF to image failed: %s", e)
|
|
return images
|
|
|
|
def _pdf_bytes_to_text(self, pdf_bytes):
|
|
"""Extract text from raw PDF bytes."""
|
|
max_pages = self._get_max_pages()
|
|
try:
|
|
import fitz
|
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
|
parts = []
|
|
for page_num in range(min(len(doc), max_pages)):
|
|
parts.append(doc[page_num].get_text())
|
|
doc.close()
|
|
return '\n'.join(parts)
|
|
except Exception:
|
|
return ''
|
|
|
|
def extract_bill_data(self, email_body, attachments=None):
|
|
"""Extract bill data from email body and attachments using OpenAI.
|
|
|
|
Args:
|
|
email_body: Plain text or HTML email body
|
|
attachments: List of ir.attachment records
|
|
|
|
Returns:
|
|
dict with extracted data, or empty dict on failure
|
|
"""
|
|
if not self._is_ai_enabled():
|
|
_logger.info("AI extraction is disabled")
|
|
return {}
|
|
|
|
api_key = self._get_api_key()
|
|
if not api_key:
|
|
_logger.warning("No OpenAI API key configured for Fusion Accounts")
|
|
return {}
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
_logger.error("requests library not available")
|
|
return {}
|
|
|
|
# Clean HTML from email body
|
|
clean_body = self._strip_html(email_body or '')
|
|
|
|
# Build messages for OpenAI
|
|
messages = [
|
|
{"role": "system", "content": EXTRACTION_PROMPT},
|
|
]
|
|
|
|
# Build content -- PDF attachments FIRST (primary source), email body second
|
|
content_parts = []
|
|
has_pdf_content = False
|
|
|
|
# Add PDF/image attachments first (these are the invoice documents)
|
|
if attachments:
|
|
for attachment in attachments[:3]: # Max 3 attachments
|
|
if attachment.mimetype == 'application/pdf':
|
|
# Try image conversion first (best for AI vision)
|
|
pdf_images = self._pdf_to_images(attachment)
|
|
if pdf_images:
|
|
has_pdf_content = True
|
|
for img_data in pdf_images:
|
|
content_parts.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/png;base64,{img_data}",
|
|
"detail": "high",
|
|
}
|
|
})
|
|
else:
|
|
# Fallback: extract text from PDF
|
|
pdf_text = self._pdf_to_text(attachment)
|
|
if pdf_text:
|
|
has_pdf_content = True
|
|
content_parts.append({
|
|
"type": "text",
|
|
"text": f"INVOICE/BILL DOCUMENT:\n{pdf_text[:8000]}"
|
|
})
|
|
elif attachment.mimetype in ('image/png', 'image/jpeg', 'image/jpg'):
|
|
has_pdf_content = True
|
|
img_b64 = base64.b64encode(base64.b64decode(attachment.datas)).decode()
|
|
content_parts.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{attachment.mimetype};base64,{img_b64}",
|
|
"detail": "high",
|
|
}
|
|
})
|
|
|
|
# Add email body as secondary context (only if no PDF content found)
|
|
if clean_body and not has_pdf_content:
|
|
content_parts.append({
|
|
"type": "text",
|
|
"text": f"EMAIL BODY (no invoice attachment found):\n{clean_body[:5000]}"
|
|
})
|
|
elif clean_body and has_pdf_content:
|
|
content_parts.append({
|
|
"type": "text",
|
|
"text": f"ADDITIONAL CONTEXT FROM EMAIL:\n{clean_body[:2000]}"
|
|
})
|
|
|
|
if not content_parts:
|
|
_logger.info("No content to extract from")
|
|
return {}
|
|
|
|
messages.append({"role": "user", "content": content_parts})
|
|
|
|
# Call OpenAI API
|
|
model = self._get_ai_model()
|
|
try:
|
|
response = requests.post(
|
|
'https://api.openai.com/v1/chat/completions',
|
|
headers={
|
|
'Authorization': f'Bearer {api_key}',
|
|
'Content-Type': 'application/json',
|
|
},
|
|
json={
|
|
'model': model,
|
|
'messages': messages,
|
|
'max_tokens': 2000,
|
|
'temperature': 0.1,
|
|
},
|
|
timeout=60,
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
content = result['choices'][0]['message']['content']
|
|
|
|
# Parse JSON from response -- handle markdown code fences
|
|
content = content.strip()
|
|
if content.startswith('```'):
|
|
# Remove ```json ... ``` wrapper
|
|
lines = content.split('\n')
|
|
content = '\n'.join(lines[1:-1] if lines[-1].strip() == '```' else lines[1:])
|
|
content = content.strip()
|
|
|
|
if not content:
|
|
_logger.warning("AI returned empty response")
|
|
return {}
|
|
|
|
extracted = json.loads(content)
|
|
_logger.info("AI extraction successful: %s", json.dumps(extracted, indent=2)[:500])
|
|
return extracted
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
_logger.error("OpenAI API request failed: %s", e)
|
|
return {}
|
|
except (json.JSONDecodeError, KeyError, IndexError) as e:
|
|
_logger.warning("Failed to parse AI response: %s (content: %s)", e, content[:200] if content else 'empty')
|
|
return {}
|
|
|
|
def apply_extracted_data(self, move, extracted_data):
|
|
"""Apply AI-extracted data to a draft vendor bill.
|
|
|
|
The PDF/invoice is the source of truth for:
|
|
- Vendor name (matched to Odoo contact)
|
|
- Invoice/bill number (ref)
|
|
- Invoice date, due date
|
|
- Line items
|
|
|
|
Args:
|
|
move: account.move record (draft vendor bill)
|
|
extracted_data: dict from extract_bill_data()
|
|
"""
|
|
if not extracted_data:
|
|
return
|
|
|
|
vals = {}
|
|
|
|
# --- Vendor matching from AI-extracted vendor name ---
|
|
# This overrides the email sender match because the PDF
|
|
# shows the actual billing company (e.g., "Canada Computers Inc.")
|
|
ai_vendor_name = extracted_data.get('vendor_name')
|
|
if ai_vendor_name:
|
|
partner = self._match_vendor_by_name(ai_vendor_name)
|
|
if partner:
|
|
vals['partner_id'] = partner.id
|
|
_logger.info("AI vendor match: '%s' -> %s (id=%d)",
|
|
ai_vendor_name, partner.name, partner.id)
|
|
|
|
# Invoice reference (vendor's invoice/bill/SO number)
|
|
if extracted_data.get('invoice_number'):
|
|
vals['ref'] = extracted_data['invoice_number']
|
|
|
|
# Invoice date
|
|
if extracted_data.get('invoice_date'):
|
|
try:
|
|
from datetime import datetime
|
|
vals['invoice_date'] = datetime.strptime(
|
|
extracted_data['invoice_date'], '%Y-%m-%d'
|
|
).date()
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Due date
|
|
if extracted_data.get('due_date'):
|
|
try:
|
|
from datetime import datetime
|
|
vals['invoice_date_due'] = datetime.strptime(
|
|
extracted_data['due_date'], '%Y-%m-%d'
|
|
).date()
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if vals:
|
|
try:
|
|
move.write(vals)
|
|
_logger.info("Applied AI data to bill %s: %s", move.id, vals)
|
|
except Exception as e:
|
|
_logger.error("Failed to apply AI data to bill %s: %s", move.id, e)
|
|
|
|
# Add invoice lines if extracted
|
|
lines = extracted_data.get('lines', [])
|
|
if lines and not move.invoice_line_ids:
|
|
line_vals_list = []
|
|
for line in lines[:20]: # Max 20 lines
|
|
line_vals = {
|
|
'move_id': move.id,
|
|
'name': line.get('description', 'Extracted line'),
|
|
'quantity': line.get('quantity', 1.0),
|
|
'price_unit': line.get('unit_price', 0.0),
|
|
}
|
|
line_vals_list.append(line_vals)
|
|
|
|
if line_vals_list:
|
|
try:
|
|
move.write({
|
|
'invoice_line_ids': [(0, 0, lv) for lv in line_vals_list]
|
|
})
|
|
_logger.info("Added %d AI-extracted lines to bill %s",
|
|
len(line_vals_list), move.id)
|
|
except Exception as e:
|
|
_logger.error("Failed to add lines to bill %s: %s", move.id, e)
|
|
|
|
def _match_vendor_by_name(self, vendor_name):
|
|
"""Match AI-extracted vendor name to an Odoo partner.
|
|
|
|
Tries multiple strategies:
|
|
1. Exact name match
|
|
2. Commercial company name match
|
|
3. Partial/contains match (only if single result)
|
|
|
|
Returns: res.partner record or False
|
|
"""
|
|
if not vendor_name or len(vendor_name) < 3:
|
|
return False
|
|
|
|
Partner = self.env['res.partner'].sudo()
|
|
vendor_name = vendor_name.strip()
|
|
|
|
# Level 1: Exact name match
|
|
partner = Partner.search([
|
|
('name', '=ilike', vendor_name),
|
|
('supplier_rank', '>', 0),
|
|
], limit=1)
|
|
if partner:
|
|
return partner
|
|
|
|
# Level 2: Exact name match without supplier_rank filter
|
|
partner = Partner.search([
|
|
('name', '=ilike', vendor_name),
|
|
], limit=1)
|
|
if partner:
|
|
return partner
|
|
|
|
# Level 3: Commercial company name match
|
|
partner = Partner.search([
|
|
('commercial_company_name', '=ilike', vendor_name),
|
|
], limit=1)
|
|
if partner:
|
|
return partner
|
|
|
|
# Level 4: Contains match (only accept single result to avoid false positives)
|
|
partners = Partner.search([
|
|
'|',
|
|
('name', 'ilike', vendor_name),
|
|
('commercial_company_name', 'ilike', vendor_name),
|
|
])
|
|
if len(partners) == 1:
|
|
return partners
|
|
|
|
# Level 5: Try without common suffixes (Inc, Ltd, Corp, etc.)
|
|
clean_name = vendor_name
|
|
for suffix in [' Inc', ' Inc.', ' Ltd', ' Ltd.', ' Corp', ' Corp.',
|
|
' Co', ' Co.', ' LLC', ' Company', ' Limited']:
|
|
if clean_name.lower().endswith(suffix.lower()):
|
|
clean_name = clean_name[:len(clean_name) - len(suffix)].strip()
|
|
break
|
|
|
|
if clean_name != vendor_name and len(clean_name) >= 3:
|
|
partners = Partner.search([
|
|
'|',
|
|
('name', 'ilike', clean_name),
|
|
('commercial_company_name', 'ilike', clean_name),
|
|
])
|
|
if len(partners) == 1:
|
|
return partners
|
|
|
|
_logger.info("No vendor match for AI-extracted name: '%s'", vendor_name)
|
|
return False
|
|
|
|
def _strip_html(self, html):
|
|
"""Strip HTML tags from text."""
|
|
clean = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL)
|
|
clean = re.sub(r'<script[^>]*>.*?</script>', '', clean, flags=re.DOTALL)
|
|
clean = re.sub(r'<[^>]+>', ' ', clean)
|
|
clean = re.sub(r'\s+', ' ', clean).strip()
|
|
return clean
|
|
|
|
def _pdf_to_images(self, attachment):
|
|
"""Convert PDF attachment pages to base64 PNG images using PyMuPDF."""
|
|
max_pages = self._get_max_pages()
|
|
images = []
|
|
|
|
try:
|
|
import fitz # PyMuPDF
|
|
pdf_data = base64.b64decode(attachment.datas)
|
|
doc = fitz.open(stream=pdf_data, filetype="pdf")
|
|
for page_num in range(min(len(doc), max_pages)):
|
|
page = doc[page_num]
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) # 2x zoom for readability
|
|
img_data = base64.b64encode(pix.tobytes("png")).decode()
|
|
images.append(img_data)
|
|
_logger.info("Converted PDF page %d to image (%d bytes)", page_num + 1, len(img_data))
|
|
doc.close()
|
|
except ImportError:
|
|
_logger.warning("PyMuPDF not available, will try text extraction fallback")
|
|
except Exception as e:
|
|
_logger.warning("PDF to image conversion failed: %s", e)
|
|
|
|
return images
|
|
|
|
def _pdf_to_text(self, attachment):
|
|
"""Extract text content from PDF as fallback when image conversion fails."""
|
|
max_pages = self._get_max_pages()
|
|
|
|
try:
|
|
import fitz # PyMuPDF
|
|
pdf_data = base64.b64decode(attachment.datas)
|
|
doc = fitz.open(stream=pdf_data, filetype="pdf")
|
|
text_parts = []
|
|
for page_num in range(min(len(doc), max_pages)):
|
|
page = doc[page_num]
|
|
text_parts.append(page.get_text())
|
|
doc.close()
|
|
full_text = '\n'.join(text_parts)
|
|
if full_text.strip():
|
|
_logger.info("Extracted %d chars of text from PDF", len(full_text))
|
|
return full_text
|
|
except ImportError:
|
|
pass
|
|
except Exception as e:
|
|
_logger.warning("PDF text extraction failed: %s", e)
|
|
|
|
return ''
|