This commit is contained in:
gsinghpal
2026-04-27 00:11:18 -04:00
parent d9f58b9851
commit f08f328688
116 changed files with 9891 additions and 359 deletions

View File

@@ -23,3 +23,23 @@ from . import fp_part_catalog
from . import fp_qc_template
from . import fp_thickness_reading
from . import fp_quality_check
# Sub 12 Phase A — native RMA + the inverse fields it hangs off existing
# quality and receiving models.
from . import fp_rma
from . import fp_rma_links
# Sub 12 Phase B — categorisation primitives + cross-model link fields.
from . import fp_quality_tag
from . import fp_quality_reason
from . import fp_quality_team
from . import fp_quality_alert_stage
from . import fp_quality_categorisation_links
# Sub 12 Phase C — trigger-based quality points.
from . import fp_quality_point
from . import fp_quality_point_hooks
# Sub 12 Phase D — smart-button counts + cross-creation actions.
from . import fp_quality_smart_buttons
from . import fp_quality_cross_creation

View File

@@ -44,28 +44,29 @@ class FpPartCatalog(models.Model):
# ---- Computes ------------------------------------------------------------
def _compute_has_confirmed_mo(self):
"""True if this part is referenced by at least one non-draft MO.
"""True if this part is referenced by at least one live fp.job.
Trace: fp.part.catalog → sale.order.line (x_fc_part_catalog_id)
→ sale.order → mrp.production (via origin name match).
Cheap: two bounded search_counts. Kept store=False so MO state
changes don't write-amplify through every part record.
Sub 11 — replaced mrp.production lookup with fp.job. Trace:
fp.part.catalog → sale.order.line (x_fc_part_catalog_id) →
sale.order → fp.job (via origin name match).
"""
SO = self.env['sale.order']
MO = self.env['mrp.production']
live_states = ('confirmed', 'progress', 'to_close', 'done')
live_states = ('confirmed', 'in_progress', 'on_hold', 'done')
for part in self:
part.x_fc_has_confirmed_mo = False
if 'fp.job' not in self.env:
return
Job = self.env['fp.job']
for part in self:
if not part.id:
part.x_fc_has_confirmed_mo = False
continue
so_names = SO.search([
('order_line.x_fc_part_catalog_id', '=', part.id),
('state', 'in', ('sale', 'done')),
]).mapped('name')
if not so_names:
part.x_fc_has_confirmed_mo = False
continue
part.x_fc_has_confirmed_mo = bool(MO.search_count([
part.x_fc_has_confirmed_mo = bool(Job.search_count([
('origin', 'in', so_names),
('state', 'in', live_states),
]))

View File

@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase B — quality alert stage.
#
# Shared kanban-stage namespace used by both NCR and RMA. Each model has
# its own state Selection (state machine guards) AND a stage_id Many2one
# (kanban-draggable). The two stay in sync — see fp_quality_categorisation_links.
from odoo import fields, models
class FpQualityAlertStage(models.Model):
_name = 'fp.quality.alert.stage'
_description = 'Fusion Plating — Quality Alert Stage'
_order = 'sequence, id'
name = fields.Char(required=True, translate=True)
sequence = fields.Integer(default=10, index=True)
fold = fields.Boolean(
string='Fold by Default',
help='If checked the stage is collapsed by default in kanban views.',
)
code = fields.Char(
string='Code',
index=True,
help='Stable machine identifier used by the state ↔ stage_id sync. '
'Examples: new / investigating / containment / disposition / '
'awaiting_signoff / closed / cancelled.',
)
description = fields.Text(translate=True)
active = fields.Boolean(default=True)
_sql_constraints = [
('code_uniq', 'unique(code)', 'A stage with that code already exists.'),
]

View File

@@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase B — categorisation field extensions.
#
# Adds the cross-cutting tag_ids / reason_id / team_id fields to all five
# quality records (NCR, CAPA, Hold, Check, RMA). Adds stage_id (kanban
# stage) to NCR + RMA with state ↔ stage_id sync.
import logging
from odoo import api, fields, models
_logger = logging.getLogger(__name__)
# ----- helper mapping (keeps stage codes consistent across models) -----
NCR_STATE_TO_STAGE_CODE = {
'draft': 'new',
'open': 'investigating',
'containment': 'containment',
'disposition': 'disposition',
'closed': 'closed',
}
NCR_STAGE_CODE_TO_STATE = {v: k for k, v in NCR_STATE_TO_STAGE_CODE.items()}
RMA_STATE_TO_STAGE_CODE = {
'draft': 'new',
'authorised': 'investigating',
'shipped_to_us': 'investigating',
'received': 'containment',
'triaged': 'disposition',
'resolving': 'disposition',
'resolved': 'awaiting_signoff',
'closed': 'closed',
'cancelled': 'cancelled',
}
def _stage_for_code(env, code):
if not code:
return env['fp.quality.alert.stage']
return env['fp.quality.alert.stage'].sudo().search(
[('code', '=', code)], limit=1,
)
# ============================================================ NCR ===
class FpNcrCategorisation(models.Model):
_inherit = 'fusion.plating.ncr'
tag_ids = fields.Many2many(
'fp.quality.tag', 'fp_ncr_tag_rel', 'ncr_id', 'tag_id',
string='Tags',
)
reason_id = fields.Many2one('fp.quality.reason', string='Root-Cause Reason')
team_id = fields.Many2one('fp.quality.team', string='Quality Team',
tracking=True)
stage_id = fields.Many2one(
'fp.quality.alert.stage', string='Stage',
compute='_compute_stage_id', inverse='_inverse_stage_id',
store=True, tracking=True, group_expand='_read_group_stage_ids',
)
@api.depends('state')
def _compute_stage_id(self):
for rec in self:
code = NCR_STATE_TO_STAGE_CODE.get(rec.state)
rec.stage_id = _stage_for_code(self.env, code) if code else False
def _inverse_stage_id(self):
for rec in self:
if not rec.stage_id or not rec.stage_id.code:
continue
new_state = NCR_STAGE_CODE_TO_STATE.get(rec.stage_id.code)
if new_state and new_state != rec.state:
# Use direct write to avoid the action_close UserError
# guards — kanban drag is an explicit user intent.
super(FpNcrCategorisation, rec).write({'state': new_state})
@api.model
def _read_group_stage_ids(self, stages, domain):
return self.env['fp.quality.alert.stage'].sudo().search([])
# ============================================================ CAPA ===
class FpCapaCategorisation(models.Model):
_inherit = 'fusion.plating.capa'
tag_ids = fields.Many2many(
'fp.quality.tag', 'fp_capa_tag_rel', 'capa_id', 'tag_id',
string='Tags',
)
reason_id = fields.Many2one('fp.quality.reason', string='Root-Cause Reason')
team_id = fields.Many2one('fp.quality.team', string='Quality Team')
# ============================================================ HOLD ===
class FpQualityHoldCategorisation(models.Model):
_inherit = 'fusion.plating.quality.hold'
tag_ids = fields.Many2many(
'fp.quality.tag', 'fp_hold_tag_rel', 'hold_id', 'tag_id',
string='Tags',
)
reason_id = fields.Many2one('fp.quality.reason', string='Root-Cause Reason')
team_id = fields.Many2one('fp.quality.team', string='Quality Team')
# =========================================================== CHECK ===
class FpQualityCheckCategorisation(models.Model):
_inherit = 'fusion.plating.quality.check'
tag_ids = fields.Many2many(
'fp.quality.tag', 'fp_check_tag_rel', 'check_id', 'tag_id',
string='Tags',
)
reason_id = fields.Many2one('fp.quality.reason', string='Failure Reason')
team_id = fields.Many2one('fp.quality.team', string='Quality Team')
# ============================================================ RMA ===
class FpRmaCategorisation(models.Model):
_inherit = 'fusion.plating.rma'
tag_ids = fields.Many2many(
'fp.quality.tag', 'fp_rma_tag_rel', 'rma_id', 'tag_id',
string='Tags',
)
reason_id = fields.Many2one('fp.quality.reason', string='Root-Cause Reason')
team_id = fields.Many2one('fp.quality.team', string='Quality Team',
tracking=True)
stage_id = fields.Many2one(
'fp.quality.alert.stage', string='Stage',
compute='_compute_stage_id', store=True, tracking=True,
group_expand='_read_group_stage_ids',
help='Computed from state. RMA state machine has guards (use the '
'lifecycle buttons for valid transitions); the stage field is '
'read-mostly here so the unified Quality Dashboard can group '
'NCR + RMA cards in one kanban.',
)
@api.depends('state')
def _compute_stage_id(self):
for rec in self:
code = RMA_STATE_TO_STAGE_CODE.get(rec.state)
rec.stage_id = _stage_for_code(self.env, code) if code else False
@api.model
def _read_group_stage_ids(self, stages, domain):
return self.env['fp.quality.alert.stage'].sudo().search([])

View File

@@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase D — cross-creation actions and CAPA closure-loop linkage.
#
# - NCR.action_spawn_capa: creates a draft CAPA pre-filled from the NCR.
# - CAPA.action_mark_not_effective override: auto-creates a follow-up NCR
# linked back to the original NCR. Closes the loop "we said we fixed it
# but it happened again."
import logging
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FpNcrCrossCreation(models.Model):
_inherit = 'fusion.plating.ncr'
def action_spawn_capa(self):
"""Create a draft CAPA pre-filled from this NCR. Visible from form
when state ∈ {disposition, closed} and severity ≥ medium (gating
lives in the view; this method is a helper)."""
self.ensure_one()
Capa = self.env['fusion.plating.capa']
existing = Capa.search([('ncr_id', '=', self.id)], limit=1)
if existing:
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.capa',
'view_mode': 'form',
'res_id': existing.id,
}
capa = Capa.create({
'ncr_id': self.id,
'description': self.description,
'type': 'corrective',
'state': 'draft',
'team_id': self.team_id.id if self.team_id else False,
'reason_id': self.reason_id.id if self.reason_id else False,
})
self.message_post(
body=Markup('Spawned CAPA <b>%s</b> from this NCR.') % capa.name,
message_type='comment',
subtype_xmlid='mail.mt_note',
)
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.capa',
'view_mode': 'form',
'res_id': capa.id,
}
class FpCapaCrossCreation(models.Model):
_inherit = 'fusion.plating.capa'
follow_up_ncr_id = fields.Many2one(
'fusion.plating.ncr', string='Follow-up NCR',
ondelete='set null',
help='When effectiveness verification fails, a new NCR is auto-spawned '
'linked back to the original. This field tracks that follow-up.',
)
def action_mark_not_effective(self):
"""Override to auto-spawn a follow-up NCR linked to the original.
Closes the closed-loop CAPA discipline: if a fix didn't work, the
next NCR gets a clear lineage back to the failed CAPA, so root-
cause analysis can dig deeper next time.
"""
super().action_mark_not_effective()
Ncr = self.env['fusion.plating.ncr']
for rec in self:
if rec.follow_up_ncr_id:
continue
if not rec.ncr_id:
_logger.info(
'CAPA %s marked not_effective but has no source NCR; '
'no follow-up NCR created.', rec.name,
)
continue
src = rec.ncr_id
ncr = Ncr.create({
'facility_id': src.facility_id.id,
'source': src.source,
'severity': src.severity,
'part_ref': src.part_ref,
'quantity_affected': src.quantity_affected,
'customer_partner_id': src.customer_partner_id.id,
'bath_id': src.bath_id.id if src.bath_id else False,
'description': Markup(
'<p><strong>Follow-up NCR auto-created from CAPA %s '
'(verification failed).</strong></p>'
) % rec.name,
'team_id': rec.team_id.id if rec.team_id else False,
'reason_id': rec.reason_id.id if rec.reason_id else False,
'tag_ids': [(6, 0, src.tag_ids.ids)],
})
rec.follow_up_ncr_id = ncr.id
rec.message_post(
body=Markup(
'Effectiveness verification failed. Spawned follow-up '
'<b>NCR %s</b> for re-investigation.'
) % ncr.name,
message_type='comment',
subtype_xmlid='mail.mt_note',
)
ncr.message_post(
body=Markup(
'Auto-created from <b>CAPA %s</b> after effectiveness '
'verification failed. Original NCR was %s.'
) % (rec.name, src.name),
message_type='comment',
subtype_xmlid='mail.mt_note',
)
def action_verify_effectiveness(self):
"""Schedule a follow-up activity on the originating NCR.
Used from the CAPA form to remind the QA team to come back and
confirm the corrective action actually held.
"""
from datetime import timedelta
self.ensure_one()
if not self.ncr_id:
raise UserError(_(
'CAPA %s has no source NCR — verification activity '
'cannot be scheduled.'
) % self.display_name)
deadline = fields.Date.context_today(self) + timedelta(days=30)
self.ncr_id.activity_schedule(
'mail.mail_activity_data_todo',
date_deadline=deadline,
summary=_('Verify CAPA %s effectiveness') % self.name,
note=_(
'Confirm that the corrective action from CAPA %s is still '
'holding. If issue recurs, mark CAPA as Not Effective '
'(auto-spawns a follow-up NCR).'
) % self.name,
user_id=(self.owner_id.id if self.owner_id else self.env.user.id),
)
self.message_post(
body=Markup(
'Verification activity scheduled on source <b>NCR %s</b> '
'(due %s).'
) % (self.ncr_id.name, deadline),
message_type='comment',
subtype_xmlid='mail.mt_note',
)
return True

View File

@@ -0,0 +1,199 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase C — trigger-based quality points.
#
# Replaces the old "customer.x_fc_requires_qc + customer.x_fc_qc_template_id"
# direct binding. Now an admin defines fp.quality.point rules with filters
# (partner / part / coating / step kind) and a trigger event; matching
# records spawn fusion.plating.quality.check rows from the chosen template.
import logging
from odoo import _, api, fields, models
_logger = logging.getLogger(__name__)
TRIGGER_TYPES = [
('manual', 'Manual'),
('so_confirmed', 'Sale Order Confirmed'),
('receiving_done', 'Receiving Closed'),
('job_confirmed', 'Job Confirmed'),
('job_step_done', 'Job Step Finished'),
('job_done', 'Job Completed'),
]
STEP_KINDS = [
('wet', 'Wet Process'),
('bake', 'Bake / Cure'),
('inspect', 'Inspection'),
('mask', 'Masking'),
('post', 'Post-Treatment'),
('other', 'Other'),
]
class FpQualityPoint(models.Model):
_name = 'fp.quality.point'
_description = 'Fusion Plating — Quality Point'
_inherit = ['mail.thread']
_order = 'sequence, name'
name = fields.Char(required=True, translate=True, tracking=True)
sequence = fields.Integer(default=10)
active = fields.Boolean(default=True, tracking=True)
description = fields.Text(translate=True)
trigger_type = fields.Selection(
TRIGGER_TYPES, string='Trigger', required=True,
default='job_confirmed', tracking=True,
help='When this point fires. "manual" never auto-fires.',
)
# ----- Filters (all optional; empty == match all) -----
partner_ids = fields.Many2many(
'res.partner', 'fp_quality_point_partner_rel',
'point_id', 'partner_id', string='Customers',
)
part_catalog_ids = fields.Many2many(
'fp.part.catalog', 'fp_quality_point_part_rel',
'point_id', 'part_id', string='Parts',
)
coating_config_ids = fields.Many2many(
'fp.coating.config', 'fp_quality_point_coating_rel',
'point_id', 'coating_id', string='Coatings',
)
step_kind = fields.Selection(STEP_KINDS, string='Step Kind')
template_id = fields.Many2one(
'fp.qc.checklist.template', string='Checklist Template',
required=True, ondelete='restrict',
)
assignee_user_id = fields.Many2one(
'res.users', string='Default Inspector',
help='If set, the auto-spawned QC check is pre-assigned here.',
)
team_id = fields.Many2one('fp.quality.team', string='Quality Team')
tag_ids = fields.Many2many(
'fp.quality.tag', 'fp_quality_point_tag_rel',
'point_id', 'tag_id', string='Tags',
)
# Stats
spawn_count = fields.Integer(
string='Checks Spawned', compute='_compute_spawn_count',
)
@api.depends('template_id')
def _compute_spawn_count(self):
Check = self.env['fusion.plating.quality.check']
for rec in self:
if not rec.template_id:
rec.spawn_count = 0
continue
rec.spawn_count = Check.search_count([
('template_id', '=', rec.template_id.id),
])
# ------------------------------------------------------------------
# Matching + spawning
# ------------------------------------------------------------------
def _matches(self, partner=None, part=None, coating=None, step=None):
"""Return True if this point's filters all pass against the supplied
context. Empty filter == match anything.
"""
self.ensure_one()
if self.partner_ids and (not partner or partner not in self.partner_ids):
return False
if self.part_catalog_ids and (
not part or part not in self.part_catalog_ids):
return False
if self.coating_config_ids and (
not coating or coating not in self.coating_config_ids):
return False
if self.step_kind and step and getattr(step, 'kind', None) \
and step.kind != self.step_kind:
return False
return True
@api.model
def _find_matching(self, trigger, partner=None, part=None, coating=None,
step=None):
"""Return active points whose trigger + filters match the context."""
candidates = self.search([
('active', '=', True),
('trigger_type', '=', trigger),
])
return candidates.filtered(lambda p: p._matches(
partner=partner, part=part, coating=coating, step=step,
))
def _spawn_check_for(self, source, partner=None, job=None, step=None):
"""Create a fusion.plating.quality.check from this point's template.
Idempotent per (point, source): if a check already exists with the
same template_id and the same job/step binding, no new one is
created (returns the existing one).
"""
self.ensure_one()
Check = self.env['fusion.plating.quality.check']
if not self.template_id:
_logger.warning(
'fp.quality.point %s: no template_id set, skipping spawn.',
self.name,
)
return False
domain = [('template_id', '=', self.template_id.id)]
if job:
domain.append(('job_id', '=', job.id))
if step and 'step_id' in Check._fields:
domain.append(('step_id', '=', step.id))
existing = Check.search(domain, limit=1)
if existing:
return existing
vals = {
'template_id': self.template_id.id,
}
# Best-effort field bindings — survives schema variations.
if 'partner_id' in Check._fields and partner:
vals['partner_id'] = partner.id
if 'job_id' in Check._fields and job:
vals['job_id'] = job.id
if 'step_id' in Check._fields and step:
vals['step_id'] = step.id
if 'state' in Check._fields:
vals['state'] = 'pending'
if 'inspector_id' in Check._fields and self.assignee_user_id:
vals['inspector_id'] = self.assignee_user_id.id
if 'team_id' in Check._fields and self.team_id:
vals['team_id'] = self.team_id.id
if 'tag_ids' in Check._fields and self.tag_ids:
vals['tag_ids'] = [(6, 0, self.tag_ids.ids)]
try:
return Check.create(vals)
except Exception as e:
_logger.warning(
'fp.quality.point %s: spawn failed for %s%s',
self.name, source.display_name if source else '?', e,
)
return False
def action_spawn_manual(self):
"""Manual fire — present from the form view button. No source ctx."""
for rec in self:
rec._spawn_check_for(source=rec)
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Quality Point fired'),
'message': _('Spawned %s check(s).') % len(self),
'type': 'success',
},
}

View File

@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase C — trigger-hook overrides on receiving / job / step / SO.
# Each hook walks fp.quality.point with the matching trigger_type and
# spawns a quality check for every match. Best-effort: failures are
# logged but never block the underlying state transition.
import logging
from odoo import api, fields, models
_logger = logging.getLogger(__name__)
# ============================================================ RECEIVING ===
class FpReceivingPointHook(models.Model):
_inherit = 'fp.receiving'
def write(self, vals):
"""When state flips to closed, fire receiving_done points."""
prev_states = {rec.id: rec.state for rec in self}
result = super().write(vals)
if 'state' not in vals or vals.get('state') != 'closed':
return result
Point = self.env['fp.quality.point']
for rec in self:
if prev_states.get(rec.id) == 'closed':
continue
partner = rec.partner_id
points = Point._find_matching(
trigger='receiving_done', partner=partner,
)
for point in points:
point._spawn_check_for(
source=rec, partner=partner,
)
return result
# ================================================================== SO ===
class SaleOrderPointHook(models.Model):
_inherit = 'sale.order'
def action_confirm(self):
result = super().action_confirm()
Point = self.env['fp.quality.point']
for so in self:
partner = so.partner_id
# Walk lines for part / coating context.
parts = so.order_line.mapped('x_fc_part_catalog_id') \
if 'x_fc_part_catalog_id' in so.order_line._fields else False
coatings = so.order_line.mapped('x_fc_coating_config_id') \
if 'x_fc_coating_config_id' in so.order_line._fields else False
points = Point._find_matching(
trigger='so_confirmed', partner=partner,
)
for point in points:
# Filter by part / coating intersection if the point cares.
if point.part_catalog_ids and parts and \
not (point.part_catalog_ids & parts):
continue
if point.coating_config_ids and coatings and \
not (point.coating_config_ids & coatings):
continue
point._spawn_check_for(source=so, partner=partner)
return result
# ================================================================ JOB ===
class FpJobPointHook(models.Model):
_inherit = 'fp.job'
def action_confirm(self):
result = super().action_confirm()
Point = self.env['fp.quality.point']
for job in self:
partner = job.partner_id
part = getattr(job, 'part_catalog_id', False) or False
coating = getattr(job, 'coating_config_id', False) or False
points = Point._find_matching(
trigger='job_confirmed', partner=partner,
part=part or None, coating=coating or None,
)
for point in points:
point._spawn_check_for(
source=job, partner=partner, job=job,
)
return result
def button_mark_done(self):
result = super().button_mark_done()
Point = self.env['fp.quality.point']
for job in self:
if job.state != 'done':
continue
partner = job.partner_id
part = getattr(job, 'part_catalog_id', False) or False
coating = getattr(job, 'coating_config_id', False) or False
points = Point._find_matching(
trigger='job_done', partner=partner,
part=part or None, coating=coating or None,
)
for point in points:
point._spawn_check_for(
source=job, partner=partner, job=job,
)
return result
# =========================================================== JOB STEP ===
class FpJobStepPointHook(models.Model):
_inherit = 'fp.job.step'
def button_finish(self):
result = super().button_finish()
Point = self.env['fp.quality.point']
for step in self:
if step.state != 'done':
continue
job = step.job_id
partner = job.partner_id if job else False
part = getattr(job, 'part_catalog_id', False) or False
coating = getattr(job, 'coating_config_id', False) or False
points = Point._find_matching(
trigger='job_step_done', partner=partner,
part=part or None, coating=coating or None, step=step,
)
for point in points:
point._spawn_check_for(
source=step, partner=partner, job=job, step=step,
)
return result

View File

@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase B — quality reason (root-cause classification library).
from odoo import fields, models
class FpQualityReason(models.Model):
_name = 'fp.quality.reason'
_description = 'Fusion Plating — Quality Reason'
_order = 'category, name'
name = fields.Char(required=True, translate=True)
description = fields.Text(translate=True)
category = fields.Selection(
[
('process', 'Process'),
('supplier', 'Supplier / Material Inbound'),
('equipment', 'Equipment / Calibration'),
('human', 'Human Error / Training'),
('material', 'Material Defect'),
('other', 'Other'),
],
string='Category',
default='process',
required=True,
)
active = fields.Boolean(default=True)
_sql_constraints = [
('name_category_uniq', 'unique(name, category)',
'A reason with that name + category combination already exists.'),
]

View File

@@ -0,0 +1,261 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase D — smart-button counts on fp.job, sale.order, res.partner.
#
# Each parent record gets badge counts for: Holds, Checks, NCRs, CAPAs,
# RMAs. Counts always render (zero is acceptable). Action methods open
# the relevant kanban filtered to that record.
from odoo import _, api, fields, models
# ============================================================ FP.JOB ===
class FpJobQualitySmart(models.Model):
_inherit = 'fp.job'
fp_qc_hold_count = fields.Integer(
compute='_compute_fp_quality_counts', string='Holds',
)
fp_qc_check_count = fields.Integer(
compute='_compute_fp_quality_counts', string='Checks',
)
fp_qc_ncr_count = fields.Integer(
compute='_compute_fp_quality_counts', string='NCRs',
)
fp_qc_capa_count = fields.Integer(
compute='_compute_fp_quality_counts', string='CAPAs',
)
fp_qc_rma_count = fields.Integer(
compute='_compute_fp_quality_counts', string='RMAs',
)
def _compute_fp_quality_counts(self):
Hold = self.env['fusion.plating.quality.hold']
Check = self.env['fusion.plating.quality.check']
Ncr = self.env['fusion.plating.ncr']
Capa = self.env['fusion.plating.capa']
Rma = self.env['fusion.plating.rma']
for job in self:
job.fp_qc_hold_count = Hold.search_count(
[('job_id', '=', job.id)])
job.fp_qc_check_count = Check.search_count(
[('job_id', '=', job.id)])
ncr_ids = []
capa_ids = []
rma_ids = []
if job.sale_order_id:
rma_ids = Rma.search(
[('sale_order_id', '=', job.sale_order_id.id)]).ids
if rma_ids:
ncr_ids = Ncr.search([('rma_id', 'in', rma_ids)]).ids
if job.partner_id:
ncr_ids = list(set(ncr_ids + Ncr.search([
('customer_partner_id', '=', job.partner_id.id),
]).ids))
if ncr_ids:
capa_ids = Capa.search([('ncr_id', 'in', ncr_ids)]).ids
job.fp_qc_ncr_count = len(ncr_ids)
job.fp_qc_capa_count = len(capa_ids)
job.fp_qc_rma_count = len(rma_ids)
def action_view_fp_holds(self):
self.ensure_one()
return {
'name': _('Holds'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.hold',
'view_mode': 'list,form',
'domain': [('job_id', '=', self.id)],
'context': {'default_job_id': self.id},
}
def action_view_fp_checks(self):
self.ensure_one()
return {
'name': _('Quality Checks'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.check',
'view_mode': 'list,form',
'domain': [('job_id', '=', self.id)],
'context': {'default_job_id': self.id},
}
def action_view_fp_ncrs(self):
self.ensure_one()
domain = [('customer_partner_id', '=', self.partner_id.id)]
return {
'name': _('NCRs'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.ncr',
'view_mode': 'kanban,list,form',
'domain': domain,
'context': {'default_customer_partner_id': self.partner_id.id},
}
def action_view_fp_capas(self):
self.ensure_one()
Ncr = self.env['fusion.plating.ncr']
ncr_ids = Ncr.search([
('customer_partner_id', '=', self.partner_id.id),
]).ids
return {
'name': _('CAPAs'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.capa',
'view_mode': 'list,form',
'domain': [('ncr_id', 'in', ncr_ids)],
}
def action_view_fp_rmas(self):
self.ensure_one()
domain = [('partner_id', '=', self.partner_id.id)]
if self.sale_order_id:
domain = ['|', ('sale_order_id', '=', self.sale_order_id.id),
('partner_id', '=', self.partner_id.id)]
return {
'name': _('RMAs'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.rma',
'view_mode': 'kanban,list,form',
'domain': domain,
'context': {'default_partner_id': self.partner_id.id},
}
# ============================================================== SO ===
class SaleOrderQualitySmart(models.Model):
_inherit = 'sale.order'
fp_qc_hold_count = fields.Integer(
compute='_compute_fp_qc_counts', string='Holds',
)
fp_qc_check_count = fields.Integer(
compute='_compute_fp_qc_counts', string='Checks',
)
fp_qc_ncr_count_so = fields.Integer(
compute='_compute_fp_qc_counts', string='NCRs',
)
fp_qc_capa_count = fields.Integer(
compute='_compute_fp_qc_counts', string='CAPAs',
)
fp_qc_rma_count = fields.Integer(
compute='_compute_fp_qc_counts', string='RMAs',
)
def _compute_fp_qc_counts(self):
Hold = self.env['fusion.plating.quality.hold']
Check = self.env['fusion.plating.quality.check']
Ncr = self.env['fusion.plating.ncr']
Capa = self.env['fusion.plating.capa']
Rma = self.env['fusion.plating.rma']
Job = self.env['fp.job']
for so in self:
job_ids = Job.search([('sale_order_id', '=', so.id)]).ids
so.fp_qc_hold_count = Hold.search_count(
[('job_id', 'in', job_ids)]) if job_ids else 0
so.fp_qc_check_count = Check.search_count(
[('job_id', 'in', job_ids)]) if job_ids else 0
rma_ids = Rma.search([('sale_order_id', '=', so.id)]).ids
so.fp_qc_rma_count = len(rma_ids)
ncr_ids = []
if rma_ids:
ncr_ids = Ncr.search([('rma_id', 'in', rma_ids)]).ids
if so.partner_id:
ncr_ids = list(set(ncr_ids + Ncr.search([
('customer_partner_id', '=', so.partner_id.id),
]).ids))
so.fp_qc_ncr_count_so = len(ncr_ids)
so.fp_qc_capa_count = Capa.search_count(
[('ncr_id', 'in', ncr_ids)]) if ncr_ids else 0
def action_view_fp_holds(self):
self.ensure_one()
Job = self.env['fp.job']
job_ids = Job.search([('sale_order_id', '=', self.id)]).ids
return {
'name': _('Holds'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.hold',
'view_mode': 'list,form',
'domain': [('job_id', 'in', job_ids)],
}
def action_view_fp_checks(self):
self.ensure_one()
Job = self.env['fp.job']
job_ids = Job.search([('sale_order_id', '=', self.id)]).ids
return {
'name': _('Quality Checks'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.check',
'view_mode': 'list,form',
'domain': [('job_id', 'in', job_ids)],
}
def action_view_fp_ncrs_so(self):
self.ensure_one()
return {
'name': _('NCRs'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.ncr',
'view_mode': 'kanban,list,form',
'domain': [('customer_partner_id', '=', self.partner_id.id)],
}
def action_view_fp_capas(self):
self.ensure_one()
Ncr = self.env['fusion.plating.ncr']
ncr_ids = Ncr.search([
('customer_partner_id', '=', self.partner_id.id),
]).ids
return {
'name': _('CAPAs'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.capa',
'view_mode': 'list,form',
'domain': [('ncr_id', 'in', ncr_ids)],
}
def action_view_fp_rmas(self):
self.ensure_one()
return {
'name': _('RMAs'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.rma',
'view_mode': 'kanban,list,form',
'domain': [('sale_order_id', '=', self.id)],
'context': {
'default_partner_id': self.partner_id.id,
'default_sale_order_id': self.id,
},
}
# ====================================================== RES.PARTNER ===
class ResPartnerQualitySmart(models.Model):
_inherit = 'res.partner'
fp_qc_quality_history_count = fields.Integer(
compute='_compute_fp_qc_history_count', string='Quality History',
)
def _compute_fp_qc_history_count(self):
Ncr = self.env['fusion.plating.ncr']
Rma = self.env['fusion.plating.rma']
for partner in self:
partner.fp_qc_quality_history_count = (
Ncr.search_count([('customer_partner_id', '=', partner.id)])
+ Rma.search_count([('partner_id', '=', partner.id)])
)
def action_view_fp_quality_history(self):
self.ensure_one()
return {
'name': _('Quality History — %s') % self.display_name,
'type': 'ir.actions.client',
'tag': 'fp_quality_dashboard',
'context': {'default_partner_id': self.id},
}

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase B — quality tag.
#
# Cross-cutting tag library reused by NCR, CAPA, Hold, Check, RMA.
from odoo import fields, models
class FpQualityTag(models.Model):
_name = 'fp.quality.tag'
_description = 'Fusion Plating — Quality Tag'
_order = 'name'
name = fields.Char(required=True, translate=True)
color = fields.Integer(string='Colour Index', default=0)
description = fields.Char(string='Description', translate=True)
active = fields.Boolean(default=True)
_sql_constraints = [
('name_uniq', 'unique(name)', 'A tag with that name already exists.'),
]

View File

@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase B — quality team.
#
# Dedicated team model rather than reusing res.groups, per Sub 12 locked
# decision: teams need their own kanban grouping + per-team escalation
# chains.
from odoo import fields, models
class FpQualityTeam(models.Model):
_name = 'fp.quality.team'
_description = 'Fusion Plating — Quality Team'
_order = 'sequence, name'
_inherit = ['mail.thread']
name = fields.Char(required=True, tracking=True, translate=True)
sequence = fields.Integer(default=10)
color = fields.Integer(string='Colour Index', default=0)
description = fields.Text(translate=True)
lead_user_id = fields.Many2one(
'res.users', string='Team Lead',
tracking=True,
help='Owns escalations and weekly review of open NCRs/RMAs.',
)
member_ids = fields.Many2many(
'res.users', 'fp_quality_team_user_rel', 'team_id', 'user_id',
string='Members',
)
escalation_user_id = fields.Many2one(
'res.users', string='Escalation Manager',
tracking=True,
help='Notified when team owns a record that misses its deadline.',
)
active = fields.Boolean(default=True)
_sql_constraints = [
('name_uniq', 'unique(name)', 'A team with that name already exists.'),
]

View File

@@ -0,0 +1,775 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# fp.rma — Return Material Authorisation.
#
# Sub 12 Phase A. Internal-only RMA workflow that ties customer returns to
# the existing NCR / CAPA / Hold stack. Portal submission is deferred to a
# future sub-project; for now an internal user opens the RMA on behalf of
# the customer.
#
# Lifecycle:
# draft -> authorised -> shipped_to_us -> received -> triaged ->
# resolving -> resolved -> closed
# \
# -> cancelled (manager only, any state)
#
# Auto-spawn rules at the `received` transition (driven by fp.receiving):
# - if auto_spawn_ncr (default True) -> create fusion.plating.ncr
# - if auto_spawn_hold (default True) -> create fusion.plating.quality.hold
# A manager can flip either toggle off before saving the RMA.
import base64
import io
import logging
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FpRma(models.Model):
_name = 'fusion.plating.rma'
_description = 'Fusion Plating — Return Material Authorisation'
_inherit = ['mail.thread', 'mail.activity.mixin']
_order = 'create_date desc, id desc'
_rec_name = 'name'
name = fields.Char(
string='Reference',
required=True,
copy=False,
readonly=True,
default=lambda self: self._default_name(),
tracking=True,
)
state = fields.Selection(
[
('draft', 'Draft'),
('authorised', 'Authorised'),
('shipped_to_us', 'Customer Shipped'),
('received', 'Received at Shop'),
('triaged', 'Triaged'),
('resolving', 'Resolving'),
('resolved', 'Resolved'),
('closed', 'Closed'),
('cancelled', 'Cancelled'),
],
string='Status',
default='draft',
required=True,
tracking=True,
index=True,
)
# ------------------------------------------------------------------
# Customer + originating order
# ------------------------------------------------------------------
partner_id = fields.Many2one(
'res.partner', string='Customer',
required=True, tracking=True,
domain=[('customer_rank', '>', 0)],
)
sale_order_id = fields.Many2one(
'sale.order', string='Original Sale Order',
required=True, tracking=True,
domain="[('partner_id', '=', partner_id)]",
help='The order being returned. Required so cert/part/coating '
'context follows the return through triage and resolution.',
)
sale_order_line_ids = fields.Many2many(
'sale.order.line', 'fp_rma_sol_rel', 'rma_id', 'sol_id',
string='Returned Lines',
domain="[('order_id', '=', sale_order_id)]",
help='Subset of the original SO lines that the customer is '
'returning. Used to pull part/cert context.',
)
original_job_ids = fields.Many2many(
'fp.job', string='Original Jobs',
compute='_compute_original_job_ids', store=False,
help='Jobs derived from the SO. Navigation-only.',
)
company_id = fields.Many2one(
'res.company', default=lambda self: self.env.company,
readonly=True,
)
# ------------------------------------------------------------------
# Why and how bad
# ------------------------------------------------------------------
trigger_source = fields.Selection(
[
('customer_complaint', 'Customer Complaint'),
('qc_fail_post_ship', 'Post-Shipment QC Failure'),
('inspection_post_delivery', 'Customer Inspection Post-Delivery'),
('other', 'Other'),
],
string='Trigger',
default='customer_complaint',
required=True,
tracking=True,
)
severity = fields.Selection(
[
('low', 'Low'),
('medium', 'Medium'),
('high', 'High'),
('critical', 'Critical'),
],
string='Severity',
default='medium',
required=True,
tracking=True,
)
complaint_description = fields.Html(
string='Customer Complaint',
help='What the customer reported.',
)
triage_findings = fields.Html(
string='Triage Findings',
help='What we found on inspection after receiving the parts.',
)
# ------------------------------------------------------------------
# Resolution
# ------------------------------------------------------------------
resolution_type = fields.Selection(
[
('replace', 'Replace'),
('rework', 'Rework'),
('refund', 'Refund'),
('scrap', 'Scrap'),
],
string='Resolution',
tracking=True,
)
resolution_notes = fields.Html(string='Resolution Notes')
replacement_job_id = fields.Many2one(
'fp.job', string='Replacement Job',
ondelete='set null',
help='New plating job created for replace/rework resolutions.',
)
refund_invoice_id = fields.Many2one(
'account.move', string='Refund / Credit Note',
ondelete='set null',
domain="[('move_type', '=', 'out_refund')]",
)
# ------------------------------------------------------------------
# Inbound logistics
# ------------------------------------------------------------------
inbound_receiving_id = fields.Many2one(
'fp.receiving', string='Inbound Receiving',
ondelete='set null',
help='Receiving record auto-created when the carrier delivers '
'the returned parts.',
)
inbound_picking_id = fields.Many2one(
'stock.picking', string='Inbound Picking',
ondelete='set null',
)
qty_returned = fields.Integer(
string='Qty Returned', tracking=True,
help='Total units the customer is returning per the authorisation.',
)
qty_received = fields.Integer(
string='Qty Received', tracking=True,
help='Counted on receipt at our dock.',
)
customer_tracking = fields.Char(
string='Customer Tracking #',
help='Outbound tracking from the customer back to us.',
)
our_tracking = fields.Char(
string='Our Tracking #',
help='Tracking number for the replacement / return shipment '
'from our shop.',
)
carrier_id = fields.Many2one('delivery.carrier', string='Carrier')
# ------------------------------------------------------------------
# QR + auto-spawn toggles
# ------------------------------------------------------------------
qr_code = fields.Binary(
string='QR Code', compute='_compute_qr_code', store=False,
help='Encodes /fp/rma/<id> for the customer authorisation PDF.',
)
auto_spawn_ncr = fields.Boolean(
string='Auto-create NCR on Receipt',
default=True, tracking=True,
help='When the carrier delivers the returned parts and an '
'fp.receiving is created against this RMA, an NCR is '
'spawned automatically. Manager can toggle off — the '
'change is tracked on the chatter for audit.',
)
auto_spawn_hold = fields.Boolean(
string='Auto-place Hold on Receipt',
default=True, tracking=True,
help='Same trigger as auto_spawn_ncr but creates an '
'fusion.plating.quality.hold for the returned qty.',
)
# ------------------------------------------------------------------
# Linked records (cross-domain)
# ------------------------------------------------------------------
linked_ncr_ids = fields.One2many(
'fusion.plating.ncr', 'rma_id', string='NCRs',
)
linked_hold_ids = fields.One2many(
'fusion.plating.quality.hold', 'rma_id', string='Holds',
)
linked_capa_ids = fields.Many2many(
'fusion.plating.capa', string='CAPAs',
compute='_compute_linked_capa_ids', store=False,
)
ncr_count = fields.Integer(compute='_compute_link_counts')
hold_count = fields.Integer(compute='_compute_link_counts')
capa_count = fields.Integer(compute='_compute_link_counts')
active = fields.Boolean(default=True)
# ------------------------------------------------------------------
# Phase B placeholders (categorisation) — added now so views won't
# break when Phase B lands. Kept as M2O/M2M to models added later.
# ------------------------------------------------------------------
# tag_ids, reason_id, team_id, stage_id are added in Phase B.
# ------------------------------------------------------------------
# Defaults / create / name
# ------------------------------------------------------------------
@api.model
def _default_name(self):
seq = self.env['ir.sequence'].next_by_code('fusion.plating.rma')
return seq or '/'
@api.model_create_multi
def create(self, vals_list):
for vals in vals_list:
if not vals.get('name') or vals.get('name') == '/':
vals['name'] = self._default_name()
return super().create(vals_list)
# ------------------------------------------------------------------
# Computes
# ------------------------------------------------------------------
@api.depends('sale_order_id', 'sale_order_line_ids')
def _compute_original_job_ids(self):
Job = self.env['fp.job']
for rec in self:
if not rec.sale_order_id:
rec.original_job_ids = False
continue
rec.original_job_ids = Job.search([
('sale_order_id', '=', rec.sale_order_id.id),
])
@api.depends('linked_ncr_ids.capa_ids')
def _compute_linked_capa_ids(self):
for rec in self:
rec.linked_capa_ids = rec.linked_ncr_ids.mapped('capa_ids')
@api.depends(
'linked_ncr_ids', 'linked_hold_ids', 'linked_capa_ids',
)
def _compute_link_counts(self):
for rec in self:
rec.ncr_count = len(rec.linked_ncr_ids)
rec.hold_count = len(rec.linked_hold_ids)
rec.capa_count = len(rec.linked_capa_ids)
@api.depends('name')
def _compute_qr_code(self):
try:
import qrcode
except ImportError:
for rec in self:
rec.qr_code = False
return
base = self.env['ir.config_parameter'].sudo().get_param(
'web.base.url', '',
)
for rec in self:
if not rec.id:
rec.qr_code = False
continue
url = f'{base}/fp/rma/{rec.id}'
buf = io.BytesIO()
img = qrcode.make(url)
img.save(buf, format='PNG')
rec.qr_code = base64.b64encode(buf.getvalue())
# ------------------------------------------------------------------
# Lifecycle actions
# ------------------------------------------------------------------
def action_authorise(self):
for rec in self:
if not rec.sale_order_line_ids:
raise UserError(_(
'Select at least one returned line on RMA %s before '
'authorising.'
) % rec.display_name)
if rec.qty_returned <= 0:
raise UserError(_(
'RMA %s needs a returned quantity > 0 before '
'authorising.'
) % rec.display_name)
rec.state = 'authorised'
rec._post_state_message('Authorised')
rec._fire_rma_notification('rma_authorised')
def action_mark_shipped_to_us(self):
for rec in self:
if rec.state != 'authorised':
raise UserError(_(
'RMA %s must be Authorised before marking it as '
'shipped by the customer.'
) % rec.display_name)
rec.state = 'shipped_to_us'
rec._post_state_message('Customer Shipped')
def action_mark_received(self):
"""Manual fallback when an inbound fp.receiving was not auto-linked."""
for rec in self:
if rec.state not in ('authorised', 'shipped_to_us'):
raise UserError(_(
'RMA %s must be Authorised or Shipped before being '
'marked Received.'
) % rec.display_name)
rec._enter_received_state(receiving=False)
def _enter_received_state(self, receiving=None):
"""Common receive-side hook. Called either:
- from action_mark_received (manual)
- from fp.receiving.create override when rma_id was set
Flips state to `received` and (optionally) spawns NCR + Hold per
the auto_spawn_* toggles. Idempotent — re-entry on an already-
received RMA is a no-op (no double-spawn on ORM retry / split
deliveries).
"""
for rec in self:
if rec.state == 'received':
continue
rec.state = 'received'
spawned = []
if rec.auto_spawn_ncr:
ncr = rec._spawn_ncr()
if ncr:
spawned.append(_('NCR %s') % ncr.name)
if rec.auto_spawn_hold:
hold = rec._spawn_hold()
if hold:
spawned.append(_('Hold %s') % hold.name)
label = 'Received'
if spawned:
label += ' — auto-spawned ' + ', '.join(spawned)
rec._post_state_message(label)
# Customer notification: parts arrived at the shop.
rec._fire_rma_notification('rma_received')
def _spawn_ncr(self):
self.ensure_one()
Ncr = self.env['fusion.plating.ncr']
# Idempotency: if an NCR for this RMA already exists, return it.
existing = Ncr.search([('rma_id', '=', self.id)], limit=1)
if existing:
return existing
partner = self.partner_id
# Pull a facility — prefer the partner's company facility, fall
# back to the first active facility.
Facility = self.env['fusion.plating.facility']
facility = (
Facility.search([('company_id', '=', self.company_id.id)], limit=1)
or Facility.search([], limit=1)
)
if not facility:
_logger.warning(
'RMA %s: no fusion.plating.facility found, NCR spawn '
'skipped', self.name,
)
return False
part_ref = ', '.join(
self.sale_order_line_ids.mapped('product_id.default_code') or []
) or self.sale_order_line_ids[:1].product_id.display_name or '/'
complaint = self.complaint_description or ''
body = (
Markup('<p><strong>RMA %s — auto-created from customer return.</strong></p>') % self.name
+ Markup(complaint or '<p>(no description)</p>')
)
ncr = Ncr.create({
'facility_id': facility.id,
'source': 'customer',
'severity': self.severity or 'medium',
'part_ref': part_ref[:64],
'quantity_affected': self.qty_received or self.qty_returned or 0,
'description': body,
'customer_partner_id': partner.id,
'rma_id': self.id,
})
return ncr
def _spawn_hold(self):
self.ensure_one()
Hold = self.env['fusion.plating.quality.hold']
# Idempotency: one auto-Hold per RMA.
existing = Hold.search([('rma_id', '=', self.id)], limit=1)
if existing:
return existing
Facility = self.env['fusion.plating.facility']
facility = (
Facility.search([('company_id', '=', self.company_id.id)], limit=1)
or Facility.search([], limit=1)
)
part_ref = (
self.sale_order_line_ids[:1].product_id.default_code
or self.sale_order_line_ids[:1].product_id.display_name
or self.name
)
hold = Hold.create({
'part_ref': part_ref[:64],
'qty_on_hold': self.qty_received or self.qty_returned or 0,
'qty_original': self.qty_returned or 0,
'hold_reason': 'customer_complaint',
'description': (
f'Auto-created from RMA {self.name}. '
f'Returned parts on hold pending triage.'
),
'facility_id': facility.id if facility else False,
'rma_id': self.id,
})
return hold
def action_triage_complete(self):
for rec in self:
if rec.state != 'received':
raise UserError(_(
'RMA %s must be Received before triage can be '
'completed.'
) % rec.display_name)
if not rec.resolution_type:
raise UserError(_(
'Set a Resolution (replace / rework / refund / scrap) '
'on RMA %s before completing triage.'
) % rec.display_name)
rec.state = 'triaged'
rec._post_state_message('Triaged')
def action_start_resolving(self):
for rec in self:
if rec.state != 'triaged':
raise UserError(_(
'RMA %s must be Triaged before resolution work can '
'start.'
) % rec.display_name)
rec.state = 'resolving'
rec._post_state_message('Resolving')
def action_resolve(self):
"""Trigger resolution-specific side-effects then flip to resolved.
For replace/rework/scrap: spawn the side-effect, flip state.
For refund: open the credit-note wizard. State stays at
`resolving` until the wizard runs and the accountant links the
credit note via action_link_refund (or the AccountMove write
hook auto-links by invoice_origin).
"""
for rec in self:
if rec.state not in ('triaged', 'resolving'):
raise UserError(_(
'RMA %s must be Triaged or Resolving before being '
'marked Resolved.'
) % rec.display_name)
# Refund path needs a wizard return — handle separately.
refund_recs = self.filtered(lambda r: r.resolution_type == 'refund')
if len(refund_recs) > 1:
raise UserError(_(
'Resolve refund RMAs one at a time so the credit-note '
'wizard can be filled in.'
))
if refund_recs:
return refund_recs._resolve_refund()
# Non-refund paths: fire side-effect then flip state.
for rec in self:
handler = {
'replace': rec._resolve_replace,
'rework': rec._resolve_rework,
'scrap': rec._resolve_scrap,
}.get(rec.resolution_type)
if not handler:
raise UserError(_(
'No handler for resolution type "%s" on RMA %s.'
) % (rec.resolution_type, rec.display_name))
handler()
rec.state = 'resolved'
rec._post_state_message(
f'Resolved ({rec.resolution_type})',
)
rec._fire_rma_notification('rma_resolved')
def _resolve_replace(self):
return self._spawn_replacement_job(reason='replace')
def _resolve_rework(self):
return self._spawn_replacement_job(reason='rework')
def _spawn_replacement_job(self, reason='replace'):
self.ensure_one()
Job = self.env['fp.job']
if self.replacement_job_id:
return self.replacement_job_id
first = self.original_job_ids[:1]
if not first:
_logger.info(
'RMA %s: no originating fp.job to clone; creating bare '
'replacement job.', self.name,
)
new_job = Job.create({
'partner_id': self.partner_id.id,
'sale_order_id': self.sale_order_id.id,
'origin': self.sale_order_id.name or self.name,
'qty': self.qty_returned or 1,
})
else:
new_job = first.copy({
'origin': f'{self.name} (RMA {reason})',
'qty': self.qty_returned or first.qty,
'state': 'draft',
})
# Drop cloned-from-source steps and regenerate from the
# recipe so the rework starts fresh (every step pending,
# no inherited timelogs / actuals / completion flags).
if hasattr(new_job, 'step_ids') and new_job.step_ids:
new_job.step_ids.unlink()
if hasattr(new_job, '_generate_steps_from_recipe') \
and new_job.recipe_id:
new_job._generate_steps_from_recipe()
self.replacement_job_id = new_job.id
# Auto-confirm so the portal mirror, racking inspection and
# 'job_confirmed' notification all fire — same as a normal job.
if hasattr(new_job, 'action_confirm') and new_job.state == 'draft':
try:
new_job.action_confirm()
except Exception as e:
_logger.warning(
'RMA %s: replacement job %s auto-confirm failed (%s); '
'leaving in draft.', self.name, new_job.name, e,
)
return new_job
def _resolve_refund(self):
self.ensure_one()
if self.refund_invoice_id:
return self.refund_invoice_id
# Open the standard refund wizard pre-filled to the original SO.
# We don't auto-confirm — accountant verifies amounts first.
invoices = self.env['account.move'].search([
('invoice_origin', '=', self.sale_order_id.name),
('move_type', '=', 'out_invoice'),
], limit=1)
if not invoices:
raise UserError(_(
'RMA %s: no posted invoice found for SO %s — cannot '
'create a credit note automatically. Issue refund '
'manually.'
) % (self.display_name, self.sale_order_id.name))
return {
'name': _('Credit Note'),
'type': 'ir.actions.act_window',
'res_model': 'account.move.reversal',
'view_mode': 'form',
'target': 'new',
'context': {
'active_model': 'account.move',
'active_ids': invoices.ids,
'default_reason': f'RMA {self.name}',
'default_journal_id': invoices.journal_id.id,
},
}
def _resolve_scrap(self):
# NB: spec calls for an fp.job.consumption row with source='rma_scrap'
# but fp.job.consumption requires product_id and there's no curated
# "scrap" product yet. Phase E will surface scrap via the Monthly
# Quality Summary report instead. For now, just narrate.
self.ensure_one()
qty = self.qty_received or self.qty_returned or 0
self.message_post(
body=Markup(
'Resolution: <b>scrap</b>. %s units written off via RMA %s.'
) % (qty, self.name),
message_type='comment',
subtype_xmlid='mail.mt_note',
)
for ncr in self.linked_ncr_ids:
ncr.message_post(
body=Markup('Resolution: <b>scrap</b> via RMA %s.') % self.name,
message_type='comment',
subtype_xmlid='mail.mt_note',
)
def action_close(self):
for rec in self:
if rec.state != 'resolved':
raise UserError(_(
'RMA %s must be Resolved before it can be closed.'
) % rec.display_name)
open_ncrs = rec.linked_ncr_ids.filtered(
lambda n: n.state != 'closed'
)
if open_ncrs:
raise UserError(_(
'RMA %s has open NCRs (%s). Close the NCRs first.'
) % (
rec.display_name,
', '.join(open_ncrs.mapped('name')),
))
open_holds = rec.linked_hold_ids.filtered(
lambda h: h.state in ('on_hold', 'under_review')
)
if open_holds:
raise UserError(_(
'RMA %s still has active Holds (%s). Release, scrap, '
'or send to rework before closing the RMA.'
) % (
rec.display_name,
', '.join(open_holds.mapped('name')),
))
rec.state = 'closed'
rec._post_state_message('Closed')
def _fire_rma_notification(self, event):
"""Best-effort notification dispatch via fp.notification.template.
Silently skips if fusion_plating_notifications is absent or no
template is configured for this trigger event. Failures never
block the RMA state machine.
"""
if 'fp.notification.template' not in self.env:
return
Tpl = self.env['fp.notification.template'].sudo()
for rec in self:
partner = rec.partner_id
if not partner:
continue
try:
Tpl._dispatch(
event, rec, partner, sale_order=rec.sale_order_id,
)
except Exception as e:
_logger.warning(
'RMA %s: notification %s failed: %s',
rec.name, event, e,
)
def action_cancel(self):
is_manager = self.env.user.has_group(
'fusion_plating.group_fusion_plating_manager'
)
if not is_manager:
raise UserError(_(
'Only Plating Managers can cancel an RMA.'
))
for rec in self:
if rec.state == 'closed':
raise UserError(_(
'RMA %s is already closed and cannot be cancelled.'
) % rec.display_name)
rec.state = 'cancelled'
rec._post_state_message('Cancelled')
# ------------------------------------------------------------------
# Smart-button actions
# ------------------------------------------------------------------
def action_view_ncrs(self):
self.ensure_one()
return {
'name': _('NCRs'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.ncr',
'view_mode': 'list,form',
'domain': [('rma_id', '=', self.id)],
'context': {
'default_rma_id': self.id,
'default_customer_partner_id': self.partner_id.id,
'default_source': 'customer',
},
}
def action_view_holds(self):
self.ensure_one()
return {
'name': _('Holds'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.hold',
'view_mode': 'list,form',
'domain': [('rma_id', '=', self.id)],
'context': {'default_rma_id': self.id},
}
def action_view_capas(self):
self.ensure_one()
return {
'name': _('CAPAs'),
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.capa',
'view_mode': 'list,form',
'domain': [('id', 'in', self.linked_capa_ids.ids)],
}
def action_view_sale_order(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'sale.order',
'view_mode': 'form',
'res_id': self.sale_order_id.id,
}
def action_view_replacement_job(self):
self.ensure_one()
if not self.replacement_job_id:
return False
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job',
'view_mode': 'form',
'res_id': self.replacement_job_id.id,
}
def action_view_refund(self):
self.ensure_one()
if not self.refund_invoice_id:
return False
return {
'type': 'ir.actions.act_window',
'res_model': 'account.move',
'view_mode': 'form',
'res_id': self.refund_invoice_id.id,
}
def action_view_inbound_receiving(self):
self.ensure_one()
if not self.inbound_receiving_id:
return False
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.receiving',
'view_mode': 'form',
'res_id': self.inbound_receiving_id.id,
}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _post_state_message(self, label):
for rec in self:
rec.message_post(
body=Markup('RMA status changed to <b>%s</b>.') % label,
message_type='comment',
subtype_xmlid='mail.mt_note',
)

View File

@@ -0,0 +1,156 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
#
# Sub 12 Phase A. Inverse Many2one fields on NCR, Hold and fp.receiving so
# RMA can hang One2many counterparts off them. Plus a tiny override on
# fp.receiving.create to flip a linked RMA into the `received` state and
# trigger the auto-spawn rules.
import logging
from markupsafe import Markup
from odoo import api, fields, models
_logger = logging.getLogger(__name__)
class FpNcrRmaLink(models.Model):
_inherit = 'fusion.plating.ncr'
rma_id = fields.Many2one(
'fusion.plating.rma', string='RMA',
ondelete='set null', index=True,
help='Return that triggered this NCR (auto-set by RMA receive).',
)
class FpQualityHoldRmaLink(models.Model):
_inherit = 'fusion.plating.quality.hold'
rma_id = fields.Many2one(
'fusion.plating.rma', string='RMA',
ondelete='set null', index=True,
help='Return that placed these parts on hold.',
)
class FpReceivingRmaLink(models.Model):
_inherit = 'fp.receiving'
rma_id = fields.Many2one(
'fusion.plating.rma', string='Linked RMA',
ondelete='set null', index=True,
help='If set, this receiving is the inbound for a customer return. '
'When created, it transitions the RMA to `received` and may '
'auto-spawn an NCR + Hold per the RMA toggles.',
)
@api.model_create_multi
def create(self, vals_list):
records = super().create(vals_list)
# Walk new records, mirror back to RMA, walk the receiving's own
# state machine (draft → counted → staged → closed) so the linked
# SO's x_fc_receiving_status updates, then fire the RMA receive
# hook. Without this the receiving sat at draft and the SO read
# 'not_received' even though the parts were physically at the shop.
for rec in records:
if not rec.rma_id:
continue
rma = rec.rma_id.sudo()
# Mirror inbound link both ways.
if not rma.inbound_receiving_id:
rma.inbound_receiving_id = rec.id
if rma.state in ('authorised', 'shipped_to_us'):
# Use received_qty as qty_received fallback if not set.
if not rma.qty_received and rec.received_qty:
rma.qty_received = rec.received_qty
# Walk the receiving's lifecycle to closed so SO status
# updates. RMA receipts don't have a multi-day racking
# delay (parts are already plated and being inspected for
# the complaint, not racked for fresh plating), so we
# fast-forward all three transitions in one shot.
rec.sudo()._fp_rma_fast_close()
rma._enter_received_state(receiving=rec)
else:
_logger.info(
'RMA %s linked to fp.receiving %s but state %s does '
'not trigger auto-receive hook.',
rma.name, rec.name, rma.state,
)
return records
def _fp_rma_fast_close(self):
"""Walk an RMA-bound receiving from draft to closed in one call.
For RMA returns, the receiving's box-count → racking → close walk
is purely administrative — the parts are already plated and the
operator opens them on triage, not on intake. Fast-forwarding
here keeps the SO's x_fc_receiving_status accurate without
forcing the receiver to click three buttons in sequence.
"""
for rec in self:
if not rec.box_count_in:
# Best-effort default: 1 box if unknown. Real qty lives on
# the RMA's qty_returned / qty_received.
rec.box_count_in = 1
if rec.state == 'draft':
rec.action_mark_counted()
if rec.state == 'counted':
rec.action_mark_staged()
if rec.state == 'staged':
rec.action_close()
class AccountMoveRmaLink(models.Model):
"""Auto-link a credit note back to its RMA when the accountant
confirms the reversal wizard. Looks up by invoice_origin matching
an RMA's sale_order_id.name, scoped to RMAs in `resolving` state
with resolution_type='refund' and no refund_invoice_id yet.
Also flips the RMA from `resolving` to `resolved` once the credit
note is linked — mirrors the auto-progression for replace/rework
paths so the RMA doesn't get stuck after a refund.
"""
_inherit = 'account.move'
@api.model_create_multi
def create(self, vals_list):
moves = super().create(vals_list)
moves._fp_link_to_rma()
return moves
def write(self, vals):
result = super().write(vals)
if 'state' in vals and vals.get('state') == 'posted':
self._fp_link_to_rma()
return result
def _fp_link_to_rma(self):
Rma = self.env['fusion.plating.rma'].sudo()
for move in self:
if move.move_type != 'out_refund':
continue
if not move.invoice_origin:
continue
candidate = Rma.search([
('sale_order_id.name', '=', move.invoice_origin),
('resolution_type', '=', 'refund'),
('refund_invoice_id', '=', False),
('state', 'in', ('resolving', 'triaged')),
], limit=1)
if not candidate:
continue
candidate.refund_invoice_id = move.id
candidate.state = 'resolved'
candidate.message_post(
body=Markup(
'Refund credit note <b>%s</b> linked back to this RMA. '
'Marked Resolved.'
) % move.name,
message_type='comment',
subtype_xmlid='mail.mt_note',
)
candidate._fire_rma_notification('rma_resolved')