# -*- coding: utf-8 -*- from odoo import models, fields, api, _ from odoo.exceptions import UserError import re class SaleOrder(models.Model): _inherit = 'sale.order' fusion_purchase_count = fields.Integer( string='Purchase', compute='_compute_fusion_purchase_count', ) def _compute_fusion_purchase_count(self): for so in self: so.fusion_purchase_count = self.env['purchase.order'].search_count([ ('fusion_sale_ids', 'in', so.id), ]) def action_view_fusion_purchases(self): purchases = self.env['purchase.order'].search([ ('fusion_sale_ids', 'in', self.id), ]) return { 'name': _('Purchase Orders'), 'view_mode': 'list,form', 'res_model': 'purchase.order', 'domain': [('id', 'in', purchases.ids)], 'type': 'ir.actions.act_window', } class PurchaseOrder(models.Model): _inherit = 'purchase.order' fusion_sale_ids = fields.Many2many( 'sale.order', 'fusion_po_so_rel', 'purchase_order_id', 'sale_order_id', string='Sale Orders', help='Sale orders linked to this purchase order', ) fusion_marked_for_ids = fields.Many2many( 'res.partner', 'fusion_po_marked_for_rel', 'purchase_order_id', 'partner_id', string='Marked For', help='Customers this purchase order is marked for', ) x_marked_for = fields.Many2one( 'res.partner', string='Marked For (Legacy)', help='Legacy single-customer marked-for field (migrated from Studio)', index=True, ) fusion_sale_count = fields.Integer( string='Sales', compute='_compute_fusion_sale_count', ) def _compute_fusion_sale_count(self): for po in self: po.fusion_sale_count = len(po.fusion_sale_ids) def action_view_fusion_sale_order(self): self.ensure_one() if not self.fusion_sale_ids: raise UserError(_("No Sale Orders are linked to this Purchase Order.")) if len(self.fusion_sale_ids) == 1: return { 'name': _('Sale Order'), 'view_mode': 'form', 'res_model': 'sale.order', 'res_id': self.fusion_sale_ids.id, 'type': 'ir.actions.act_window', } return { 'name': _('Sale Orders'), 'view_mode': 'list,form', 'res_model': 'sale.order', 'domain': [('id', 'in', self.fusion_sale_ids.ids)], 'type': 'ir.actions.act_window', } def _open_fusion_match_wizard(self, search_hint=''): wizard = self.env['fusion.match.so.wiz'].create({ 'fusion_po_id': self.id, 'fusion_search_hint': search_hint, }) return { 'name': _('Match Sale Order'), 'type': 'ir.actions.act_window', 'res_model': 'fusion.match.so.wiz', 'res_id': wizard.id, 'view_mode': 'form', 'target': 'new', } def action_fusion_match_sale_order(self): """Match this PO to a Sale Order based on x_marked_for field.""" self.ensure_one() marked_for_value = getattr(self, 'x_marked_for', None) if not marked_for_value: return self._open_fusion_match_wizard('') marked_for_str = str(marked_for_value) search_hint = marked_for_str matching_partners = None partner_id_match = re.search(r'res\.partner\((\d+)', marked_for_str) if partner_id_match: partner_id = int(partner_id_match.group(1)) partner = self.env['res.partner'].browse(partner_id).exists() if partner: matching_partners = partner search_hint = partner.name if not matching_partners: matching_partners = self.env['res.partner'].search([ '|', ('name', 'ilike', marked_for_str), ('display_name', 'ilike', marked_for_str), ]) if not matching_partners: return self._open_fusion_match_wizard(search_hint) matching_sales = self.env['sale.order'].search([ ('partner_id', 'in', matching_partners.ids), ]) if not matching_sales or len(matching_sales) > 1: hint = matching_partners[0].name if matching_partners else search_hint return self._open_fusion_match_wizard(hint) self.write({ 'fusion_sale_ids': [(4, matching_sales.id)], 'fusion_marked_for_ids': [(4, matching_partners[0].id)], }) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('Success'), 'message': _('Linked to Sale Order: %s') % matching_sales.name, 'type': 'success', 'sticky': False, }, } def action_fusion_batch_match(self): """Batch match multiple POs to Sale Orders based on x_marked_for field.""" matched = 0 skipped = 0 errors = [] for po in self: marked_for_value = getattr(po, 'x_marked_for', None) if not marked_for_value: skipped += 1 continue marked_for_str = str(marked_for_value) matching_partners = None partner_id_match = re.search(r'res\.partner\((\d+)', marked_for_str) if partner_id_match: partner_id = int(partner_id_match.group(1)) matching_partners = self.env['res.partner'].browse(partner_id).exists() if not matching_partners: matching_partners = self.env['res.partner'].search([ '|', ('name', 'ilike', marked_for_str), ('display_name', 'ilike', marked_for_str), ]) if not matching_partners: errors.append(_("PO %s: No customer found for '%s'") % (po.name, marked_for_str)) continue matching_sales = self.env['sale.order'].search([ ('partner_id', 'in', matching_partners.ids), ]) if not matching_sales: errors.append(_("PO %s: No SO found for '%s'") % (po.name, matching_partners[0].name)) continue if len(matching_sales) > 1: errors.append( _("PO %s: Multiple SOs (%d) for '%s'") % (po.name, len(matching_sales), matching_partners[0].name) ) continue po.write({ 'fusion_sale_ids': [(4, matching_sales.id)], 'fusion_marked_for_ids': [(4, matching_partners[0].id)], }) matched += 1 message = _("Matched: %d, Skipped: %d") % (matched, skipped) if errors: message += "\n" + "\n".join(errors[:5]) if len(errors) > 5: message += _("\n... and %d more errors") % (len(errors) - 5) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('Batch Match Complete'), 'message': message, 'type': 'info' if matched > 0 else 'warning', 'sticky': True, }, } class ResPartner(models.Model): _inherit = 'res.partner' x_fc_account_number = fields.Char( string='Account Number', tracking=True, help='Vendor/supplier account number', )