# Fusion Accounting - EC Sales / Tax Report Handler from collections import defaultdict from odoo import _, api, fields, models from odoo.tools import SQL class ECSalesReportCustomHandler(models.AbstractModel): """Produces the EC Sales List report. Lists intra-community transactions broken down by goods, triangular operations and services, with per-partner VAT details and optional country-specific operation codes. """ _name = 'account.ec.sales.report.handler' _inherit = 'account.report.custom.handler' _description = 'EC Sales Report Custom Handler' # ------------------------------------------------------------------ # Display # ------------------------------------------------------------------ def _get_custom_display_config(self): return { 'components': { 'AccountReportFilters': 'fusion_accounting.SalesReportFilters', }, } # ------------------------------------------------------------------ # Dynamic lines # ------------------------------------------------------------------ def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None): """Generate one line per partner per operation category (vertical layout).""" output = [] col_totals = { cg: { 'balance': 0.0, 'goods': 0.0, 'triangular': 0.0, 'services': 0.0, 'vat_number': '', 'country_code': '', 'sales_type_code': '', } for cg in options['column_groups'] } category_config = options['sales_report_taxes'].get('operation_category', {}) filter_map = { item['id']: item.get('selected') for item in options.get('ec_tax_filter_selection', []) } for partner_rec, data in self._query_partner_amounts(report, options, warnings): for category in ('goods', 'triangular', 'services'): if not filter_map.get(category): continue per_col = defaultdict(dict) override_code = category_config.get(category) found_entries = False for cg in options['column_groups']: psum = data.get(cg, {}) per_col[cg]['vat_number'] = psum.get('vat_number', 'UNKNOWN') per_col[cg]['country_code'] = psum.get('country_code', 'UNKNOWN') per_col[cg]['sales_type_code'] = [] per_col[cg]['balance'] = psum.get(category, 0.0) col_totals[cg]['balance'] += psum.get(category, 0.0) for idx, elem_id in enumerate(psum.get('tax_element_id', [])): if elem_id in options['sales_report_taxes'][category]: found_entries = True code_val = ( override_code or (psum.get('sales_type_code') and psum['sales_type_code'][idx]) or None ) per_col[cg]['sales_type_code'].append(code_val) per_col[cg]['sales_type_code'] = ', '.join( set(per_col[cg]['sales_type_code']) ) if found_entries: output.append(( 0, self._render_partner_line(report, options, partner_rec, per_col, markup=category), )) if output: output.append((0, self._render_total_line(report, options, col_totals))) return output # ------------------------------------------------------------------ # Caret # ------------------------------------------------------------------ def _caret_options_initializer(self): return { 'ec_sales': [ {'name': _("View Partner"), 'action': 'caret_option_open_record_form'}, ], } # ------------------------------------------------------------------ # Options # ------------------------------------------------------------------ def _custom_options_initializer(self, report, options, previous_options): """Set up EC tax filter selections and partner-country domain.""" super()._custom_options_initializer(report, options, previous_options=previous_options) self._setup_core_options(report, options, previous_options) # Populate tax identifiers for each category. # In the generic case (no country tax report), fall back to tax IDs. options['sales_report_taxes'] = { 'goods': tuple(self.env['account.tax'].search([ *self.env['account.tax']._check_company_domain(self.env.company), ('amount', '=', 0.0), ('amount_type', '=', 'percent'), ('type_tax_use', '=', 'sale'), ]).ids), 'services': tuple(), 'triangular': tuple(), 'use_taxes_instead_of_tags': True, } ec_codes = self._ec_country_code_set(options) ec_country_ids = self.env['res.country'].search([ ('code', 'in', tuple(ec_codes)), ]).ids foreign_ids = tuple( set(ec_country_ids) - {self.env.company.account_fiscal_country_id.id} ) options.setdefault('forced_domain', []).extend([ '|', ('move_id.partner_shipping_id.country_id', 'in', foreign_ids), '&', ('move_id.partner_shipping_id', '=', False), ('partner_id.country_id', 'in', foreign_ids), ]) report._init_options_journals(options, previous_options=previous_options) self._enable_export_buttons_for_common_vat_groups_in_branches(options) def _setup_core_options(self, report, options, previous_options): """Initialise the EC tax category filter (goods / triangular / services).""" default_filters = [ {'id': 'goods', 'name': _('Goods'), 'selected': True}, {'id': 'triangular', 'name': _('Triangular'), 'selected': True}, {'id': 'services', 'name': _('Services'), 'selected': True}, ] saved = previous_options.get('ec_tax_filter_selection', default_filters) if saved != default_filters: valid_ids = {f['id'] for f in default_filters} saved = [item for item in saved if item['id'] in valid_ids] options['ec_tax_filter_selection'] = saved # ------------------------------------------------------------------ # Line renderers # ------------------------------------------------------------------ def _render_partner_line(self, report, options, partner, col_data, markup=''): """Format a single partner / category row.""" cols = [] for col_def in options['columns']: raw = col_data[col_def['column_group_key']].get(col_def['expression_label']) cols.append(report._build_column_dict(raw, col_def, options=options)) return { 'id': report._get_generic_line_id('res.partner', partner.id, markup=markup), 'name': (partner.name or '')[:128] if partner else _('Unknown Partner'), 'columns': cols, 'level': 2, 'trust': partner.trust if partner else None, 'caret_options': 'ec_sales', } def _render_total_line(self, report, options, col_totals): cols = [] for col_def in options['columns']: raw = col_totals[col_def['column_group_key']].get(col_def['expression_label']) display = raw if col_def['figure_type'] == 'monetary' else '' cols.append(report._build_column_dict(display, col_def, options=options)) return { 'id': report._get_generic_line_id(None, None, markup='total'), 'name': _('Total'), 'class': 'total', 'level': 1, 'columns': cols, } # ------------------------------------------------------------------ # SQL # ------------------------------------------------------------------ def _query_partner_amounts(self, report, options, warnings=None): """Execute the main query and return ``[(partner, values), ...]``.""" by_partner = {} comp_cur = self.env.company.currency_id def _store_row(row): if comp_cur.is_zero(row['balance']): return by_partner.setdefault(row['groupby'], defaultdict(lambda: defaultdict(float))) bucket = by_partner[row['groupby']][row['column_group_key']] elem_id = row['tax_element_id'] if elem_id in options['sales_report_taxes']['goods']: bucket['goods'] += row['balance'] elif elem_id in options['sales_report_taxes']['triangular']: bucket['triangular'] += row['balance'] elif elem_id in options['sales_report_taxes']['services']: bucket['services'] += row['balance'] bucket.setdefault('tax_element_id', []).append(elem_id) bucket.setdefault('sales_type_code', []).append(row['sales_type_code']) vat_raw = row['vat_number'] or '' country_prefix = vat_raw[:2] if vat_raw[:2].isalpha() else None bucket.setdefault('vat_number', vat_raw if not country_prefix else vat_raw[2:]) bucket.setdefault('full_vat_number', vat_raw) bucket.setdefault('country_code', country_prefix or row.get('country_code')) if warnings is not None: ec_codes = self._ec_country_code_set(options) if row['country_code'] not in ec_codes: warnings['fusion_accounting.sales_report_warning_non_ec_country'] = {'alert_type': 'warning'} elif not row.get('vat_number'): warnings['fusion_accounting.sales_report_warning_missing_vat'] = {'alert_type': 'warning'} if row.get('same_country') and row['country_code']: warnings['fusion_accounting.sales_report_warning_same_country'] = {'alert_type': 'warning'} sql = self._build_sums_sql(report, options) self.env.cr.execute(sql) for rec in self.env.cr.dictfetchall(): _store_row(rec) if by_partner: partners = self.env['res.partner'].with_context(active_test=False).browse(by_partner.keys()) else: partners = self.env['res.partner'] return [(p, by_partner[p.id]) for p in partners.sorted()] def _build_sums_sql(self, report, options) -> SQL: """Build the main aggregation query, joining either tax tags or tax records depending on configuration.""" parts = [] allowed = self._filtered_element_ids(options) use_tax_fallback = options.get('sales_report_taxes', {}).get('use_taxes_instead_of_tags') if use_tax_fallback: elem_table = SQL('account_tax') elem_id_col = SQL('account_tax_id') rel_table = SQL('account_move_line_account_tax_rel') elem_name_sql = self.env['account.tax']._field_to_sql('account_tax', 'name') else: elem_table = SQL('account_account_tag') elem_id_col = SQL('account_account_tag_id') rel_table = SQL('account_account_tag_account_move_line_rel') elem_name_sql = self.env['account.account.tag']._field_to_sql('account_account_tag', 'name') for cg, cg_opts in report._split_options_per_column_group(options).items(): qry = report._get_report_query(cg_opts, 'strict_range') if allowed: qry.add_where(SQL('%s.id IN %s', elem_table, tuple(allowed))) parts.append(SQL( """ SELECT %(cg)s AS column_group_key, account_move_line.partner_id AS groupby, rp.vat AS vat_number, rc.code AS country_code, -SUM(%(bal)s) AS balance, %(elem_name)s AS sales_type_code, %(elem_tbl)s.id AS tax_element_id, (comp_p.country_id = rp.country_id) AS same_country FROM %(tbl)s %(fx)s JOIN %(rel)s ON %(rel)s.account_move_line_id = account_move_line.id JOIN %(elem_tbl)s ON %(rel)s.%(elem_id)s = %(elem_tbl)s.id JOIN res_partner rp ON account_move_line.partner_id = rp.id JOIN res_country rc ON rp.country_id = rc.id JOIN res_company rco ON rco.id = account_move_line.company_id JOIN res_partner comp_p ON comp_p.id = rco.partner_id WHERE %(cond)s GROUP BY %(elem_tbl)s.id, %(elem_tbl)s.name, account_move_line.partner_id, rp.vat, rc.code, comp_p.country_id, rp.country_id """, cg=cg, elem_name=elem_name_sql, elem_tbl=elem_table, tbl=qry.from_clause, bal=report._currency_table_apply_rate(SQL("account_move_line.balance")), fx=report._currency_table_aml_join(cg_opts), rel=rel_table, elem_id=elem_id_col, cond=qry.where_clause, )) return SQL(' UNION ALL ').join(parts) # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ @api.model def _filtered_element_ids(self, options): """Collect the set of tax / tag IDs selected via the filter toggles.""" selected = set() for toggle in options.get('ec_tax_filter_selection', []): if toggle.get('selected'): selected.update(options['sales_report_taxes'][toggle['id']]) return selected @api.model def _ec_country_code_set(self, options): """Return the set of EU member-state country codes applicable to the report period.""" members = { 'AT', 'BE', 'BG', 'HR', 'CY', 'CZ', 'DK', 'EE', 'FI', 'FR', 'DE', 'GR', 'HU', 'IE', 'IT', 'LV', 'LT', 'LU', 'MT', 'NL', 'PL', 'PT', 'RO', 'SK', 'SI', 'ES', 'SE', 'XI', } # Great Britain left the EU on 2021-01-01 period_start = fields.Date.from_string(options['date']['date_from']) if period_start < fields.Date.from_string('2021-01-01'): members.add('GB') # Monaco participates in the French VAT area if self.env.company.account_fiscal_country_id.code != 'FR': members.add('MC') return members # ------------------------------------------------------------------ # Warning action # ------------------------------------------------------------------ def get_warning_act_window(self, options, params): """Open a window showing the problematic entries for a given warning.""" action = {'type': 'ir.actions.act_window', 'context': {}} warning_type = params['type'] if warning_type == 'no_vat': aml_filter = [ ('partner_id.vat', '=', None), ('partner_id.country_id.code', 'in', tuple(self._ec_country_code_set(options))), ] action.update({ 'name': _("Entries with partners missing VAT numbers"), 'context': {'search_default_group_by_partner': 1, 'expand': 1}, }) elif warning_type == 'non_ec_country': aml_filter = [ ('partner_id.country_id.code', 'not in', tuple(self._ec_country_code_set(options))), ] action['name'] = _("EC tax applied to non-EC countries") else: aml_filter = [ ('partner_id.country_id.code', '=', options.get('same_country_warning')), ] action['name'] = _("EC tax applied to same country") use_tax_fallback = options.get('sales_report_taxes', {}).get('use_taxes_instead_of_tags') lookup_field = 'tax_ids.id' if use_tax_fallback else 'tax_tag_ids.id' matching_lines = self.env['account.move.line'].search([ *aml_filter, *self.env['account.report']._get_options_date_domain(options, 'strict_range'), (lookup_field, 'in', tuple(self._filtered_element_ids(options))), ]) if params['model'] == 'move': action.update({ 'views': [[self.env.ref('account.view_move_tree').id, 'list'], (False, 'form')], 'res_model': 'account.move', 'domain': [('id', 'in', matching_lines.move_id.ids)], }) else: action.update({ 'views': [(False, 'list'), (False, 'form')], 'res_model': 'res.partner', 'domain': [('id', 'in', matching_lines.move_id.partner_id.ids)], }) return action