""" Fusion Accounting - Asset Management & Depreciation Module Provides fixed asset tracking, automated depreciation scheduling, and depreciation schedule reporting for Odoo 19. """ import datetime import psycopg2 from collections import defaultdict from dateutil.relativedelta import relativedelta from markupsafe import Markup from math import copysign from odoo import api, Command, fields, models, _ from odoo.exceptions import UserError, ValidationError from odoo.tools import float_compare, float_is_zero, formatLang from odoo.tools import format_date, SQL, Query from odoo.tools.date_utils import end_of # Normalized calendar constants used for prorata calculations # when not using actual calendar days (constant_periods mode). NORMALIZED_MONTH_DAYS = 30 NORMALIZED_YEAR_DAYS = NORMALIZED_MONTH_DAYS * 12 # Truncation limit for asset names displayed in report lines REPORT_NAME_TRUNCATION = 50 class FusionAsset(models.Model): """ Manages fixed assets, deferred revenue recognition, and their depreciation schedules throughout the asset lifecycle. An asset transitions through: draft -> open (running) -> close/cancelled, with an optional 'paused' state and a special 'model' state for templates. """ _name = 'account.asset' _description = 'Asset/Revenue Recognition' _inherit = ['mail.thread', 'mail.activity.mixin', 'analytic.mixin'] # -- Counters -- depreciation_entries_count = fields.Integer( compute='_compute_counts', string='# Posted Depreciation Entries', ) gross_increase_count = fields.Integer( compute='_compute_counts', string='# Gross Increases', help="Count of child assets created to augment this asset's value", ) total_depreciation_entries_count = fields.Integer( compute='_compute_counts', string='# Depreciation Entries', help="Total depreciation entries including both posted and draft", ) # -- Identity -- name = fields.Char( string='Asset Name', compute='_compute_name', store=True, required=True, readonly=False, tracking=True, ) company_id = fields.Many2one( 'res.company', string='Company', required=True, default=lambda self: self.env.company, ) country_code = fields.Char(related='company_id.account_fiscal_country_id.code') currency_id = fields.Many2one('res.currency', related='company_id.currency_id', store=True) state = fields.Selection( selection=[ ('model', 'Model'), ('draft', 'Draft'), ('open', 'Running'), ('paused', 'On Hold'), ('close', 'Closed'), ('cancelled', 'Cancelled'), ], string='Status', copy=False, default='draft', readonly=True, help=( "Draft: initial state for newly created assets.\n" "Running: asset is confirmed and depreciation entries are generated.\n" "On Hold: depreciation is temporarily suspended.\n" "Closed: depreciation is complete or asset has been disposed.\n" "Cancelled: all depreciation entries have been reversed." ), ) active = fields.Boolean(default=True) # -- Depreciation Parameters -- method = fields.Selection( selection=[ ('linear', 'Straight Line'), ('degressive', 'Declining'), ('degressive_then_linear', 'Declining then Straight Line'), ], string='Method', default='linear', help=( "Straight Line: equal depreciation over each period based on gross value / duration.\n" "Declining: each period's depreciation = remaining value * declining factor.\n" "Declining then Straight Line: uses declining method but guarantees at least " "the straight-line amount per period." ), ) method_number = fields.Integer( string='Duration', default=5, help="Total number of depreciation periods over the asset lifetime", ) method_period = fields.Selection( [('1', 'Months'), ('12', 'Years')], string='Number of Months in a Period', default='12', help="Length of each depreciation period in months", ) method_progress_factor = fields.Float(string='Declining Factor', default=0.3) prorata_computation_type = fields.Selection( selection=[ ('none', 'No Prorata'), ('constant_periods', 'Constant Periods'), ('daily_computation', 'Based on days per period'), ], string="Computation", required=True, default='constant_periods', ) prorata_date = fields.Date( string='Prorata Date', compute='_compute_prorata_date', store=True, readonly=False, help='Reference start date for the prorata computation of the first depreciation period', required=True, precompute=True, copy=True, ) paused_prorata_date = fields.Date( compute='_compute_paused_prorata_date', help="Prorata date adjusted forward by the number of paused days", ) # -- Accounts -- account_asset_id = fields.Many2one( 'account.account', string='Fixed Asset Account', compute='_compute_account_asset_id', help="Account recording the original purchase price of the asset.", store=True, readonly=False, check_company=True, domain="[('account_type', '!=', 'off_balance')]", ) asset_group_id = fields.Many2one( 'account.asset.group', string='Asset Group', tracking=True, index=True, ) account_depreciation_id = fields.Many2one( comodel_name='account.account', string='Depreciation Account', check_company=True, domain="[('account_type', 'not in', ('asset_receivable', 'liability_payable', " "'asset_cash', 'liability_credit_card', 'off_balance')), " "]", help="Account for accumulated depreciation entries reducing asset value.", ) account_depreciation_expense_id = fields.Many2one( comodel_name='account.account', string='Expense Account', check_company=True, domain="[('account_type', 'not in', ('asset_receivable', 'liability_payable', " "'asset_cash', 'liability_credit_card', 'off_balance')), " "]", help="Account debited for periodic depreciation expense recognition.", ) journal_id = fields.Many2one( 'account.journal', string='Journal', check_company=True, domain="[('type', '=', 'general')]", compute='_compute_journal_id', store=True, readonly=False, ) # -- Monetary Values -- original_value = fields.Monetary( string="Original Value", compute='_compute_value', store=True, readonly=False, ) book_value = fields.Monetary( string='Book Value', readonly=True, compute='_compute_book_value', recursive=True, store=True, help="Net book value: residual depreciable amount + salvage value + child asset book values", ) value_residual = fields.Monetary( string='Depreciable Value', compute='_compute_value_residual', ) salvage_value = fields.Monetary( string='Not Depreciable Value', help="Estimated residual value at end of useful life that will not be depreciated.", compute="_compute_salvage_value", store=True, readonly=False, ) salvage_value_pct = fields.Float( string='Not Depreciable Value Percent', help="Percentage of original value to retain as salvage value.", ) total_depreciable_value = fields.Monetary(compute='_compute_total_depreciable_value') gross_increase_value = fields.Monetary( string="Gross Increase Value", compute="_compute_gross_increase_value", compute_sudo=True, ) non_deductible_tax_value = fields.Monetary( string="Non Deductible Tax Value", compute="_compute_non_deductible_tax_value", store=True, readonly=True, ) related_purchase_value = fields.Monetary(compute='_compute_related_purchase_value') # -- Entry Links -- depreciation_move_ids = fields.One2many( 'account.move', 'asset_id', string='Depreciation Lines', ) original_move_line_ids = fields.Many2many( 'account.move.line', 'asset_move_line_rel', 'asset_id', 'line_id', string='Journal Items', copy=False, ) # -- Properties -- asset_properties_definition = fields.PropertiesDefinition('Model Properties') asset_properties = fields.Properties( 'Properties', definition='model_id.asset_properties_definition', copy=True, ) # -- Dates -- acquisition_date = fields.Date( compute='_compute_acquisition_date', store=True, precompute=True, readonly=False, copy=True, ) disposal_date = fields.Date( readonly=False, compute="_compute_disposal_date", store=True, ) # -- Model / Template Reference -- model_id = fields.Many2one( 'account.asset', string='Model', change_default=True, domain="[('company_id', '=', company_id)]", ) account_type = fields.Selection( string="Type of the account", related='account_asset_id.account_type', ) display_account_asset_id = fields.Boolean(compute="_compute_display_account_asset_id") # -- Parent / Child (Gross Increases) -- parent_id = fields.Many2one( 'account.asset', help="Parent asset when this asset represents a gross increase in value", ) children_ids = fields.One2many( 'account.asset', 'parent_id', help="Child assets representing value increases for this asset", ) # -- Import Support -- already_depreciated_amount_import = fields.Monetary( help=( "When importing from external software, set this field to align the " "depreciation schedule with previously recorded depreciation amounts." ), ) # -- Lifetime Tracking -- asset_lifetime_days = fields.Float( compute="_compute_lifetime_days", recursive=True, help="Total normalized days for the asset's depreciation schedule", ) asset_paused_days = fields.Float(copy=False) net_gain_on_sale = fields.Monetary( string="Net gain on sale", help="Net gain or loss realized upon sale/disposal of the asset", copy=False, ) # -- Linked Assets -- linked_assets_ids = fields.One2many( comodel_name='account.asset', string="Linked Assets", compute='_compute_linked_assets', ) count_linked_asset = fields.Integer(compute="_compute_linked_assets") warning_count_assets = fields.Boolean(compute="_compute_linked_assets") # ========================================================================= # COMPUTED FIELDS # ========================================================================= @api.depends('company_id') def _compute_journal_id(self): """Select a general journal matching the asset's company.""" for rec in self: if rec.journal_id and rec.journal_id.company_id == rec.company_id: continue rec.journal_id = self.env['account.journal'].search([ *self.env['account.journal']._check_company_domain(rec.company_id), ('type', '=', 'general'), ], limit=1) @api.depends('salvage_value', 'original_value') def _compute_total_depreciable_value(self): """Net amount subject to depreciation = original minus salvage.""" for rec in self: rec.total_depreciable_value = rec.original_value - rec.salvage_value @api.depends('original_value', 'model_id') def _compute_salvage_value(self): """Derive salvage value from model percentage if configured.""" for rec in self: pct = rec.model_id.salvage_value_pct if pct != 0.0: rec.salvage_value = rec.original_value * pct @api.depends('depreciation_move_ids.date', 'state') def _compute_disposal_date(self): """Set disposal date to the latest depreciation entry date when closed.""" for rec in self: if rec.state == 'close': entry_dates = rec.depreciation_move_ids.filtered( lambda mv: mv.date ).mapped('date') rec.disposal_date = max(entry_dates) if entry_dates else False else: rec.disposal_date = False @api.depends('original_move_line_ids', 'original_move_line_ids.account_id', 'non_deductible_tax_value') def _compute_value(self): """Compute original value from linked purchase journal items.""" for rec in self: if not rec.original_move_line_ids: rec.original_value = rec.original_value or False continue draft_lines = rec.original_move_line_ids.filtered( lambda ln: ln.move_id.state == 'draft' ) if draft_lines: raise UserError(_("All the lines should be posted")) computed_val = rec.related_purchase_value if rec.non_deductible_tax_value: computed_val += rec.non_deductible_tax_value rec.original_value = computed_val @api.depends('original_move_line_ids') @api.depends_context('form_view_ref') def _compute_display_account_asset_id(self): """Hide the asset account field when creating a model from the Chart of Accounts.""" for rec in self: creating_model_from_coa = ( self.env.context.get('form_view_ref') and rec.state == 'model' ) rec.display_account_asset_id = ( not rec.original_move_line_ids and not creating_model_from_coa ) @api.depends('account_depreciation_id', 'account_depreciation_expense_id', 'original_move_line_ids') def _compute_account_asset_id(self): """Derive asset account from linked journal items when available.""" for rec in self: if rec.original_move_line_ids: distinct_accounts = rec.original_move_line_ids.account_id if len(distinct_accounts) > 1: raise UserError(_("All the lines should be from the same account")) rec.account_asset_id = distinct_accounts if not rec.account_asset_id: rec._onchange_account_depreciation_id() @api.depends('original_move_line_ids') def _compute_analytic_distribution(self): """Blend analytic distributions from source journal items weighted by balance.""" for rec in self: blended = {} total_bal = sum(rec.original_move_line_ids.mapped("balance")) if not float_is_zero(total_bal, precision_rounding=rec.currency_id.rounding): for line in rec.original_move_line_ids._origin: if line.analytic_distribution: for acct_key, pct in line.analytic_distribution.items(): blended[acct_key] = blended.get(acct_key, 0) + pct * line.balance for acct_key in blended: blended[acct_key] /= total_bal rec.analytic_distribution = blended if blended else rec.analytic_distribution @api.depends('method_number', 'method_period', 'prorata_computation_type') def _compute_lifetime_days(self): """ Calculate the total normalized lifetime in days for the depreciation schedule. Child assets inherit the remaining lifetime of their parent. """ for rec in self: period_months = int(rec.method_period) total_periods = rec.method_number if not rec.parent_id: if rec.prorata_computation_type == 'daily_computation': # Use actual calendar day count end_dt = rec.prorata_date + relativedelta(months=period_months * total_periods) rec.asset_lifetime_days = (end_dt - rec.prorata_date).days else: # Use normalized 30-day months rec.asset_lifetime_days = period_months * total_periods * NORMALIZED_MONTH_DAYS else: # Child inherits remaining lifetime of parent asset parent = rec.parent_id if rec.prorata_computation_type == 'daily_computation': parent_terminus = parent.paused_prorata_date + relativedelta( days=int(parent.asset_lifetime_days - 1) ) else: full_months = int(parent.asset_lifetime_days / NORMALIZED_MONTH_DAYS) leftover_days = int(parent.asset_lifetime_days % NORMALIZED_MONTH_DAYS) parent_terminus = parent.paused_prorata_date + relativedelta( months=full_months, days=leftover_days - 1 ) rec.asset_lifetime_days = rec._compute_day_span(rec.prorata_date, parent_terminus) @api.depends('acquisition_date', 'company_id', 'prorata_computation_type') def _compute_prorata_date(self): """ Set the prorata reference date. For 'No Prorata' mode, align to the start of the fiscal year containing the acquisition date. """ for rec in self: if rec.prorata_computation_type == 'none' and rec.acquisition_date: fy_info = rec.company_id.compute_fiscalyear_dates(rec.acquisition_date) rec.prorata_date = fy_info.get('date_from') else: rec.prorata_date = rec.acquisition_date @api.depends('prorata_date', 'prorata_computation_type', 'asset_paused_days') def _compute_paused_prorata_date(self): """Shift the prorata date forward by the accumulated pause duration.""" for rec in self: pause_days = rec.asset_paused_days if rec.prorata_computation_type == 'daily_computation': rec.paused_prorata_date = rec.prorata_date + relativedelta(days=pause_days) else: whole_months = int(pause_days / NORMALIZED_MONTH_DAYS) remainder_days = pause_days % NORMALIZED_MONTH_DAYS rec.paused_prorata_date = rec.prorata_date + relativedelta( months=whole_months, days=remainder_days ) @api.depends('original_move_line_ids') def _compute_related_purchase_value(self): """Sum balances from linked purchase journal items, adjusting for multi-asset lines.""" for rec in self: purchase_total = sum(rec.original_move_line_ids.mapped('balance')) if ( rec.account_asset_id.multiple_assets_per_line and len(rec.original_move_line_ids) == 1 ): qty = max(1, int(rec.original_move_line_ids.quantity)) purchase_total /= qty rec.related_purchase_value = purchase_total @api.depends('original_move_line_ids') def _compute_acquisition_date(self): """Derive acquisition date from the earliest linked journal item.""" for rec in self: if rec.acquisition_date: continue candidate_dates = [ (aml.invoice_date or aml.date) for aml in rec.original_move_line_ids ] candidate_dates.append(fields.Date.today()) rec.acquisition_date = min(candidate_dates) @api.depends('original_move_line_ids') def _compute_name(self): """Default name from the first linked journal item's label.""" for rec in self: if not rec.name and rec.original_move_line_ids: rec.name = rec.original_move_line_ids[0].name or '' @api.depends( 'original_value', 'salvage_value', 'already_depreciated_amount_import', 'depreciation_move_ids.state', 'depreciation_move_ids.depreciation_value', 'depreciation_move_ids.reversal_move_ids', ) def _compute_value_residual(self): """Calculate remaining depreciable value after posted depreciation entries.""" for rec in self: posted_depr = rec.depreciation_move_ids.filtered(lambda mv: mv.state == 'posted') total_posted = sum(posted_depr.mapped('depreciation_value')) rec.value_residual = ( rec.original_value - rec.salvage_value - rec.already_depreciated_amount_import - total_posted ) @api.depends('value_residual', 'salvage_value', 'children_ids.book_value') def _compute_book_value(self): """ Net book value includes residual depreciable amount, salvage value, and the book values of any gross increase child assets. When fully closed, the salvage value is excluded. """ for rec in self: children_bv = sum(rec.children_ids.mapped('book_value')) rec.book_value = rec.value_residual + rec.salvage_value + children_bv if rec.state == 'close': all_posted = all(mv.state == 'posted' for mv in rec.depreciation_move_ids) if all_posted: rec.book_value -= rec.salvage_value @api.depends('children_ids.original_value') def _compute_gross_increase_value(self): """Total original value from all gross increase child assets.""" for rec in self: rec.gross_increase_value = sum(rec.children_ids.mapped('original_value')) @api.depends('original_move_line_ids') def _compute_non_deductible_tax_value(self): """Sum non-deductible tax amounts from source journal items.""" for rec in self: ndt_total = 0.0 for line in rec.original_move_line_ids: if line.non_deductible_tax_value: acct = line.account_id multi_asset = acct.create_asset != 'no' and acct.multiple_assets_per_line divisor = line.quantity if multi_asset else 1 converted = line.currency_id._convert( line.non_deductible_tax_value / divisor, rec.currency_id, rec.company_id, line.date, ) ndt_total += rec.currency_id.round(converted) rec.non_deductible_tax_value = ndt_total @api.depends('depreciation_move_ids.state', 'parent_id') def _compute_counts(self): """Compute posted depreciation count, total entry count, and gross increase count.""" posted_counts = { group.id: cnt for group, cnt in self.env['account.move']._read_group( domain=[ ('asset_id', 'in', self.ids), ('state', '=', 'posted'), ], groupby=['asset_id'], aggregates=['__count'], ) } for rec in self: rec.depreciation_entries_count = posted_counts.get(rec.id, 0) rec.total_depreciation_entries_count = len(rec.depreciation_move_ids) rec.gross_increase_count = len(rec.children_ids) @api.depends('original_move_line_ids.asset_ids') def _compute_linked_assets(self): """Find other assets sharing the same source journal items.""" for rec in self: rec.linked_assets_ids = rec.original_move_line_ids.asset_ids - self rec.count_linked_asset = len(rec.linked_assets_ids) running_linked = rec.linked_assets_ids.filtered(lambda a: a.state == 'open') # Flag the smart button in red when at least one linked asset is confirmed rec.warning_count_assets = len(running_linked) > 0 # ========================================================================= # ONCHANGE HANDLERS # ========================================================================= @api.onchange('account_depreciation_id') def _onchange_account_depreciation_id(self): """Default the asset account to the depreciation account if not yet set.""" if not self.original_move_line_ids: if not self.account_asset_id and self.state != 'model': self.account_asset_id = self.account_depreciation_id @api.onchange('original_value', 'original_move_line_ids') def _display_original_value_warning(self): """Warn when the entered original value diverges from the linked purchase total.""" if self.original_move_line_ids: expected = self.related_purchase_value + self.non_deductible_tax_value if self.original_value != expected: return { 'warning': { 'title': _("Warning for the Original Value of %s", self.name), 'message': _( "The amount you have entered (%(entered_amount)s) does not match " "the Related Purchase's value (%(purchase_value)s). " "Please make sure this is what you want.", entered_amount=formatLang(self.env, self.original_value, currency_obj=self.currency_id), purchase_value=formatLang(self.env, expected, currency_obj=self.currency_id), ), } } @api.onchange('original_move_line_ids') def _onchange_original_move_line_ids(self): """Recompute acquisition date when source journal items change.""" self.acquisition_date = False self._compute_acquisition_date() @api.onchange('account_asset_id') def _onchange_account_asset_id(self): """Mirror asset account to depreciation account as a default.""" self.account_depreciation_id = self.account_depreciation_id or self.account_asset_id @api.onchange('model_id') def _onchange_model_id(self): """Apply all depreciation parameters from the selected template model.""" tmpl = self.model_id if tmpl: self.method = tmpl.method self.method_number = tmpl.method_number self.method_period = tmpl.method_period self.method_progress_factor = tmpl.method_progress_factor self.prorata_computation_type = tmpl.prorata_computation_type self.analytic_distribution = tmpl.analytic_distribution or self.analytic_distribution self.account_asset_id = tmpl.account_asset_id self.account_depreciation_id = tmpl.account_depreciation_id self.account_depreciation_expense_id = tmpl.account_depreciation_expense_id self.journal_id = tmpl.journal_id @api.onchange( 'original_value', 'salvage_value', 'acquisition_date', 'method', 'method_progress_factor', 'method_period', 'method_number', 'prorata_computation_type', 'already_depreciated_amount_import', 'prorata_date', ) def onchange_consistent_board(self): """ Clear existing depreciation entries when core parameters change, preventing stale data from lingering on the board. """ self.write({'depreciation_move_ids': [Command.set([])]}) # ========================================================================= # CONSTRAINTS # ========================================================================= @api.constrains('active', 'state') def _check_active(self): for rec in self: if not rec.active and rec.state not in ('close', 'model'): raise UserError(_('You cannot archive a record that is not closed')) @api.constrains('depreciation_move_ids') def _check_depreciations(self): """Ensure the final depreciation entry exhausts the depreciable value.""" for rec in self: if rec.state != 'open' or not rec.depreciation_move_ids: continue last_entry = rec.depreciation_move_ids.sorted(lambda mv: (mv.date, mv.id))[-1] if not rec.currency_id.is_zero(last_entry.asset_remaining_value): raise UserError( _("The remaining value on the last depreciation line must be 0") ) @api.constrains('original_move_line_ids') def _check_related_purchase(self): """Validate that linked purchase lines produce a non-zero value.""" for rec in self: if rec.original_move_line_ids and rec.related_purchase_value == 0: raise UserError(_( "You cannot create an asset from lines containing credit and " "debit on the account or with a null amount" )) if rec.state not in ('model', 'draft'): raise UserError(_( "You cannot add or remove bills when the asset is already running or closed." )) # ========================================================================= # CRUD OVERRIDES # ========================================================================= @api.ondelete(at_uninstall=True) def _unlink_if_model_or_draft(self): """Prevent deletion of active or paused assets.""" for rec in self: if rec.state in ('open', 'paused', 'close'): state_label = dict( self._fields['state']._description_selection(self.env) ).get(rec.state) raise UserError(_( 'You cannot delete a document that is in %s state.', state_label )) posted_count = len( rec.depreciation_move_ids.filtered(lambda mv: mv.state == 'posted') ) if posted_count > 0: raise UserError(_( 'You cannot delete an asset linked to posted entries.\n' 'You should either confirm the asset, then, sell or dispose of it,' ' or cancel the linked journal entries.' )) def unlink(self): """Post deletion notices on linked source journal entries.""" for rec in self: for line in rec.original_move_line_ids: if line.name: notice = _( 'A document linked to %(move_line_name)s has been deleted: %(link)s', move_line_name=line.name, link=rec._get_html_link(), ) else: notice = _( 'A document linked to this move has been deleted: %s', rec._get_html_link(), ) line.move_id.message_post(body=notice) if len(line.move_id.asset_ids) == 1: line.move_id.asset_move_type = False return super(FusionAsset, self).unlink() def copy_data(self, default=None): """Preserve model state and append (copy) suffix when duplicating.""" result = super().copy_data(default) for rec, vals in zip(self, result): if rec.state == 'model': vals['state'] = 'model' vals['name'] = _('%s (copy)', rec.name) vals['account_asset_id'] = rec.account_asset_id.id return result @api.model_create_multi def create(self, vals_list): """Create assets, enforcing draft state for non-model records.""" for vals in vals_list: if ( 'state' in vals and vals['state'] != 'draft' and not (set(vals) - {'account_depreciation_id', 'account_depreciation_expense_id', 'journal_id'}) ): raise UserError(_("Some required values are missing")) is_model_context = self.env.context.get('default_state') == 'model' if not is_model_context and vals.get('state') != 'model': vals['state'] = 'draft' new_records = super(FusionAsset, self.with_context(mail_create_nolog=True)).create(vals_list) # Respect explicit original_value even if compute set a different value for idx, vals in enumerate(vals_list): if 'original_value' in vals: new_records[idx].original_value = vals['original_value'] if self.env.context.get('original_asset'): source_asset = self.env['account.asset'].browse( self.env.context['original_asset'] ) source_asset.model_id = new_records return new_records def write(self, vals): """Propagate account/journal changes to draft depreciation entries.""" result = super().write(vals) for rec in self: for entry in rec.depreciation_move_ids: if entry.state == 'draft' and 'analytic_distribution' in vals: entry.line_ids.analytic_distribution = vals['analytic_distribution'] fiscal_lock = entry.company_id._get_user_fiscal_lock_date(rec.journal_id) if entry.date > fiscal_lock: if 'account_depreciation_id' in vals: # Even-indexed lines (0, 2, 4...) use the depreciation account entry.line_ids[::2].account_id = vals['account_depreciation_id'] if 'account_depreciation_expense_id' in vals: # Odd-indexed lines (1, 3, 5...) use the expense account entry.line_ids[1::2].account_id = vals['account_depreciation_expense_id'] if 'journal_id' in vals: entry.journal_id = vals['journal_id'] return result # ========================================================================= # DEPRECIATION BOARD COMPUTATION # ========================================================================= def _compute_straight_line_amount(self, elapsed_before, elapsed_through_end, depreciable_base): """ Calculate linear depreciation for a period by computing the expected cumulative depreciation at period boundaries and taking the difference. Accounts for value-change entries that modify the schedule mid-life. """ expected_at_start = depreciable_base * elapsed_before / self.asset_lifetime_days expected_at_end = depreciable_base * elapsed_through_end / self.asset_lifetime_days # Reduce for any mid-life value decreases spread over remaining life change_entries = self.depreciation_move_ids.filtered(lambda mv: mv.asset_value_change) period_length = elapsed_through_end - elapsed_before adjustment_total = sum( period_length * mv.depreciation_value / ( self.asset_lifetime_days - self._compute_day_span(self.paused_prorata_date, mv.asset_depreciation_beginning_date) ) for mv in change_entries ) raw_amount = expected_at_end - self.currency_id.round(expected_at_start) - adjustment_total return self.currency_id.round(raw_amount) def _compute_board_amount( self, remaining_value, period_start, period_end, days_already_depreciated, days_remaining_in_life, declining_base, fiscal_year_start=None, total_life_remaining=None, residual_at_recompute=None, recompute_start_date=None, ): """ Determine the depreciation amount for a single period based on the configured method (linear, degressive, or degressive-then-linear). Returns (period_day_count, rounded_amount). """ def _pick_larger_of_declining_and_linear(straight_line_val, fy_start=fiscal_year_start): """ For degressive: compute the declining-balance amount for this period and return whichever is larger (in absolute terms) vs. the straight-line amount. """ fy_dates = self.company_id.compute_fiscalyear_dates(period_end) fy_day_count = self._compute_day_span(fy_dates['date_from'], fy_dates['date_to']) elapsed_in_fy = self._compute_day_span(fy_start, period_end) declining_target = declining_base * (1 - self.method_progress_factor * elapsed_in_fy / fy_day_count) declining_period = remaining_value - declining_target return self._select_dominant_amount(remaining_value, declining_period, straight_line_val) if float_is_zero(self.asset_lifetime_days, 2) or float_is_zero(remaining_value, 2): return 0, 0 elapsed_through_end = self._compute_day_span(self.paused_prorata_date, period_end) elapsed_before = self._compute_day_span( self.paused_prorata_date, period_start + relativedelta(days=-1), ) elapsed_before = max(elapsed_before, 0) period_day_count = elapsed_through_end - elapsed_before if self.method == 'linear': if total_life_remaining and float_compare(total_life_remaining, 0, 2) > 0: elapsed_since_recompute = self._compute_day_span(recompute_start_date, period_end) proportion_consumed = elapsed_since_recompute / total_life_remaining amount = remaining_value - residual_at_recompute * (1 - proportion_consumed) else: amount = self._compute_straight_line_amount( elapsed_before, elapsed_through_end, self.total_depreciable_value ) amount = min(amount, remaining_value, key=abs) elif self.method == 'degressive': eff_start = ( max(fiscal_year_start, self.paused_prorata_date) if fiscal_year_start else self.paused_prorata_date ) span_from_fy_start = ( self._compute_day_span(eff_start, period_start - relativedelta(days=1)) + days_remaining_in_life ) linear_target = ( declining_base - declining_base * self._compute_day_span(eff_start, period_end) / span_from_fy_start ) straight_line_val = remaining_value - linear_target amount = _pick_larger_of_declining_and_linear(straight_line_val, eff_start) elif self.method == 'degressive_then_linear': if not self.parent_id: straight_line_val = self._compute_straight_line_amount( elapsed_before, elapsed_through_end, self.total_depreciable_value ) else: # Align child asset's transition point with the parent's schedule parent = self.parent_id parent_prior_entries = parent.depreciation_move_ids.filtered( lambda mv: mv.date <= self.prorata_date ).sorted(key=lambda mv: (mv.date, mv.id)) if parent_prior_entries: parent_cumul = parent_prior_entries[-1].asset_depreciated_value parent_remaining = parent_prior_entries[-1].asset_remaining_value else: parent_cumul = parent.already_depreciated_amount_import parent_remaining = parent.total_depreciable_value if self.currency_id.is_zero(parent_remaining): straight_line_val = self._compute_straight_line_amount( elapsed_before, elapsed_through_end, self.total_depreciable_value ) else: # Scale the child's depreciable base to match parent's curve scale_factor = 1 + parent_cumul / parent_remaining scaled_base = self.total_depreciable_value * scale_factor lifetime_ratio = self.asset_lifetime_days / parent.asset_lifetime_days straight_line_val = ( self._compute_straight_line_amount( elapsed_before, elapsed_through_end, scaled_base ) * lifetime_ratio ) amount = _pick_larger_of_declining_and_linear(straight_line_val) # Clamp sign to match residual direction if self.currency_id.compare_amounts(remaining_value, 0) > 0: amount = max(amount, 0) else: amount = min(amount, 0) amount = self._clamp_final_period(remaining_value, amount, elapsed_through_end) return period_day_count, self.currency_id.round(amount) def compute_depreciation_board(self, date=False): """ Regenerate the full depreciation schedule. Draft entries at or after the given date are removed, then new entries are computed and created. """ self.depreciation_move_ids.filtered( lambda mv: mv.state == 'draft' and (mv.date >= date if date else True) ).unlink() all_new_vals = [] for rec in self: all_new_vals.extend(rec._recompute_board(date)) created_moves = self.env['account.move'].create(all_new_vals) # Post entries for running assets; future entries remain as auto-post drafts to_post = created_moves.filtered(lambda mv: mv.asset_id.state == 'open') to_post._post() def _recompute_board(self, start_date_override=False): """ Build the list of depreciation move value dicts for a single asset, starting from the given date or the paused prorata date. """ self.ensure_one() posted_entries = self.depreciation_move_ids.filtered( lambda mv: mv.state == 'posted' and not mv.asset_value_change ).sorted(key=lambda mv: (mv.date, mv.id)) prior_import = self.already_depreciated_amount_import draft_total = sum( self.depreciation_move_ids.filtered(lambda mv: mv.state == 'draft') .mapped('depreciation_value') ) remaining = self.value_residual - draft_total if not posted_entries: remaining += prior_import declining_base = recompute_residual = remaining cursor_date = start_date_override or self.paused_prorata_date recompute_origin = cursor_date fy_period_start = cursor_date terminus = self._get_last_day_asset() final_period_end = self._get_end_period_date(terminus) total_life_left = self._compute_day_span(cursor_date, terminus) move_vals_list = [] if not float_is_zero(self.value_residual, precision_rounding=self.currency_id.rounding): while not self.currency_id.is_zero(remaining) and cursor_date < final_period_end: period_end = self._get_end_period_date(cursor_date) fy_end = self.company_id.compute_fiscalyear_dates(period_end).get('date_to') life_left = self._compute_day_span(cursor_date, terminus) day_count, depr_amount = self._compute_board_amount( remaining, cursor_date, period_end, False, life_left, declining_base, fy_period_start, total_life_left, recompute_residual, recompute_origin, ) remaining -= depr_amount # Handle pre-imported depreciation by absorbing it from initial entries if not posted_entries: if abs(prior_import) <= abs(depr_amount): depr_amount -= prior_import prior_import = 0 else: prior_import -= depr_amount depr_amount = 0 if ( self.method == 'degressive_then_linear' and final_period_end < period_end ): period_end = final_period_end if not float_is_zero(depr_amount, precision_rounding=self.currency_id.rounding): move_vals_list.append( self.env['account.move']._prepare_move_for_asset_depreciation({ 'amount': depr_amount, 'asset_id': self, 'depreciation_beginning_date': cursor_date, 'date': period_end, 'asset_number_days': day_count, }) ) # Reset declining base at fiscal year boundaries if period_end == fy_end: next_fy = self.company_id.compute_fiscalyear_dates(period_end) fy_period_start = next_fy.get('date_from') + relativedelta(years=1) declining_base = remaining cursor_date = period_end + relativedelta(days=1) return move_vals_list def _get_end_period_date(self, ref_date): """ Determine the end of the depreciation period containing ref_date. For monthly periods, this is the month-end; for yearly, the fiscal year end. """ self.ensure_one() fy_end = self.company_id.compute_fiscalyear_dates(ref_date).get('date_to') period_end = fy_end if ref_date <= fy_end else fy_end + relativedelta(years=1) if self.method_period == '1': month_last = end_of( datetime.date(ref_date.year, ref_date.month, 1), 'month' ).day period_end = min(ref_date.replace(day=month_last), period_end) return period_end def _compute_day_span(self, dt_start, dt_end): """ Count the number of days between two dates, using either actual calendar days (daily_computation) or normalized 30-day months. """ self.ensure_one() if self.prorata_computation_type == 'daily_computation': return (dt_end - dt_start).days + 1 # Normalized calculation: each month counts as exactly 30 days, # with prorated fractions for partial months at start and end. actual_days_in_start_month = end_of(dt_start, 'month').day # Fraction of the starting month from dt_start through month-end start_fraction = (actual_days_in_start_month - dt_start.day + 1) / actual_days_in_start_month actual_days_in_end_month = end_of(dt_end, 'month').day # Fraction of the ending month from month-start through dt_end end_fraction = dt_end.day / actual_days_in_end_month # Full years between the two dates (in normalized days) year_span = (dt_end.year - dt_start.year) * NORMALIZED_YEAR_DAYS # Full months between the partial start and end months month_span = (dt_end.month - dt_start.month - 1) * NORMALIZED_MONTH_DAYS return ( start_fraction * NORMALIZED_MONTH_DAYS + end_fraction * NORMALIZED_MONTH_DAYS + year_span + month_span ) def _get_last_day_asset(self): """Get the final calendar day of the asset's depreciation schedule.""" ref = self.parent_id if self.parent_id else self period_months = int(ref.method_period) return ref.paused_prorata_date + relativedelta( months=period_months * ref.method_number, days=-1 ) # ========================================================================= # PUBLIC ACTIONS # ========================================================================= def action_open_linked_assets(self): """Open a view showing assets that share the same source journal items.""" action = self.linked_assets_ids.open_asset(['list', 'form']) action.get('context', {}).update({'from_linked_assets': 0}) return action def action_asset_modify(self): """Open the asset modification wizard for disposal or resumption.""" self.ensure_one() wizard = self.env['asset.modify'].create({ 'asset_id': self.id, 'modify_action': 'resume' if self.env.context.get('resume_after_pause') else 'dispose', }) return { 'name': _('Modify Asset'), 'view_mode': 'form', 'res_model': 'asset.modify', 'type': 'ir.actions.act_window', 'target': 'new', 'res_id': wizard.id, 'context': self.env.context, } def action_save_model(self): """Save the current asset's configuration as a reusable model/template.""" return { 'name': _('Save model'), 'views': [[self.env.ref('fusion_accountingview_account_asset_form').id, "form"]], 'res_model': 'account.asset', 'type': 'ir.actions.act_window', 'context': { 'default_state': 'model', 'default_account_asset_id': self.account_asset_id.id, 'default_account_depreciation_id': self.account_depreciation_id.id, 'default_account_depreciation_expense_id': self.account_depreciation_expense_id.id, 'default_journal_id': self.journal_id.id, 'default_method': self.method, 'default_method_number': self.method_number, 'default_method_period': self.method_period, 'default_method_progress_factor': self.method_progress_factor, 'default_prorata_date': self.prorata_date, 'default_prorata_computation_type': self.prorata_computation_type, 'default_analytic_distribution': self.analytic_distribution, 'original_asset': self.id, }, } def open_entries(self): """Open list/form view of all depreciation journal entries.""" return { 'name': _('Journal Entries'), 'view_mode': 'list,form', 'res_model': 'account.move', 'search_view_id': [self.env.ref('account.view_account_move_filter').id, 'search'], 'views': [(self.env.ref('account.view_move_tree').id, 'list'), (False, 'form')], 'type': 'ir.actions.act_window', 'domain': [('id', 'in', self.depreciation_move_ids.ids)], 'context': dict(self.env.context, create=False), } def open_related_entries(self): """Open the source purchase journal items linked to this asset.""" return { 'name': _('Journal Items'), 'view_mode': 'list,form', 'res_model': 'account.move.line', 'view_id': False, 'type': 'ir.actions.act_window', 'domain': [('id', 'in', self.original_move_line_ids.ids)], } def open_increase(self): """Open list/form view of gross increase child assets.""" action = { 'name': _('Gross Increase'), 'view_mode': 'list,form', 'res_model': 'account.asset', 'context': {**self.env.context, 'create': False}, 'view_id': False, 'type': 'ir.actions.act_window', 'domain': [('id', 'in', self.children_ids.ids)], 'views': [(False, 'list'), (False, 'form')], } if len(self.children_ids) == 1: action['views'] = [(False, 'form')] action['res_id'] = self.children_ids.id return action def open_parent_id(self): """Navigate to the parent asset form view.""" return { 'name': _('Parent Asset'), 'view_mode': 'form', 'res_model': 'account.asset', 'type': 'ir.actions.act_window', 'res_id': self.parent_id.id, 'views': [(False, 'form')], } def validate(self): """ Confirm draft assets: transition to 'open' state, compute depreciation board if needed, and post all entries. """ tracked_field_names = [ 'method', 'method_number', 'method_period', 'method_progress_factor', 'salvage_value', 'original_move_line_ids', ] field_definitions = self.env['account.asset'].fields_get(tracked_field_names) self.write({'state': 'open'}) for rec in self: active_fields = field_definitions.copy() if rec.method == 'linear': del active_fields['method_progress_factor'] _dummy, tracking_ids = rec._mail_track( active_fields, dict.fromkeys(tracked_field_names) ) creation_label = _('Asset created') chatter_msg = _('An asset has been created for this move:') + ' ' + rec._get_html_link() rec.message_post(body=creation_label, tracking_value_ids=tracking_ids) for source_move in rec.original_move_line_ids.mapped('move_id'): source_move.message_post(body=chatter_msg) try: if not rec.depreciation_move_ids: rec.compute_depreciation_board() rec._check_depreciations() rec.depreciation_move_ids.filtered( lambda mv: mv.state != 'posted' )._post() except psycopg2.errors.CheckViolation: raise ValidationError(_( "At least one asset (%s) couldn't be set as running " "because it lacks any required information", rec.name )) if rec.account_asset_id.create_asset == 'no': rec._post_non_deductible_tax_value() def set_to_close(self, invoice_line_ids, date=None, message=None): """ Close the asset (and its gross increases), generating disposal moves and computing net gain/loss on sale. """ self.ensure_one() close_date = date or fields.Date.today() fiscal_lock = self.company_id._get_user_fiscal_lock_date(self.journal_id) if close_date <= fiscal_lock: raise UserError(_("You cannot dispose of an asset before the lock date.")) if invoice_line_ids: active_children = self.children_ids.filtered( lambda c: c.state in ('draft', 'open') or c.value_residual > 0 ) if active_children: raise UserError(_( "You cannot automate the journal entry for an asset that has a " "running gross increase. Please use 'Dispose' on the increase(s)." )) combined = self + self.children_ids combined.state = 'close' disposal_move_ids = combined._get_disposal_moves( [invoice_line_ids] * len(combined), close_date ) for rec in combined: body = ( _('Asset sold. %s', message or "") if invoice_line_ids else _('Asset disposed. %s', message or "") ) rec.message_post(body=body) sale_proceeds = abs(sum(ln.balance for ln in invoice_line_ids)) self.net_gain_on_sale = self.currency_id.round(sale_proceeds - self.book_value) if disposal_move_ids: label = _('Disposal Move') if len(disposal_move_ids) == 1 else _('Disposal Moves') view = 'form' if len(disposal_move_ids) == 1 else 'list,form' return { 'name': label, 'view_mode': view, 'res_model': 'account.move', 'type': 'ir.actions.act_window', 'target': 'current', 'res_id': disposal_move_ids[0], 'domain': [('id', 'in', disposal_move_ids)], } def set_to_cancelled(self): """ Cancel the asset: reverse all posted depreciation entries, remove drafts, and log a detailed summary of reversed amounts. """ for rec in self: unreversed_posted = rec.depreciation_move_ids.filtered(lambda mv: ( not mv.reversal_move_ids and not mv.reversed_entry_id and mv.state == 'posted' )) if unreversed_posted: expense_delta = sum(unreversed_posted.line_ids.mapped( lambda ln: ln.debit if ln.account_id == rec.account_depreciation_expense_id else 0.0 )) accumulated_delta = sum(unreversed_posted.line_ids.mapped( lambda ln: ln.credit if ln.account_id == rec.account_depreciation_id else 0.0 )) entry_descriptions = Markup('
').join( unreversed_posted.sorted('date').mapped(lambda mv: ( f'{mv.ref} - {mv.date} - ' f'{formatLang(self.env, mv.depreciation_value, currency_obj=mv.currency_id)} - ' f'{mv.name}' )) ) rec._cancel_future_moves(datetime.date.min) summary = ( _('Asset Cancelled') + Markup('
') + _( 'The account %(exp_acc)s has been credited by %(exp_delta)s, ' 'while the account %(dep_acc)s has been debited by %(dep_delta)s. ' 'This corresponds to %(move_count)s cancelled %(word)s:', exp_acc=rec.account_depreciation_expense_id.display_name, exp_delta=formatLang(self.env, expense_delta, currency_obj=rec.currency_id), dep_acc=rec.account_depreciation_id.display_name, dep_delta=formatLang(self.env, accumulated_delta, currency_obj=rec.currency_id), move_count=len(unreversed_posted), word=_('entries') if len(unreversed_posted) > 1 else _('entry'), ) + Markup('
') + entry_descriptions ) rec._message_log(body=summary) else: rec._message_log(body=_('Asset Cancelled')) rec.depreciation_move_ids.filtered( lambda mv: mv.state == 'draft' ).with_context(force_delete=True).unlink() rec.asset_paused_days = 0 rec.write({'state': 'cancelled'}) def set_to_draft(self): """Reset the asset to draft state.""" self.write({'state': 'draft'}) def set_to_running(self): """Re-open a closed or cancelled asset, resetting net gain.""" if self.depreciation_move_ids: final_entry = max(self.depreciation_move_ids, key=lambda mv: (mv.date, mv.id)) if final_entry.asset_remaining_value != 0: self.env['asset.modify'].create({ 'asset_id': self.id, 'name': _('Reset to running'), }).modify() self.write({'state': 'open', 'net_gain_on_sale': 0}) def resume_after_pause(self): """ Transition a paused asset back to running state via the modification wizard, which handles prorating the current period. """ self.ensure_one() return self.with_context(resume_after_pause=True).action_asset_modify() def pause(self, pause_date, message=None): """ Suspend depreciation: generate a partial-period entry up to the pause date, then set the asset to 'paused' state. """ self.ensure_one() self._create_move_before_date(pause_date) self.write({'state': 'paused'}) self.message_post(body=_("Asset paused. %s", message or "")) def open_asset(self, view_mode): """Return a window action to display the current asset(s).""" if len(self) == 1: view_mode = ['form'] view_list = [v for v in [(False, 'list'), (False, 'form')] if v[1] in view_mode] ctx = dict(self.env.context) ctx.pop('default_move_type', None) return { 'name': _('Asset'), 'view_mode': ','.join(view_mode), 'type': 'ir.actions.act_window', 'res_id': self.id if 'list' not in view_mode else False, 'res_model': 'account.asset', 'views': view_list, 'domain': [('id', 'in', self.ids)], 'context': ctx, } # ========================================================================= # INTERNAL HELPERS # ========================================================================= def _insert_depreciation_line(self, amount, begin_date, entry_date, day_count): """Create and return a single depreciation journal entry.""" self.ensure_one() MoveModel = self.env['account.move'] return MoveModel.create(MoveModel._prepare_move_for_asset_depreciation({ 'amount': amount, 'asset_id': self, 'depreciation_beginning_date': begin_date, 'date': entry_date, 'asset_number_days': day_count, })) def _post_non_deductible_tax_value(self): """Log a chatter message explaining non-deductible tax added to original value.""" if self.non_deductible_tax_value: curr = self.env.company.currency_id self.message_post(body=_( 'A non deductible tax value of %(tax_value)s was added to ' '%(name)s\'s initial value of %(purchase_value)s', tax_value=formatLang(self.env, self.non_deductible_tax_value, currency_obj=curr), name=self.name, purchase_value=formatLang(self.env, self.related_purchase_value, currency_obj=curr), )) def _create_move_before_date(self, cutoff_date): """ Cancel all entries after cutoff_date, then create a partial depreciation entry covering the period up to (and including) the cutoff date. """ posted_before = self.depreciation_move_ids.filtered( lambda mv: ( mv.date <= cutoff_date and not mv.reversal_move_ids and not mv.reversed_entry_id and mv.state == 'posted' ) ).sorted('date') prior_dates = posted_before.mapped('date') fy_start = ( self.company_id.compute_fiscalyear_dates(cutoff_date).get('date_from') if self.method != 'linear' else False ) fy_lead_entry = self.env['account.move'] if prior_dates: latest_posted_date = max(prior_dates) # Find the beginning date of the next depreciation period upcoming_starts = self.depreciation_move_ids.filtered( lambda mv: mv.date > latest_posted_date and ( (not mv.reversal_move_ids and not mv.reversed_entry_id and mv.state == 'posted') or mv.state == 'draft' ) ).mapped('asset_depreciation_beginning_date') period_begin = min(upcoming_starts) if upcoming_starts else self.paused_prorata_date if self.method != 'linear': fy_candidates = self.depreciation_move_ids.filtered( lambda mv: mv.asset_depreciation_beginning_date >= fy_start and ( (not mv.reversal_move_ids and not mv.reversed_entry_id and mv.state == 'posted') or mv.state == 'draft' ) ).sorted(lambda mv: (mv.asset_depreciation_beginning_date, mv.id)) fy_lead_entry = next(iter(fy_candidates), fy_lead_entry) else: period_begin = self.paused_prorata_date declining_start_val = ( fy_lead_entry.asset_remaining_value + fy_lead_entry.depreciation_value ) self._cancel_future_moves(cutoff_date) import_offset = self.already_depreciated_amount_import if not prior_dates else 0 residual_for_calc = ( self.value_residual + self.already_depreciated_amount_import if not prior_dates else self.value_residual ) declining_start_val = declining_start_val or residual_for_calc terminus = self._get_last_day_asset() life_remaining = self._compute_day_span(period_begin, terminus) day_count, depr_amount = self._compute_board_amount( self.value_residual, period_begin, cutoff_date, False, life_remaining, declining_start_val, fy_start, life_remaining, residual_for_calc, period_begin, ) if abs(import_offset) <= abs(depr_amount): depr_amount -= import_offset if not float_is_zero(depr_amount, precision_rounding=self.currency_id.rounding): new_entry = self._insert_depreciation_line( depr_amount, period_begin, cutoff_date, day_count ) new_entry._post() def _cancel_future_moves(self, cutoff_date): """ Remove or reverse depreciation entries dated after cutoff_date. Draft entries are deleted; posted entries are reversed. """ for rec in self: stale_entries = rec.depreciation_move_ids.filtered( lambda mv: mv.state == 'draft' or ( not mv.reversal_move_ids and not mv.reversed_entry_id and mv.state == 'posted' and mv.date > cutoff_date ) ) stale_entries._unlink_or_reverse() def _get_disposal_moves(self, invoice_lines_list, disposal_date): """ Generate disposal/sale journal entries for each asset, balancing the original value, accumulated depreciation, and gain/loss accounts. """ def _build_line(label, asset_rec, amt, account): return (0, 0, { 'name': label, 'account_id': account.id, 'balance': -amt, 'analytic_distribution': analytics, 'currency_id': asset_rec.currency_id.id, 'amount_currency': -asset_rec.company_id.currency_id._convert( from_amount=amt, to_currency=asset_rec.currency_id, company=asset_rec.company_id, date=disposal_date, ), }) created_ids = [] assert len(self) == len(invoice_lines_list) for rec, inv_lines in zip(self, invoice_lines_list): rec._create_move_before_date(disposal_date) analytics = rec.analytic_distribution # Tally invoice line amounts per account invoice_by_account = {} inv_total = 0 orig_val = rec.original_value orig_account = ( rec.original_move_line_ids.account_id if len(rec.original_move_line_ids.account_id) == 1 else rec.account_asset_id ) entries_before = rec.depreciation_move_ids.filtered(lambda mv: mv.date <= disposal_date) cumulative_depr = rec.currency_id.round(copysign( sum(entries_before.mapped('depreciation_value')) + rec.already_depreciated_amount_import, -orig_val, )) depr_account = rec.account_depreciation_id for inv_line in inv_lines: signed_bal = copysign(inv_line.balance, -orig_val) invoice_by_account[inv_line.account_id] = ( invoice_by_account.get(inv_line.account_id, 0) + signed_bal ) inv_total += signed_bal acct_amount_pairs = list(invoice_by_account.items()) gap = -orig_val - cumulative_depr - inv_total gap_account = ( rec.company_id.gain_account_id if gap > 0 else rec.company_id.loss_account_id ) all_line_data = ( [(orig_val, orig_account), (cumulative_depr, depr_account)] + [(amt, acct) for acct, amt in acct_amount_pairs] + [(gap, gap_account)] ) ref_label = ( _("%(asset)s: Sale", asset=rec.name) if inv_lines else _("%(asset)s: Disposal", asset=rec.name) ) move_vals = { 'asset_id': rec.id, 'ref': ref_label, 'asset_depreciation_beginning_date': disposal_date, 'date': disposal_date, 'journal_id': rec.journal_id.id, 'move_type': 'entry', 'asset_move_type': 'sale' if inv_lines else 'disposal', 'line_ids': [ _build_line(ref_label, rec, amt, acct) for amt, acct in all_line_data if acct ], } rec.write({'depreciation_move_ids': [(0, 0, move_vals)]}) created_ids += self.env['account.move'].search([ ('asset_id', '=', rec.id), ('state', '=', 'draft') ]).ids return created_ids def _select_dominant_amount(self, remaining_value, declining_amount, straight_line_amount): """Pick the larger depreciation amount (by absolute value) respecting sign.""" if self.currency_id.compare_amounts(remaining_value, 0) > 0: return max(declining_amount, straight_line_amount) return min(declining_amount, straight_line_amount) def _clamp_final_period(self, remaining_value, computed_amount, elapsed_days): """ In the final period (or when computed amount overshoots), clamp the depreciation to the remaining value. """ if abs(remaining_value) < abs(computed_amount) or elapsed_days >= self.asset_lifetime_days: return remaining_value return computed_amount def _get_own_book_value(self, date=None): """Book value of this asset alone (without children), optionally at a given date.""" self.ensure_one() residual = self._get_residual_value_at_date(date) if date else self.value_residual return residual + self.salvage_value def _get_residual_value_at_date(self, target_date): """ Interpolate the asset's residual value at any arbitrary date by finding the surrounding depreciation entries and prorating. """ relevant_entries = self.depreciation_move_ids.filtered( lambda mv: ( mv.asset_depreciation_beginning_date < target_date and not mv.reversed_entry_id ) ).sorted('asset_depreciation_beginning_date', reverse=True) if not relevant_entries: return 0 if len(relevant_entries) > 1: prior_residual = relevant_entries[1].asset_remaining_value else: prior_residual = ( self.original_value - self.salvage_value - self.already_depreciated_amount_import ) current_entry = relevant_entries[0] entry_start = current_entry.asset_depreciation_beginning_date entry_end = self._get_end_period_date(target_date) proportion = ( self._compute_day_span(entry_start, target_date) / self._compute_day_span(entry_start, entry_end) ) value_consumed = (prior_residual - current_entry.asset_remaining_value) * proportion interpolated = self.currency_id.round(prior_residual - value_consumed) if self.currency_id.compare_amounts(self.original_value, 0) > 0: return max(interpolated, 0) return min(interpolated, 0) class FusionAssetGroup(models.Model): """Logical grouping of assets for organizational and reporting purposes.""" _name = 'account.asset.group' _description = 'Asset Group' _order = 'name' name = fields.Char("Name", index="trigram") company_id = fields.Many2one( 'res.company', string='Company', default=lambda self: self.env.company, ) linked_asset_ids = fields.One2many( 'account.asset', 'asset_group_id', string='Related Assets', ) count_linked_assets = fields.Integer(compute='_compute_count_linked_asset') @api.depends('linked_asset_ids') def _compute_count_linked_asset(self): """Count assets in each group using a single grouped query.""" group_counts = { grp.id: total for grp, total in self.env['account.asset']._read_group( domain=[('asset_group_id', 'in', self.ids)], groupby=['asset_group_id'], aggregates=['__count'], ) } for grp in self: grp.count_linked_assets = group_counts.get(grp.id, 0) def action_open_linked_assets(self): """Open a list/form view of assets belonging to this group.""" self.ensure_one() return { 'name': self.name, 'view_mode': 'list,form', 'res_model': 'account.asset', 'type': 'ir.actions.act_window', 'domain': [('id', 'in', self.linked_asset_ids.ids)], } class FusionAssetReportHandler(models.AbstractModel): """ Custom report handler for the depreciation schedule report. Generates asset-level rows with acquisition, depreciation, and book value columns, and supports grouping by account or asset group. """ _name = 'account.asset.report.handler' _inherit = 'account.report.custom.handler' _description = 'Assets Report Custom Handler' def _get_custom_display_config(self): """Configure the report's CSS class and filter template.""" return { 'client_css_custom_class': 'depreciation_schedule', 'templates': { 'AccountReportFilters': 'fusion_accounting.DepreciationScheduleFilters', }, } def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None): """ Main entry point: generate all report lines, optionally grouped, with a grand total row at the bottom. """ detail_lines, col_group_totals = self._build_ungrouped_lines(report, options) # Apply grouping if configured grouping_mode = options['assets_grouping_field'] if grouping_mode != 'none': detail_lines = self._apply_grouping(report, detail_lines, options) else: detail_lines = report._regroup_lines_by_name_prefix( options, detail_lines, '_report_expand_unfoldable_line_assets_report_prefix_group', 0, ) # Build grand total row total_cols = [] for col_def in options['columns']: col_key = col_def['column_group_key'] expr = col_def['expression_label'] raw_val = col_group_totals[col_key].get(expr) display_val = raw_val if col_def.get('figure_type') == 'monetary' else '' total_cols.append(report._build_column_dict(display_val, col_def, options=options)) if detail_lines: detail_lines.append({ 'id': report._get_generic_line_id(None, None, markup='total'), 'level': 1, 'name': _('Total'), 'columns': total_cols, 'unfoldable': False, 'unfolded': False, }) return [(0, line) for line in detail_lines] def _build_ungrouped_lines(self, report, options, name_prefix=None, parent_line_id=None, restrict_account_id=None): """ Query the database and construct one report line per asset. Returns (lines, totals_by_column_group). """ all_asset_ids = set() combined_data = {} for cg_key, cg_options in report._split_options_per_column_group(options).items(): query_rows = self._fetch_report_rows( cg_options, name_prefix=name_prefix, restrict_account_id=restrict_account_id, ) for acct_id, asset_id, group_id, expr_vals in query_rows: key = (acct_id, asset_id, group_id) all_asset_ids.add(asset_id) combined_data.setdefault(key, {})[cg_key] = expr_vals column_labels = [ 'assets_date_from', 'assets_plus', 'assets_minus', 'assets_date_to', 'depre_date_from', 'depre_plus', 'depre_minus', 'depre_date_to', 'balance', ] col_group_totals = defaultdict(lambda: dict.fromkeys(column_labels, 0.0)) asset_cache = {a.id: a for a in self.env['account.asset'].browse(all_asset_ids)} company_curr = self.env.company.currency_id expr_model = self.env['account.report.expression'] output_lines = [] for (acct_id, asset_id, group_id), cg_data in combined_data.items(): row_columns = [] for col_def in options['columns']: cg_key = col_def['column_group_key'] expr = col_def['expression_label'] if cg_key not in cg_data or expr not in cg_data[cg_key]: row_columns.append(report._build_column_dict(None, None)) continue val = cg_data[cg_key][expr] col_meta = None if val is None else col_def row_columns.append(report._build_column_dict( val, col_meta, options=options, column_expression=expr_model, currency=company_curr, )) if col_def['figure_type'] == 'monetary': col_group_totals[cg_key][expr] += val asset_name = asset_cache[asset_id].name line_dict = { 'id': report._get_generic_line_id('account.asset', asset_id, parent_line_id=parent_line_id), 'level': 2, 'name': asset_name, 'columns': row_columns, 'unfoldable': False, 'unfolded': False, 'caret_options': 'account_asset_line', 'assets_account_id': acct_id, 'assets_asset_group_id': group_id, } if parent_line_id: line_dict['parent_id'] = parent_line_id if len(asset_name) >= REPORT_NAME_TRUNCATION: line_dict['title_hover'] = asset_name output_lines.append(line_dict) return output_lines, col_group_totals def _caret_options_initializer(self): """Define the caret menu option for opening asset records.""" return { 'account_asset_line': [ {'name': _("Open Asset"), 'action': 'caret_option_open_record_form'}, ], } def _custom_options_initializer(self, report, options, previous_options): """Configure dynamic column headers and subheaders for the depreciation schedule.""" super()._custom_options_initializer(report, options, previous_options=previous_options) cg_map = report._split_options_per_column_group(options) for col in options['columns']: cg_opts = cg_map[col['column_group_key']] if col['expression_label'] == 'balance': col['name'] = '' elif col['expression_label'] in ('assets_date_from', 'depre_date_from'): col['name'] = format_date(self.env, cg_opts['date']['date_from']) elif col['expression_label'] in ('assets_date_to', 'depre_date_to'): col['name'] = format_date(self.env, cg_opts['date']['date_to']) options['custom_columns_subheaders'] = [ {"name": _("Characteristics"), "colspan": 4}, {"name": _("Assets"), "colspan": 4}, {"name": _("Depreciation"), "colspan": 4}, {"name": _("Book Value"), "colspan": 1}, ] options['assets_grouping_field'] = previous_options.get('assets_grouping_field') or 'account_id' has_groups = self.env['account.group'].search_count( [('company_id', '=', self.env.company.id)], limit=1, ) hierarchy_pref = previous_options.get('hierarchy', True) options['hierarchy'] = has_groups and hierarchy_pref or False def _fetch_report_rows(self, options, name_prefix=None, restrict_account_id=None): """ Execute the SQL query for the depreciation schedule and transform each raw row into (account_id, asset_id, group_id, {expr_label: value}). """ raw_rows = self._execute_schedule_query( options, name_prefix=name_prefix, restrict_account_id=restrict_account_id, ) result = [] # Separate parent assets from children (gross increases) parent_rows = [] children_by_parent = defaultdict(list) for row in raw_rows: if row['parent_id']: children_by_parent[row['parent_id']].append(row) else: parent_rows.append(row) for row in parent_rows: child_rows = children_by_parent.get(row['asset_id'], []) schedule_values = self._get_parent_asset_values(options, row, child_rows) # Method display label method_label = { 'linear': _("Linear"), 'degressive': _("Declining"), }.get(row['asset_method'], _("Dec. then Straight")) columns = { 'acquisition_date': ( format_date(self.env, row['asset_acquisition_date']) if row['asset_acquisition_date'] else "" ), 'first_depreciation': ( format_date(self.env, row['asset_date']) if row['asset_date'] else "" ), 'method': method_label, **schedule_values, } result.append((row['account_id'], row['asset_id'], row['asset_group_id'], columns)) return result def _get_parent_asset_values(self, options, asset_row, child_rows): """ Compute opening/closing balances and depreciation movements for an asset and its gross-increase children within the report period. """ # Depreciation rate display depr_method = asset_row['asset_method'] periods = asset_row['asset_method_number'] if depr_method == 'linear' and periods: total_months = int(periods) * int(asset_row['asset_method_period']) yr = total_months // 12 mo = total_months % 12 rate_parts = [] if yr: rate_parts.append(_("%(years)s y", years=yr)) if mo: rate_parts.append(_("%(months)s m", months=mo)) rate_str = " ".join(rate_parts) elif depr_method == 'linear': rate_str = '0.00 %' else: rate_str = '{:.2f} %'.format(float(asset_row['asset_method_progress_factor']) * 100) period_from = fields.Date.to_date(options['date']['date_from']) period_to = fields.Date.to_date(options['date']['date_to']) # Determine if the asset was acquired before the report period acq_or_first = asset_row['asset_acquisition_date'] or asset_row['asset_date'] existed_before_period = acq_or_first < period_from # Base depreciation figures depr_opening = asset_row['depreciated_before'] depr_additions = asset_row['depreciated_during'] depr_removals = 0.0 disposal_val = 0.0 if asset_row['asset_disposal_date'] and asset_row['asset_disposal_date'] <= period_to: disposal_val = asset_row['asset_disposal_value'] asset_opening = asset_row['asset_original_value'] if existed_before_period else 0.0 asset_additions = 0.0 if existed_before_period else asset_row['asset_original_value'] asset_removals = 0.0 salvage = asset_row.get('asset_salvage_value', 0.0) # Accumulate child (gross increase) values for child in child_rows: depr_opening += child['depreciated_before'] depr_additions += child['depreciated_during'] child_acq = child['asset_acquisition_date'] or child['asset_date'] child_existed = child_acq < period_from asset_opening += child['asset_original_value'] if child_existed else 0.0 asset_additions += 0.0 if child_existed else child['asset_original_value'] # Closing figures asset_closing = asset_opening + asset_additions - asset_removals depr_closing = depr_opening + depr_additions - depr_removals asset_curr = self.env['res.currency'].browse(asset_row['asset_currency_id']) # Handle fully depreciated & closed assets if ( asset_row['asset_state'] == 'close' and asset_row['asset_disposal_date'] and asset_row['asset_disposal_date'] <= period_to and asset_curr.compare_amounts(depr_closing, asset_closing - salvage) == 0 ): depr_additions -= disposal_val depr_removals += depr_closing - disposal_val depr_closing = 0.0 asset_removals += asset_closing asset_closing = 0.0 # Invert signs for credit-note (negative) assets if asset_curr.compare_amounts(asset_row['asset_original_value'], 0) < 0: asset_additions, asset_removals = -asset_removals, -asset_additions depr_additions, depr_removals = -depr_removals, -depr_additions return { 'duration_rate': rate_str, 'asset_disposal_value': disposal_val, 'assets_date_from': asset_opening, 'assets_plus': asset_additions, 'assets_minus': asset_removals, 'assets_date_to': asset_closing, 'depre_date_from': depr_opening, 'depre_plus': depr_additions, 'depre_minus': depr_removals, 'depre_date_to': depr_closing, 'balance': asset_closing - depr_closing, } def _apply_grouping(self, report, lines, options): """ Organize flat asset lines into collapsible groups based on the configured grouping field (account or asset group). """ if not lines: return lines groups = {} grouping_model = ( 'account.account' if options['assets_grouping_field'] == 'account_id' else 'account.asset.group' ) for line in lines: group_key = ( line.get('assets_account_id') if options['assets_grouping_field'] == 'account_id' else line.get('assets_asset_group_id') ) _model, record_id = report._get_model_info_from_id(line['id']) line['id'] = report._build_line_id([ (None, grouping_model, group_key), (None, 'account.asset', record_id), ]) is_unfolded = any( report._get_model_info_from_id(uid) == (grouping_model, group_key) for uid in options.get('unfolded_lines', []) ) groups.setdefault(group_key, { 'id': report._build_line_id([(None, grouping_model, group_key)]), 'columns': [], 'unfoldable': True, 'unfolded': is_unfolded or options.get('unfold_all'), 'level': 1, 'group_lines': [], })['group_lines'].append(line) # Build the final list with group headers and their children result = [] monetary_col_indices = [ idx for idx, col in enumerate(options['columns']) if col['figure_type'] == 'monetary' ] group_records = self.env[grouping_model].browse(groups.keys()) for grp_rec in group_records: header = groups[grp_rec.id] if options['assets_grouping_field'] == 'account_id': header['name'] = f"{grp_rec.code} {grp_rec.name}" else: header['name'] = grp_rec.name or _('(No %s)', grp_rec._description) result.append(header) running_totals = {ci: 0 for ci in monetary_col_indices} nested_lines = report._regroup_lines_by_name_prefix( options, header.pop('group_lines'), '_report_expand_unfoldable_line_assets_report_prefix_group', header['level'], parent_line_dict_id=header['id'], ) for child_line in nested_lines: for ci in monetary_col_indices: running_totals[ci] += child_line['columns'][ci].get('no_format', 0) child_line['parent_id'] = header['id'] result.append(child_line) # Populate the group header's column totals for ci in range(len(options['columns'])): header['columns'].append(report._build_column_dict( running_totals.get(ci, ''), options['columns'][ci], options=options, )) return result def _execute_schedule_query(self, options, name_prefix=None, restrict_account_id=None): """ Build and execute the SQL query that retrieves asset data for the depreciation schedule report. """ self.env['account.move.line'].check_access('read') self.env['account.asset'].check_access('read') qry = Query(self.env, alias='asset', table=SQL.identifier('account_asset')) acct_alias = qry.join( lhs_alias='asset', lhs_column='account_asset_id', rhs_table='account_account', rhs_column='id', link='account_asset_id', ) move_state_filter = "!= 'cancel'" if options.get('all_entries') else "= 'posted'" qry.add_join('LEFT JOIN', alias='move', table='account_move', condition=SQL( f"move.asset_id = asset.id AND move.state {move_state_filter}" )) acct_code_sql = self.env['account.account']._field_to_sql(acct_alias, 'code', qry) acct_name_sql = self.env['account.account']._field_to_sql(acct_alias, 'name') acct_id_sql = SQL.identifier(acct_alias, 'id') if name_prefix: qry.add_where(SQL("asset.name ILIKE %s", f"{name_prefix}%")) if restrict_account_id: qry.add_where(SQL("%s = %s", acct_id_sql, restrict_account_id)) # Analytic filtering analytic_ids = [] opt_analytics = options.get('analytic_accounts', []) opt_analytics_list = options.get('analytic_accounts_list', []) if opt_analytics and not any(x in opt_analytics_list for x in opt_analytics): analytic_ids.append([str(aid) for aid in opt_analytics]) if opt_analytics_list: analytic_ids.append([str(aid) for aid in opt_analytics_list]) if analytic_ids: qry.add_where(SQL( '%s && %s', analytic_ids, self.env['account.asset']._query_analytic_accounts('asset'), )) # Journal filtering active_journals = tuple( j['id'] for j in options.get('journals', []) if j['model'] == 'account.journal' and j['selected'] ) if active_journals: qry.add_where(SQL("asset.journal_id in %s", active_journals)) stmt = SQL(""" SELECT asset.id AS asset_id, asset.parent_id AS parent_id, asset.name AS asset_name, asset.asset_group_id AS asset_group_id, asset.original_value AS asset_original_value, asset.currency_id AS asset_currency_id, COALESCE(asset.salvage_value, 0) AS asset_salvage_value, MIN(move.date) AS asset_date, asset.disposal_date AS asset_disposal_date, asset.acquisition_date AS asset_acquisition_date, asset.method AS asset_method, asset.method_number AS asset_method_number, asset.method_period AS asset_method_period, asset.method_progress_factor AS asset_method_progress_factor, asset.state AS asset_state, asset.company_id AS company_id, %(acct_code)s AS account_code, %(acct_name)s AS account_name, %(acct_id)s AS account_id, COALESCE(SUM(move.depreciation_value) FILTER (WHERE move.date < %(dt_from)s), 0) + COALESCE(asset.already_depreciated_amount_import, 0) AS depreciated_before, COALESCE(SUM(move.depreciation_value) FILTER (WHERE move.date BETWEEN %(dt_from)s AND %(dt_to)s), 0) AS depreciated_during, COALESCE(SUM(move.depreciation_value) FILTER ( WHERE move.date BETWEEN %(dt_from)s AND %(dt_to)s AND move.asset_number_days IS NULL ), 0) AS asset_disposal_value FROM %(from_clause)s WHERE %(where_clause)s AND asset.company_id IN %(company_ids)s AND (asset.acquisition_date <= %(dt_to)s OR move.date <= %(dt_to)s) AND (asset.disposal_date >= %(dt_from)s OR asset.disposal_date IS NULL) AND (asset.state NOT IN ('model', 'draft', 'cancelled') OR (asset.state = 'draft' AND %(include_draft)s)) AND asset.active = 't' GROUP BY asset.id, account_id, account_code, account_name ORDER BY account_code, asset.acquisition_date, asset.id """, acct_code=acct_code_sql, acct_name=acct_name_sql, acct_id=acct_id_sql, dt_from=options['date']['date_from'], dt_to=options['date']['date_to'], from_clause=qry.from_clause, where_clause=qry.where_clause or SQL('TRUE'), company_ids=tuple(self.env['account.report'].get_report_company_ids(options)), include_draft=options.get('all_entries', False), ) self.env.cr.execute(stmt) return self.env.cr.dictfetchall() def _report_expand_unfoldable_line_assets_report_prefix_group( self, line_dict_id, groupby, options, progress, offset, unfold_all_batch_data=None, ): """Handle expansion of prefix-grouped unfoldable lines in the report.""" prefix = self.env['account.report']._get_prefix_groups_matched_prefix_from_line_id(line_dict_id) rpt = self.env['account.report'].browse(options['report_id']) expanded_lines, _totals = self._build_ungrouped_lines( rpt, options, name_prefix=prefix, parent_line_id=line_dict_id, restrict_account_id=self.env['account.report']._get_res_id_from_line_id( line_dict_id, 'account.account' ), ) expanded_lines = rpt._regroup_lines_by_name_prefix( options, expanded_lines, '_report_expand_unfoldable_line_assets_report_prefix_group', len(prefix), matched_prefix=prefix, parent_line_dict_id=line_dict_id, ) return { 'lines': expanded_lines, 'offset_increment': len(expanded_lines), 'has_more': False, } class AssetsReport(models.Model): """Extend account.report to register asset-specific caret options.""" _inherit = 'account.report' def _get_caret_option_view_map(self): view_map = super()._get_caret_option_view_map() view_map['account.asset.line'] = 'fusion_accountingview_account_asset_expense_form' return view_map