244 lines
8.4 KiB
Python
244 lines
8.4 KiB
Python
import base64
|
|
import io
|
|
import logging
|
|
|
|
from odoo import _, api, fields, models
|
|
from odoo.exceptions import UserError
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from ofxparse import OfxParser
|
|
except ImportError:
|
|
OfxParser = None
|
|
_logger.warning("ofxparse library not installed — OFX import disabled.")
|
|
|
|
|
|
class FusionStatementImportLine(models.TransientModel):
|
|
_name = 'fusion.statement.import.line'
|
|
_description = 'Statement Import Preview Line'
|
|
_order = 'date desc, id desc'
|
|
|
|
wizard_id = fields.Many2one('fusion.statement.import', ondelete='cascade')
|
|
selected = fields.Boolean(default=True)
|
|
is_duplicate = fields.Boolean(readonly=True)
|
|
fitid = fields.Char(string='Transaction ID', readonly=True)
|
|
date = fields.Date(readonly=True)
|
|
payment_ref = fields.Char(string='Description', readonly=True)
|
|
amount = fields.Float(digits=(16, 2), readonly=True)
|
|
|
|
|
|
class FusionStatementImport(models.TransientModel):
|
|
_name = 'fusion.statement.import'
|
|
_description = 'Import Bank Statement'
|
|
|
|
step = fields.Selection([
|
|
('upload', 'Upload'),
|
|
('review', 'Review'),
|
|
], default='upload', readonly=True)
|
|
|
|
journal_id = fields.Many2one(
|
|
'account.journal', string='Bank Journal', required=True,
|
|
domain="[('type', '=', 'bank')]",
|
|
)
|
|
data_file = fields.Binary(string='Statement File', attachment=False)
|
|
filename = fields.Char()
|
|
|
|
line_ids = fields.One2many('fusion.statement.import.line', 'wizard_id')
|
|
|
|
total_new = fields.Integer(compute='_compute_counts')
|
|
total_duplicate = fields.Integer(compute='_compute_counts')
|
|
total_selected = fields.Integer(compute='_compute_counts')
|
|
|
|
balance_start = fields.Float(digits=(16, 2), readonly=True)
|
|
balance_end = fields.Float(digits=(16, 2), readonly=True)
|
|
currency_code = fields.Char(readonly=True)
|
|
account_number = fields.Char(readonly=True)
|
|
|
|
@api.depends('line_ids.selected', 'line_ids.is_duplicate')
|
|
def _compute_counts(self):
|
|
for rec in self:
|
|
lines = rec.line_ids
|
|
rec.total_new = len(lines.filtered(lambda l: not l.is_duplicate))
|
|
rec.total_duplicate = len(lines.filtered(lambda l: l.is_duplicate))
|
|
rec.total_selected = len(lines.filtered(lambda l: l.selected))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Step 1 → Step 2: Parse file
|
|
# ------------------------------------------------------------------
|
|
|
|
def action_parse(self):
|
|
self.ensure_one()
|
|
if not self.data_file:
|
|
raise UserError(_("Please upload a statement file."))
|
|
if not OfxParser:
|
|
raise UserError(_(
|
|
"The 'ofxparse' Python library is not installed. "
|
|
"Ask your administrator to run: pip install ofxparse"
|
|
))
|
|
|
|
raw = base64.b64decode(self.data_file)
|
|
try:
|
|
ofx = OfxParser.parse(io.BytesIO(raw))
|
|
except Exception as e:
|
|
raise UserError(_(
|
|
"Could not parse the file. Make sure it is a valid "
|
|
"OFX/QFX/QBO file.\n\nError: %s"
|
|
) % str(e)) from e
|
|
|
|
if not ofx.accounts:
|
|
raise UserError(_("No accounts found in the file."))
|
|
|
|
account = ofx.accounts[0]
|
|
transactions = account.statement.transactions
|
|
if not transactions:
|
|
raise UserError(_("No transactions found in the file."))
|
|
|
|
ImportLog = self.env['fusion.statement.import.log']
|
|
existing_fitids = set(
|
|
ImportLog.search([
|
|
('journal_id', '=', self.journal_id.id),
|
|
]).mapped('fitid')
|
|
)
|
|
|
|
lines = []
|
|
for tx in transactions:
|
|
fitid = str(tx.id).strip()
|
|
payee = tx.payee or ''
|
|
if tx.checknum:
|
|
payee += ' ' + tx.checknum
|
|
if tx.memo:
|
|
payee += ' : ' + tx.memo
|
|
|
|
is_dup = fitid in existing_fitids
|
|
lines.append((0, 0, {
|
|
'fitid': fitid,
|
|
'date': tx.date.date() if hasattr(tx.date, 'date') else tx.date,
|
|
'payment_ref': payee.strip(),
|
|
'amount': float(tx.amount),
|
|
'is_duplicate': is_dup,
|
|
'selected': not is_dup,
|
|
}))
|
|
|
|
balance = float(account.statement.balance)
|
|
total_amt = sum(float(tx.amount) for tx in transactions)
|
|
|
|
self.write({
|
|
'step': 'review',
|
|
'line_ids': [(5, 0, 0)] + lines,
|
|
'balance_end': balance,
|
|
'balance_start': balance - total_amt,
|
|
'currency_code': account.statement.currency or '',
|
|
'account_number': account.number or '',
|
|
})
|
|
|
|
return self._reopen()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Step 2: Import selected lines
|
|
# ------------------------------------------------------------------
|
|
|
|
def action_import(self):
|
|
self.ensure_one()
|
|
selected = self.line_ids.filtered(lambda l: l.selected)
|
|
if not selected:
|
|
raise UserError(_("No transactions selected for import."))
|
|
|
|
journal = self.journal_id
|
|
|
|
statement = self.env['account.bank.statement'].create({
|
|
'name': self.filename or 'OFX Import',
|
|
'reference': self.filename or '',
|
|
'journal_id': journal.id,
|
|
'balance_start': self.balance_start,
|
|
'balance_end_real': self.balance_end,
|
|
})
|
|
|
|
ImportLog = self.env['fusion.statement.import.log']
|
|
created_lines = self.env['account.bank.statement.line']
|
|
|
|
for line in selected.sorted('date'):
|
|
st_line = self.env['account.bank.statement.line'].create({
|
|
'journal_id': journal.id,
|
|
'date': line.date,
|
|
'payment_ref': line.payment_ref,
|
|
'amount': line.amount,
|
|
'statement_id': statement.id,
|
|
})
|
|
created_lines |= st_line
|
|
|
|
ImportLog.create({
|
|
'journal_id': journal.id,
|
|
'fitid': line.fitid,
|
|
'date': line.date,
|
|
'amount': line.amount,
|
|
'payment_ref': line.payment_ref,
|
|
'statement_line_id': st_line.id,
|
|
'company_id': journal.company_id.id,
|
|
})
|
|
|
|
all_lines = self.line_ids
|
|
dup_count = len(all_lines.filtered(lambda l: l.is_duplicate))
|
|
manual_skip = len(all_lines.filtered(lambda l: not l.selected and not l.is_duplicate))
|
|
date_min = min(selected.mapped('date'))
|
|
date_max = max(selected.mapped('date'))
|
|
|
|
parts = ['%d transactions imported.' % len(selected)]
|
|
if dup_count:
|
|
parts.append('%d duplicates detected.' % dup_count)
|
|
if manual_skip:
|
|
parts.append('%d manually excluded.' % manual_skip)
|
|
parts.append('Date range: %s to %s' % (date_min, date_max))
|
|
|
|
return {
|
|
'type': 'ir.actions.client',
|
|
'tag': 'display_notification',
|
|
'params': {
|
|
'title': _('Bank Statement Imported'),
|
|
'message': ' '.join(parts),
|
|
'type': 'success',
|
|
'sticky': False,
|
|
'next': {
|
|
'type': 'ir.actions.act_window',
|
|
'name': _('Imported Statement'),
|
|
'res_model': 'account.bank.statement',
|
|
'res_id': statement.id,
|
|
'views': [(False, 'form')],
|
|
},
|
|
},
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Navigation helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def action_back(self):
|
|
self.ensure_one()
|
|
self.write({'step': 'upload', 'line_ids': [(5, 0, 0)]})
|
|
return self._reopen()
|
|
|
|
def action_select_all_new(self):
|
|
self.ensure_one()
|
|
for line in self.line_ids:
|
|
line.selected = not line.is_duplicate
|
|
return self._reopen()
|
|
|
|
def action_select_none(self):
|
|
self.ensure_one()
|
|
self.line_ids.write({'selected': False})
|
|
return self._reopen()
|
|
|
|
def action_select_all(self):
|
|
self.ensure_one()
|
|
self.line_ids.write({'selected': True})
|
|
return self._reopen()
|
|
|
|
def _reopen(self):
|
|
return {
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': self._name,
|
|
'res_id': self.id,
|
|
'views': [(False, 'form')],
|
|
'target': 'new',
|
|
}
|