import base64 import io import logging from odoo import _, api, fields, models from odoo.exceptions import UserError _logger = logging.getLogger(__name__) try: from ofxparse import OfxParser except ImportError: OfxParser = None _logger.warning("ofxparse library not installed — OFX import disabled.") class FusionStatementImportLine(models.TransientModel): _name = 'fusion.statement.import.line' _description = 'Statement Import Preview Line' _order = 'date desc, id desc' wizard_id = fields.Many2one('fusion.statement.import', ondelete='cascade') selected = fields.Boolean(default=True) is_duplicate = fields.Boolean(readonly=True) fitid = fields.Char(string='Transaction ID', readonly=True) date = fields.Date(readonly=True) payment_ref = fields.Char(string='Description', readonly=True) amount = fields.Float(digits=(16, 2), readonly=True) class FusionStatementImport(models.TransientModel): _name = 'fusion.statement.import' _description = 'Import Bank Statement' step = fields.Selection([ ('upload', 'Upload'), ('review', 'Review'), ], default='upload', readonly=True) journal_id = fields.Many2one( 'account.journal', string='Bank Journal', required=True, domain="[('type', '=', 'bank')]", ) data_file = fields.Binary(string='Statement File', attachment=False) filename = fields.Char() line_ids = fields.One2many('fusion.statement.import.line', 'wizard_id') total_new = fields.Integer(compute='_compute_counts') total_duplicate = fields.Integer(compute='_compute_counts') total_selected = fields.Integer(compute='_compute_counts') balance_start = fields.Float(digits=(16, 2), readonly=True) balance_end = fields.Float(digits=(16, 2), readonly=True) currency_code = fields.Char(readonly=True) account_number = fields.Char(readonly=True) @api.depends('line_ids.selected', 'line_ids.is_duplicate') def _compute_counts(self): for rec in self: lines = rec.line_ids rec.total_new = len(lines.filtered(lambda l: not l.is_duplicate)) rec.total_duplicate = len(lines.filtered(lambda l: l.is_duplicate)) rec.total_selected = len(lines.filtered(lambda l: l.selected)) # ------------------------------------------------------------------ # Step 1 → Step 2: Parse file # ------------------------------------------------------------------ def action_parse(self): self.ensure_one() if not self.data_file: raise UserError(_("Please upload a statement file.")) if not OfxParser: raise UserError(_( "The 'ofxparse' Python library is not installed. " "Ask your administrator to run: pip install ofxparse" )) raw = base64.b64decode(self.data_file) try: ofx = OfxParser.parse(io.BytesIO(raw)) except Exception as e: raise UserError(_( "Could not parse the file. Make sure it is a valid " "OFX/QFX/QBO file.\n\nError: %s" ) % str(e)) from e if not ofx.accounts: raise UserError(_("No accounts found in the file.")) account = ofx.accounts[0] transactions = account.statement.transactions if not transactions: raise UserError(_("No transactions found in the file.")) ImportLog = self.env['fusion.statement.import.log'] existing_fitids = set( ImportLog.search([ ('journal_id', '=', self.journal_id.id), ]).mapped('fitid') ) lines = [] for tx in transactions: fitid = str(tx.id).strip() payee = tx.payee or '' if tx.checknum: payee += ' ' + tx.checknum if tx.memo: payee += ' : ' + tx.memo is_dup = fitid in existing_fitids lines.append((0, 0, { 'fitid': fitid, 'date': tx.date.date() if hasattr(tx.date, 'date') else tx.date, 'payment_ref': payee.strip(), 'amount': float(tx.amount), 'is_duplicate': is_dup, 'selected': not is_dup, })) balance = float(account.statement.balance) total_amt = sum(float(tx.amount) for tx in transactions) self.write({ 'step': 'review', 'line_ids': [(5, 0, 0)] + lines, 'balance_end': balance, 'balance_start': balance - total_amt, 'currency_code': account.statement.currency or '', 'account_number': account.number or '', }) return self._reopen() # ------------------------------------------------------------------ # Step 2: Import selected lines # ------------------------------------------------------------------ def action_import(self): self.ensure_one() selected = self.line_ids.filtered(lambda l: l.selected) if not selected: raise UserError(_("No transactions selected for import.")) journal = self.journal_id statement = self.env['account.bank.statement'].create({ 'name': self.filename or 'OFX Import', 'reference': self.filename or '', 'journal_id': journal.id, 'balance_start': self.balance_start, 'balance_end_real': self.balance_end, }) ImportLog = self.env['fusion.statement.import.log'] created_lines = self.env['account.bank.statement.line'] for line in selected.sorted('date'): st_line = self.env['account.bank.statement.line'].create({ 'journal_id': journal.id, 'date': line.date, 'payment_ref': line.payment_ref, 'amount': line.amount, 'statement_id': statement.id, }) created_lines |= st_line ImportLog.create({ 'journal_id': journal.id, 'fitid': line.fitid, 'date': line.date, 'amount': line.amount, 'payment_ref': line.payment_ref, 'statement_line_id': st_line.id, 'company_id': journal.company_id.id, }) all_lines = self.line_ids dup_count = len(all_lines.filtered(lambda l: l.is_duplicate)) manual_skip = len(all_lines.filtered(lambda l: not l.selected and not l.is_duplicate)) date_min = min(selected.mapped('date')) date_max = max(selected.mapped('date')) parts = ['%d transactions imported.' % len(selected)] if dup_count: parts.append('%d duplicates detected.' % dup_count) if manual_skip: parts.append('%d manually excluded.' % manual_skip) parts.append('Date range: %s to %s' % (date_min, date_max)) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('Bank Statement Imported'), 'message': ' '.join(parts), 'type': 'success', 'sticky': False, 'next': { 'type': 'ir.actions.act_window', 'name': _('Imported Statement'), 'res_model': 'account.bank.statement', 'res_id': statement.id, 'views': [(False, 'form')], }, }, } # ------------------------------------------------------------------ # Navigation helpers # ------------------------------------------------------------------ def action_back(self): self.ensure_one() self.write({'step': 'upload', 'line_ids': [(5, 0, 0)]}) return self._reopen() def action_select_all_new(self): self.ensure_one() for line in self.line_ids: line.selected = not line.is_duplicate return self._reopen() def action_select_none(self): self.ensure_one() self.line_ids.write({'selected': False}) return self._reopen() def action_select_all(self): self.ensure_one() self.line_ids.write({'selected': True}) return self._reopen() def _reopen(self): return { 'type': 'ir.actions.act_window', 'res_model': self._name, 'res_id': self.id, 'views': [(False, 'form')], 'target': 'new', }