# Fusion Accounting - Company Model Extensions # Adds accounting-specific fields and methods to res.company for # managing fiscal periods, tax closings, deferred entries, and assets. import datetime import itertools from datetime import timedelta from dateutil.relativedelta import relativedelta from odoo import api, fields, models, _ from odoo.exceptions import UserError from odoo.tools import date_utils from odoo.tools.misc import DEFAULT_SERVER_DATE_FORMAT, format_date class ResCompany(models.Model): _inherit = 'res.company' # ===================================================================== # Invoicing & Prediction # ===================================================================== invoicing_switch_threshold = fields.Date( string="Invoicing Switch Threshold", help="Entries dated before this threshold are marked as " "'From Invoicing', hiding their accounting details. " "Useful when transitioning from Invoicing to full Accounting.", ) predict_bill_product = fields.Boolean( string="Predict Bill Product", ) # ===================================================================== # Invoice Signing # ===================================================================== sign_invoice = fields.Boolean( string='Display signing field on invoices', ) signing_user = fields.Many2one( comodel_name='res.users', ) # ===================================================================== # Deferred Expense Configuration # ===================================================================== deferred_expense_journal_id = fields.Many2one( comodel_name='account.journal', string="Deferred Expense Journal", ) deferred_expense_account_id = fields.Many2one( comodel_name='account.account', string="Deferred Expense Account", ) generate_deferred_expense_entries_method = fields.Selection( string="Generate Deferred Expense Entries", selection=[ ('on_validation', 'On bill validation'), ('manual', 'Manually & Grouped'), ], default='on_validation', required=True, ) deferred_expense_amount_computation_method = fields.Selection( string="Deferred Expense Based on", selection=[ ('day', 'Days'), ('month', 'Months'), ('full_months', 'Full Months'), ], default='month', required=True, ) # ===================================================================== # Deferred Revenue Configuration # ===================================================================== deferred_revenue_journal_id = fields.Many2one( comodel_name='account.journal', string="Deferred Revenue Journal", ) deferred_revenue_account_id = fields.Many2one( comodel_name='account.account', string="Deferred Revenue Account", ) generate_deferred_revenue_entries_method = fields.Selection( string="Generate Deferred Revenue Entries", selection=[ ('on_validation', 'On bill validation'), ('manual', 'Manually & Grouped'), ], default='on_validation', required=True, ) deferred_revenue_amount_computation_method = fields.Selection( string="Deferred Revenue Based on", selection=[ ('day', 'Days'), ('month', 'Months'), ('full_months', 'Full Months'), ], default='month', required=True, ) # ===================================================================== # Reporting & Tax Periodicity # ===================================================================== totals_below_sections = fields.Boolean( string='Add totals below sections', help='Display totals and subtotals beneath report sections.', ) account_tax_periodicity = fields.Selection( selection=[ ('year', 'annually'), ('semester', 'semi-annually'), ('4_months', 'every 4 months'), ('trimester', 'quarterly'), ('2_months', 'every 2 months'), ('monthly', 'monthly'), ], string="Delay units", help="Frequency of tax return submissions.", default='monthly', required=True, ) account_tax_periodicity_reminder_day = fields.Integer( string='Start from', default=7, required=True, ) account_tax_periodicity_journal_id = fields.Many2one( comodel_name='account.journal', string='Journal', domain=[('type', '=', 'general')], check_company=True, ) # ===================================================================== # Multicurrency Revaluation # ===================================================================== account_revaluation_journal_id = fields.Many2one( comodel_name='account.journal', domain=[('type', '=', 'general')], check_company=True, ) account_revaluation_expense_provision_account_id = fields.Many2one( comodel_name='account.account', string='Expense Provision Account', check_company=True, ) account_revaluation_income_provision_account_id = fields.Many2one( comodel_name='account.account', string='Income Provision Account', check_company=True, ) # ===================================================================== # Tax Units & Representatives # ===================================================================== account_tax_unit_ids = fields.Many2many( string="Tax Units", comodel_name='account.tax.unit', help="Tax units this company participates in.", ) account_representative_id = fields.Many2one( comodel_name='res.partner', string='Accounting Firm', help="External accounting firm acting as representative for " "tax report exports.", ) account_display_representative_field = fields.Boolean( compute='_compute_account_display_representative_field', ) # ===================================================================== # Asset Gain/Loss Accounts # ===================================================================== gain_account_id = fields.Many2one( comodel_name='account.account', domain="[]", check_company=True, help="Account for recording gains on asset disposal.", ) loss_account_id = fields.Many2one( comodel_name='account.account', domain="[]", check_company=True, help="Account for recording losses on asset disposal.", ) # ===================================================================== # Write Override - Invoicing Switch Threshold # ===================================================================== def write(self, vals): """Handle the invoicing switch threshold by toggling move states between 'posted' and 'invoicing_legacy' based on the new date.""" prior_thresholds = { company: company.invoicing_switch_threshold for company in self } result = super().write(vals) if 'invoicing_switch_threshold' not in vals: return result for company in self: if prior_thresholds[company] == vals['invoicing_switch_threshold']: continue self.env['account.move.line'].flush_model(['move_id', 'parent_state']) self.env['account.move'].flush_model([ 'company_id', 'date', 'state', 'payment_state', 'payment_state_before_switch', ]) if company.invoicing_switch_threshold: # Apply threshold: hide old entries, restore newer ones self.env.cr.execute(""" UPDATE account_move_line aml SET parent_state = 'posted' FROM account_move am WHERE aml.move_id = am.id AND am.payment_state = 'invoicing_legacy' AND am.date >= %(cutoff)s AND am.company_id = %(cid)s; UPDATE account_move SET state = 'posted', payment_state = payment_state_before_switch, payment_state_before_switch = null WHERE payment_state = 'invoicing_legacy' AND date >= %(cutoff)s AND company_id = %(cid)s; UPDATE account_move_line aml SET parent_state = 'cancel' FROM account_move am WHERE aml.move_id = am.id AND am.state = 'posted' AND am.date < %(cutoff)s AND am.company_id = %(cid)s; UPDATE account_move SET state = 'cancel', payment_state_before_switch = payment_state, payment_state = 'invoicing_legacy' WHERE state = 'posted' AND date < %(cutoff)s AND company_id = %(cid)s; """, { 'cid': company.id, 'cutoff': company.invoicing_switch_threshold, }) else: # Threshold cleared: restore all legacy entries self.env.cr.execute(""" UPDATE account_move_line aml SET parent_state = 'posted' FROM account_move am WHERE aml.move_id = am.id AND am.payment_state = 'invoicing_legacy' AND am.company_id = %(cid)s; UPDATE account_move SET state = 'posted', payment_state = payment_state_before_switch, payment_state_before_switch = null WHERE payment_state = 'invoicing_legacy' AND company_id = %(cid)s; """, {'cid': company.id}) self.env['account.move.line'].invalidate_model(['parent_state']) self.env['account.move'].invalidate_model([ 'state', 'payment_state', 'payment_state_before_switch', ]) return result # ===================================================================== # Fiscal Year Computation # ===================================================================== def compute_fiscalyear_dates(self, current_date): """Determine the fiscal year boundaries containing the given date. :param current_date: Reference date (date or datetime). :return: Dict with 'date_from', 'date_to', and optionally 'record'. """ self.ensure_one() formatted = current_date.strftime(DEFAULT_SERVER_DATE_FORMAT) # Check for an explicitly defined fiscal year record fy_record = self.env['account.fiscal.year'].search([ ('company_id', '=', self.id), ('date_from', '<=', formatted), ('date_to', '>=', formatted), ], limit=1) if fy_record: return { 'date_from': fy_record.date_from, 'date_to': fy_record.date_to, 'record': fy_record, } # Calculate from company fiscal year settings fy_start, fy_end = date_utils.get_fiscal_year( current_date, day=self.fiscalyear_last_day, month=int(self.fiscalyear_last_month), ) start_str = fy_start.strftime(DEFAULT_SERVER_DATE_FORMAT) end_str = fy_end.strftime(DEFAULT_SERVER_DATE_FORMAT) # Adjust for gaps between fiscal year records overlapping_start = self.env['account.fiscal.year'].search([ ('company_id', '=', self.id), ('date_from', '<=', start_str), ('date_to', '>=', start_str), ], limit=1) if overlapping_start: fy_start = overlapping_start.date_to + timedelta(days=1) overlapping_end = self.env['account.fiscal.year'].search([ ('company_id', '=', self.id), ('date_from', '<=', end_str), ('date_to', '>=', end_str), ], limit=1) if overlapping_end: fy_end = overlapping_end.date_from - timedelta(days=1) return {'date_from': fy_start, 'date_to': fy_end} # ===================================================================== # Statement Reconciliation Redirect # ===================================================================== def _get_unreconciled_statement_lines_redirect_action( self, unreconciled_statement_lines, ): """Override to open the bank reconciliation widget for unreconciled statement lines.""" return self.env['account.bank.statement.line']._action_open_bank_reconciliation_widget( extra_domain=[('id', 'in', unreconciled_statement_lines.ids)], name=_('Unreconciled statements lines'), ) # ===================================================================== # Tax Representative # ===================================================================== @api.depends('account_fiscal_country_id.code') def _compute_account_display_representative_field(self): """Show the representative field only for countries that support it.""" allowed_codes = self._get_countries_allowing_tax_representative() for company in self: company.account_display_representative_field = ( company.account_fiscal_country_id.code in allowed_codes ) def _get_countries_allowing_tax_representative(self): """Hook for localization modules to declare countries that support tax representative functionality. :return: Set of country code strings. """ return set() # ===================================================================== # Tax Closing Journal # ===================================================================== def _get_default_misc_journal(self): """Retrieve a general-type journal as the default for tax closings.""" return self.env['account.journal'].search([ *self.env['account.journal']._check_company_domain(self), ('type', '=', 'general'), ], limit=1) def _get_tax_closing_journal(self): """Return the configured tax closing journal, falling back to the default miscellaneous journal.""" result = self.env['account.journal'] for company in self: result |= ( company.account_tax_periodicity_journal_id or company._get_default_misc_journal() ) return result # ===================================================================== # Company Create/Write for Tax Closings # ===================================================================== @api.model_create_multi def create(self, vals_list): """Initialize onboardings for newly created companies.""" new_companies = super().create(vals_list) new_companies._initiate_account_onboardings() return new_companies def write(self, values): """Regenerate tax closing reminders and moves when periodicity or journal settings change.""" tracked_deps = ( 'account_tax_periodicity', 'account_tax_periodicity_journal_id.id', ) companies_needing_update = self.env['res.company'] for company in self: if not company._get_tax_closing_journal(): continue if any( dep in values and company.mapped(dep)[0] != values[dep] for dep in tracked_deps ): companies_needing_update += company result = super().write(values) if not companies_needing_update: return result # Cancel existing draft closings and reminder activities draft_closings = self.env['account.move'].sudo().search([ ('company_id', 'in', companies_needing_update.ids), ('tax_closing_report_id', '!=', False), ('state', '=', 'draft'), ]) draft_closings.button_cancel() general_journals = self.env['account.journal'].sudo().search([ *self.env['account.journal']._check_company_domain(companies_needing_update), ('type', '=', 'general'), ]) closing_activity_type = self.env.ref( 'fusion_accounting.tax_closing_activity_type', ) stale_activities = self.env['mail.activity'].sudo().search([ ('res_id', 'in', general_journals.ids), ('res_model_id', '=', self.env['ir.model']._get_id('account.journal')), ('activity_type_id', '=', closing_activity_type.id), ('active', '=', True), ]) stale_activities.action_cancel() # Regenerate reminders for each affected company base_tax_report = self.env.ref('account.generic_tax_report') for company in companies_needing_update: country_reports = self.env['account.report'].search([ ('availability_condition', '=', 'country'), ('country_id', 'in', company.account_enabled_tax_country_ids.ids), ('root_report_id', '=', base_tax_report.id), ]) if not country_reports.filtered( lambda r: r.country_id == company.account_fiscal_country_id, ): country_reports += base_tax_report for tax_report in country_reports: p_start, p_end = company._get_tax_closing_period_boundaries( fields.Date.today(), tax_report, ) existing_activity = company._get_tax_closing_reminder_activity( tax_report.id, p_end, ) has_posted_closing = self.env['account.move'].search_count([ ('date', '<=', p_end), ('date', '>=', p_start), ('tax_closing_report_id', '=', tax_report.id), ('company_id', '=', company.id), ('state', '=', 'posted'), ]) > 0 if not existing_activity and not has_posted_closing: company._generate_tax_closing_reminder_activity( tax_report, p_end, ) # Ensure tax journals are visible on dashboard hidden_journals = ( self._get_tax_closing_journal() .sudo() .filtered(lambda j: not j.show_on_dashboard) ) if hidden_journals: hidden_journals.show_on_dashboard = True return result # ===================================================================== # Tax Closing Move Management # ===================================================================== def _get_and_update_tax_closing_moves( self, in_period_date, report, fiscal_positions=None, include_domestic=False, ): """Find or create draft tax closing moves for the given period. :param in_period_date: Any date within the target tax period. :param report: The tax report record. :param fiscal_positions: Optional fiscal position recordset. :param include_domestic: Include the domestic (no fpos) closing. :return: Recordset of closing moves. """ self.ensure_one() fpos_list = fiscal_positions or [] period_start, period_end = self._get_tax_closing_period_boundaries( in_period_date, report, ) periodicity = self._get_tax_periodicity(report) closing_journal = self._get_tax_closing_journal() closing_moves = self.env['account.move'] targets = list(fpos_list) + ([False] if include_domestic else []) for fpos in targets: fpos_id = fpos.id if fpos else False existing_move = self.env['account.move'].search([ ('state', '=', 'draft'), ('company_id', '=', self.id), ('tax_closing_report_id', '=', report.id), ('date', '>=', period_start), ('date', '<=', period_end), ('fiscal_position_id', '=', fpos.id if fpos else None), ]) if len(existing_move) > 1: if fpos: msg = _( "Multiple draft tax closing entries found for fiscal " "position %(position)s after %(period_start)s. " "Expected at most one.\n%(entries)s", position=fpos.name, period_start=period_start, entries=existing_move.mapped('display_name'), ) else: msg = _( "Multiple draft tax closing entries found for your " "domestic region after %(period_start)s. " "Expected at most one.\n%(entries)s", period_start=period_start, entries=existing_move.mapped('display_name'), ) raise UserError(msg) # Build the reference label period_desc = self._get_tax_closing_move_description( periodicity, period_start, period_end, fpos, report, ) report_label = self._get_tax_closing_report_display_name(report) ref_text = _( "%(report_label)s: %(period)s", report_label=report_label, period=period_desc, ) move_vals = { 'company_id': self.id, 'journal_id': closing_journal.id, 'date': period_end, 'tax_closing_report_id': report.id, 'fiscal_position_id': fpos_id, 'ref': ref_text, 'name': '/', } if existing_move: existing_move.write(move_vals) else: existing_move = self.env['account.move'].create(move_vals) # Ensure a reminder activity exists reminder = self._get_tax_closing_reminder_activity( report.id, period_end, fpos_id, ) closing_opts = existing_move._get_tax_closing_report_options( existing_move.company_id, existing_move.fiscal_position_id, existing_move.tax_closing_report_id, existing_move.date, ) sender_company = report._get_sender_company_for_export(closing_opts) if not reminder and sender_company == existing_move.company_id: self._generate_tax_closing_reminder_activity( report, period_end, fpos, ) closing_moves += existing_move return closing_moves def _get_tax_closing_report_display_name(self, report): """Return a human-readable name for the tax closing report.""" ext_id = report.get_external_id().get(report.id) generic_ids = ( 'account.generic_tax_report', 'account.generic_tax_report_account_tax', 'account.generic_tax_report_tax_account', ) if ext_id in generic_ids: return _("Tax return") return report.display_name # ===================================================================== # Tax Closing Reminder Activities # ===================================================================== def _generate_tax_closing_reminder_activity( self, report, date_in_period=None, fiscal_position=None, ): """Create a reminder activity on the tax closing journal.""" self.ensure_one() if not date_in_period: date_in_period = fields.Date.today() activity_type = self.env.ref( 'fusion_accounting.tax_closing_activity_type', ) p_start, p_end = self._get_tax_closing_period_boundaries( date_in_period, report, ) periodicity = self._get_tax_periodicity(report) deadline = p_end + relativedelta( days=self.account_tax_periodicity_reminder_day, ) report_label = self._get_tax_closing_report_display_name(report) period_desc = self._get_tax_closing_move_description( periodicity, p_start, p_end, fiscal_position, report, ) summary_text = _( "%(report_label)s: %(period)s", report_label=report_label, period=period_desc, ) # Find the appropriate user for the reminder assigned_user = ( activity_type.default_user_id if activity_type else self.env['res.users'] ) if assigned_user and not ( self in assigned_user.company_ids and assigned_user.has_group('account.group_account_manager') ): assigned_user = self.env['res.users'] if not assigned_user: assigned_user = self.env['res.users'].search([ ('company_ids', 'in', self.ids), ('groups_id', 'in', self.env.ref( 'account.group_account_manager', ).ids), ], limit=1, order="id ASC") self.env['mail.activity'].with_context( mail_activity_quick_update=True, ).create({ 'res_id': self._get_tax_closing_journal().id, 'res_model_id': self.env['ir.model']._get_id('account.journal'), 'activity_type_id': activity_type.id, 'date_deadline': deadline, 'automated': True, 'summary': summary_text, 'user_id': assigned_user.id or self.env.user.id, 'account_tax_closing_params': { 'report_id': report.id, 'tax_closing_end_date': fields.Date.to_string(p_end), 'fpos_id': fiscal_position.id if fiscal_position else False, }, }) def _get_tax_closing_reminder_activity( self, report_id, period_end, fpos_id=False, ): """Search for an existing tax closing reminder activity.""" self.ensure_one() activity_type = self.env.ref( 'fusion_accounting.tax_closing_activity_type', ) return self._get_tax_closing_journal().activity_ids.filtered( lambda act: ( act.account_tax_closing_params and act.activity_type_id == activity_type and act.account_tax_closing_params['report_id'] == report_id and fields.Date.from_string( act.account_tax_closing_params['tax_closing_end_date'], ) == period_end and act.account_tax_closing_params['fpos_id'] == fpos_id ) ) # ===================================================================== # Tax Period Description & Boundaries # ===================================================================== def _get_tax_closing_move_description( self, periodicity, period_start, period_end, fiscal_position, report, ): """Generate a human-readable description of the tax period.""" self.ensure_one() # Determine region suffix based on foreign VAT positions fvat_count = self.env['account.fiscal.position'].search_count([ ('company_id', '=', self.id), ('foreign_vat', '!=', False), ]) region_suffix = '' if fvat_count: if fiscal_position: country = fiscal_position.country_id.code states = ( fiscal_position.mapped('state_ids.code') if fiscal_position.state_ids else [] ) else: country = self.account_fiscal_country_id.code state_fpos = self.env['account.fiscal.position'].search_count([ ('company_id', '=', self.id), ('foreign_vat', '!=', False), ('country_id', '=', self.account_fiscal_country_id.id), ('state_ids', '!=', False), ]) states = ( [self.state_id.code] if self.state_id and state_fpos else [] ) if states: region_suffix = " (%s - %s)" % (country, ', '.join(states)) else: region_suffix = " (%s)" % country # Check for custom start date that would break standard period labels start_day, start_month = self._get_tax_closing_start_date_attributes(report) if start_day != 1 or start_month != 1: return ( f"{format_date(self.env, period_start)} - " f"{format_date(self.env, period_end)}{region_suffix}" ) if periodicity == 'year': return f"{period_start.year}{region_suffix}" elif periodicity == 'trimester': quarter_label = format_date( self.env, period_start, date_format='qqq yyyy', ) return f"{quarter_label}{region_suffix}" elif periodicity == 'monthly': month_label = format_date( self.env, period_start, date_format='LLLL yyyy', ) return f"{month_label}{region_suffix}" else: return ( f"{format_date(self.env, period_start)} - " f"{format_date(self.env, period_end)}{region_suffix}" ) def _get_tax_closing_period_boundaries(self, target_date, report): """Calculate the start and end dates of the tax period containing the given date. :return: Tuple of (period_start, period_end). """ self.ensure_one() months_per_period = self._get_tax_periodicity_months_delay(report) start_day, start_month = self._get_tax_closing_start_date_attributes(report) # Align the date backward by the start-day offset aligned = target_date + relativedelta(days=-(start_day - 1)) yr = aligned.year month_offset = aligned.month - start_month period_idx = (month_offset // months_per_period) + 1 # Handle dates that fall before the start-month in the calendar year if target_date < datetime.date(target_date.year, start_month, start_day): yr -= 1 period_idx = ((12 + month_offset) // months_per_period) + 1 total_month_delta = period_idx * months_per_period end_dt = ( datetime.date(yr, start_month, 1) + relativedelta(months=total_month_delta, days=start_day - 2) ) start_dt = ( datetime.date(yr, start_month, 1) + relativedelta( months=total_month_delta - months_per_period, day=start_day, ) ) return start_dt, end_dt def _get_available_tax_unit(self, report): """Find a tax unit applicable to this company and report country. :return: Tax unit recordset (may be empty). """ self.ensure_one() return self.env['account.tax.unit'].search([ ('company_ids', 'in', self.id), ('country_id', '=', report.country_id.id), ], limit=1) def _get_tax_periodicity(self, report): """Return the tax periodicity, respecting tax unit configuration.""" target_company = self if ( report.filter_multi_company == 'tax_units' and report.country_id ): tax_unit = self._get_available_tax_unit(report) if tax_unit: target_company = tax_unit.main_company_id return target_company.account_tax_periodicity def _get_tax_closing_start_date_attributes(self, report): """Return (day, month) for the tax closing start date. :return: Tuple of (start_day, start_month). """ if not report.tax_closing_start_date: jan_first = fields.Date.start_of(fields.Date.today(), 'year') return jan_first.day, jan_first.month target_company = self if ( report.filter_multi_company == 'tax_units' and report.country_id ): tax_unit = self._get_available_tax_unit(report) if tax_unit: target_company = tax_unit.main_company_id configured_start = report.with_company( target_company, ).tax_closing_start_date return configured_start.day, configured_start.month def _get_tax_periodicity_months_delay(self, report): """Convert the periodicity selection to a number of months. :return: Integer number of months between tax returns. """ self.ensure_one() month_map = { 'year': 12, 'semester': 6, '4_months': 4, 'trimester': 3, '2_months': 2, 'monthly': 1, } return month_map[self._get_tax_periodicity(report)] # ===================================================================== # Branch VAT Grouping # ===================================================================== def _get_branches_with_same_vat(self, accessible_only=False): """Identify all companies in the branch hierarchy that share the same effective VAT number as this company. Companies without a VAT inherit the nearest parent's VAT. The current company is always returned first. :param accessible_only: Limit to companies in self.env.companies. :return: Recordset of matching companies. """ self.ensure_one() current = self.sudo() matching_ids = [current.id] strict_parents = current.parent_ids - current if accessible_only: branch_pool = current.root_id._accessible_branches() else: branch_pool = self.env['res.company'].sudo().search([ ('id', 'child_of', current.root_id.ids), ]) own_vat_set = {current.vat} if current.vat else set() for branch in branch_pool - current: # Collect VAT numbers from intermediary parents intermediate_vats = set(filter( None, (branch.parent_ids - strict_parents).mapped('vat'), )) if intermediate_vats == own_vat_set: matching_ids.append(branch.id) return self.browse(matching_ids)