# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# fp.job extension — cross-module fields that couldn't live in core
# because their target models are in dependent modules. Per spec §5.1
# this module is the umbrella that re-bundles the cross-module
# extensions for the native job flow.
#
# qc_check_id is deferred to Task 2.7 (the underlying QC model still
# lives in fusion_plating_bridge_mrp; we'll address its sourcing then).
import logging
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FpJob(models.Model):
_inherit = 'fp.job'
part_catalog_id = fields.Many2one(
'fp.part.catalog',
string='Part',
ondelete='restrict',
)
coating_config_id = fields.Many2one(
'fp.coating.config',
string='Coating Configuration',
ondelete='restrict',
)
customer_spec_id = fields.Many2one(
'fusion.plating.customer.spec',
string='Customer Spec',
ondelete='set null',
)
portal_job_id = fields.Many2one(
'fusion.plating.portal.job',
string='Portal Job',
ondelete='set null',
)
delivery_id = fields.Many2one(
'fusion.plating.delivery',
string='Delivery',
ondelete='set null',
)
override_ids = fields.One2many(
'fp.job.node.override',
'job_id',
string='Recipe Overrides',
)
# Phase 7 — migration idempotency key. Populated by
# scripts/migrate_to_fp_jobs.py to mark a fp.job as the mirror of a
# specific mrp.production. Used to skip already-migrated MOs on
# subsequent runs. Cleared after the 2-week shadow period.
legacy_mrp_production_id = fields.Integer(
string='Legacy MRP Production ID',
index=True,
help='Database id of the source mrp.production record this job '
'was migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)
# ------------------------------------------------------------------
# Smart-button counts (Feature A — operator workflow)
#
# Compute counts for each downstream model so the form view can
# render an oe_stat_button row similar to sale.order. Cross-module
# models are runtime-detected so this still works when one of the
# bridge modules is uninstalled.
# ------------------------------------------------------------------
sale_order_count = fields.Integer(compute='_compute_smart_counts')
delivery_count = fields.Integer(compute='_compute_smart_counts')
invoice_count = fields.Integer(compute='_compute_smart_counts')
payment_count = fields.Integer(compute='_compute_smart_counts')
quality_hold_count = fields.Integer(compute='_compute_smart_counts')
certificate_count = fields.Integer(compute='_compute_smart_counts')
timelog_count = fields.Integer(compute='_compute_smart_counts')
portal_job_count = fields.Integer(compute='_compute_smart_counts')
@api.depends(
'sale_order_id', 'delivery_id', 'portal_job_id', 'step_ids',
'step_ids.time_log_ids', 'origin', 'partner_id',
)
def _compute_smart_counts(self):
AccountMove = self.env.get('account.move')
AccountPayment = self.env.get('account.payment')
QualityHold = self.env.get('fusion.plating.quality.hold')
Certificate = self.env.get('fp.certificate')
for job in self:
job.sale_order_count = 1 if job.sale_order_id else 0
job.delivery_count = 1 if job.delivery_id else 0
job.portal_job_count = 1 if job.portal_job_id else 0
# Invoices via origin (the SO name)
if AccountMove is not None and job.origin:
job.invoice_count = AccountMove.search_count([
('invoice_origin', '=', job.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
])
else:
job.invoice_count = 0
# Payments — find invoices for this SO, then payments
# reconciled against them.
if (AccountMove is not None and AccountPayment is not None
and job.origin):
inv_ids = AccountMove.search([
('invoice_origin', '=', job.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
]).ids
if inv_ids:
job.payment_count = AccountPayment.search_count([
('reconciled_invoice_ids', 'in', inv_ids),
])
else:
job.payment_count = 0
else:
job.payment_count = 0
if QualityHold is not None:
job.quality_hold_count = QualityHold.search_count([
('x_fc_job_id', '=', job.id),
])
else:
job.quality_hold_count = 0
if Certificate is not None:
job.certificate_count = Certificate.search_count([
('x_fc_job_id', '=', job.id),
])
else:
job.certificate_count = 0
job.timelog_count = sum(
len(s.time_log_ids) for s in job.step_ids
)
# ------------------------------------------------------------------
# Smart-button actions
# ------------------------------------------------------------------
def action_view_sale_order(self):
self.ensure_one()
if not self.sale_order_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'sale.order',
'res_id': self.sale_order_id.id,
'view_mode': 'form',
'name': self.sale_order_id.name,
}
# All time logs across every step on this job — backs the Time Logs
# tab on the form so the manager sees the full labour audit without
# clicking into each step.
time_log_ids = fields.One2many(
'fp.job.step.timelog',
'job_id',
string='All Time Logs',
readonly=True,
)
# 2026-04-28 — link to the auto-created Sub 8 racking inspection so
# the job form can show a smart button + the manager can route into
# the inspection without leaving the job screen.
racking_inspection_ids = fields.One2many(
'fp.racking.inspection',
'x_fc_job_id',
string='Racking Inspections',
)
racking_inspection_id = fields.Many2one(
'fp.racking.inspection',
string='Racking Inspection',
compute='_compute_racking_inspection',
store=False,
help='The single racking inspection scoped to this job (Sub 8 '
'enforces uniqueness). Smart button on the form routes here.',
)
# Computed alongside racking_inspection_id so views can render the
# state badge without needing a related-on-non-stored field (which
# the ORM rejects). Selection mirrors fp.racking.inspection.state.
racking_inspection_state = fields.Selection(
[('draft', 'Draft'),
('inspecting', 'Inspecting'),
('done', 'Done'),
('discrepancy_flagged', 'Discrepancy Flagged')],
string='Racking Inspection Status',
compute='_compute_racking_inspection',
store=False,
)
@api.depends('racking_inspection_ids', 'racking_inspection_ids.state')
def _compute_racking_inspection(self):
for job in self:
ri = job.racking_inspection_ids[:1]
job.racking_inspection_id = ri
job.racking_inspection_state = ri.state if ri else False
def action_view_racking_inspection(self):
"""Open the racking inspection. Auto-create if missing (e.g. job
was created before Sub 8 shipped, or auto-create silently failed
at action_confirm time)."""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
from odoo.exceptions import UserError
raise UserError(_(
'Sub 8 racking inspection module not installed. '
'Install fusion_plating_receiving to enable.'
))
if not self.racking_inspection_id:
self._fp_create_racking_inspection()
self.invalidate_recordset(['racking_inspection_ids'])
ri = self.racking_inspection_id or self.racking_inspection_ids[:1]
if not ri:
from odoo.exceptions import UserError
raise UserError(_('Could not auto-create racking inspection.'))
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.racking.inspection',
'res_id': ri.id,
'view_mode': 'form',
'target': 'current',
'name': _('Racking Inspection — %s') % self.name,
}
def action_view_steps(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step',
'view_mode': 'list,form',
'domain': [('job_id', '=', self.id)],
'name': 'Steps — %s' % self.name,
'context': {'default_job_id': self.id},
}
def action_open_move_wizard(self):
"""Header button — opens the Move wizard pre-filled with the
currently in-progress (or most recently in-progress) step as the
from-step. Lets the manager move the job forward without first
clicking into a specific step row.
"""
self.ensure_one()
active_step = self.step_ids.filtered(
lambda s: s.state == 'in_progress'
)[:1]
if not active_step:
active_step = self.step_ids.filtered(
lambda s: s.state in ('paused', 'ready')
).sorted('sequence')[:1]
if not active_step:
raise UserError(_(
'No in-progress, paused, or ready step found on this job. '
'Either every step is done or the job is still in draft.'
))
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.move.wizard',
'view_mode': 'form',
'target': 'new',
'name': _('Move Step — %s') % active_step.name,
'context': {
'default_from_step_id': active_step.id,
'default_job_id': self.id,
},
}
def action_print_traveller(self):
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_traveller'
).report_action(self)
def action_print_wo_detail(self):
"""Print the Steelhead-style Work Order Detail PDF — chronological
chain-of-custody + per-step inputs + Certified By page. Use this
as the AS9100/Nadcap shippable audit document.
"""
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_wo_detail'
).report_action(self)
def action_view_deliveries(self):
self.ensure_one()
if not self.delivery_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.delivery',
'res_id': self.delivery_id.id,
'view_mode': 'form',
'name': self.delivery_id.name,
}
def action_view_invoices(self):
self.ensure_one()
if not self.origin:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'account.move',
'view_mode': 'list,form',
'domain': [
('invoice_origin', '=', self.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
],
'name': 'Invoices — %s' % self.name,
}
def action_view_payments(self):
self.ensure_one()
if not self.origin:
return {'type': 'ir.actions.act_window_close'}
AccountMove = self.env.get('account.move')
if AccountMove is None:
return {'type': 'ir.actions.act_window_close'}
inv_ids = AccountMove.search([
('invoice_origin', '=', self.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
]).ids
return {
'type': 'ir.actions.act_window',
'res_model': 'account.payment',
'view_mode': 'list,form',
'domain': (
[('reconciled_invoice_ids', 'in', inv_ids)]
if inv_ids else [('id', '=', 0)]
),
'name': 'Payments — %s' % self.name,
}
def action_view_quality_holds(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.hold',
'view_mode': 'list,form',
'domain': [('x_fc_job_id', '=', self.id)],
'name': 'Quality Holds — %s' % self.name,
'context': {'default_x_fc_job_id': self.id},
}
def action_view_certificates(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.certificate',
'view_mode': 'list,form',
'domain': [('x_fc_job_id', '=', self.id)],
'name': 'Certificates — %s' % self.name,
'context': {'default_x_fc_job_id': self.id},
}
def action_view_timelogs(self):
self.ensure_one()
step_ids = self.step_ids.ids
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.timelog',
'view_mode': 'list,form',
'domain': (
[('step_id', 'in', step_ids)]
if step_ids else [('id', '=', 0)]
),
'name': 'Time Logs — %s' % self.name,
}
def action_view_portal_job(self):
self.ensure_one()
if not self.portal_job_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.portal.job',
'res_id': self.portal_job_id.id,
'view_mode': 'form',
'name': self.portal_job_id.name,
}
def write(self, vals):
"""Write hook: when qty_scrapped INCREASES, auto-spawn a
fusion.plating.quality.hold for the scrapped delta. AS9100 /
Nadcap need a disposition record per scrap event — without
this the operator silently bumps qty_scrapped, no paper trail,
auditor can't reconstruct what happened.
Idempotent per write: one hold per increase event. Operator
fills hold_reason + description on the spawned record.
"""
from markupsafe import Markup as _Markup
scrap_deltas = {}
if 'qty_scrapped' in vals:
new = vals['qty_scrapped'] or 0
for job in self:
old = job.qty_scrapped or 0
if new > old:
scrap_deltas[job.id] = (old, new)
result = super().write(vals)
if not scrap_deltas:
return result
Hold = (self.env['fusion.plating.quality.hold']
if 'fusion.plating.quality.hold' in self.env else None)
if Hold is None:
return result
Facility = self.env['fusion.plating.facility']
for job in self:
if job.id not in scrap_deltas:
continue
old, new = scrap_deltas[job.id]
delta = new - old
facility = job.facility_id or Facility.search([
('company_id', '=', job.company_id.id),
], limit=1) or Facility.search([], limit=1)
part_ref = (
job.part_catalog_id.part_number if job.part_catalog_id
else job.product_id.default_code or job.name
)
# When the scrap was bumped from the tablet, the operator
# was prompted for a reason and we passed it via context as
# `fp_scrap_reason` (see /fp/shopfloor/bump_qty_scrapped).
# Prepend that reason to the description so the audit row
# captures what the operator actually typed instead of the
# generic "OPERATOR: replace this text..." placeholder.
scrap_reason = self.env.context.get('fp_scrap_reason')
if scrap_reason:
description = _(
'Operator reason: %s\n\n'
'Auto-spawned from job %s scrap update by %s: '
'qty_scrapped went from %g to %g (delta %g).'
) % (scrap_reason, job.name, self.env.user.name, old, new, delta)
else:
description = _(
'Auto-spawned from job %s scrap update by %s: '
'qty_scrapped went from %g to %g (delta %g). '
'OPERATOR: replace this text with the actual '
'reason (drop / contamination / out-of-spec / etc).'
) % (job.name, self.env.user.name, old, new, delta)
try:
hold = Hold.create({
'job_id': job.id,
'part_ref': (part_ref or job.name)[:64],
'qty_on_hold': int(delta),
'qty_original': int(job.qty or 0),
'mark_for_scrap': True,
'hold_reason': 'other',
'description': description,
'facility_id': facility.id if facility else False,
})
job.message_post(body=_Markup(_(
'⚠️ Scrap auto-Hold spawned: %s for %g part(s). '
'Operator must update description with the cause.'
)) % (hold.name, delta))
except Exception as e:
_logger.warning(
'Job %s: failed to auto-spawn scrap hold: %s',
job.name, e,
)
return result
def action_sync_qty_from_so(self):
"""Pull the SO qty into the job's qty field after a mid-job
SO line edit. Posts chatter so the audit trail captures who
synced + what the previous value was.
Manual action because qty changes mid-job have physical-world
consequences (rack more parts, stop early, scrap excess) — the
supervisor must explicitly acknowledge by clicking the button.
"""
from markupsafe import Markup
for job in self:
if not job.sale_order_id:
continue
so_qty = sum(job.sale_order_id.order_line.mapped('product_uom_qty'))
old = job.qty
if abs(old - so_qty) < 0.0001:
continue
job.qty = so_qty
job.message_post(body=Markup(_(
'Job qty synced from SO by %s: %g → %g (Δ %+g). '
'Operator: confirm physical scope matches.'
)) % (self.env.user.name, old, so_qty, so_qty - old))
return True
# ------------------------------------------------------------------
# Recipe → fp.job.step generation (Task 2.4)
#
# Native port of fusion_plating_bridge_mrp's
# _generate_workorders_from_recipe. Walks the recipe tree, creates
# one fp.job.step per 'operation' node, formats child 'step' nodes
# as step instructions on chatter, respects opt-in/out overrides
# from fp.job.node.override.
#
# Adaptations from the original:
# - Creates fp.job.step (not mrp.workorder)
# - Maps fusion.plating.work.center → fp.work.centre via code
# fallback (no forward link exists yet)
# - Uses native field names (job_id, work_centre_id, etc.)
# - Drops work_role_id (not on fp.job.step yet — Task 2.6+)
# - Drops _fp_autofill_default_equipment (not yet on step)
# ------------------------------------------------------------------
def _generate_steps_from_recipe(self):
"""Generate fp.job.step records from the assigned recipe.
Walks the recipe tree, creates one step per 'operation' node,
and formats child 'step' nodes as step instructions on the
chatter. Respects opt-in/out overrides from override_ids.
"""
Step = self.env['fp.job.step']
Node = self.env['fusion.plating.process.node']
for job in self:
if not job.recipe_id:
continue # No recipe assigned
if job.step_ids:
continue # Steps already exist — don't duplicate
# Build lookup of overrides keyed by node ID
override_map = {ov.node_id.id: ov.included for ov in job.override_ids}
# Start-at-node: if set, the allowed set is the union of:
# 1. start_node and all its descendants
# 2. each ancestor of start_node
# 3. at each ancestor level, any LATER-sequence sibling and
# all of its descendants
start_node = job.start_at_node_id
allowed_ids = None # None = include everything
if start_node:
descendants = Node.search([('id', 'child_of', start_node.id)])
allowed_ids = set(descendants.ids)
cur = start_node
while cur.parent_id:
parent = cur.parent_id
allowed_ids.add(parent.id)
later_sibs = parent.child_ids.filtered(
lambda n: n.sequence > cur.sequence
)
for sib in later_sibs:
sib_descendants = Node.search([
('id', 'child_of', sib.id),
])
allowed_ids |= set(sib_descendants.ids)
cur = parent
step_vals_list = []
wo_steps = {} # {sequence: instruction text}
seq_counter = [10]
def _is_node_included(node):
"""Determine if a node should be included based on
opt-in/out logic, per-job overrides, and start-at-node
filter.
"""
nid = node.id
if allowed_ids is not None and nid not in allowed_ids:
return False
opt = node.opt_in_out or 'disabled'
if opt == 'disabled':
return True
if nid in override_map:
return override_map[nid]
if opt == 'opt_in':
return False # Default excluded
return True # opt_out → default included
def _resolve_work_centre(legacy_wc):
"""Map fusion.plating.work.center → fp.work.centre.
The legacy work-centre model does not (yet) have a forward
link to the new fp.work.centre. Try a forward link
(x_fc_fp_work_centre_id) if some bridge module added one;
otherwise fall back to a code lookup.
"""
if not legacy_wc:
return self.env['fp.work.centre']
# Forward link, if any
if (
'x_fc_fp_work_centre_id' in legacy_wc._fields
and legacy_wc.x_fc_fp_work_centre_id
):
return legacy_wc.x_fc_fp_work_centre_id
# Code fallback (legacy code is unique-per-facility,
# native code is globally unique — first match wins)
if legacy_wc.code:
found = self.env['fp.work.centre'].search(
[('code', '=', legacy_wc.code)], limit=1,
)
if found:
return found
return self.env['fp.work.centre']
def walk_node(node):
if not _is_node_included(node):
return
if node.node_type == 'operation':
work_centre = _resolve_work_centre(node.work_center_id)
if not work_centre:
_logger.warning(
'Job %s: operation "%s" has no mapped fp.work.centre — '
'creating step without work centre.',
job.name, node.name,
)
# Collect step instructions from child 'step' nodes
instructions = []
step_num = 1
for child in node.child_ids.sorted('sequence'):
if child.node_type == 'step' and _is_node_included(child):
line = '%d. %s' % (step_num, child.name)
if child.estimated_duration:
line += ' (%.0f min)' % child.estimated_duration
instructions.append(line)
step_num += 1
# Map recipe_node.default_kind → step.kind so the
# downstream gates (Sub 8 racking soft-gate, Policy B
# contract-review gate) work even when the step gets
# renamed by the customer (e.g. "Hang on Bar" instead
# of "Racking"). Without this, gate detection falls
# back to fragile name matching.
_NODE_KIND_TO_STEP_KIND = {
'cleaning': 'wet',
'etch': 'wet',
'rinse': 'wet',
'plate': 'wet',
'dry': 'wet',
'wbf_test': 'wet',
'bake': 'bake',
'mask': 'mask',
'demask': 'mask',
'racking': 'rack',
'derack': 'rack',
'inspect': 'inspect',
'final_inspect': 'inspect',
'contract_review': 'other',
'gating': 'other',
'ship': 'other',
}
step_kind = 'other'
node_kind = (
node.default_kind
if 'default_kind' in node._fields else None
)
if node_kind and node_kind in _NODE_KIND_TO_STEP_KIND:
step_kind = _NODE_KIND_TO_STEP_KIND[node_kind]
vals = {
'job_id': job.id,
'name': node.name,
'work_centre_id': work_centre.id if work_centre else False,
'duration_expected': node.estimated_duration or 0.0,
'sequence': seq_counter[0],
'recipe_node_id': node.id,
'kind': step_kind,
}
if node.estimated_duration:
vals['dwell_time_minutes'] = node.estimated_duration
# Pull thickness target from the coating config when
# this is a plating step (matched by node name keyword).
coating = job.coating_config_id
name_l = (node.name or '').lower()
is_plating_node = (
'plat' in name_l or 'nickel' in name_l
or 'chrome' in name_l or 'anodiz' in name_l
)
if coating and is_plating_node:
if (
'thickness_max' in coating._fields
and coating.thickness_max
):
vals['thickness_target'] = coating.thickness_max
if (
'thickness_uom' in coating._fields
and coating.thickness_uom
):
# fp.coating.config uses long-form uom names
# (mils / microns / inches); fp.job.step uses
# short codes (mil / um / inch). Map between
# them. Unknown values fall through to the
# step's default ('um').
_UOM_MAP = {
'mils': 'mil',
'mil': 'mil',
'microns': 'um',
'micron': 'um',
'um': 'um',
'inches': 'inch',
'inch': 'inch',
'in': 'inch',
}
mapped = _UOM_MAP.get(coating.thickness_uom)
if mapped:
vals['thickness_uom'] = mapped
step_vals_list.append(vals)
if instructions:
wo_steps[seq_counter[0]] = '\n'.join(instructions)
seq_counter[0] += 10
elif node.node_type in ('recipe', 'sub_process'):
for child in node.child_ids.sorted('sequence'):
walk_node(child)
# 'step' nodes at top level are handled by their parent operation
# Walk from recipe root
walk_node(job.recipe_id)
# Bulk create
if step_vals_list:
created = Step.create(step_vals_list)
for step in created:
instr_text = wo_steps.get(step.sequence)
if instr_text:
step.message_post(
body=Markup(
'Recipe steps:
%s' ) % instr_text, subtype_xmlid='mail.mt_note', ) job.message_post( body=('%d steps generated from recipe "%s".') % ( len(step_vals_list), job.recipe_id.name, ), ) return True # ------------------------------------------------------------------ # UI — Process Tree client action (Phase 6) # ------------------------------------------------------------------ def action_open_process_tree(self): """Open the OWL process-tree visualization for this job. Launches the fp_process_tree client action (defined in fusion_plating_shopfloor) with job_id in context. The component fetches /fp/shopfloor/process_tree and renders the recipe -> sub_process -> operation hierarchy as cards with per-step state badges. Consolidated 2026-04-24: this points at the canonical shopfloor client action; the parallel fp_job_process_tree was removed. """ self.ensure_one() return { 'type': 'ir.actions.client', 'tag': 'fp_process_tree', 'context': {'job_id': self.id}, 'name': 'Process Tree — %s' % (self.name or ''), 'target': 'current', } # ------------------------------------------------------------------ # Lifecycle hooks (Tasks 2.6, 2.7, 2.8) # # On confirm: create the portal-job mirror record and (when the # customer requires QC) a fusion.plating.quality.check. # On done: create a draft fusion.plating.delivery and best-effort # trigger fp.certificate auto-generation. # # The QC and certificate models live in modules this module does NOT # depend on by design (bridge_mrp). We runtime-detect those models so # the hooks degrade gracefully when those modules are absent. # ------------------------------------------------------------------ def action_confirm(self): result = super().action_confirm() # During migration, lifecycle side-effects are skipped — the # migration script directly rebinds existing portal/QC/inspection # records via x_fc_job_id. See scripts/migrate_to_fp_jobs.py. if self.env.context.get('fp_jobs_migration'): return result for job in self: # Auto-generate steps from the recipe — was previously only # called by seed scripts, which meant real-life confirmed # jobs sat with zero operations. Idempotent: the generator # short-circuits when steps already exist. if job.recipe_id and not job.step_ids: job._generate_steps_from_recipe() # Promote freshly-generated 'pending' steps to 'ready' so the # operator has a Start button when they open the job. Without # this the floor stalls — every step is parked in pending with # no UI affordance to move it forward. pending_steps = job.step_ids.filtered( lambda s: s.state == 'pending' ) if pending_steps: pending_steps.write({'state': 'ready'}) # 2026-04-28 — auto-populate facility_id + manager_id so the # job header surfaces them on the form. Page-1 audit found # both empty on confirmed jobs. job._fp_autofill_facility_and_manager() job._fp_create_portal_job() job._fp_create_qc_check_if_needed() job._fp_create_racking_inspection() job._fp_fire_notification('job_confirmed') return result def _fp_autofill_facility_and_manager(self): """Populate facility_id + manager_id on confirm if empty. Resolution order: facility_id — 1. Already set → leave alone. 2. First step with a work_centre that has a facility → use it. 3. Recipe's process_type → facility (if process_type carries one). 4. Single-facility company → use that one. manager_id — 1. Already set → leave alone. 2. Confirming user IS in the Plating Manager group → use them. 3. Sale order user_id (the salesperson who confirmed the SO). 4. The customer's account manager (partner.user_id). 5. Leave blank — no sensible default. """ self.ensure_one() # ---- facility_id ---- if not self.facility_id: facility = False for s in self.step_ids: if s.work_centre_id and 'facility_id' in s.work_centre_id._fields: facility = s.work_centre_id.facility_id if facility: break if not facility and self.recipe_id and 'process_type_id' in self.recipe_id._fields: pt = self.recipe_id.process_type_id if pt and 'facility_id' in pt._fields: facility = pt.facility_id if not facility: Facility = self.env.get('fusion.plating.facility') if Facility is not None: facilities = Facility.search([ ('company_id', '=', self.company_id.id), ]) if len(facilities) == 1: facility = facilities if facility: self.facility_id = facility.id self.message_post(body=_( 'Facility auto-set on confirm: %s' ) % facility.display_name) # ---- manager_id ---- if not self.manager_id: mgr = False ManagerGroup = self.env.ref( 'fusion_plating.group_fusion_plating_manager', raise_if_not_found=False, ) if ManagerGroup and self.env.user in ManagerGroup.user_ids: mgr = self.env.user elif self.sale_order_id and self.sale_order_id.user_id: mgr = self.sale_order_id.user_id elif self.partner_id and self.partner_id.user_id: mgr = self.partner_id.user_id if mgr: self.manager_id = mgr.id self.message_post(body=_( 'Plating Manager auto-set on confirm: %s' ) % mgr.name) def _fp_create_racking_inspection(self): """Auto-create a draft racking inspection on job confirm. Phase 9 — production_id is now optional on fp.racking.inspection, so we always create one bound by `x_fc_job_id`. When the job is also linked to an MO (legacy bridge_mrp coexistence), populate production_id too so legacy reports keep working. Idempotent — if an inspection already exists for this job, skip. """ self.ensure_one() if 'fp.racking.inspection' not in self.env: return Inspection = self.env['fp.racking.inspection'].sudo() if 'x_fc_job_id' not in Inspection._fields: # Schema not yet upgraded — skip. return existing = Inspection.search([ ('x_fc_job_id', '=', self.id), ], limit=1) if existing: return # Phase 6 (Sub 11) — production_id retired; bind by x_fc_job_id only. vals = {'x_fc_job_id': self.id} try: Inspection.create(vals) except Exception as e: _logger.warning( "Job %s: failed to auto-create racking inspection: %s", self.name, e, ) def _fp_create_portal_job(self): """Create the fusion.plating.portal.job mirror record.""" self.ensure_one() if self.portal_job_id: return # already exists — idempotent Portal = self.env['fusion.plating.portal.job'].sudo() portal = Portal.create({ 'name': self.name, 'partner_id': self.partner_id.id, 'state': 'in_progress', 'x_fc_job_id': self.id, }) self.portal_job_id = portal.id def _fp_create_qc_check_if_needed(self): """If customer has x_fc_requires_qc=True, spawn a QC check via the canonical fp.quality.check.create_for_job() entry point. Sub 11 — model relocated from bridge_mrp to fusion_plating_quality. create_for_job resolves the template (customer-specific or default), clones every template line, returns an existing record if one is already open, and posts a chatter trail. """ self.ensure_one() partner = self.partner_id wants_qc = ( 'x_fc_requires_qc' in partner._fields and partner.x_fc_requires_qc ) if not wants_qc: return if 'fusion.plating.quality.check' not in self.env: return QC = self.env['fusion.plating.quality.check'] try: QC.create_for_job(self) except Exception as e: _logger.warning( "Job %s: create_for_job failed: %s", self.name, e, ) # ------------------------------------------------------------------ # button_mark_done — Task 2.8 # ------------------------------------------------------------------ def button_mark_done(self): """Transition the job to 'done' and trigger downstream side effects. - Blocks if any step is not done/skipped (manager bypass via context key `fp_skip_step_gate=True`). Compliance: AS9100 / Nadcap require evidence that every recipe step ran. Without this guard an operator could close a job with zero work. - Blocks if customer requires QC and the QC check isn't passed (manager bypass via context key `fp_skip_qc_gate=True`) - Sets state='done', date_finished=now - Auto-creates a draft fusion.plating.delivery - Triggers certificate auto-generation (best-effort) """ # During migration, side-effects are skipped — see action_confirm. skip_side_effects = self.env.context.get('fp_jobs_migration') skip_qc_gate = self.env.context.get('fp_skip_qc_gate') skip_step_gate = self.env.context.get('fp_skip_step_gate') QC = self.env['fusion.plating.quality.check'] \ if 'fusion.plating.quality.check' in self.env else None for job in self: if job.state == 'done': continue if job.state == 'cancelled': raise UserError( "Job %s is cancelled — cannot mark done." % job.name ) # Step-completion gate: every step must be done (or explicitly # skipped, once button_skip is implemented). Without this # guard operators can close a recipe-driven job with zero # actual work logged. Manager bypass via context. if not skip_step_gate and job.step_ids: # `skipped` and `cancelled` count as terminal — operator # explicitly opted those out (skipped) or killed them # (cancelled). Only steps still in pending/ready/in_progress/ # paused block job close. undone = job.step_ids.filtered( lambda s: s.state not in ('done', 'skipped', 'cancelled') ) if undone: raise UserError(_( "Job %s cannot be marked Done — %d/%d step(s) " "are not finished:\n %s\n\nWalk each step on " "the tablet (or skip / cancel opt-in steps)." ) % ( job.name, len(undone), len(job.step_ids), '\n '.join( f'#{s.sequence} {s.name} ({s.state})' for s in undone[:5] ), )) # Bake-window gate (compliance — AS9100 / Nadcap): if any # auto-spawned bake.window is still awaiting_bake OR # bake_in_progress, the bake hasn't been documented and # parts cannot ship. Without this guard a careless # operator closes the job, parts ship, three weeks later # a field failure surfaces and the auditor asks for the # bake record that doesn't exist. Manager bypass via # fp_skip_bake_gate=True for documented customer deviation. skip_bake_gate = self.env.context.get('fp_skip_bake_gate') BW = (self.env['fusion.plating.bake.window'] if 'fusion.plating.bake.window' in self.env else None) if not skip_bake_gate and BW is not None: pending_bw = BW.sudo().search([ ('part_ref', '=', job.name), ('state', 'in', ('awaiting_bake', 'bake_in_progress')), ]) if pending_bw: raise UserError(_( "Job %s cannot be marked Done — bake window " "still pending:\n %s\n\nBake hydrogen " "embrittlement relief on the parts (start + " "end the bake on the bake.window record), then " "close the job. Manager override available for " "documented customer deviation." ) % ( job.name, '\n '.join( f'{bw.name} (state={bw.state}, ' f'required_by={bw.bake_required_by})' for bw in pending_bw[:5] ), )) # Qty reconciliation gate: qty_done + qty_scrapped must # equal qty when the job closes. Without this an operator # can ship "5 of 5" while only 4 are actually plated + # 1 contaminated, with no record of the missing piece. # Manager bypass via fp_skip_qty_reconcile=True (e.g. when # qty tracking truly doesn't apply). skip_qty_gate = self.env.context.get('fp_skip_qty_reconcile') if not skip_qty_gate and job.qty: accounted = (job.qty_done or 0) + (job.qty_scrapped or 0) if abs(accounted - job.qty) > 0.0001: raise UserError(_( "Job %s qty mismatch — ordered %g, but qty_done " "(%g) + qty_scrapped (%g) = %g. Update Quantity " "Completed and Quantity Scrapped on the job " "header so they sum to %g before closing." ) % ( job.name, job.qty, job.qty_done or 0, job.qty_scrapped or 0, accounted, job.qty, )) # QC gate: customers flagged x_fc_requires_qc must have a # passed QC before the job closes. AS9100 / Nadcap compliance. if QC and not skip_qc_gate \ and 'x_fc_requires_qc' in job.partner_id._fields \ and job.partner_id.x_fc_requires_qc: blocking_qc = QC.search([ ('job_id', '=', job.id), ('state', 'not in', ('passed',)), ], order='create_date desc', limit=1) if blocking_qc: raise UserError(_( "Job %s cannot be marked Done — QC check %s is in " "state '%s'. Pass the QC checklist first, or have " "a manager override via the bypass button." ) % (job.name, blocking_qc.name, blocking_qc.state)) # No QC at all? Spawn one now (idempotent) and require # the operator to walk it before retrying. no_qc = not QC.search_count([('job_id', '=', job.id)]) if no_qc: QC.create_for_job(job) raise UserError(_( "Job %s requires QC. A new check has been created — " "complete it before marking the job Done." ) % job.name) job.state = 'done' job.date_finished = fields.Datetime.now() if not skip_side_effects: job._fp_create_delivery() job._fp_create_certificates() job._fp_fire_notification('job_complete') return True # ------------------------------------------------------------------ # Notifications dispatch (Phase 4) # # Fires fp.notification.template records whose trigger_event matches # the given event name. Best-effort: silently skips if the # fusion_plating_notifications module is not installed (model not # registered) and logs (without raising) on any send failure so the # job lifecycle is never blocked by an email problem. # ------------------------------------------------------------------ def _fp_fire_notification(self, event): """Best-effort notification dispatch for fp.job lifecycle events. Looks up fp.notification.template records with the matching trigger_event and dispatches via the central _dispatch helper provided by fusion_plating_notifications. Silently no-ops when that module isn't installed. """ self.ensure_one() if 'fp.notification.template' not in self.env: return Template = self.env['fp.notification.template'].sudo() try: # The notifications module exposes a model-level _dispatch # helper that handles template lookup, recipient resolution # (Sub 6 contact routing), attachment rendering, and audit # logging in one go. Pass partner explicitly since fp.job's # partner_id is the customer. Template._dispatch(event, self, partner=self.partner_id) except Exception as e: _logger.warning( "Job %s: notification %s dispatch failed: %s", self.name, event, e, ) def _fp_create_delivery(self): """Create a draft fusion.plating.delivery linked to this job. Sets BOTH x_fc_job_id (Many2one — strong link) AND job_ref (Char — soft reference). Downstream code is split: smart-button navigation reads x_fc_job_id, but the box-parity check, RMA refund auto-link, and the legacy notification dispatch all look up by job_ref. Setting both ends keeps every consumer happy. """ self.ensure_one() if self.delivery_id: return Delivery = self.env['fusion.plating.delivery'].sudo() vals = {'partner_id': self.partner_id.id} if 'x_fc_job_id' in Delivery._fields: vals['x_fc_job_id'] = self.id if 'job_ref' in Delivery._fields: vals['job_ref'] = self.name if 'x_fc_job_id' not in Delivery._fields \ and 'job_ref' not in Delivery._fields: _logger.warning( "Job %s: fusion.plating.delivery has no job link field; " "delivery created without job back-reference.", self.name, ) try: delivery = Delivery.create(vals) self.delivery_id = delivery.id except Exception as e: _logger.warning( "Job %s: failed to auto-create delivery: %s", self.name, e, ) def _fp_create_certificates(self): """Trigger cert auto-create on job done. Pre-populates ALL the fields a CoC issuer needs so Tom can hit Issue without filling 6 fields first: - partner_id from job - spec_reference from coating (required by action_issue) - part_number from part_catalog - quantity_shipped from job qty (minus scrap) - po_number from sale_order - sale_order_id link - x_fc_job_id link if the field exists Idempotent — if a cert already exists for this job, skip (prevents dupes when button_mark_done is re-run after a manager bypass). """ self.ensure_one() if 'fp.certificate' not in self.env: return Cert = self.env['fp.certificate'].sudo() # Idempotency: don't double-create on retry. existing_dom = [] if 'x_fc_job_id' in Cert._fields: existing_dom.append(('x_fc_job_id', '=', self.id)) elif self.sale_order_id and 'sale_order_id' in Cert._fields: existing_dom.append(('sale_order_id', '=', self.sale_order_id.id)) if existing_dom: existing = Cert.search(existing_dom, limit=1) if existing: _logger.info( 'Job %s: cert %s already exists, skipping auto-create', self.name, existing.name, ) return try: vals = {'partner_id': self.partner_id.id} if 'certificate_type' in Cert._fields: vals['certificate_type'] = 'coc' if 'state' in Cert._fields: vals['state'] = 'draft' # Job + SO links. if 'x_fc_job_id' in Cert._fields: vals['x_fc_job_id'] = self.id elif 'job_id' in Cert._fields: vals['job_id'] = self.id if 'sale_order_id' in Cert._fields and self.sale_order_id: vals['sale_order_id'] = self.sale_order_id.id # Pre-fill from coating: the spec_reference is what action_issue # blocks on — without this every cert needs a manual edit. coating = self.coating_config_id if coating and 'spec_reference' in Cert._fields \ and getattr(coating, 'spec_reference', False): vals['spec_reference'] = coating.spec_reference # Pre-fill part_number from the part catalog if we have one. if 'part_number' in Cert._fields and self.part_catalog_id: vals['part_number'] = self.part_catalog_id.part_number or '' # Quantity shipped = job qty minus scrap. AS9100 wants the # actual count that left the shop, not the order count. if 'quantity_shipped' in Cert._fields: vals['quantity_shipped'] = int( (self.qty_done or self.qty or 0) - (self.qty_scrapped or 0) ) # PO number from the source SO. if 'po_number' in Cert._fields and self.sale_order_id \ and 'x_fc_po_number' in self.sale_order_id._fields: vals['po_number'] = self.sale_order_id.x_fc_po_number or '' # Customer job# → cert label (helps customer search). if 'customer_job_no' in Cert._fields and self.sale_order_id \ and 'x_fc_customer_job_number' in self.sale_order_id._fields: vals['customer_job_no'] = ( self.sale_order_id.x_fc_customer_job_number or '' ) # Process description from coating name. if 'process_description' in Cert._fields and coating: vals['process_description'] = coating.name or '' # Job # for shop-side reference. if 'entech_wo_number' in Cert._fields: vals['entech_wo_number'] = self.name or '' cert = Cert.create(vals) self.message_post(body=Markup(_( 'CoC %s auto-created (draft). Issuer should hit ' 'the Issue button on the certificate when ready to ship.' )) % cert.name) except Exception as e: _logger.warning( "Job %s: failed to auto-create cert: %s", self.name, e, ) class FpJobStep(models.Model): """Phase 7 — adds the migration idempotency key on fp.job.step. Populated by scripts/migrate_to_fp_jobs.py to mark a step as the mirror of a specific mrp.workorder. Used to skip already-migrated WOs on subsequent runs. """ _inherit = 'fp.job.step' legacy_mrp_workorder_id = fields.Integer( string='Legacy MRP Work Order ID', index=True, help='Database id of the source mrp.workorder this step was ' 'migrated from. Used by the migration script for ' 'idempotency. Cleared post-cutover.', )