Files
Odoo-Modules/Fusion Accounting/wizard/account_bank_statement_import_csv.py
2026-02-22 01:22:18 -05:00

199 lines
7.8 KiB
Python

# Fusion Accounting - CSV Bank Statement Import
# Extends the base import wizard to handle bank statement CSV files
# with debit/credit columns, running balances, and date ordering
import contextlib
import psycopg2
from odoo import _, api, fields, models, Command
from odoo.exceptions import UserError, ValidationError
from odoo.addons.base_import.models.base_import import FIELDS_RECURSION_LIMIT
class AccountBankStmtImportCSV(models.TransientModel):
"""Extends the standard CSV import to support bank-statement-specific
columns such as debit, credit, and cumulative balance, automatically
computing net amounts and statement boundaries."""
_inherit = 'base_import.import'
@api.model
def get_fields_tree(self, model, depth=FIELDS_RECURSION_LIMIT):
"""Append virtual monetary columns (balance, debit, credit)
when the import is running in bank statement context."""
available_fields = super().get_fields_tree(model, depth=depth)
if self.env.context.get('bank_stmt_import', False):
extra_columns = [
{
'id': 'balance',
'name': 'balance',
'string': 'Cumulative Balance',
'required': False,
'fields': [],
'type': 'monetary',
'model_name': model,
},
{
'id': 'debit',
'name': 'debit',
'string': 'Debit',
'required': False,
'fields': [],
'type': 'monetary',
'model_name': model,
},
{
'id': 'credit',
'name': 'credit',
'string': 'Credit',
'required': False,
'fields': [],
'type': 'monetary',
'model_name': model,
},
]
available_fields.extend(extra_columns)
return available_fields
def _safe_float(self, raw_value):
"""Safely convert a string or empty value to float, defaulting to 0.0."""
return float(raw_value) if raw_value else 0.0
def _parse_import_data(self, data, import_fields, options):
# EXTENDS base
data = super()._parse_import_data(data, import_fields, options)
target_journal_id = self.env.context.get('default_journal_id')
is_bank_import = options.get('bank_stmt_import')
if not target_journal_id or not is_bank_import:
return data
# Validate that either amount OR both debit and credit are mapped
amount_mapped = 'amount' in import_fields
credit_mapped = 'credit' in import_fields
debit_mapped = 'debit' in import_fields
if (debit_mapped ^ credit_mapped) or not (amount_mapped ^ debit_mapped):
raise ValidationError(
_("Make sure that an Amount or Debit and Credit is in the file.")
)
stmt_metadata = options['statement_vals'] = {}
output_rows = []
import_fields.append('sequence')
balance_col_idx = False
need_amount_conversion = False
# Ensure rows are sorted chronologically (ascending or descending accepted)
if 'date' in import_fields:
date_col = import_fields.index('date')
parsed_dates = [
fields.Date.from_string(row[date_col])
for row in data
if row[date_col]
]
ascending_order = sorted(parsed_dates)
if parsed_dates != ascending_order:
descending_order = ascending_order[::-1]
if parsed_dates == descending_order:
# Flip to ascending for consistent processing
data = data[::-1]
else:
raise UserError(_('Rows must be sorted by date.'))
# Handle debit/credit column conversion to a single amount
if 'debit' in import_fields and 'credit' in import_fields:
debit_col = import_fields.index('debit')
credit_col = import_fields.index('credit')
self._parse_float_from_data(data, debit_col, 'debit', options)
self._parse_float_from_data(data, credit_col, 'credit', options)
import_fields.append('amount')
need_amount_conversion = True
# Extract opening and closing balance from the balance column
if 'balance' in import_fields:
balance_col_idx = import_fields.index('balance')
self._parse_float_from_data(data, balance_col_idx, 'balance', options)
first_row_balance = self._safe_float(data[0][balance_col_idx])
amount_col = import_fields.index('amount')
if not need_amount_conversion:
first_row_amount = self._safe_float(data[0][amount_col])
else:
first_row_amount = (
abs(self._safe_float(data[0][credit_col]))
- abs(self._safe_float(data[0][debit_col]))
)
stmt_metadata['balance_start'] = first_row_balance - first_row_amount
stmt_metadata['balance_end_real'] = data[-1][balance_col_idx]
import_fields.remove('balance')
# Clean up temporary column mappings
if need_amount_conversion:
import_fields.remove('debit')
import_fields.remove('credit')
# Build final row data with sequence numbers, converting debit/credit
for seq_num, row in enumerate(data):
row.append(seq_num)
cols_to_drop = []
if need_amount_conversion:
net_amount = (
abs(self._safe_float(row[credit_col]))
- abs(self._safe_float(row[debit_col]))
)
row.append(net_amount)
cols_to_drop.extend([debit_col, credit_col])
if balance_col_idx:
cols_to_drop.append(balance_col_idx)
# Drop virtual columns in reverse order to preserve indices
for drop_idx in sorted(cols_to_drop, reverse=True):
del row[drop_idx]
# Only include rows that have a non-zero amount
if row[import_fields.index('amount')]:
output_rows.append(row)
return output_rows
def parse_preview(self, options, count=10):
"""Inject bank statement context flag when previewing CSV data."""
if options.get('bank_stmt_import', False):
self = self.with_context(bank_stmt_import=True)
return super().parse_preview(options, count=count)
def execute_import(self, fields, columns, options, dryrun=False):
"""Execute the import, wrapping bank statement rows into a
statement record with computed balance boundaries."""
if options.get('bank_stmt_import'):
with self.env.cr.savepoint(flush=False) as sp:
import_result = super().execute_import(fields, columns, options, dryrun=dryrun)
if 'statement_id' not in fields:
new_statement = self.env['account.bank.statement'].create({
'reference': self.file_name,
'line_ids': [Command.set(import_result.get('ids', []))],
**options.get('statement_vals', {}),
})
if not dryrun:
import_result['messages'].append({
'statement_id': new_statement.id,
'type': 'bank_statement',
})
with contextlib.suppress(psycopg2.InternalError):
sp.close(rollback=dryrun)
return import_result
else:
return super().execute_import(fields, columns, options, dryrun=dryrun)