Files
2026-02-22 01:22:18 -05:00

910 lines
34 KiB
Python

# Fusion Accounting - Company Model Extensions
# Adds accounting-specific fields and methods to res.company for
# managing fiscal periods, tax closings, deferred entries, and assets.
import datetime
import itertools
from datetime import timedelta
from dateutil.relativedelta import relativedelta
from odoo import api, fields, models, _
from odoo.exceptions import UserError
from odoo.tools import date_utils
from odoo.tools.misc import DEFAULT_SERVER_DATE_FORMAT, format_date
class ResCompany(models.Model):
_inherit = 'res.company'
# =====================================================================
# Invoicing & Prediction
# =====================================================================
invoicing_switch_threshold = fields.Date(
string="Invoicing Switch Threshold",
help="Entries dated before this threshold are marked as "
"'From Invoicing', hiding their accounting details. "
"Useful when transitioning from Invoicing to full Accounting.",
)
predict_bill_product = fields.Boolean(
string="Predict Bill Product",
)
# =====================================================================
# Invoice Signing
# =====================================================================
sign_invoice = fields.Boolean(
string='Display signing field on invoices',
)
signing_user = fields.Many2one(
comodel_name='res.users',
)
# =====================================================================
# Deferred Expense Configuration
# =====================================================================
deferred_expense_journal_id = fields.Many2one(
comodel_name='account.journal',
string="Deferred Expense Journal",
)
deferred_expense_account_id = fields.Many2one(
comodel_name='account.account',
string="Deferred Expense Account",
)
generate_deferred_expense_entries_method = fields.Selection(
string="Generate Deferred Expense Entries",
selection=[
('on_validation', 'On bill validation'),
('manual', 'Manually & Grouped'),
],
default='on_validation',
required=True,
)
deferred_expense_amount_computation_method = fields.Selection(
string="Deferred Expense Based on",
selection=[
('day', 'Days'),
('month', 'Months'),
('full_months', 'Full Months'),
],
default='month',
required=True,
)
# =====================================================================
# Deferred Revenue Configuration
# =====================================================================
deferred_revenue_journal_id = fields.Many2one(
comodel_name='account.journal',
string="Deferred Revenue Journal",
)
deferred_revenue_account_id = fields.Many2one(
comodel_name='account.account',
string="Deferred Revenue Account",
)
generate_deferred_revenue_entries_method = fields.Selection(
string="Generate Deferred Revenue Entries",
selection=[
('on_validation', 'On bill validation'),
('manual', 'Manually & Grouped'),
],
default='on_validation',
required=True,
)
deferred_revenue_amount_computation_method = fields.Selection(
string="Deferred Revenue Based on",
selection=[
('day', 'Days'),
('month', 'Months'),
('full_months', 'Full Months'),
],
default='month',
required=True,
)
# =====================================================================
# Reporting & Tax Periodicity
# =====================================================================
totals_below_sections = fields.Boolean(
string='Add totals below sections',
help='Display totals and subtotals beneath report sections.',
)
account_tax_periodicity = fields.Selection(
selection=[
('year', 'annually'),
('semester', 'semi-annually'),
('4_months', 'every 4 months'),
('trimester', 'quarterly'),
('2_months', 'every 2 months'),
('monthly', 'monthly'),
],
string="Delay units",
help="Frequency of tax return submissions.",
default='monthly',
required=True,
)
account_tax_periodicity_reminder_day = fields.Integer(
string='Start from',
default=7,
required=True,
)
account_tax_periodicity_journal_id = fields.Many2one(
comodel_name='account.journal',
string='Journal',
domain=[('type', '=', 'general')],
check_company=True,
)
# =====================================================================
# Multicurrency Revaluation
# =====================================================================
account_revaluation_journal_id = fields.Many2one(
comodel_name='account.journal',
domain=[('type', '=', 'general')],
check_company=True,
)
account_revaluation_expense_provision_account_id = fields.Many2one(
comodel_name='account.account',
string='Expense Provision Account',
check_company=True,
)
account_revaluation_income_provision_account_id = fields.Many2one(
comodel_name='account.account',
string='Income Provision Account',
check_company=True,
)
# =====================================================================
# Tax Units & Representatives
# =====================================================================
account_tax_unit_ids = fields.Many2many(
string="Tax Units",
comodel_name='account.tax.unit',
help="Tax units this company participates in.",
)
account_representative_id = fields.Many2one(
comodel_name='res.partner',
string='Accounting Firm',
help="External accounting firm acting as representative for "
"tax report exports.",
)
account_display_representative_field = fields.Boolean(
compute='_compute_account_display_representative_field',
)
# =====================================================================
# Asset Gain/Loss Accounts
# =====================================================================
gain_account_id = fields.Many2one(
comodel_name='account.account',
domain="[]",
check_company=True,
help="Account for recording gains on asset disposal.",
)
loss_account_id = fields.Many2one(
comodel_name='account.account',
domain="[]",
check_company=True,
help="Account for recording losses on asset disposal.",
)
# =====================================================================
# Write Override - Invoicing Switch Threshold
# =====================================================================
def write(self, vals):
"""Handle the invoicing switch threshold by toggling move states
between 'posted' and 'invoicing_legacy' based on the new date."""
prior_thresholds = {
company: company.invoicing_switch_threshold
for company in self
}
result = super().write(vals)
if 'invoicing_switch_threshold' not in vals:
return result
for company in self:
if prior_thresholds[company] == vals['invoicing_switch_threshold']:
continue
self.env['account.move.line'].flush_model(['move_id', 'parent_state'])
self.env['account.move'].flush_model([
'company_id', 'date', 'state',
'payment_state', 'payment_state_before_switch',
])
if company.invoicing_switch_threshold:
# Apply threshold: hide old entries, restore newer ones
self.env.cr.execute("""
UPDATE account_move_line aml
SET parent_state = 'posted'
FROM account_move am
WHERE aml.move_id = am.id
AND am.payment_state = 'invoicing_legacy'
AND am.date >= %(cutoff)s
AND am.company_id = %(cid)s;
UPDATE account_move
SET state = 'posted',
payment_state = payment_state_before_switch,
payment_state_before_switch = null
WHERE payment_state = 'invoicing_legacy'
AND date >= %(cutoff)s
AND company_id = %(cid)s;
UPDATE account_move_line aml
SET parent_state = 'cancel'
FROM account_move am
WHERE aml.move_id = am.id
AND am.state = 'posted'
AND am.date < %(cutoff)s
AND am.company_id = %(cid)s;
UPDATE account_move
SET state = 'cancel',
payment_state_before_switch = payment_state,
payment_state = 'invoicing_legacy'
WHERE state = 'posted'
AND date < %(cutoff)s
AND company_id = %(cid)s;
""", {
'cid': company.id,
'cutoff': company.invoicing_switch_threshold,
})
else:
# Threshold cleared: restore all legacy entries
self.env.cr.execute("""
UPDATE account_move_line aml
SET parent_state = 'posted'
FROM account_move am
WHERE aml.move_id = am.id
AND am.payment_state = 'invoicing_legacy'
AND am.company_id = %(cid)s;
UPDATE account_move
SET state = 'posted',
payment_state = payment_state_before_switch,
payment_state_before_switch = null
WHERE payment_state = 'invoicing_legacy'
AND company_id = %(cid)s;
""", {'cid': company.id})
self.env['account.move.line'].invalidate_model(['parent_state'])
self.env['account.move'].invalidate_model([
'state', 'payment_state', 'payment_state_before_switch',
])
return result
# =====================================================================
# Fiscal Year Computation
# =====================================================================
def compute_fiscalyear_dates(self, current_date):
"""Determine the fiscal year boundaries containing the given date.
:param current_date: Reference date (date or datetime).
:return: Dict with 'date_from', 'date_to', and optionally 'record'.
"""
self.ensure_one()
formatted = current_date.strftime(DEFAULT_SERVER_DATE_FORMAT)
# Check for an explicitly defined fiscal year record
fy_record = self.env['account.fiscal.year'].search([
('company_id', '=', self.id),
('date_from', '<=', formatted),
('date_to', '>=', formatted),
], limit=1)
if fy_record:
return {
'date_from': fy_record.date_from,
'date_to': fy_record.date_to,
'record': fy_record,
}
# Calculate from company fiscal year settings
fy_start, fy_end = date_utils.get_fiscal_year(
current_date,
day=self.fiscalyear_last_day,
month=int(self.fiscalyear_last_month),
)
start_str = fy_start.strftime(DEFAULT_SERVER_DATE_FORMAT)
end_str = fy_end.strftime(DEFAULT_SERVER_DATE_FORMAT)
# Adjust for gaps between fiscal year records
overlapping_start = self.env['account.fiscal.year'].search([
('company_id', '=', self.id),
('date_from', '<=', start_str),
('date_to', '>=', start_str),
], limit=1)
if overlapping_start:
fy_start = overlapping_start.date_to + timedelta(days=1)
overlapping_end = self.env['account.fiscal.year'].search([
('company_id', '=', self.id),
('date_from', '<=', end_str),
('date_to', '>=', end_str),
], limit=1)
if overlapping_end:
fy_end = overlapping_end.date_from - timedelta(days=1)
return {'date_from': fy_start, 'date_to': fy_end}
# =====================================================================
# Statement Reconciliation Redirect
# =====================================================================
def _get_unreconciled_statement_lines_redirect_action(
self, unreconciled_statement_lines,
):
"""Override to open the bank reconciliation widget for
unreconciled statement lines."""
return self.env['account.bank.statement.line']._action_open_bank_reconciliation_widget(
extra_domain=[('id', 'in', unreconciled_statement_lines.ids)],
name=_('Unreconciled statements lines'),
)
# =====================================================================
# Tax Representative
# =====================================================================
@api.depends('account_fiscal_country_id.code')
def _compute_account_display_representative_field(self):
"""Show the representative field only for countries that support it."""
allowed_codes = self._get_countries_allowing_tax_representative()
for company in self:
company.account_display_representative_field = (
company.account_fiscal_country_id.code in allowed_codes
)
def _get_countries_allowing_tax_representative(self):
"""Hook for localization modules to declare countries that
support tax representative functionality.
:return: Set of country code strings.
"""
return set()
# =====================================================================
# Tax Closing Journal
# =====================================================================
def _get_default_misc_journal(self):
"""Retrieve a general-type journal as the default for tax closings."""
return self.env['account.journal'].search([
*self.env['account.journal']._check_company_domain(self),
('type', '=', 'general'),
], limit=1)
def _get_tax_closing_journal(self):
"""Return the configured tax closing journal, falling back
to the default miscellaneous journal."""
result = self.env['account.journal']
for company in self:
result |= (
company.account_tax_periodicity_journal_id
or company._get_default_misc_journal()
)
return result
# =====================================================================
# Company Create/Write for Tax Closings
# =====================================================================
@api.model_create_multi
def create(self, vals_list):
"""Initialize onboardings for newly created companies."""
new_companies = super().create(vals_list)
new_companies._initiate_account_onboardings()
return new_companies
def write(self, values):
"""Regenerate tax closing reminders and moves when periodicity
or journal settings change."""
tracked_deps = (
'account_tax_periodicity',
'account_tax_periodicity_journal_id.id',
)
companies_needing_update = self.env['res.company']
for company in self:
if not company._get_tax_closing_journal():
continue
if any(
dep in values and company.mapped(dep)[0] != values[dep]
for dep in tracked_deps
):
companies_needing_update += company
result = super().write(values)
if not companies_needing_update:
return result
# Cancel existing draft closings and reminder activities
draft_closings = self.env['account.move'].sudo().search([
('company_id', 'in', companies_needing_update.ids),
('tax_closing_report_id', '!=', False),
('state', '=', 'draft'),
])
draft_closings.button_cancel()
general_journals = self.env['account.journal'].sudo().search([
*self.env['account.journal']._check_company_domain(companies_needing_update),
('type', '=', 'general'),
])
closing_activity_type = self.env.ref(
'fusion_accounting.tax_closing_activity_type',
)
stale_activities = self.env['mail.activity'].sudo().search([
('res_id', 'in', general_journals.ids),
('res_model_id', '=', self.env['ir.model']._get_id('account.journal')),
('activity_type_id', '=', closing_activity_type.id),
('active', '=', True),
])
stale_activities.action_cancel()
# Regenerate reminders for each affected company
base_tax_report = self.env.ref('account.generic_tax_report')
for company in companies_needing_update:
country_reports = self.env['account.report'].search([
('availability_condition', '=', 'country'),
('country_id', 'in', company.account_enabled_tax_country_ids.ids),
('root_report_id', '=', base_tax_report.id),
])
if not country_reports.filtered(
lambda r: r.country_id == company.account_fiscal_country_id,
):
country_reports += base_tax_report
for tax_report in country_reports:
p_start, p_end = company._get_tax_closing_period_boundaries(
fields.Date.today(), tax_report,
)
existing_activity = company._get_tax_closing_reminder_activity(
tax_report.id, p_end,
)
has_posted_closing = self.env['account.move'].search_count([
('date', '<=', p_end),
('date', '>=', p_start),
('tax_closing_report_id', '=', tax_report.id),
('company_id', '=', company.id),
('state', '=', 'posted'),
]) > 0
if not existing_activity and not has_posted_closing:
company._generate_tax_closing_reminder_activity(
tax_report, p_end,
)
# Ensure tax journals are visible on dashboard
hidden_journals = (
self._get_tax_closing_journal()
.sudo()
.filtered(lambda j: not j.show_on_dashboard)
)
if hidden_journals:
hidden_journals.show_on_dashboard = True
return result
# =====================================================================
# Tax Closing Move Management
# =====================================================================
def _get_and_update_tax_closing_moves(
self, in_period_date, report,
fiscal_positions=None, include_domestic=False,
):
"""Find or create draft tax closing moves for the given period.
:param in_period_date: Any date within the target tax period.
:param report: The tax report record.
:param fiscal_positions: Optional fiscal position recordset.
:param include_domestic: Include the domestic (no fpos) closing.
:return: Recordset of closing moves.
"""
self.ensure_one()
fpos_list = fiscal_positions or []
period_start, period_end = self._get_tax_closing_period_boundaries(
in_period_date, report,
)
periodicity = self._get_tax_periodicity(report)
closing_journal = self._get_tax_closing_journal()
closing_moves = self.env['account.move']
targets = list(fpos_list) + ([False] if include_domestic else [])
for fpos in targets:
fpos_id = fpos.id if fpos else False
existing_move = self.env['account.move'].search([
('state', '=', 'draft'),
('company_id', '=', self.id),
('tax_closing_report_id', '=', report.id),
('date', '>=', period_start),
('date', '<=', period_end),
('fiscal_position_id', '=', fpos.id if fpos else None),
])
if len(existing_move) > 1:
if fpos:
msg = _(
"Multiple draft tax closing entries found for fiscal "
"position %(position)s after %(period_start)s. "
"Expected at most one.\n%(entries)s",
position=fpos.name,
period_start=period_start,
entries=existing_move.mapped('display_name'),
)
else:
msg = _(
"Multiple draft tax closing entries found for your "
"domestic region after %(period_start)s. "
"Expected at most one.\n%(entries)s",
period_start=period_start,
entries=existing_move.mapped('display_name'),
)
raise UserError(msg)
# Build the reference label
period_desc = self._get_tax_closing_move_description(
periodicity, period_start, period_end, fpos, report,
)
report_label = self._get_tax_closing_report_display_name(report)
ref_text = _(
"%(report_label)s: %(period)s",
report_label=report_label,
period=period_desc,
)
move_vals = {
'company_id': self.id,
'journal_id': closing_journal.id,
'date': period_end,
'tax_closing_report_id': report.id,
'fiscal_position_id': fpos_id,
'ref': ref_text,
'name': '/',
}
if existing_move:
existing_move.write(move_vals)
else:
existing_move = self.env['account.move'].create(move_vals)
# Ensure a reminder activity exists
reminder = self._get_tax_closing_reminder_activity(
report.id, period_end, fpos_id,
)
closing_opts = existing_move._get_tax_closing_report_options(
existing_move.company_id,
existing_move.fiscal_position_id,
existing_move.tax_closing_report_id,
existing_move.date,
)
sender_company = report._get_sender_company_for_export(closing_opts)
if not reminder and sender_company == existing_move.company_id:
self._generate_tax_closing_reminder_activity(
report, period_end, fpos,
)
closing_moves += existing_move
return closing_moves
def _get_tax_closing_report_display_name(self, report):
"""Return a human-readable name for the tax closing report."""
ext_id = report.get_external_id().get(report.id)
generic_ids = (
'account.generic_tax_report',
'account.generic_tax_report_account_tax',
'account.generic_tax_report_tax_account',
)
if ext_id in generic_ids:
return _("Tax return")
return report.display_name
# =====================================================================
# Tax Closing Reminder Activities
# =====================================================================
def _generate_tax_closing_reminder_activity(
self, report, date_in_period=None, fiscal_position=None,
):
"""Create a reminder activity on the tax closing journal."""
self.ensure_one()
if not date_in_period:
date_in_period = fields.Date.today()
activity_type = self.env.ref(
'fusion_accounting.tax_closing_activity_type',
)
p_start, p_end = self._get_tax_closing_period_boundaries(
date_in_period, report,
)
periodicity = self._get_tax_periodicity(report)
deadline = p_end + relativedelta(
days=self.account_tax_periodicity_reminder_day,
)
report_label = self._get_tax_closing_report_display_name(report)
period_desc = self._get_tax_closing_move_description(
periodicity, p_start, p_end, fiscal_position, report,
)
summary_text = _(
"%(report_label)s: %(period)s",
report_label=report_label,
period=period_desc,
)
# Find the appropriate user for the reminder
assigned_user = (
activity_type.default_user_id if activity_type
else self.env['res.users']
)
if assigned_user and not (
self in assigned_user.company_ids
and assigned_user.has_group('account.group_account_manager')
):
assigned_user = self.env['res.users']
if not assigned_user:
assigned_user = self.env['res.users'].search([
('company_ids', 'in', self.ids),
('groups_id', 'in', self.env.ref(
'account.group_account_manager',
).ids),
], limit=1, order="id ASC")
self.env['mail.activity'].with_context(
mail_activity_quick_update=True,
).create({
'res_id': self._get_tax_closing_journal().id,
'res_model_id': self.env['ir.model']._get_id('account.journal'),
'activity_type_id': activity_type.id,
'date_deadline': deadline,
'automated': True,
'summary': summary_text,
'user_id': assigned_user.id or self.env.user.id,
'account_tax_closing_params': {
'report_id': report.id,
'tax_closing_end_date': fields.Date.to_string(p_end),
'fpos_id': fiscal_position.id if fiscal_position else False,
},
})
def _get_tax_closing_reminder_activity(
self, report_id, period_end, fpos_id=False,
):
"""Search for an existing tax closing reminder activity."""
self.ensure_one()
activity_type = self.env.ref(
'fusion_accounting.tax_closing_activity_type',
)
return self._get_tax_closing_journal().activity_ids.filtered(
lambda act: (
act.account_tax_closing_params
and act.activity_type_id == activity_type
and act.account_tax_closing_params['report_id'] == report_id
and fields.Date.from_string(
act.account_tax_closing_params['tax_closing_end_date'],
) == period_end
and act.account_tax_closing_params['fpos_id'] == fpos_id
)
)
# =====================================================================
# Tax Period Description & Boundaries
# =====================================================================
def _get_tax_closing_move_description(
self, periodicity, period_start, period_end, fiscal_position, report,
):
"""Generate a human-readable description of the tax period."""
self.ensure_one()
# Determine region suffix based on foreign VAT positions
fvat_count = self.env['account.fiscal.position'].search_count([
('company_id', '=', self.id),
('foreign_vat', '!=', False),
])
region_suffix = ''
if fvat_count:
if fiscal_position:
country = fiscal_position.country_id.code
states = (
fiscal_position.mapped('state_ids.code')
if fiscal_position.state_ids else []
)
else:
country = self.account_fiscal_country_id.code
state_fpos = self.env['account.fiscal.position'].search_count([
('company_id', '=', self.id),
('foreign_vat', '!=', False),
('country_id', '=', self.account_fiscal_country_id.id),
('state_ids', '!=', False),
])
states = (
[self.state_id.code]
if self.state_id and state_fpos else []
)
if states:
region_suffix = " (%s - %s)" % (country, ', '.join(states))
else:
region_suffix = " (%s)" % country
# Check for custom start date that would break standard period labels
start_day, start_month = self._get_tax_closing_start_date_attributes(report)
if start_day != 1 or start_month != 1:
return (
f"{format_date(self.env, period_start)} - "
f"{format_date(self.env, period_end)}{region_suffix}"
)
if periodicity == 'year':
return f"{period_start.year}{region_suffix}"
elif periodicity == 'trimester':
quarter_label = format_date(
self.env, period_start, date_format='qqq yyyy',
)
return f"{quarter_label}{region_suffix}"
elif periodicity == 'monthly':
month_label = format_date(
self.env, period_start, date_format='LLLL yyyy',
)
return f"{month_label}{region_suffix}"
else:
return (
f"{format_date(self.env, period_start)} - "
f"{format_date(self.env, period_end)}{region_suffix}"
)
def _get_tax_closing_period_boundaries(self, target_date, report):
"""Calculate the start and end dates of the tax period
containing the given date.
:return: Tuple of (period_start, period_end).
"""
self.ensure_one()
months_per_period = self._get_tax_periodicity_months_delay(report)
start_day, start_month = self._get_tax_closing_start_date_attributes(report)
# Align the date backward by the start-day offset
aligned = target_date + relativedelta(days=-(start_day - 1))
yr = aligned.year
month_offset = aligned.month - start_month
period_idx = (month_offset // months_per_period) + 1
# Handle dates that fall before the start-month in the calendar year
if target_date < datetime.date(target_date.year, start_month, start_day):
yr -= 1
period_idx = ((12 + month_offset) // months_per_period) + 1
total_month_delta = period_idx * months_per_period
end_dt = (
datetime.date(yr, start_month, 1)
+ relativedelta(months=total_month_delta, days=start_day - 2)
)
start_dt = (
datetime.date(yr, start_month, 1)
+ relativedelta(
months=total_month_delta - months_per_period,
day=start_day,
)
)
return start_dt, end_dt
def _get_available_tax_unit(self, report):
"""Find a tax unit applicable to this company and report country.
:return: Tax unit recordset (may be empty).
"""
self.ensure_one()
return self.env['account.tax.unit'].search([
('company_ids', 'in', self.id),
('country_id', '=', report.country_id.id),
], limit=1)
def _get_tax_periodicity(self, report):
"""Return the tax periodicity, respecting tax unit configuration."""
target_company = self
if (
report.filter_multi_company == 'tax_units'
and report.country_id
):
tax_unit = self._get_available_tax_unit(report)
if tax_unit:
target_company = tax_unit.main_company_id
return target_company.account_tax_periodicity
def _get_tax_closing_start_date_attributes(self, report):
"""Return (day, month) for the tax closing start date.
:return: Tuple of (start_day, start_month).
"""
if not report.tax_closing_start_date:
jan_first = fields.Date.start_of(fields.Date.today(), 'year')
return jan_first.day, jan_first.month
target_company = self
if (
report.filter_multi_company == 'tax_units'
and report.country_id
):
tax_unit = self._get_available_tax_unit(report)
if tax_unit:
target_company = tax_unit.main_company_id
configured_start = report.with_company(
target_company,
).tax_closing_start_date
return configured_start.day, configured_start.month
def _get_tax_periodicity_months_delay(self, report):
"""Convert the periodicity selection to a number of months.
:return: Integer number of months between tax returns.
"""
self.ensure_one()
month_map = {
'year': 12,
'semester': 6,
'4_months': 4,
'trimester': 3,
'2_months': 2,
'monthly': 1,
}
return month_map[self._get_tax_periodicity(report)]
# =====================================================================
# Branch VAT Grouping
# =====================================================================
def _get_branches_with_same_vat(self, accessible_only=False):
"""Identify all companies in the branch hierarchy that share
the same effective VAT number as this company.
Companies without a VAT inherit the nearest parent's VAT.
The current company is always returned first.
:param accessible_only: Limit to companies in self.env.companies.
:return: Recordset of matching companies.
"""
self.ensure_one()
current = self.sudo()
matching_ids = [current.id]
strict_parents = current.parent_ids - current
if accessible_only:
branch_pool = current.root_id._accessible_branches()
else:
branch_pool = self.env['res.company'].sudo().search([
('id', 'child_of', current.root_id.ids),
])
own_vat_set = {current.vat} if current.vat else set()
for branch in branch_pool - current:
# Collect VAT numbers from intermediary parents
intermediate_vats = set(filter(
None,
(branch.parent_ids - strict_parents).mapped('vat'),
))
if intermediate_vats == own_vat_set:
matching_ids.append(branch.id)
return self.browse(matching_ids)