Files
Odoo-Modules/Fusion Accounting/models/account_move.py
2026-02-22 01:22:18 -05:00

1843 lines
78 KiB
Python

"""
Fusion Accounting - Journal Entry Extensions
Augments the core account.move and account.move.line models with capabilities
for deferred revenue/expense management, digital invoice signatures,
VAT period closing workflows, asset depreciation tracking, and predictive
line item suggestions.
"""
import ast
import calendar
import datetime
import logging
import math
import re
from contextlib import contextmanager
from dateutil.relativedelta import relativedelta
from markupsafe import Markup
from odoo import api, fields, models, Command, _
from odoo.exceptions import UserError, ValidationError
from odoo.osv import expression
from odoo.tools import SQL, float_compare
from odoo.tools.misc import format_date, formatLang
try:
from odoo.addons.account.models.exceptions import TaxClosingNonPostedDependingMovesError
except (ImportError, ModuleNotFoundError):
# Fallback: define a placeholder so references don't break at runtime.
# Tax-closing redirect will simply not be caught.
class TaxClosingNonPostedDependingMovesError(Exception):
pass
from odoo.addons.web.controllers.utils import clean_action
_log = logging.getLogger(__name__)
# Date boundaries for deferred entries
DEFERRED_DATE_MIN = datetime.date(1900, 1, 1)
DEFERRED_DATE_MAX = datetime.date(9999, 12, 31)
class FusionAccountMove(models.Model):
"""
Augments journal entries with deferral tracking, invoice signatures,
VAT closing workflows, and fixed asset depreciation management.
"""
_inherit = "account.move"
# ---- Payment State Tracking ----
payment_state_before_switch = fields.Char(
string="Cached Payment Status",
copy=False,
help="Stores the payment state prior to resetting the entry to draft.",
)
# ---- Deferral Linkage ----
deferred_move_ids = fields.Many2many(
comodel_name='account.move',
relation='account_move_deferred_rel',
column1='original_move_id',
column2='deferred_move_id',
string="Generated Deferrals",
copy=False,
help="Journal entries created to spread this document's revenue or expense across periods.",
)
deferred_original_move_ids = fields.Many2many(
comodel_name='account.move',
relation='account_move_deferred_rel',
column1='deferred_move_id',
column2='original_move_id',
string="Source Documents",
copy=False,
help="The originating invoices or bills that produced this deferral entry.",
)
deferred_entry_type = fields.Selection(
selection=[
('expense', 'Deferred Expense'),
('revenue', 'Deferred Revenue'),
],
string="Deferral Category",
compute='_compute_deferred_entry_type',
copy=False,
)
# ---- Invoice Signature ----
signing_user = fields.Many2one(
comodel_name='res.users',
string='Authorized Signer',
compute='_compute_signing_user',
store=True,
copy=False,
)
show_signature_area = fields.Boolean(
compute='_compute_signature',
)
signature = fields.Binary(
compute='_compute_signature',
)
# ---- VAT / Tax Closing ----
tax_closing_report_id = fields.Many2one(
comodel_name='account.report',
string="Tax Report Reference",
)
tax_closing_alert = fields.Boolean(
compute='_compute_tax_closing_alert',
)
# ---- Loan Linkage ----
fusion_loan_id = fields.Many2one(
'fusion.loan',
string='Loan',
index=True,
ondelete='set null',
copy=False,
help="The loan record this journal entry belongs to.",
)
# ---- Fixed Asset Depreciation ----
asset_id = fields.Many2one(
'account.asset',
string='Asset',
index=True,
ondelete='cascade',
copy=False,
domain="[('company_id', '=', company_id)]",
)
asset_remaining_value = fields.Monetary(
string='Depreciable Value',
compute='_compute_depreciation_cumulative_value',
)
asset_depreciated_value = fields.Monetary(
string='Cumulative Depreciation',
compute='_compute_depreciation_cumulative_value',
)
asset_value_change = fields.Boolean(
help="Indicates this entry results from an asset revaluation.",
)
asset_number_days = fields.Integer(
string="Depreciation Days",
copy=False,
)
asset_depreciation_beginning_date = fields.Date(
string="Depreciation Start",
copy=False,
)
depreciation_value = fields.Monetary(
string="Depreciation Amount",
compute="_compute_depreciation_value",
inverse="_inverse_depreciation_value",
store=True,
)
asset_ids = fields.One2many(
'account.asset',
string='Linked Assets',
compute="_compute_asset_ids",
)
asset_id_display_name = fields.Char(
compute="_compute_asset_ids",
)
count_asset = fields.Integer(
compute="_compute_asset_ids",
)
draft_asset_exists = fields.Boolean(
compute="_compute_asset_ids",
)
asset_move_type = fields.Selection(
selection=[
('depreciation', 'Depreciation'),
('sale', 'Sale'),
('purchase', 'Purchase'),
('disposal', 'Disposal'),
('negative_revaluation', 'Negative revaluation'),
('positive_revaluation', 'Positive revaluation'),
],
string='Asset Move Type',
compute='_compute_asset_move_type',
store=True,
copy=False,
)
# =========================================================================
# HELPERS
# =========================================================================
def _get_invoice_in_payment_state(self):
return 'in_payment'
def _get_deferred_entries_method(self):
"""Determine whether this entry uses expense or revenue deferral settings."""
self.ensure_one()
if self.is_outbound():
return self.company_id.generate_deferred_expense_entries_method
return self.company_id.generate_deferred_revenue_entries_method
# =========================================================================
# COMPUTE: SIGNATURE
# =========================================================================
@api.depends('state', 'move_type', 'invoice_user_id')
def _compute_signing_user(self):
non_sales = self.filtered(lambda m: not m.is_sale_document())
non_sales.signing_user = False
current_is_root = self.env.user == self.env.ref('base.user_root')
current_is_internal = self.env.user.has_group('base.group_user')
for inv in (self - non_sales).filtered(lambda r: r.state == 'posted'):
company_rep = inv.company_id.signing_user
if current_is_root:
salesperson = inv.invoice_user_id
can_sign = salesperson and salesperson.has_group('base.group_user')
inv.signing_user = company_rep or (salesperson if can_sign else False)
else:
inv.signing_user = company_rep or (self.env.user if current_is_internal else False)
@api.depends('state')
def _compute_signature(self):
is_portal = self.env.user.has_group('base.group_portal')
excluded = self.filtered(
lambda rec: (
not rec.company_id.sign_invoice
or rec.state in ('draft', 'cancel')
or not rec.is_sale_document()
or (is_portal and not rec.invoice_pdf_report_id)
)
)
excluded.show_signature_area = False
excluded.signature = None
signable = self - excluded
signable.show_signature_area = True
for record in signable:
record.signature = record.signing_user.sign_signature
# =========================================================================
# COMPUTE: DEFERRAL
# =========================================================================
@api.depends('deferred_original_move_ids')
def _compute_deferred_entry_type(self):
for entry in self:
if entry.deferred_original_move_ids:
first_source = entry.deferred_original_move_ids[0]
entry.deferred_entry_type = 'expense' if first_source.is_outbound() else 'revenue'
else:
entry.deferred_entry_type = False
# =========================================================================
# COMPUTE: TAX CLOSING
# =========================================================================
def _compute_tax_closing_alert(self):
for entry in self:
entry.tax_closing_alert = (
entry.state == 'posted'
and entry.tax_closing_report_id
and entry.company_id.tax_lock_date
and entry.company_id.tax_lock_date < entry.date
)
# =========================================================================
# COMPUTE: ASSET DEPRECIATION
# =========================================================================
@api.depends('asset_id', 'depreciation_value', 'asset_id.total_depreciable_value',
'asset_id.already_depreciated_amount_import', 'state')
def _compute_depreciation_cumulative_value(self):
self.asset_depreciated_value = 0
self.asset_remaining_value = 0
# Protect these fields during batch assignment to avoid infinite recursion
# when write() is triggered on non-protected records and needs to read them
protected = [self._fields['asset_remaining_value'], self._fields['asset_depreciated_value']]
with self.env.protecting(protected, self.asset_id.depreciation_move_ids):
for asset_rec in self.asset_id:
accumulated = 0
outstanding = asset_rec.total_depreciable_value - asset_rec.already_depreciated_amount_import
ordered_deps = asset_rec.depreciation_move_ids.sorted(lambda mv: (mv.date, mv._origin.id))
for dep_move in ordered_deps:
if dep_move.state != 'cancel':
outstanding -= dep_move.depreciation_value
accumulated += dep_move.depreciation_value
dep_move.asset_remaining_value = outstanding
dep_move.asset_depreciated_value = accumulated
@api.depends('line_ids.balance')
def _compute_depreciation_value(self):
for entry in self:
linked_asset = entry.asset_id or entry.reversed_entry_id.asset_id
if not linked_asset:
entry.depreciation_value = 0
continue
target_group = 'expense'
dep_total = sum(
entry.line_ids.filtered(
lambda ln: (
ln.account_id.internal_group == target_group
or ln.account_id == linked_asset.account_depreciation_expense_id
)
).mapped('balance')
)
# Detect disposal entries: the asset account is fully reversed with more than 2 lines
is_disposal = (
any(
ln.account_id == linked_asset.account_asset_id
and float_compare(
-ln.balance, linked_asset.original_value,
precision_rounding=linked_asset.currency_id.rounding,
) == 0
for ln in entry.line_ids
)
and len(entry.line_ids) > 2
)
if is_disposal:
sign_factor = -1 if linked_asset.original_value < 0 else 1
secondary_line = entry.line_ids[1]
offset_amount = secondary_line.debit if linked_asset.original_value > 0 else secondary_line.credit
dep_total = (
linked_asset.original_value
- linked_asset.salvage_value
- offset_amount * sign_factor
)
entry.depreciation_value = dep_total
@api.depends('asset_id', 'asset_ids')
def _compute_asset_move_type(self):
for entry in self:
if entry.asset_ids:
entry.asset_move_type = 'positive_revaluation' if entry.asset_ids.parent_id else 'purchase'
elif not entry.asset_move_type or not entry.asset_id:
entry.asset_move_type = False
@api.depends('line_ids.asset_ids')
def _compute_asset_ids(self):
for entry in self:
entry.asset_ids = entry.line_ids.asset_ids
entry.count_asset = len(entry.asset_ids)
entry.asset_id_display_name = _('Asset')
entry.draft_asset_exists = bool(entry.asset_ids.filtered(lambda a: a.state == 'draft'))
# =========================================================================
# INVERSE
# =========================================================================
def _inverse_depreciation_value(self):
for entry in self:
linked_asset = entry.asset_id
abs_amount = abs(entry.depreciation_value)
expense_acct = linked_asset.account_depreciation_expense_id
entry.write({'line_ids': [
Command.update(ln.id, {
'balance': abs_amount if ln.account_id == expense_acct else -abs_amount,
})
for ln in entry.line_ids
]})
# =========================================================================
# CONSTRAINTS
# =========================================================================
@api.constrains('state', 'asset_id')
def _constrains_check_asset_state(self):
for entry in self.filtered(lambda m: m.asset_id):
if entry.asset_id.state == 'draft' and entry.state == 'posted':
raise ValidationError(
_("Cannot post an entry tied to a draft asset. Confirm the asset first.")
)
# =========================================================================
# CRUD OVERRIDES
# =========================================================================
def _post(self, soft=True):
# Process VAT closing entries before delegating to the parent posting logic
for closing_entry in self.filtered(lambda m: m.tax_closing_report_id):
rpt = closing_entry.tax_closing_report_id
rpt_options = closing_entry._get_tax_closing_report_options(
closing_entry.company_id, closing_entry.fiscal_position_id, rpt, closing_entry.date,
)
closing_entry._close_tax_period(rpt, rpt_options)
confirmed = super()._post(soft)
# Handle on-validation deferral generation for newly posted entries
for entry in confirmed:
if (
entry._get_deferred_entries_method() == 'on_validation'
and any(entry.line_ids.mapped('deferred_start_date'))
):
entry._generate_deferred_entries()
# Record depreciation posting in asset chatter
confirmed._log_depreciation_asset()
# Auto-generate assets from bill lines with asset-creating accounts
confirmed.sudo()._auto_create_asset()
return confirmed
def action_post(self):
try:
result = super().action_post()
except TaxClosingNonPostedDependingMovesError as exc:
return {
"type": "ir.actions.client",
"tag": "fusion_accounting.redirect_action",
"target": "new",
"name": "Dependent Closing Entries",
"params": {
"depending_action": exc.args[0],
"message": _("There are related closing entries that must be posted first"),
"button_text": _("View dependent entries"),
},
'context': {'dialog_size': 'medium'},
}
# Trigger auto-reconciliation for bank statement entries
if self.statement_line_id and not self.env.context.get('skip_statement_line_cron_trigger'):
self.env.ref('fusion_accounting.auto_reconcile_bank_statement_line')._trigger()
return result
def button_draft(self):
# --- Deferral guard: prevent reset if grouped deferrals exist ---
for entry in self:
grouped_deferrals = entry.deferred_move_ids.filtered(
lambda dm: len(dm.deferred_original_move_ids) > 1
)
if grouped_deferrals:
raise UserError(_(
"This invoice participates in grouped deferral entries and cannot be reset to draft. "
"Consider creating a credit note instead."
))
# Undo deferral entries (unlink or reverse depending on audit trail)
reversal_entries = self.deferred_move_ids._unlink_or_reverse()
if reversal_entries:
for rev in reversal_entries:
rev.with_context(skip_readonly_check=True).write({
'date': rev._get_accounting_date(rev.date, rev._affect_tax_report()),
})
self.deferred_move_ids |= reversal_entries
# --- Tax closing guard: prevent reset if carryover impacts locked periods ---
for closing_entry in self.filtered(lambda m: m.tax_closing_report_id):
rpt = closing_entry.tax_closing_report_id
rpt_opts = closing_entry._get_tax_closing_report_options(
closing_entry.company_id, closing_entry.fiscal_position_id, rpt, closing_entry.date,
)
periodicity_delay = closing_entry.company_id._get_tax_periodicity_months_delay(rpt)
existing_carryovers = self.env['account.report.external.value'].search([
('carryover_origin_report_line_id', 'in', rpt.line_ids.ids),
('date', '=', rpt_opts['date']['date_to']),
])
affected_period_end = (
fields.Date.from_string(rpt_opts['date']['date_to'])
+ relativedelta(months=periodicity_delay)
)
lock_dt = closing_entry.company_id.tax_lock_date
if existing_carryovers and lock_dt and lock_dt >= affected_period_end:
raise UserError(_(
"Resetting this closing entry would remove carryover values that affect a locked tax period. "
"Adjust the tax return lock date before proceeding."
))
if self._has_subsequent_posted_closing_moves():
raise UserError(_(
"A subsequent tax closing entry has already been posted. "
"Reset that entry first before modifying this one."
))
existing_carryovers.unlink()
# --- Asset guard: prevent reset if linked assets are confirmed ---
for entry in self:
if any(a.state != 'draft' for a in entry.asset_ids):
raise UserError(_("Cannot reset to draft when linked assets are already confirmed."))
entry.asset_ids.filtered(lambda a: a.state == 'draft').unlink()
return super().button_draft()
def unlink(self):
# When audit trail is active, deferral entries should be reversed rather than deleted
audit_deferrals = self.filtered(
lambda m: m.company_id.check_account_audit_trail and m.deferred_original_move_ids
)
audit_deferrals.deferred_original_move_ids.deferred_move_ids = False
audit_deferrals._reverse_moves()
return super(FusionAccountMove, self - audit_deferrals).unlink()
def button_cancel(self):
result = super(FusionAccountMove, self).button_cancel()
# Deactivate any assets originating from cancelled entries
self.env['account.asset'].sudo().search(
[('original_move_line_ids.move_id', 'in', self.ids)]
).write({'active': False})
return result
def _reverse_moves(self, default_values_list=None, cancel=False):
if default_values_list is None:
default_values_list = [{} for _ in self]
for entry, defaults in zip(self, default_values_list):
if not entry.asset_id:
continue
asset_rec = entry.asset_id
pending_drafts = asset_rec.depreciation_move_ids.filtered(lambda m: m.state == 'draft')
earliest_draft = min(pending_drafts, key=lambda m: m.date, default=None)
if earliest_draft:
# Transfer the depreciation amount to the next available draft entry
earliest_draft.depreciation_value += entry.depreciation_value
elif asset_rec.state != 'close':
# No drafts remain and asset is still open: create a new depreciation entry
latest_dep_date = max(asset_rec.depreciation_move_ids.mapped('date'))
period_method = asset_rec.method_period
next_offset = relativedelta(months=1) if period_method == "1" else relativedelta(years=1)
self.create(self._prepare_move_for_asset_depreciation({
'asset_id': asset_rec,
'amount': entry.depreciation_value,
'depreciation_beginning_date': latest_dep_date + next_offset,
'date': latest_dep_date + next_offset,
'asset_number_days': 0,
}))
note = _(
'Depreciation %(entry_name)s reversed (%(dep_amount)s)',
entry_name=entry.name,
dep_amount=formatLang(self.env, entry.depreciation_value, currency_obj=entry.company_id.currency_id),
)
asset_rec.message_post(body=note)
defaults['asset_id'] = asset_rec.id
defaults['asset_number_days'] = -entry.asset_number_days
defaults['asset_depreciation_beginning_date'] = defaults.get('date', entry.date)
return super(FusionAccountMove, self)._reverse_moves(default_values_list, cancel)
# =========================================================================
# DEFERRAL ENGINE
# =========================================================================
@api.model
def _get_deferred_diff_dates(self, start, end):
"""
Calculates the fractional number of months between two dates using a
30-day month convention. This normalization ensures equal deferral
amounts for February, March, and April when spreading monthly.
"""
if start > end:
start, end = end, start
total_months = end.month - start.month + 12 * (end.year - start.year)
day_a = start.day
day_b = end.day
if day_a == calendar.monthrange(start.year, start.month)[1]:
day_a = 30
if day_b == calendar.monthrange(end.year, end.month)[1]:
day_b = 30
fractional_days = day_b - day_a
return (total_months * 30 + fractional_days) / 30
@api.model
def _get_deferred_period_amount(self, calc_method, seg_start, seg_end, full_start, full_end, total_balance):
"""
Computes the portion of total_balance attributable to the segment
[seg_start, seg_end] within the full deferral range [full_start, full_end].
Supports 'day', 'month', and 'full_months' calculation methods.
"""
segment_valid = seg_end > full_start and seg_end > seg_start
if calc_method == 'day':
daily_rate = total_balance / (full_end - full_start).days
return (seg_end - seg_start).days * daily_rate if segment_valid else 0
if calc_method == 'month':
monthly_rate = total_balance / self._get_deferred_diff_dates(full_end, full_start)
segment_months = self._get_deferred_diff_dates(seg_end, seg_start)
return segment_months * monthly_rate if segment_valid else 0
if calc_method == 'full_months':
span_months = self._get_deferred_diff_dates(full_end, full_start)
period_months = self._get_deferred_diff_dates(seg_end, seg_start)
if span_months < 1:
return total_balance if segment_valid else 0
eom_full = full_end.day == calendar.monthrange(full_end.year, full_end.month)[1]
span_rounded = math.ceil(span_months) if eom_full else math.floor(span_months)
eom_seg = seg_end.day == calendar.monthrange(seg_end.year, seg_end.month)[1]
if eom_seg or full_end != seg_end:
period_rounded = math.ceil(period_months)
else:
period_rounded = math.floor(period_months)
per_month = total_balance / span_rounded
return period_rounded * per_month if segment_valid else 0
return 0
@api.model
def _get_deferred_amounts_by_line(self, line_data, time_segments, category):
"""
For each line and each time segment, compute the deferred amount.
Returns a list of dicts containing line identification fields plus
an entry for each time_segment tuple as key.
"""
output = []
for item in line_data:
start_dt = fields.Date.to_date(item['deferred_start_date'])
end_dt = fields.Date.to_date(item['deferred_end_date'])
# Guard against inverted date ranges to prevent calculation errors
if end_dt < start_dt:
end_dt = start_dt
segment_amounts = {}
for seg in time_segments:
# "Not Started" column only applies when deferral begins after the report end date
if seg[2] == 'not_started' and start_dt <= seg[0]:
segment_amounts[seg] = 0.0
continue
effective_start = max(seg[0], start_dt)
effective_end = min(seg[1], end_dt)
# Adjust the start date to be inclusive in specific circumstances
should_include_start = (
seg[2] in ('not_started', 'later') and seg[0] < start_dt
or len(time_segments) <= 1
or seg[2] not in ('not_started', 'before', 'later')
)
if should_include_start:
effective_start -= relativedelta(days=1)
comp_method = (
self.env.company.deferred_expense_amount_computation_method
if category == 'expense'
else self.env.company.deferred_revenue_amount_computation_method
)
segment_amounts[seg] = self._get_deferred_period_amount(
comp_method,
effective_start, effective_end,
start_dt - relativedelta(days=1), end_dt,
item['balance'],
)
output.append({
**self.env['account.move.line']._get_deferred_amounts_by_line_values(item),
**segment_amounts,
})
return output
@api.model
def _get_deferred_lines(self, source_line, deferral_acct, category, segment, description, force_balance=None, grouping_field='account_id'):
"""
Creates a pair of journal item commands for one deferral period:
one on the original account and one on the deferral holding account.
"""
computed = self._get_deferred_amounts_by_line(source_line, [segment], category)[0]
amount = computed[segment] if force_balance is None else force_balance
return [
Command.create({
**self.env['account.move.line']._get_deferred_lines_values(
acct.id, coefficient * amount, description, source_line.analytic_distribution, source_line,
),
'partner_id': source_line.partner_id.id,
'product_id': source_line.product_id.id,
})
for acct, coefficient in [(computed[grouping_field], 1), (deferral_acct, -1)]
]
def _generate_deferred_entries(self):
"""
Produces the full set of deferral journal entries for this posted
invoice or bill. For each eligible line, creates an initial
full-deferral entry and then per-period recognition entries.
"""
self.ensure_one()
if self.state != 'posted':
return
if self.is_entry():
raise UserError(_("Deferral generation is not supported for miscellaneous entries."))
category = 'expense' if self.is_purchase_document() else 'revenue'
holding_account = (
self.company_id.deferred_expense_account_id
if category == 'expense'
else self.company_id.deferred_revenue_account_id
)
deferral_journal = (
self.company_id.deferred_expense_journal_id
if category == 'expense'
else self.company_id.deferred_revenue_journal_id
)
if not deferral_journal:
raise UserError(_("Configure the deferral journal in Accounting Settings before generating entries."))
if not holding_account:
raise UserError(_("Configure the deferral accounts in Accounting Settings before generating entries."))
eligible_lines = self.line_ids.filtered(lambda ln: ln.deferred_start_date and ln.deferred_end_date)
for src_line in eligible_lines:
period_ranges = src_line._get_deferred_periods()
if not period_ranges:
continue
label = _("Deferral of %s", src_line.move_id.name or '')
base_vals = {
'move_type': 'entry',
'deferred_original_move_ids': [Command.set(src_line.move_id.ids)],
'journal_id': deferral_journal.id,
'company_id': self.company_id.id,
'partner_id': src_line.partner_id.id,
'auto_post': 'at_date',
'ref': label,
'name': False,
}
# Step 1: Create the initial full-offset entry on the invoice date
offset_entry = self.create({**base_vals, 'date': src_line.move_id.date})
# Write lines after creation so deferred_original_move_ids is set,
# preventing unintended tax computation on deferral moves
offset_entry.write({
'line_ids': [
Command.create(
self.env['account.move.line']._get_deferred_lines_values(
acct.id, coeff * src_line.balance, label, src_line.analytic_distribution, src_line,
)
)
for acct, coeff in [(src_line.account_id, -1), (holding_account, 1)]
],
})
# Step 2: Create per-period recognition entries
recognition_entries = self.create([
{**base_vals, 'date': seg[1]} for seg in period_ranges
])
balance_remaining = src_line.balance
for idx, (seg, recog_entry) in enumerate(zip(period_ranges, recognition_entries)):
is_final = idx == len(period_ranges) - 1
override = balance_remaining if is_final else None
# Same pattern: write lines after creation to prevent tax side effects
recog_entry.write({
'line_ids': self._get_deferred_lines(
src_line, holding_account, category, seg, label, force_balance=override,
),
})
balance_remaining -= recog_entry.line_ids[0].balance
# Remove zero-amount deferral entries
if recog_entry.currency_id.is_zero(recog_entry.amount_total):
recognition_entries -= recog_entry
recog_entry.unlink()
all_deferrals = offset_entry + recognition_entries
# If only one recognition entry falls in the same month as the offset,
# they cancel each other out and serve no purpose
if len(recognition_entries) == 1 and offset_entry.date.month == recognition_entries.date.month:
all_deferrals.unlink()
continue
src_line.move_id.deferred_move_ids |= all_deferrals
all_deferrals._post(soft=True)
# =========================================================================
# DEFERRAL ACTIONS
# =========================================================================
def open_deferred_entries(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'name': _("Deferral Entries"),
'res_model': 'account.move.line',
'domain': [('id', 'in', self.deferred_move_ids.line_ids.ids)],
'views': [(self.env.ref('fusion_accounting.view_deferred_entries_tree').id, 'list')],
'context': {'search_default_group_by_move': True, 'expand': True},
}
def open_deferred_original_entry(self):
self.ensure_one()
source_moves = self.deferred_original_move_ids
result = {
'type': 'ir.actions.act_window',
'name': _("Originating Entries"),
'res_model': 'account.move.line',
'domain': [('id', 'in', source_moves.line_ids.ids)],
'views': [(False, 'list'), (False, 'form')],
'context': {'search_default_group_by_move': True, 'expand': True},
}
if len(source_moves) == 1:
result.update({
'res_model': 'account.move',
'res_id': source_moves.id,
'views': [(False, 'form')],
})
return result
# =========================================================================
# BANK RECONCILIATION ACTIONS
# =========================================================================
def action_open_bank_reconciliation_widget(self):
return self.statement_line_id._action_open_bank_reconciliation_widget(
default_context={
'search_default_journal_id': self.statement_line_id.journal_id.id,
'search_default_statement_line_id': self.statement_line_id.id,
'default_st_line_id': self.statement_line_id.id,
}
)
def action_open_bank_reconciliation_widget_statement(self):
return self.statement_line_id._action_open_bank_reconciliation_widget(
extra_domain=[('statement_id', 'in', self.statement_id.ids)],
)
def action_open_business_doc(self):
if self.statement_line_id:
return self.action_open_bank_reconciliation_widget()
act = super().action_open_business_doc()
# Prevent leaking reconciliation-specific context to the document view
act['context'] = act.get('context', {}) | {
'preferred_aml_value': None,
'preferred_aml_currency_id': None,
}
return act
# =========================================================================
# MAIL / EDI
# =========================================================================
def _get_mail_thread_data_attachments(self):
result = super()._get_mail_thread_data_attachments()
result += self.statement_line_id.statement_id.attachment_ids
return result
@contextmanager
def _get_edi_creation(self):
with super()._get_edi_creation() as entry:
pre_existing_lines = entry.invoice_line_ids
yield entry.with_context(disable_onchange_name_predictive=True)
for new_line in entry.invoice_line_ids - pre_existing_lines:
new_line._onchange_name_predictive()
# =========================================================================
# TAX / VAT CLOSING
# =========================================================================
def _has_subsequent_posted_closing_moves(self):
"""Returns True if any posted tax closing entry exists after this one."""
self.ensure_one()
return bool(self.env['account.move'].search_count([
('company_id', '=', self.company_id.id),
('tax_closing_report_id', '!=', False),
('state', '=', 'posted'),
('date', '>', self.date),
('fiscal_position_id', '=', self.fiscal_position_id.id),
], limit=1))
def _get_tax_to_pay_on_closing(self):
"""Sums the balance on tax payable accounts to determine the tax due."""
self.ensure_one()
payable_accounts = self.env['account.tax.group'].search([
('company_id', '=', self.company_id.id),
]).tax_payable_account_id
relevant_lines = self.line_ids.filtered(lambda ln: ln.account_id in payable_accounts)
return self.currency_id.round(-sum(relevant_lines.mapped('balance')))
def _action_tax_to_pay_wizard(self):
return self.action_open_tax_report()
def action_open_tax_report(self):
act = self.env["ir.actions.actions"]._for_xml_id("fusion_accounting.action_account_report_gt")
if not self.tax_closing_report_id:
raise UserError(_("No tax report is associated with this entry."))
rpt_opts = self._get_tax_closing_report_options(
self.company_id, self.fiscal_position_id, self.tax_closing_report_id, self.date,
)
act.update({'params': {'options': rpt_opts, 'ignore_session': True}})
return act
def refresh_tax_entry(self):
"""Re-generates tax closing line items for draft closing entries."""
for entry in self.filtered(lambda m: m.tax_closing_report_id and m.state == 'draft'):
rpt = entry.tax_closing_report_id
rpt_opts = entry._get_tax_closing_report_options(
entry.company_id, entry.fiscal_position_id, rpt, entry.date,
)
handler_model = rpt.custom_handler_model_name or 'account.generic.tax.report.handler'
self.env[handler_model]._generate_tax_closing_entries(rpt, rpt_opts, closing_moves=entry)
def _close_tax_period(self, report, options):
"""
Executes the full tax period closing workflow: validates permissions,
handles dependent branch/unit closings, generates carryover values,
attaches the tax report PDF, and schedules follow-up activities.
"""
self.ensure_one()
if not self.env.user.has_group('account.group_account_manager'):
raise UserError(_("Only Billing Administrators can modify lock dates."))
rpt = self.tax_closing_report_id
opts = self._get_tax_closing_report_options(self.company_id, self.fiscal_position_id, rpt, self.date)
# Update the tax lock date for domestic (non-foreign-VAT) closings
if (
not self.fiscal_position_id
and (not self.company_id.tax_lock_date or self.date > self.company_id.tax_lock_date)
):
self.company_id.sudo().tax_lock_date = self.date
self.env['account.report']._generate_default_external_values(
opts['date']['date_from'], opts['date']['date_to'], True,
)
reporting_company = rpt._get_sender_company_for_export(opts)
member_ids = rpt.get_report_company_ids(opts)
if reporting_company == self.company_id:
related_closings = (
self.env['account.tax.report.handler']._get_tax_closing_entries_for_closed_period(
rpt, opts, self.env['res.company'].browse(member_ids), posted_only=False,
) - self
)
unposted_related = related_closings.filtered(lambda x: x.state == 'draft')
if unposted_related:
nav_action = self.env["ir.actions.actions"]._for_xml_id("account.action_move_journal_line")
nav_action = clean_action(nav_action, env=self.env)
if len(unposted_related) == 1:
nav_action['views'] = [(self.env.ref('account.view_move_form').id, 'form')]
nav_action['res_id'] = unposted_related.id
else:
nav_action['domain'] = [('id', 'in', unposted_related.ids)]
ctx = dict(ast.literal_eval(nav_action['context']))
ctx.pop('search_default_posted', None)
nav_action['context'] = ctx
# Raise an error caught by action_post to display a redirect component
raise TaxClosingNonPostedDependingMovesError(nav_action)
# Produce carryover values for the next period
rpt.with_context(allowed_company_ids=member_ids)._generate_carryover_external_values(opts)
# Attach the tax report PDF to this closing entry
report_files = self._get_vat_report_attachments(rpt, opts)
mail_subject = _(
"Tax closing from %(start)s to %(end)s",
start=format_date(self.env, opts['date']['date_from']),
end=format_date(self.env, opts['date']['date_to']),
)
self.with_context(no_new_invoice=True).message_post(
body=self.ref, subject=mail_subject, attachments=report_files,
)
# Add a cross-reference note on related company closings
for related_entry in related_closings:
related_entry.message_post(
body=Markup("%s") % _(
"The tax report attachments are available on the "
"<a href='#' data-oe-model='account.move' data-oe-id='%s'>closing entry</a> "
"of the representative company.",
self.id,
),
)
# Complete the reminder activity and schedule the next one
reminder = self.company_id._get_tax_closing_reminder_activity(
rpt.id, self.date, self.fiscal_position_id.id,
)
if reminder:
reminder.action_done()
fpos_for_next = self.fiscal_position_id if self.fiscal_position_id.foreign_vat else None
self.company_id._generate_tax_closing_reminder_activity(
self.tax_closing_report_id,
self.date + relativedelta(days=1),
fpos_for_next,
)
self._close_tax_period_create_activities()
def _close_tax_period_create_activities(self):
"""Creates 'Report Ready to Send' and optionally 'Tax Payment Due' activities."""
send_type_xmlid = 'fusion_accounting.mail_activity_type_tax_report_to_be_sent'
send_type = self.env.ref(send_type_xmlid, raise_if_not_found=False)
if not send_type:
# Ensure the activity type exists by creating it on the fly if missing
send_type = self.env['mail.activity.type'].sudo()._load_records([{
'xml_id': send_type_xmlid,
'noupdate': False,
'values': {
'name': 'Tax Report Ready',
'summary': 'Tax report is ready to be sent to the administration',
'category': 'tax_report',
'delay_count': '0',
'delay_unit': 'days',
'delay_from': 'current_date',
'res_model': 'account.move',
'chaining_type': 'suggest',
},
}])
pay_type_xmlid = 'fusion_accounting.mail_activity_type_tax_report_to_pay'
pay_type = self.env.ref(pay_type_xmlid, raise_if_not_found=False)
responsible = send_type.default_user_id
if responsible and not (
self.company_id in responsible.company_ids
and responsible.has_group('account.group_account_manager')
):
responsible = self.env['res.users']
entries_needing_activity = self.filtered_domain([
'|',
('activity_ids', '=', False),
('activity_ids', 'not any', [('activity_type_id.id', '=', send_type.id)]),
])
for entry in entries_needing_activity:
p_start, p_end = entry.company_id._get_tax_closing_period_boundaries(
entry.date, entry.tax_closing_report_id,
)
period_label = entry.company_id._get_tax_closing_move_description(
entry.company_id._get_tax_periodicity(entry.tax_closing_report_id),
p_start, p_end,
entry.fiscal_position_id,
entry.tax_closing_report_id,
)
entry.with_context(mail_activity_quick_update=True).activity_schedule(
act_type_xmlid=send_type_xmlid,
summary=_("Submit tax report: %s", period_label),
date_deadline=fields.Date.context_today(entry),
user_id=responsible.id or self.env.user.id,
)
if (
pay_type
and pay_type not in entry.activity_ids.activity_type_id
and entry._get_tax_to_pay_on_closing() > 0
):
entry.with_context(mail_activity_quick_update=True).activity_schedule(
act_type_xmlid=pay_type_xmlid,
summary=_("Remit tax payment: %s", period_label),
date_deadline=fields.Date.context_today(entry),
user_id=responsible.id or self.env.user.id,
)
@api.model
def _get_tax_closing_report_options(self, company, fiscal_pos, report, reference_date):
"""
Constructs the option dict used to generate a tax report for a given
company, fiscal position, and closing date.
"""
_dummy, period_end = company._get_tax_closing_period_boundaries(reference_date, report)
if fiscal_pos and fiscal_pos.foreign_vat:
fpos_val = fiscal_pos.id
target_country = fiscal_pos.country_id
else:
fpos_val = 'domestic'
target_country = company.account_fiscal_country_id
base_options = {
'date': {
'date_to': fields.Date.to_string(period_end),
'filter': 'custom_tax_period',
'mode': 'range',
},
'selected_variant_id': report.id,
'sections_source_id': report.id,
'fiscal_position': fpos_val,
'tax_unit': 'company_only',
}
if report.filter_multi_company == 'tax_units':
matching_unit = company.account_tax_unit_ids.filtered(
lambda u: u.country_id == target_country
)
if matching_unit:
base_options['tax_unit'] = matching_unit.id
active_company_ids = matching_unit.company_ids.ids
else:
sibling_companies = self.env.company._get_branches_with_same_vat()
active_company_ids = sibling_companies.sorted(lambda c: len(c.parent_ids)).ids
else:
active_company_ids = self.env.company.ids
return report.with_context(allowed_company_ids=active_company_ids).get_options(previous_options=base_options)
def _get_vat_report_attachments(self, report, options):
"""Generates the PDF attachment for the tax report."""
pdf_result = report.export_to_pdf(options)
return [(pdf_result['file_name'], pdf_result['file_content'])]
# =========================================================================
# ASSET DEPRECIATION
# =========================================================================
def _log_depreciation_asset(self):
"""Posts a chatter message on the asset when a depreciation entry is confirmed."""
for entry in self.filtered(lambda m: m.asset_id):
msg = _(
'Depreciation %(entry_ref)s confirmed (%(amount)s)',
entry_ref=entry.name,
amount=formatLang(self.env, entry.depreciation_value, currency_obj=entry.company_id.currency_id),
)
entry.asset_id.message_post(body=msg)
def _auto_create_asset(self):
"""
Scans posted invoice lines for accounts configured to auto-create assets.
Builds asset records and optionally validates them based on account settings.
"""
asset_data = []
linked_invoices = []
should_validate = []
for entry in self:
if not entry.is_invoice():
continue
for ln in entry.line_ids:
if not (
ln.account_id
and ln.account_id.can_create_asset
and ln.account_id.create_asset != 'no'
and not (ln.currency_id or entry.currency_id).is_zero(ln.price_total)
and not ln.asset_ids
and not ln.tax_line_id
and ln.price_total > 0
and not (
entry.move_type in ('out_invoice', 'out_refund')
and ln.account_id.internal_group == 'asset'
)
):
continue
if not ln.name:
if ln.product_id:
ln.name = ln.product_id.display_name
else:
raise UserError(_(
"Line items on %(acct)s require a description to generate an asset.",
acct=ln.account_id.display_name,
))
unit_count = max(1, int(ln.quantity)) if ln.account_id.multiple_assets_per_line else 1
applicable_models = ln.account_id.asset_model_ids
base_data = {
'name': ln.name,
'company_id': ln.company_id.id,
'currency_id': ln.company_currency_id.id,
'analytic_distribution': ln.analytic_distribution,
'original_move_line_ids': [(6, False, ln.ids)],
'state': 'draft',
'acquisition_date': (
entry.invoice_date
if not entry.reversed_entry_id
else entry.reversed_entry_id.invoice_date
),
}
for model_rec in applicable_models or [None]:
if model_rec:
base_data['model_id'] = model_rec.id
should_validate.extend([ln.account_id.create_asset == 'validate'] * unit_count)
linked_invoices.extend([entry] * unit_count)
for seq in range(1, unit_count + 1):
row = base_data.copy()
if unit_count > 1:
row['name'] = _(
"%(label)s (%(seq)s of %(total)s)",
label=ln.name, seq=seq, total=unit_count,
)
asset_data.append(row)
new_assets = self.env['account.asset'].with_context({}).create(asset_data)
for asset_rec, data, src_invoice, auto_confirm in zip(new_assets, asset_data, linked_invoices, should_validate):
if 'model_id' in data:
asset_rec._onchange_model_id()
if auto_confirm:
asset_rec.validate()
if src_invoice:
asset_rec.message_post(body=_("Asset created from invoice: %s", src_invoice._get_html_link()))
asset_rec._post_non_deductible_tax_value()
return new_assets
@api.model
def _prepare_move_for_asset_depreciation(self, params):
"""
Prepares the values dict for creating a depreciation journal entry.
Required keys in params: asset_id, amount, depreciation_beginning_date, date, asset_number_days.
"""
required_keys = {'asset_id', 'amount', 'depreciation_beginning_date', 'date', 'asset_number_days'}
absent = required_keys - set(params)
if absent:
raise UserError(_("Missing required parameters: %s", ', '.join(absent)))
asset_rec = params['asset_id']
dist = asset_rec.analytic_distribution
dep_date = params.get('date', fields.Date.context_today(self))
base_currency = asset_rec.company_id.currency_id
asset_currency = asset_rec.currency_id
precision = base_currency.decimal_places
foreign_amount = params['amount']
local_amount = asset_currency._convert(foreign_amount, base_currency, asset_rec.company_id, dep_date)
# Use the partner from the originating document if unambiguous
originating_partners = asset_rec.original_move_line_ids.mapped('partner_id')
partner = originating_partners[:1] if len(originating_partners) <= 1 else self.env['res.partner']
entry_label = _("%s: Depreciation", asset_rec.name)
is_positive = float_compare(local_amount, 0.0, precision_digits=precision) > 0
contra_line = {
'name': entry_label,
'partner_id': partner.id,
'account_id': asset_rec.account_depreciation_id.id,
'debit': 0.0 if is_positive else -local_amount,
'credit': local_amount if is_positive else 0.0,
'currency_id': asset_currency.id,
'amount_currency': -foreign_amount,
}
expense_line = {
'name': entry_label,
'partner_id': partner.id,
'account_id': asset_rec.account_depreciation_expense_id.id,
'credit': 0.0 if is_positive else -local_amount,
'debit': local_amount if is_positive else 0.0,
'currency_id': asset_currency.id,
'amount_currency': foreign_amount,
}
if dist:
contra_line['analytic_distribution'] = dist
expense_line['analytic_distribution'] = dist
return {
'partner_id': partner.id,
'date': dep_date,
'journal_id': asset_rec.journal_id.id,
'line_ids': [(0, 0, contra_line), (0, 0, expense_line)],
'asset_id': asset_rec.id,
'ref': entry_label,
'asset_depreciation_beginning_date': params['depreciation_beginning_date'],
'asset_number_days': params['asset_number_days'],
'asset_value_change': params.get('asset_value_change', False),
'move_type': 'entry',
'currency_id': asset_currency.id,
'asset_move_type': params.get('asset_move_type', 'depreciation'),
'company_id': asset_rec.company_id.id,
}
# =========================================================================
# ASSET ACTIONS
# =========================================================================
def open_asset_view(self):
return self.asset_id.open_asset(['form'])
def action_open_asset_ids(self):
return self.asset_ids.open_asset(['list', 'form'])
class FusionMoveLine(models.Model):
"""
Extends journal items with deferral date tracking, predictive field
suggestions, custom SQL ordering for reconciliation, and asset linkage.
"""
_name = "account.move.line"
_inherit = "account.move.line"
move_attachment_ids = fields.One2many('ir.attachment', compute='_compute_attachment')
# ---- Deferral Date Tracking ----
deferred_start_date = fields.Date(
string="Start Date",
compute='_compute_deferred_start_date',
store=True,
readonly=False,
index='btree_not_null',
copy=False,
help="The date when recognition of deferred revenue or expense begins.",
)
deferred_end_date = fields.Date(
string="End Date",
index='btree_not_null',
copy=False,
help="The date when recognition of deferred revenue or expense ends.",
)
has_deferred_moves = fields.Boolean(
compute='_compute_has_deferred_moves',
)
has_abnormal_deferred_dates = fields.Boolean(
compute='_compute_has_abnormal_deferred_dates',
)
# ---- Asset Linkage ----
asset_ids = fields.Many2many(
'account.asset', 'asset_move_line_rel', 'line_id', 'asset_id',
string='Associated Assets',
copy=False,
)
non_deductible_tax_value = fields.Monetary(
compute='_compute_non_deductible_tax_value',
currency_field='company_currency_id',
)
# =========================================================================
# SQL ORDERING
# =========================================================================
def _order_to_sql(self, order, query, alias=None, reverse=False):
base_sql = super()._order_to_sql(order, query, alias, reverse)
target_amount = self.env.context.get('preferred_aml_value')
target_currency_id = self.env.context.get('preferred_aml_currency_id')
if target_amount and target_currency_id and order == self._order:
curr = self.env['res.currency'].browse(target_currency_id)
rounded_target = round(target_amount, curr.decimal_places)
tbl = alias or self._table
residual_sql = self._field_to_sql(tbl, 'amount_residual_currency', query)
currency_sql = self._field_to_sql(tbl, 'currency_id', query)
return SQL(
"ROUND(%(residual)s, %(decimals)s) = %(target)s "
"AND %(curr_field)s = %(curr_id)s DESC, %(fallback)s",
residual=residual_sql,
decimals=curr.decimal_places,
target=rounded_target,
curr_field=currency_sql,
curr_id=curr.id,
fallback=base_sql,
)
return base_sql
# =========================================================================
# CRUD OVERRIDES
# =========================================================================
def copy_data(self, default=None):
results = super().copy_data(default=default)
for ln, vals in zip(self, results):
if 'move_reverse_cancel' in self.env.context:
vals['deferred_start_date'] = ln.deferred_start_date
vals['deferred_end_date'] = ln.deferred_end_date
return results
def write(self, vals):
"""Guard against changing the account on lines with existing deferral entries."""
if 'account_id' in vals:
for ln in self:
if (
ln.has_deferred_moves
and ln.deferred_start_date
and ln.deferred_end_date
and vals['account_id'] != ln.account_id.id
):
raise UserError(_(
"The account on %(entry_name)s cannot be changed because "
"deferral entries have already been generated.",
entry_name=ln.move_id.display_name,
))
return super().write(vals)
# =========================================================================
# DEFERRAL COMPUTES
# =========================================================================
def _compute_has_deferred_moves(self):
for ln in self:
ln.has_deferred_moves = bool(ln.move_id.deferred_move_ids)
@api.depends('deferred_start_date', 'deferred_end_date')
def _compute_has_abnormal_deferred_dates(self):
# The deferral computations treat both start and end dates as inclusive.
# If the user enters dates that result in a fractional month offset of
# exactly 1/30 (e.g. Jan 1 to Jan 1 next year instead of Dec 31), the
# resulting amounts may look unexpected. Flag such cases for the user.
for ln in self:
ln.has_abnormal_deferred_dates = (
ln.deferred_start_date
and ln.deferred_end_date
and float_compare(
self.env['account.move']._get_deferred_diff_dates(
ln.deferred_start_date,
ln.deferred_end_date + relativedelta(days=1),
) % 1,
1 / 30,
precision_digits=2,
) == 0
)
def _has_deferred_compatible_account(self):
"""Checks whether this line's account type supports deferral for its document type."""
self.ensure_one()
if self.move_id.is_purchase_document():
return self.account_id.account_type in ('expense', 'expense_depreciation', 'expense_direct_cost')
if self.move_id.is_sale_document():
return self.account_id.account_type in ('income', 'income_other')
return False
@api.onchange('deferred_start_date')
def _onchange_deferred_start_date(self):
if not self._has_deferred_compatible_account():
self.deferred_start_date = False
@api.onchange('deferred_end_date')
def _onchange_deferred_end_date(self):
if not self._has_deferred_compatible_account():
self.deferred_end_date = False
@api.depends('deferred_end_date', 'move_id.invoice_date', 'move_id.state')
def _compute_deferred_start_date(self):
for ln in self:
if not ln.deferred_start_date and ln.move_id.invoice_date and ln.deferred_end_date:
ln.deferred_start_date = ln.move_id.invoice_date
@api.constrains('deferred_start_date', 'deferred_end_date', 'account_id')
def _check_deferred_dates(self):
for ln in self:
if ln.deferred_start_date and not ln.deferred_end_date:
raise UserError(_("A deferral start date requires an end date to be set as well."))
if (
ln.deferred_start_date
and ln.deferred_end_date
and ln.deferred_start_date > ln.deferred_end_date
):
raise UserError(_("The deferral start date must not be later than the end date."))
@api.model
def _get_deferred_ends_of_month(self, from_date, to_date):
"""
Generates a list of month-end dates covering the range [from_date, to_date].
Each date is the last day of the respective month.
"""
boundaries = []
cursor = from_date
while cursor <= to_date:
cursor = cursor + relativedelta(day=31)
boundaries.append(cursor)
cursor = cursor + relativedelta(days=1)
return boundaries
def _get_deferred_periods(self):
"""
Splits the deferral range into monthly segments.
Returns an empty list if no spreading is needed (single period matching the entry date).
"""
self.ensure_one()
segments = [
(
max(self.deferred_start_date, dt.replace(day=1)),
min(dt, self.deferred_end_date),
'current',
)
for dt in self._get_deferred_ends_of_month(self.deferred_start_date, self.deferred_end_date)
]
if not segments or (
len(segments) == 1
and segments[0][0].replace(day=1) == self.date.replace(day=1)
):
return []
return segments
@api.model
def _get_deferred_amounts_by_line_values(self, line_data):
return {
'account_id': line_data['account_id'],
'product_id': line_data['product_id'] if isinstance(line_data, dict) else line_data['product_id'].id,
'product_category_id': line_data['product_category_id'] if isinstance(line_data, dict) else line_data['product_category_id'].id,
'balance': line_data['balance'],
'move_id': line_data['move_id'],
}
@api.model
def _get_deferred_lines_values(self, account_id, balance, ref, analytic_distribution, line_data=None):
return {
'account_id': account_id,
'product_id': line_data['product_id'] if isinstance(line_data, dict) else line_data['product_id'].id,
'product_category_id': line_data['product_category_id'] if isinstance(line_data, dict) else line_data['product_category_id'].id,
'balance': balance,
'name': ref,
'analytic_distribution': analytic_distribution,
}
# =========================================================================
# TAX HANDLING
# =========================================================================
def _get_computed_taxes(self):
# Skip automatic tax recomputation for deferral entries and asset depreciation entries
if self.move_id.deferred_original_move_ids or self.move_id.asset_id:
return self.tax_ids
return super()._get_computed_taxes()
# =========================================================================
# ATTACHMENTS
# =========================================================================
def _compute_attachment(self):
for ln in self:
ln.move_attachment_ids = self.env['ir.attachment'].search(
expression.OR(ln._get_attachment_domains())
)
# =========================================================================
# RECONCILIATION
# =========================================================================
def action_reconcile(self):
"""
Attempts direct reconciliation of selected lines. If a write-off or
partial reconciliation is needed, opens the reconciliation wizard instead.
"""
wiz = self.env['account.reconcile.wizard'].with_context(
active_model='account.move.line',
active_ids=self.ids,
).new({})
if wiz.is_write_off_required or wiz.force_partials:
return wiz._action_open_wizard()
return wiz.reconcile()
# =========================================================================
# PREDICTIVE LINE SUGGESTIONS
# =========================================================================
def _get_predict_postgres_dictionary(self):
"""Maps the user's language to a PostgreSQL text search dictionary."""
locale = self.env.context.get('lang', '')[:2]
return {'fr': 'french'}.get(locale, 'english')
@api.model
def _build_predictive_query(self, source_move, extra_domain=None):
"""
Builds the base query for predictive field matching, limited to
historical posted entries from the same partner and move type.
"""
history_limit = int(self.env['ir.config_parameter'].sudo().get_param(
'account.bill.predict.history.limit', '100',
))
move_qry = self.env['account.move']._where_calc([
('move_type', '=', source_move.move_type),
('state', '=', 'posted'),
('partner_id', '=', source_move.partner_id.id),
('company_id', '=', source_move.journal_id.company_id.id or self.env.company.id),
])
move_qry.order = 'account_move.invoice_date'
move_qry.limit = history_limit
return self.env['account.move.line']._where_calc([
('move_id', 'in', move_qry),
('display_type', '=', 'product'),
] + (extra_domain or []))
@api.model
def _predicted_field(self, description, partner, target_field, base_query=None, supplementary_sources=None):
"""
Uses PostgreSQL full-text search to rank historical line items by
relevance to the given description, then returns the most likely
value for target_field.
Only returns a prediction when the top result is at least 10%
more relevant than the runner-up.
The search is limited to the most recent entries (configurable via
the 'account.bill.predict.history.limit' system parameter, default 100).
"""
if not description or not partner:
return False
pg_dict = self._get_predict_postgres_dictionary()
search_text = description + ' account_move_line'
sanitized = re.sub(r"[*&()|!':<>=%/~@,.;$\[\]]+", " ", search_text)
ts_expression = ' | '.join(sanitized.split())
try:
primary_source = (
base_query if base_query is not None else self._build_predictive_query(self.move_id)
).select(
SQL("%s AS prediction", target_field),
SQL(
"setweight(to_tsvector(%s, account_move_line.name), 'B') "
"|| setweight(to_tsvector('simple', 'account_move_line'), 'A') AS document",
pg_dict,
),
)
if "(" in target_field.code:
primary_source = SQL(
"%s %s", primary_source,
SQL("GROUP BY account_move_line.id, account_move_line.name, account_move_line.partner_id"),
)
self.env.cr.execute(SQL("""
WITH account_move_line AS MATERIALIZED (%(base_lines)s),
source AS (%(all_sources)s),
ranking AS (
SELECT prediction, ts_rank(source.document, query_plain) AS rank
FROM source, to_tsquery(%(dict)s, %(expr)s) query_plain
WHERE source.document @@ query_plain
)
SELECT prediction, MAX(rank) AS ranking, COUNT(*)
FROM ranking
GROUP BY prediction
ORDER BY ranking DESC, count DESC
LIMIT 2
""",
base_lines=self._build_predictive_query(self.move_id).select(SQL('*')),
all_sources=SQL('(%s)', SQL(') UNION ALL (').join(
[primary_source] + (supplementary_sources or [])
)),
dict=pg_dict,
expr=ts_expression,
))
matches = self.env.cr.dictfetchall()
if matches:
if len(matches) > 1 and matches[0]['ranking'] < 1.1 * matches[1]['ranking']:
return False
return matches[0]['prediction']
except Exception:
_log.exception("Prediction query failed for invoice line field suggestion")
return False
def _predict_taxes(self):
agg_field = SQL(
'array_agg(account_move_line__tax_rel__tax_ids.id '
'ORDER BY account_move_line__tax_rel__tax_ids.id)'
)
qry = self._build_predictive_query(self.move_id)
qry.left_join('account_move_line', 'id', 'account_move_line_account_tax_rel', 'account_move_line_id', 'tax_rel')
qry.left_join('account_move_line__tax_rel', 'account_tax_id', 'account_tax', 'id', 'tax_ids')
qry.add_where('account_move_line__tax_rel__tax_ids.active IS NOT FALSE')
suggested_ids = self._predicted_field(self.name, self.partner_id, agg_field, qry)
if suggested_ids == [None]:
return False
if suggested_ids is not False and set(suggested_ids) != set(self.tax_ids.ids):
return suggested_ids
return False
@api.model
def _predict_specific_tax(self, source_move, label, partner, amt_type, amt_value, tax_use_type):
agg_field = SQL(
'array_agg(account_move_line__tax_rel__tax_ids.id '
'ORDER BY account_move_line__tax_rel__tax_ids.id)'
)
qry = self._build_predictive_query(source_move)
qry.left_join('account_move_line', 'id', 'account_move_line_account_tax_rel', 'account_move_line_id', 'tax_rel')
qry.left_join('account_move_line__tax_rel', 'account_tax_id', 'account_tax', 'id', 'tax_ids')
qry.add_where("""
account_move_line__tax_rel__tax_ids.active IS NOT FALSE
AND account_move_line__tax_rel__tax_ids.amount_type = %s
AND account_move_line__tax_rel__tax_ids.type_tax_use = %s
AND account_move_line__tax_rel__tax_ids.amount = %s
""", (amt_type, tax_use_type, amt_value))
return self._predicted_field(label, partner, agg_field, qry)
def _predict_product(self):
feature_enabled = int(self.env['ir.config_parameter'].sudo().get_param(
'account_predictive_bills.predict_product', '1',
))
if feature_enabled and self.company_id.predict_bill_product:
qry = self._build_predictive_query(
self.move_id,
['|', ('product_id', '=', False), ('product_id.active', '=', True)],
)
suggested = self._predicted_field(
self.name, self.partner_id, SQL('account_move_line.product_id'), qry,
)
if suggested and suggested != self.product_id.id:
return suggested
return False
def _predict_account(self):
target_sql = SQL('account_move_line.account_id')
blocked_group = 'income' if self.move_id.is_purchase_document(True) else 'expense'
acct_qry = self.env['account.account']._where_calc([
*self.env['account.account']._check_company_domain(self.move_id.company_id or self.env.company),
('internal_group', 'not in', (blocked_group, 'off')),
('account_type', 'not in', ('liability_payable', 'asset_receivable')),
])
acct_name_sql = self.env['account.account']._field_to_sql('account_account', 'name')
pg_dict = self._get_predict_postgres_dictionary()
extra_sources = [SQL(acct_qry.select(
SQL("account_account.id AS account_id"),
SQL(
"setweight(to_tsvector(%(dict)s, %(name_sql)s), 'B') AS document",
dict=pg_dict,
name_sql=acct_name_sql,
),
))]
line_qry = self._build_predictive_query(self.move_id, [('account_id', 'in', acct_qry)])
suggested = self._predicted_field(self.name, self.partner_id, target_sql, line_qry, extra_sources)
if suggested and suggested != self.account_id.id:
return suggested
return False
@api.onchange('name')
def _onchange_name_predictive(self):
if not (
(self.move_id.quick_edit_mode or self.move_id.move_type == 'in_invoice')
and self.name
and self.display_type == 'product'
and not self.env.context.get('disable_onchange_name_predictive', False)
):
return
if not self.product_id:
suggested_product = self._predict_product()
if suggested_product:
guarded = ['price_unit', 'tax_ids', 'name']
fields_to_protect = [self._fields[f] for f in guarded if self[f]]
with self.env.protecting(fields_to_protect, self):
self.product_id = suggested_product
# When no product is set, predict account and taxes independently
if not self.product_id:
suggested_acct = self._predict_account()
if suggested_acct:
self.account_id = suggested_acct
suggested_taxes = self._predict_taxes()
if suggested_taxes:
self.tax_ids = [Command.set(suggested_taxes)]
# =========================================================================
# READ GROUP EXTENSIONS
# =========================================================================
def _read_group_select(self, aggregate_spec, query):
"""Enables HAVING clauses that sum values rounded to currency precision."""
col_name, __, func_name = models.parse_read_group_spec(aggregate_spec)
if func_name != 'sum_rounded':
return super()._read_group_select(aggregate_spec, query)
curr_alias = query.make_alias(self._table, 'currency_id')
query.add_join('LEFT JOIN', curr_alias, 'res_currency', SQL(
"%s = %s",
self._field_to_sql(self._table, 'currency_id', query),
SQL.identifier(curr_alias, 'id'),
))
return SQL(
'SUM(ROUND(%s, %s))',
self._field_to_sql(self._table, col_name, query),
self.env['res.currency']._field_to_sql(curr_alias, 'decimal_places', query),
)
def _read_group_groupby(self, table, groupby_spec, query):
"""Enables grouping by absolute rounded values for amount matching."""
if ':' in groupby_spec:
col_name, modifier = groupby_spec.split(':')
if modifier == 'abs_rounded':
curr_alias = query.make_alias(self._table, 'currency_id')
query.add_join('LEFT JOIN', curr_alias, 'res_currency', SQL(
"%s = %s",
self._field_to_sql(self._table, 'currency_id', query),
SQL.identifier(curr_alias, 'id'),
))
return SQL(
'ROUND(ABS(%s), %s)',
self._field_to_sql(self._table, col_name, query),
self.env['res.currency']._field_to_sql(curr_alias, 'decimal_places', query),
)
return super()._read_group_groupby(table, groupby_spec, query)
# =========================================================================
# ASSET OPERATIONS
# =========================================================================
def turn_as_asset(self):
if len(self.company_id) != 1:
raise UserError(_("All selected lines must belong to the same company."))
if any(ln.move_id.state == 'draft' for ln in self):
raise UserError(_("All selected lines must be from posted entries."))
if any(ln.account_id != self[0].account_id for ln in self):
raise UserError(_("All selected lines must share the same account."))
ctx = {
**self.env.context,
'default_original_move_line_ids': [(6, False, self.env.context['active_ids'])],
'default_company_id': self.company_id.id,
}
return {
'name': _("Convert to Asset"),
'type': 'ir.actions.act_window',
'res_model': 'account.asset',
'views': [[False, 'form']],
'target': 'current',
'context': ctx,
}
@api.depends('tax_ids.invoice_repartition_line_ids')
def _compute_non_deductible_tax_value(self):
"""
Calculates the portion of tax that is non-deductible based on
repartition lines excluded from tax closing.
"""
excluded_tax_ids = self.tax_ids.invoice_repartition_line_ids.filtered(
lambda rl: rl.repartition_type == 'tax' and not rl.use_in_tax_closing
).tax_id
aggregated = {}
if excluded_tax_ids:
scope_domain = [('move_id', 'in', self.move_id.ids)]
tax_detail_qry = self._get_query_tax_details_from_domain(scope_domain)
self.flush_model()
self.env.cr.execute(SQL("""
SELECT
tdq.base_line_id,
SUM(tdq.tax_amount_currency)
FROM (%(detail_query)s) AS tdq
JOIN account_move_line aml ON aml.id = tdq.tax_line_id
JOIN account_tax_repartition_line trl ON trl.id = tdq.tax_repartition_line_id
WHERE tdq.base_line_id IN %(line_ids)s
AND trl.use_in_tax_closing IS FALSE
GROUP BY tdq.base_line_id
""",
detail_query=tax_detail_qry,
line_ids=tuple(self.ids),
))
aggregated = {row['base_line_id']: row['sum'] for row in self.env.cr.dictfetchall()}
for ln in self:
ln.non_deductible_tax_value = aggregated.get(ln._origin.id, 0.0)