# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) import logging import re from odoo import models, fields, api _logger = logging.getLogger(__name__) class FusionInventoryDiscrepancy(models.Model): _name = 'fusion.inventory.discrepancy' _description = 'Inventory Discrepancy' _inherit = ['mail.thread'] _order = 'scan_date desc' _rec_name = 'display_name' display_name = fields.Char(compute='_compute_display_name') product_id = fields.Many2one( 'product.product', string='Product', required=True, ondelete='cascade', index=True) expected_qty = fields.Float(string='Expected Quantity') actual_qty = fields.Float(string='Actual Quantity') difference = fields.Float( string='Difference', compute='_compute_difference', store=True) discrepancy_type = fields.Selection([ ('qty_mismatch', 'Quantity Mismatch'), ('missing_serial', 'Missing Serial Number'), ('orphan_serial', 'Orphaned Serial (no quant)'), ('untracked_serial', 'Serial in Notes (not in system)'), ], string='Type', required=True, index=True) missing_serials = fields.Text( string='Serial Numbers', help='Serial numbers that were found/missing') source = fields.Char( string='Source', help='Where the discrepancy was detected') state = fields.Selection([ ('detected', 'Detected'), ('reviewed', 'Reviewed'), ('resolved', 'Resolved'), ('ignored', 'Ignored'), ], string='Status', default='detected', required=True, tracking=True, index=True) scan_date = fields.Datetime( string='Scan Date', default=fields.Datetime.now, required=True) reviewed_by = fields.Many2one('res.users', string='Reviewed By') resolution_notes = fields.Text(string='Resolution Notes') @api.depends('product_id', 'discrepancy_type') def _compute_display_name(self): for rec in self: product = rec.product_id.name or 'Unknown' dtype = dict(rec._fields['discrepancy_type'].selection).get( rec.discrepancy_type, '') rec.display_name = f'{product} - {dtype}' @api.depends('expected_qty', 'actual_qty') def _compute_difference(self): for rec in self: rec.difference = rec.actual_qty - rec.expected_qty def action_mark_reviewed(self): self.write({ 'state': 'reviewed', 'reviewed_by': self.env.uid, }) def action_mark_resolved(self): self.write({'state': 'resolved'}) def action_ignore(self): self.write({'state': 'ignored'}) @api.model def _cron_scan_discrepancies(self): """Scheduled scan: detect inventory discrepancies and missing serials.""" _logger.info('Starting inventory discrepancy scan...') count = 0 count += self._scan_serial_discrepancies() count += self._scan_quantity_discrepancies() _logger.info('Discrepancy scan complete: %d issues found', count) def _scan_serial_discrepancies(self): """Check for serial numbers in SO/invoice notes that don't exist in stock.lot.""" count = 0 serial_pattern = re.compile(r'(?=', fields.Date.today()), ], limit=200) for move in recent_moves: for line in move.invoice_line_ids: if not line.product_id: continue text_sources = [] if line.name: text_sources.append(line.name) for text in text_sources: clean = re.sub(r'<[^>]+>', ' ', text) candidates = serial_pattern.findall(clean) for candidate in candidates: if len(candidate) < 5: continue if candidate.lower() in ('total', 'price', 'quantity', 'subtotal', 'discount', 'amount', 'invoice', 'order', 'product'): continue existing = self.env['stock.lot'].search([ ('name', '=ilike', candidate), ('product_id', '=', line.product_id.id), ], limit=1) if not existing: already_reported = self.search([ ('product_id', '=', line.product_id.id), ('missing_serials', 'ilike', candidate), ('state', 'in', ('detected', 'reviewed')), ], limit=1) if not already_reported: self.create({ 'product_id': line.product_id.id, 'discrepancy_type': 'untracked_serial', 'missing_serials': candidate, 'source': f'Invoice {move.name}, line: {line.name[:80]}', 'expected_qty': 0, 'actual_qty': 0, }) count += 1 return count def _scan_quantity_discrepancies(self): """Compare stock.quant quantities against expected levels.""" count = 0 tracked_products = self.env['product.product'].search([ ('type', '=', 'product'), ('tracking', '!=', 'none'), ], limit=500) for product in tracked_products: lots = self.env['stock.lot'].search([ ('product_id', '=', product.id), ]) quants = self.env['stock.quant'].search([ ('product_id', '=', product.id), ('location_id.usage', '=', 'internal'), ]) lot_ids_with_quant = set(quants.mapped('lot_id').ids) for lot in lots: if lot.id not in lot_ids_with_quant: already = self.search([ ('product_id', '=', product.id), ('discrepancy_type', '=', 'orphan_serial'), ('missing_serials', '=', lot.name), ('state', 'in', ('detected', 'reviewed')), ], limit=1) if not already: self.create({ 'product_id': product.id, 'discrepancy_type': 'orphan_serial', 'missing_serials': lot.name, 'source': 'Automated scan: lot exists without stock quant', 'expected_qty': 1, 'actual_qty': 0, }) count += 1 return count