266 lines
11 KiB
Python
266 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2026 Nexa Systems Inc.
|
|
# License OPL-1 (Odoo Proprietary License v1.0)
|
|
|
|
import logging
|
|
from email.utils import parseaddr
|
|
|
|
from odoo import models, fields, api
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AccountMove(models.Model):
|
|
_inherit = 'account.move'
|
|
|
|
x_fa_created_from_email = fields.Boolean(
|
|
string='Created from Email',
|
|
default=False,
|
|
readonly=True,
|
|
copy=False,
|
|
help='This bill was automatically created from an incoming email.',
|
|
)
|
|
x_fa_match_level = fields.Selection(
|
|
selection=[
|
|
('exact_email', 'Exact Email'),
|
|
('domain', 'Domain Match'),
|
|
('name', 'Name Match'),
|
|
('no_match', 'No Match'),
|
|
],
|
|
string='Vendor Match Level',
|
|
readonly=True,
|
|
copy=False,
|
|
help='How the vendor was matched from the sender email.',
|
|
)
|
|
x_fa_ai_extracted = fields.Boolean(
|
|
string='AI Extracted',
|
|
default=False,
|
|
readonly=True,
|
|
copy=False,
|
|
help='Bill data was extracted using AI.',
|
|
)
|
|
x_fa_original_sender = fields.Char(
|
|
string='Original Email Sender',
|
|
readonly=True,
|
|
copy=False,
|
|
help='The original sender email address that triggered bill creation.',
|
|
)
|
|
|
|
# =========================================================================
|
|
# VENDOR MATCHING
|
|
# =========================================================================
|
|
@api.model
|
|
def _fa_match_vendor_from_email(self, email_from):
|
|
"""Multi-level vendor matching from sender email.
|
|
|
|
Tries three levels:
|
|
1. Exact email match
|
|
2. Domain match (email domain or website)
|
|
3. Name match (sender display name)
|
|
|
|
Returns: (partner_record, match_level) or (False, 'no_match')
|
|
"""
|
|
if not email_from:
|
|
return False, 'no_match'
|
|
|
|
# Parse "Display Name <email@example.com>" format
|
|
display_name, email_address = parseaddr(email_from)
|
|
if not email_address:
|
|
return False, 'no_match'
|
|
|
|
email_address = email_address.strip().lower()
|
|
Partner = self.env['res.partner'].sudo()
|
|
|
|
# Check settings for which match levels are enabled
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
enable_domain = ICP.get_param('fusion_accounts.enable_domain_match', 'True') == 'True'
|
|
enable_name = ICP.get_param('fusion_accounts.enable_name_match', 'True') == 'True'
|
|
|
|
# ----- Level 1: Exact email match -----
|
|
partner = Partner.search([
|
|
('email', '=ilike', email_address),
|
|
('supplier_rank', '>', 0),
|
|
], limit=1)
|
|
if not partner:
|
|
# Also check without supplier_rank filter (contact might not be flagged as vendor)
|
|
partner = Partner.search([
|
|
('email', '=ilike', email_address),
|
|
], limit=1)
|
|
|
|
if partner:
|
|
_logger.info("Vendor match Level 1 (exact email): %s -> %s",
|
|
email_address, partner.name)
|
|
return partner, 'exact_email'
|
|
|
|
# ----- Level 2: Domain match -----
|
|
if enable_domain and '@' in email_address:
|
|
domain = email_address.split('@')[1]
|
|
# Skip common email providers
|
|
common_domains = {
|
|
'gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com',
|
|
'live.com', 'aol.com', 'icloud.com', 'mail.com',
|
|
'protonmail.com', 'zoho.com',
|
|
}
|
|
if domain not in common_domains:
|
|
# Search by email domain
|
|
partners = Partner.search([
|
|
'|',
|
|
('email', '=ilike', f'%@{domain}'),
|
|
('website', 'ilike', domain),
|
|
])
|
|
if partners:
|
|
# Prefer is_company=True (the parent company)
|
|
company_partner = partners.filtered(lambda p: p.is_company)
|
|
partner = company_partner[0] if company_partner else partners[0]
|
|
_logger.info("Vendor match Level 2 (domain): %s -> %s (from %d candidates)",
|
|
domain, partner.name, len(partners))
|
|
return partner, 'domain'
|
|
|
|
# ----- Level 3: Name match -----
|
|
if enable_name and display_name:
|
|
clean_name = display_name.strip().strip('"').strip("'")
|
|
if len(clean_name) >= 3: # Only match names with 3+ characters
|
|
partners = Partner.search([
|
|
'|',
|
|
('name', 'ilike', clean_name),
|
|
('commercial_company_name', 'ilike', clean_name),
|
|
])
|
|
if len(partners) == 1:
|
|
_logger.info("Vendor match Level 3 (name): '%s' -> %s",
|
|
clean_name, partners.name)
|
|
return partners, 'name'
|
|
elif len(partners) > 1:
|
|
_logger.info("Vendor match Level 3 skipped: '%s' matched %d partners (ambiguous)",
|
|
clean_name, len(partners))
|
|
|
|
_logger.info("No vendor match found for: %s (%s)", display_name, email_address)
|
|
return False, 'no_match'
|
|
|
|
# =========================================================================
|
|
# MESSAGE_NEW OVERRIDE
|
|
# =========================================================================
|
|
@api.model
|
|
def message_new(self, msg_dict, custom_values=None):
|
|
"""Override to add vendor matching and blocking for incoming bills.
|
|
|
|
When an email arrives via the accounts alias:
|
|
1. Match sender to a vendor
|
|
2. If vendor is blocked -> log to Discuss, don't create bill
|
|
3. If not blocked -> create draft bill, run AI extraction
|
|
"""
|
|
email_from = msg_dict.get('email_from', '') or msg_dict.get('from', '')
|
|
subject = msg_dict.get('subject', '')
|
|
|
|
_logger.info("Fusion Accounts: Processing incoming email from '%s' subject '%s'",
|
|
email_from, subject)
|
|
|
|
# Match vendor
|
|
partner, match_level = self._fa_match_vendor_from_email(email_from)
|
|
|
|
# Check if vendor is blocked
|
|
if partner and partner.x_fa_block_email_bill:
|
|
_logger.info("Vendor '%s' is blocked for email bill creation. Skipping bill.",
|
|
partner.name)
|
|
|
|
# Log the blocked action
|
|
self.env['fusion.accounts.log'].sudo().create({
|
|
'email_from': email_from,
|
|
'email_subject': subject,
|
|
'email_date': msg_dict.get('date'),
|
|
'vendor_id': partner.id,
|
|
'match_level': match_level,
|
|
'action_taken': 'blocked',
|
|
'notes': f'Vendor "{partner.name}" has email bill creation blocked.',
|
|
})
|
|
|
|
# Post note to vendor's chatter
|
|
try:
|
|
partner.message_post(
|
|
body=f'<p><strong>Blocked bill email:</strong> {subject}</p>'
|
|
f'<p><strong>From:</strong> {email_from}</p>',
|
|
message_type='comment',
|
|
subtype_xmlid='mail.mt_note',
|
|
)
|
|
except Exception as e:
|
|
_logger.warning("Failed to post blocked email to partner chatter: %s", e)
|
|
|
|
# Don't create a bill -- just let fetchmail mark the email as handled
|
|
# by raising a controlled exception that fetchmail catches gracefully
|
|
_logger.info("Skipping bill creation for blocked vendor %s", partner.name)
|
|
raise ValueError(
|
|
f"Fusion Accounts: Bill creation blocked for vendor '{partner.name}'. "
|
|
f"Email from {email_from} logged to activity log."
|
|
)
|
|
|
|
# Not blocked -- create the bill
|
|
custom_values = custom_values or {}
|
|
custom_values['move_type'] = 'in_invoice'
|
|
if partner:
|
|
custom_values['partner_id'] = partner.id
|
|
|
|
# Create the bill via standard Odoo mechanism
|
|
try:
|
|
move = super().message_new(msg_dict, custom_values=custom_values)
|
|
# Write FA fields after creation (Odoo may strip unknown fields from custom_values)
|
|
move.sudo().write({
|
|
'x_fa_created_from_email': True,
|
|
'x_fa_match_level': match_level,
|
|
'x_fa_original_sender': email_from,
|
|
})
|
|
except Exception as e:
|
|
_logger.error("Failed to create bill from email: %s", e)
|
|
self.env['fusion.accounts.log'].sudo().create({
|
|
'email_from': email_from,
|
|
'email_subject': subject,
|
|
'email_date': msg_dict.get('date'),
|
|
'vendor_id': partner.id if partner else False,
|
|
'match_level': match_level,
|
|
'action_taken': 'failed',
|
|
'notes': str(e),
|
|
})
|
|
raise
|
|
|
|
# Run AI extraction using attachments from msg_dict
|
|
# (ir.attachment records don't exist yet at this point - they're created by message_post later)
|
|
ai_extracted = False
|
|
ai_result = ''
|
|
try:
|
|
ICP = self.env['ir.config_parameter'].sudo()
|
|
ai_enabled = ICP.get_param('fusion_accounts.ai_enabled', 'True') == 'True'
|
|
|
|
if ai_enabled:
|
|
extractor = self.env['fusion.accounts.ai.extractor']
|
|
email_body = msg_dict.get('body', '')
|
|
|
|
raw_attachments = msg_dict.get('attachments', [])
|
|
extracted_data = extractor.extract_bill_data_from_raw(
|
|
email_body, raw_attachments
|
|
)
|
|
if extracted_data:
|
|
extractor.apply_extracted_data(move, extracted_data)
|
|
ai_extracted = True
|
|
ai_result = str(extracted_data)
|
|
move.sudo().write({'x_fa_ai_extracted': True})
|
|
|
|
except Exception as e:
|
|
_logger.warning("AI extraction failed for bill %s: %s", move.id, e)
|
|
ai_result = f'Error: {e}'
|
|
|
|
# Log the successful creation
|
|
self.env['fusion.accounts.log'].sudo().create({
|
|
'email_from': email_from,
|
|
'email_subject': subject,
|
|
'email_date': msg_dict.get('date'),
|
|
'vendor_id': partner.id if partner else False,
|
|
'match_level': match_level,
|
|
'action_taken': 'bill_created',
|
|
'bill_id': move.id,
|
|
'ai_extracted': ai_extracted,
|
|
'ai_result': ai_result,
|
|
})
|
|
|
|
_logger.info("Fusion Accounts: Created bill %s from email (vendor=%s, match=%s, ai=%s)",
|
|
move.name, partner.name if partner else 'None', match_level, ai_extracted)
|
|
|
|
return move
|