Files
Odoo-Modules/Fusion Accounting/models/account_partner_ledger.py
2026-02-22 01:22:18 -05:00

800 lines
35 KiB
Python

# Fusion Accounting - Partner Ledger Report Handler
from odoo import api, models, _, fields
from odoo.exceptions import UserError
from odoo.osv import expression
from odoo.tools import SQL
from datetime import timedelta
from collections import defaultdict
class PartnerLedgerCustomHandler(models.AbstractModel):
"""Generates the Partner Ledger report.
Shows journal items grouped by partner, with initial balances and
running totals. Also handles indirectly-linked entries (items
without a partner that were reconciled with a partner's entry).
"""
_name = 'account.partner.ledger.report.handler'
_inherit = 'account.report.custom.handler'
_description = 'Partner Ledger Custom Handler'
# ------------------------------------------------------------------
# Display
# ------------------------------------------------------------------
def _get_custom_display_config(self):
return {
'css_custom_class': 'partner_ledger',
'components': {
'AccountReportLineCell': 'fusion_accounting.PartnerLedgerLineCell',
},
'templates': {
'AccountReportFilters': 'fusion_accounting.PartnerLedgerFilters',
'AccountReportLineName': 'fusion_accounting.PartnerLedgerLineName',
},
}
# ------------------------------------------------------------------
# Dynamic lines
# ------------------------------------------------------------------
def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None):
"""Build all partner lines and a final total line."""
partner_rows, col_totals = self._assemble_partner_rows(report, options)
output = report._regroup_lines_by_name_prefix(
options, partner_rows,
'_report_expand_unfoldable_line_partner_ledger_prefix_group', 0,
)
output = [(0, ln) for ln in output]
output.append((0, self._build_total_line(options, col_totals)))
return output
def _assemble_partner_rows(self, report, options, depth_shift=0):
"""Query partner sums and return ``(lines, totals_by_column_group)``."""
rows = []
col_totals = {
cg: {'debit': 0.0, 'credit': 0.0, 'amount': 0.0, 'balance': 0.0}
for cg in options['column_groups']
}
partner_data = self._query_partner_sums(options)
filter_text = options.get('filter_search_bar', '')
accept_unknown = filter_text.lower() in self._unknown_partner_label().lower()
for partner_rec, col_vals in partner_data:
# When printing with a search filter, skip the Unknown Partner row
# unless the filter matches its label.
if (
options['export_mode'] == 'print'
and filter_text
and not partner_rec
and not accept_unknown
):
continue
per_col = defaultdict(dict)
for cg in options['column_groups']:
psum = col_vals.get(cg, {})
per_col[cg]['debit'] = psum.get('debit', 0.0)
per_col[cg]['credit'] = psum.get('credit', 0.0)
per_col[cg]['amount'] = psum.get('amount', 0.0)
per_col[cg]['balance'] = psum.get('balance', 0.0)
for fld in ('debit', 'credit', 'amount', 'balance'):
col_totals[cg][fld] += per_col[cg][fld]
rows.append(
self._build_partner_line(options, partner_rec, per_col, depth_shift=depth_shift)
)
return rows, col_totals
# ------------------------------------------------------------------
# Prefix-group expand
# ------------------------------------------------------------------
def _report_expand_unfoldable_line_partner_ledger_prefix_group(
self, line_dict_id, groupby, options, progress, offset, unfold_all_batch_data=None,
):
report = self.env['account.report'].browse(options['report_id'])
prefix = report._get_prefix_groups_matched_prefix_from_line_id(line_dict_id)
prefix_filter = [('partner_id.name', '=ilike', f'{prefix}%')]
if self._unknown_partner_label().upper().startswith(prefix):
prefix_filter = expression.OR([prefix_filter, [('partner_id', '=', None)]])
filtered_opts = {
**options,
'forced_domain': options.get('forced_domain', []) + prefix_filter,
}
nest_level = len(prefix) * 2
child_lines, _ = self._assemble_partner_rows(report, filtered_opts, depth_shift=nest_level)
for child in child_lines:
child['id'] = report._build_subline_id(line_dict_id, child['id'])
child['parent_id'] = line_dict_id
grouped_output = report._regroup_lines_by_name_prefix(
options, child_lines,
'_report_expand_unfoldable_line_partner_ledger_prefix_group',
nest_level,
matched_prefix=prefix,
parent_line_dict_id=line_dict_id,
)
return {
'lines': grouped_output,
'offset_increment': len(grouped_output),
'has_more': False,
}
# ------------------------------------------------------------------
# Options
# ------------------------------------------------------------------
def _custom_options_initializer(self, report, options, previous_options):
super()._custom_options_initializer(report, options, previous_options=previous_options)
extra_domain = []
company_ids = report.get_report_company_ids(options)
fx_journals = self.env['res.company'].browse(company_ids).mapped('currency_exchange_journal_id')
if fx_journals:
extra_domain += [
'!', '&', '&', '&',
('credit', '=', 0.0),
('debit', '=', 0.0),
('amount_currency', '!=', 0.0),
('journal_id', 'in', fx_journals.ids),
]
if options['export_mode'] == 'print' and options.get('filter_search_bar'):
extra_domain += [
'|', ('matched_debit_ids.debit_move_id.partner_id.name', 'ilike', options['filter_search_bar']),
'|', ('matched_credit_ids.credit_move_id.partner_id.name', 'ilike', options['filter_search_bar']),
('partner_id.name', 'ilike', options['filter_search_bar']),
]
options['forced_domain'] = options.get('forced_domain', []) + extra_domain
if self.env.user.has_group('base.group_multi_currency'):
options['multi_currency'] = True
hidden_cols = []
options['hide_account'] = (previous_options or {}).get('hide_account', False)
if options['hide_account']:
hidden_cols += ['journal_code', 'account_code', 'matching_number']
options['hide_debit_credit'] = (previous_options or {}).get('hide_debit_credit', False)
if options['hide_debit_credit']:
hidden_cols += ['debit', 'credit']
else:
hidden_cols += ['amount']
options['columns'] = [c for c in options['columns'] if c['expression_label'] not in hidden_cols]
options['buttons'].append({
'name': _('Send'),
'action': 'action_send_statements',
'sequence': 90,
'always_show': True,
})
# ------------------------------------------------------------------
# Batch unfold
# ------------------------------------------------------------------
def _custom_unfold_all_batch_data_generator(self, report, options, lines_to_expand_by_function):
partner_ids = []
for ld in lines_to_expand_by_function.get('_report_expand_unfoldable_line_partner_ledger', []):
markup, mdl, mid = self.env['account.report']._parse_line_id(ld['id'])[-1]
if mdl == 'res.partner':
partner_ids.append(mid)
elif markup == 'no_partner':
partner_ids.append(None)
# Prefix-group expansion
unknown_label_upper = self._unknown_partner_label().upper()
prefix_domains = []
for ld in lines_to_expand_by_function.get(
'_report_expand_unfoldable_line_partner_ledger_prefix_group', [],
):
pfx = report._get_prefix_groups_matched_prefix_from_line_id(ld['id'])
prefix_domains.append([('name', '=ilike', f'{pfx}%')])
if unknown_label_upper.startswith(pfx):
partner_ids.append(None)
if prefix_domains:
partner_ids += self.env['res.partner'].with_context(active_test=False).search(
expression.OR(prefix_domains)
).ids
return {
'initial_balances': self._fetch_initial_balances(partner_ids, options) if partner_ids else {},
'aml_values': self._fetch_aml_data(options, partner_ids) if partner_ids else {},
}
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
def _get_report_send_recipients(self, options):
preset_ids = options.get('partner_ids', [])
if not preset_ids:
self.env.cr.execute(self._build_partner_sums_sql(options))
preset_ids = [r['groupby'] for r in self.env.cr.dictfetchall() if r['groupby']]
return self.env['res.partner'].browse(preset_ids)
def action_send_statements(self, options):
tpl = self.env.ref('fusion_accounting.email_template_customer_statement', False)
return {
'name': _("Send Partner Ledgers"),
'type': 'ir.actions.act_window',
'views': [[False, 'form']],
'res_model': 'account.report.send',
'target': 'new',
'context': {
'default_mail_template_id': tpl.id if tpl else False,
'default_report_options': options,
},
}
@api.model
def action_open_partner(self, options, params):
_, rec_id = self.env['account.report']._get_model_info_from_id(params['id'])
return {
'type': 'ir.actions.act_window',
'res_model': 'res.partner',
'res_id': rec_id,
'views': [[False, 'form']],
'view_mode': 'form',
'target': 'current',
}
# ------------------------------------------------------------------
# SQL helpers
# ------------------------------------------------------------------
def _query_partner_sums(self, options):
"""Fetch sums grouped by partner and apply corrections for
partnerless entries reconciled with partnered entries."""
comp_cur = self.env.company.currency_id
def _assign_if_nonzero(row):
check_fields = ['balance', 'debit', 'credit', 'amount']
if any(not comp_cur.is_zero(row[f]) for f in check_fields):
by_partner.setdefault(row['groupby'], defaultdict(lambda: defaultdict(float)))
for f in check_fields:
by_partner[row['groupby']][row['column_group_key']][f] += row[f]
by_partner = {}
self.env.cr.execute(self._build_partner_sums_sql(options))
for rec in self.env.cr.dictfetchall():
_assign_if_nonzero(rec)
# Correction: partnerless entries reconciled with a partner
self.env.cr.execute(self._build_partnerless_correction_sql(options))
correction_sums = {f: {cg: 0 for cg in options['column_groups']} for f in ('debit', 'credit', 'amount', 'balance')}
for rec in self.env.cr.dictfetchall():
for f in ('debit', 'credit', 'amount', 'balance'):
correction_sums[f][rec['column_group_key']] += rec[f]
if rec['groupby'] in by_partner:
_assign_if_nonzero(rec)
# Adjust the Unknown Partner bucket
if None in by_partner:
for cg in options['column_groups']:
by_partner[None][cg]['debit'] += correction_sums['credit'][cg]
by_partner[None][cg]['credit'] += correction_sums['debit'][cg]
by_partner[None][cg]['amount'] += correction_sums['amount'][cg]
by_partner[None][cg]['balance'] -= correction_sums['balance'][cg]
if by_partner:
partners = self.env['res.partner'].with_context(active_test=False).search_fetch(
[('id', 'in', list(by_partner.keys()))],
["id", "name", "trust", "company_registry", "vat"],
)
else:
partners = self.env['res.partner']
if None in by_partner:
partners = list(partners) + [None]
return [(p, by_partner[p.id if p else None]) for p in partners]
def _build_partner_sums_sql(self, options) -> SQL:
"""SQL that sums debit / credit / balance by partner."""
parts = []
report = self.env.ref('fusion_accounting.partner_ledger_report')
for cg, cg_opts in report._split_options_per_column_group(options).items():
qry = report._get_report_query(cg_opts, 'from_beginning')
parts.append(SQL(
"""
SELECT
account_move_line.partner_id AS groupby,
%(cg)s AS column_group_key,
SUM(%(dr)s) AS debit,
SUM(%(cr)s) AS credit,
SUM(%(bal)s) AS amount,
SUM(%(bal)s) AS balance
FROM %(tbl)s
%(fx)s
WHERE %(cond)s
GROUP BY account_move_line.partner_id
""",
cg=cg,
dr=report._currency_table_apply_rate(SQL("account_move_line.debit")),
cr=report._currency_table_apply_rate(SQL("account_move_line.credit")),
bal=report._currency_table_apply_rate(SQL("account_move_line.balance")),
tbl=qry.from_clause,
fx=report._currency_table_aml_join(cg_opts),
cond=qry.where_clause,
))
return SQL(' UNION ALL ').join(parts)
def _fetch_initial_balances(self, partner_ids, options):
"""Compute opening balances for each partner before date_from."""
parts = []
report = self.env.ref('fusion_accounting.partner_ledger_report')
for cg, cg_opts in report._split_options_per_column_group(options).items():
init_opts = self._derive_initial_balance_options(cg_opts)
qry = report._get_report_query(
init_opts, 'from_beginning', domain=[('partner_id', 'in', partner_ids)],
)
parts.append(SQL(
"""
SELECT
account_move_line.partner_id,
%(cg)s AS column_group_key,
SUM(%(dr)s) AS debit,
SUM(%(cr)s) AS credit,
SUM(%(bal)s) AS amount,
SUM(%(bal)s) AS balance
FROM %(tbl)s
%(fx)s
WHERE %(cond)s
GROUP BY account_move_line.partner_id
""",
cg=cg,
dr=report._currency_table_apply_rate(SQL("account_move_line.debit")),
cr=report._currency_table_apply_rate(SQL("account_move_line.credit")),
bal=report._currency_table_apply_rate(SQL("account_move_line.balance")),
tbl=qry.from_clause,
fx=report._currency_table_aml_join(cg_opts),
cond=qry.where_clause,
))
self.env.cr.execute(SQL(" UNION ALL ").join(parts))
init_map = {
pid: {cg: {} for cg in options['column_groups']}
for pid in partner_ids
}
for row in self.env.cr.dictfetchall():
init_map[row['partner_id']][row['column_group_key']] = row
return init_map
def _derive_initial_balance_options(self, options):
"""Return a modified options dict ending the day before ``date_from``."""
cutoff = fields.Date.from_string(options['date']['date_from']) - timedelta(days=1)
new_date = dict(options['date'], date_from=False, date_to=fields.Date.to_string(cutoff))
return dict(options, date=new_date)
def _build_partnerless_correction_sql(self, options):
"""SQL for partnerless lines reconciled with a partner's line."""
parts = []
report = self.env.ref('fusion_accounting.partner_ledger_report')
for cg, cg_opts in report._split_options_per_column_group(options).items():
qry = report._get_report_query(cg_opts, 'from_beginning')
parts.append(SQL(
"""
SELECT
%(cg)s AS column_group_key,
linked.partner_id AS groupby,
SUM(%(dr)s) AS debit,
SUM(%(cr)s) AS credit,
SUM(%(bal)s) AS amount,
SUM(%(bal)s) AS balance
FROM %(tbl)s
JOIN account_partial_reconcile pr
ON account_move_line.id = pr.debit_move_id
OR account_move_line.id = pr.credit_move_id
JOIN account_move_line linked ON
(linked.id = pr.debit_move_id OR linked.id = pr.credit_move_id)
AND linked.partner_id IS NOT NULL
%(fx)s
WHERE pr.max_date <= %(dt_to)s AND %(cond)s
AND account_move_line.partner_id IS NULL
GROUP BY linked.partner_id
""",
cg=cg,
dr=report._currency_table_apply_rate(SQL(
"CASE WHEN linked.balance > 0 THEN 0 ELSE pr.amount END"
)),
cr=report._currency_table_apply_rate(SQL(
"CASE WHEN linked.balance < 0 THEN 0 ELSE pr.amount END"
)),
bal=report._currency_table_apply_rate(SQL(
"-SIGN(linked.balance) * pr.amount"
)),
tbl=qry.from_clause,
fx=report._currency_table_aml_join(cg_opts, aml_alias=SQL("linked")),
dt_to=cg_opts['date']['date_to'],
cond=qry.where_clause,
))
return SQL(" UNION ALL ").join(parts)
# ------------------------------------------------------------------
# AML detail data
# ------------------------------------------------------------------
def _get_additional_column_aml_values(self):
"""Hook for other modules to inject extra SELECT fields into the
partner-ledger AML query."""
return SQL()
def _fetch_aml_data(self, options, partner_ids, offset=0, limit=None):
"""Load move lines for the given partners.
Returns ``{partner_id: [row, ...]}`` including both directly- and
indirectly-linked entries.
"""
container = {pid: [] for pid in partner_ids}
real_ids = [x for x in partner_ids if x]
direct_clauses = []
indirect_clause = SQL('linked_aml.partner_id IS NOT NULL')
if None in partner_ids:
direct_clauses.append(SQL('account_move_line.partner_id IS NULL'))
if real_ids:
direct_clauses.append(SQL('account_move_line.partner_id IN %s', tuple(real_ids)))
indirect_clause = SQL('linked_aml.partner_id IN %s', tuple(real_ids))
direct_filter = SQL('(%s)', SQL(' OR ').join(direct_clauses))
fragments = []
jnl_name = self.env['account.journal']._field_to_sql('journal', 'name')
report = self.env.ref('fusion_accounting.partner_ledger_report')
extra_cols = self._get_additional_column_aml_values()
for cg, grp_opts in report._split_options_per_column_group(options).items():
qry = report._get_report_query(grp_opts, 'strict_range')
acct_a = qry.left_join(
lhs_alias='account_move_line', lhs_column='account_id',
rhs_table='account_account', rhs_column='id', link='account_id',
)
code_f = self.env['account.account']._field_to_sql(acct_a, 'code', qry)
name_f = self.env['account.account']._field_to_sql(acct_a, 'name')
# Direct entries
fragments.append(SQL(
'''
SELECT
account_move_line.id,
account_move_line.date_maturity,
account_move_line.name,
account_move_line.ref,
account_move_line.company_id,
account_move_line.account_id,
account_move_line.payment_id,
account_move_line.partner_id,
account_move_line.currency_id,
account_move_line.amount_currency,
account_move_line.matching_number,
%(extra_cols)s
COALESCE(account_move_line.invoice_date, account_move_line.date) AS invoice_date,
%(dr)s AS debit,
%(cr)s AS credit,
%(bal)s AS amount,
%(bal)s AS balance,
mv.name AS move_name,
mv.move_type AS move_type,
%(code_f)s AS account_code,
%(name_f)s AS account_name,
journal.code AS journal_code,
%(jnl_name)s AS journal_name,
%(cg)s AS column_group_key,
'directly_linked_aml' AS key,
0 AS partial_id
FROM %(tbl)s
JOIN account_move mv ON mv.id = account_move_line.move_id
%(fx)s
LEFT JOIN res_company co ON co.id = account_move_line.company_id
LEFT JOIN res_partner prt ON prt.id = account_move_line.partner_id
LEFT JOIN account_journal journal ON journal.id = account_move_line.journal_id
WHERE %(cond)s AND %(direct_filter)s
ORDER BY account_move_line.date, account_move_line.id
''',
extra_cols=extra_cols,
dr=report._currency_table_apply_rate(SQL("account_move_line.debit")),
cr=report._currency_table_apply_rate(SQL("account_move_line.credit")),
bal=report._currency_table_apply_rate(SQL("account_move_line.balance")),
code_f=code_f,
name_f=name_f,
jnl_name=jnl_name,
cg=cg,
tbl=qry.from_clause,
fx=report._currency_table_aml_join(grp_opts),
cond=qry.where_clause,
direct_filter=direct_filter,
))
# Indirect (reconciled with a partner but no partner on the line)
fragments.append(SQL(
'''
SELECT
account_move_line.id,
account_move_line.date_maturity,
account_move_line.name,
account_move_line.ref,
account_move_line.company_id,
account_move_line.account_id,
account_move_line.payment_id,
linked_aml.partner_id,
account_move_line.currency_id,
account_move_line.amount_currency,
account_move_line.matching_number,
%(extra_cols)s
COALESCE(account_move_line.invoice_date, account_move_line.date) AS invoice_date,
%(dr)s AS debit,
%(cr)s AS credit,
%(bal)s AS amount,
%(bal)s AS balance,
mv.name AS move_name,
mv.move_type AS move_type,
%(code_f)s AS account_code,
%(name_f)s AS account_name,
journal.code AS journal_code,
%(jnl_name)s AS journal_name,
%(cg)s AS column_group_key,
'indirectly_linked_aml' AS key,
pr.id AS partial_id
FROM %(tbl)s
%(fx)s,
account_partial_reconcile pr,
account_move mv,
account_move_line linked_aml,
account_journal journal
WHERE
(account_move_line.id = pr.debit_move_id OR account_move_line.id = pr.credit_move_id)
AND account_move_line.partner_id IS NULL
AND mv.id = account_move_line.move_id
AND (linked_aml.id = pr.debit_move_id OR linked_aml.id = pr.credit_move_id)
AND %(indirect_clause)s
AND journal.id = account_move_line.journal_id
AND %(acct_alias)s.id = account_move_line.account_id
AND %(cond)s
AND pr.max_date BETWEEN %(dt_from)s AND %(dt_to)s
ORDER BY account_move_line.date, account_move_line.id
''',
extra_cols=extra_cols,
dr=report._currency_table_apply_rate(SQL(
"CASE WHEN linked_aml.balance > 0 THEN 0 ELSE pr.amount END"
)),
cr=report._currency_table_apply_rate(SQL(
"CASE WHEN linked_aml.balance < 0 THEN 0 ELSE pr.amount END"
)),
bal=report._currency_table_apply_rate(SQL("-SIGN(linked_aml.balance) * pr.amount")),
code_f=code_f,
name_f=name_f,
jnl_name=jnl_name,
cg=cg,
tbl=qry.from_clause,
fx=report._currency_table_aml_join(grp_opts),
indirect_clause=indirect_clause,
acct_alias=SQL.identifier(acct_a),
cond=qry.where_clause,
dt_from=grp_opts['date']['date_from'],
dt_to=grp_opts['date']['date_to'],
))
combined = SQL(" UNION ALL ").join(SQL("(%s)", f) for f in fragments)
if offset:
combined = SQL('%s OFFSET %s ', combined, offset)
if limit:
combined = SQL('%s LIMIT %s ', combined, limit)
self.env.cr.execute(combined)
for row in self.env.cr.dictfetchall():
if row['key'] == 'indirectly_linked_aml':
if row['partner_id'] in container:
container[row['partner_id']].append(row)
if None in container:
container[None].append({
**row,
'debit': row['credit'],
'credit': row['debit'],
'amount': row['credit'] - row['debit'],
'balance': -row['balance'],
})
else:
container[row['partner_id']].append(row)
return container
# ------------------------------------------------------------------
# Expand handler
# ------------------------------------------------------------------
def _report_expand_unfoldable_line_partner_ledger(
self, line_dict_id, groupby, options, progress, offset, unfold_all_batch_data=None,
):
def _running_balance(line_dict):
return {
c['column_group_key']: lc.get('no_format', 0)
for c, lc in zip(options['columns'], line_dict['columns'])
if c['expression_label'] == 'balance'
}
report = self.env.ref('fusion_accounting.partner_ledger_report')
_, mdl, rec_id = report._parse_line_id(line_dict_id)[-1]
if mdl != 'res.partner':
raise UserError(_("Invalid line ID for partner ledger expansion: %s", line_dict_id))
# Count prefix-group nesting levels
nesting = sum(
1 for mk, _, _ in report._parse_line_id(line_dict_id)
if isinstance(mk, dict) and 'groupby_prefix_group' in mk
)
depth = nesting * 2
lines = []
# Opening balance
if offset == 0:
if unfold_all_batch_data:
init_by_cg = unfold_all_batch_data['initial_balances'][rec_id]
else:
init_by_cg = self._fetch_initial_balances([rec_id], options)[rec_id]
opening_line = report._get_partner_and_general_ledger_initial_balance_line(
options, line_dict_id, init_by_cg, level_shift=depth,
)
if opening_line:
lines.append(opening_line)
progress = _running_balance(opening_line)
page_size = report.load_more_limit + 1 if report.load_more_limit and options['export_mode'] != 'print' else None
if unfold_all_batch_data:
aml_rows = unfold_all_batch_data['aml_values'][rec_id]
else:
aml_rows = self._fetch_aml_data(options, [rec_id], offset=offset, limit=page_size)[rec_id]
overflow = False
count = 0
running = progress
for row in aml_rows:
if options['export_mode'] != 'print' and report.load_more_limit and count >= report.load_more_limit:
overflow = True
break
new_line = self._build_aml_line(options, row, line_dict_id, running, depth_shift=depth)
lines.append(new_line)
running = _running_balance(new_line)
count += 1
return {
'lines': lines,
'offset_increment': count,
'has_more': overflow,
'progress': running,
}
# ------------------------------------------------------------------
# Line builders
# ------------------------------------------------------------------
def _build_partner_line(self, options, partner, col_data, depth_shift=0):
"""Produce the foldable partner-level line."""
comp_cur = self.env.company.currency_id
first_vals = next(iter(col_data.values()))
can_unfold = not comp_cur.is_zero(first_vals.get('debit', 0) or first_vals.get('credit', 0))
cols = []
report = self.env['account.report'].browse(options['report_id'])
for col_def in options['columns']:
expr = col_def['expression_label']
raw = col_data[col_def['column_group_key']].get(expr)
can_unfold = can_unfold or (
expr in ('debit', 'credit', 'amount') and not comp_cur.is_zero(raw)
)
cols.append(report._build_column_dict(raw, col_def, options=options))
if partner:
lid = report._get_generic_line_id('res.partner', partner.id)
else:
lid = report._get_generic_line_id('res.partner', None, markup='no_partner')
return {
'id': lid,
'name': (partner.name or '')[:128] if partner else self._unknown_partner_label(),
'columns': cols,
'level': 1 + depth_shift,
'trust': partner.trust if partner else None,
'unfoldable': can_unfold,
'unfolded': lid in options['unfolded_lines'] or options['unfold_all'],
'expand_function': '_report_expand_unfoldable_line_partner_ledger',
}
def _unknown_partner_label(self):
return _('Unknown Partner')
@api.model
def _format_aml_name(self, line_name, move_ref, move_name=None):
"""Format the display name for a move line."""
return self.env['account.move.line']._format_aml_name(line_name, move_ref, move_name=move_name)
def _build_aml_line(self, options, row, parent_id, running_bal, depth_shift=0):
"""Build a single move-line row under its partner."""
caret = 'account.payment' if row['payment_id'] else 'account.move.line'
cols = []
report = self.env['account.report'].browse(options['report_id'])
for col_def in options['columns']:
expr = col_def['expression_label']
if expr not in row:
raise UserError(_("Column '%s' is unavailable for this report.", expr))
raw = row[expr] if col_def['column_group_key'] == row['column_group_key'] else None
if raw is None:
cols.append(report._build_column_dict(None, None))
continue
cur = False
if expr == 'balance':
raw += running_bal[col_def['column_group_key']]
if expr == 'amount_currency':
cur = self.env['res.currency'].browse(row['currency_id'])
if cur == self.env.company.currency_id:
raw = ''
cols.append(report._build_column_dict(raw, col_def, options=options, currency=cur))
return {
'id': report._get_generic_line_id(
'account.move.line', row['id'],
parent_line_id=parent_id, markup=row['partial_id'],
),
'parent_id': parent_id,
'name': self._format_aml_name(row['name'], row['ref'], row['move_name']),
'columns': cols,
'caret_options': caret,
'level': 3 + depth_shift,
}
def _build_total_line(self, options, col_totals):
cols = []
report = self.env['account.report'].browse(options['report_id'])
for col_def in options['columns']:
raw = col_totals[col_def['column_group_key']].get(col_def['expression_label'])
cols.append(report._build_column_dict(raw, col_def, options=options))
return {
'id': report._get_generic_line_id(None, None, markup='total'),
'name': _('Total'),
'level': 1,
'columns': cols,
}
def open_journal_items(self, options, params):
params['view_ref'] = 'account.view_move_line_tree_grouped_partner'
report = self.env['account.report'].browse(options['report_id'])
action = report.open_journal_items(options=options, params=params)
action.get('context', {}).update({'search_default_group_by_account': 0})
return action