Files
Odoo-Modules/fusion_plating/fusion_plating_jobs/models/fp_job.py
gsinghpal 3fdbeed813 feat(fusion_plating_jobs): Open Workspace smart button on fp.job form
Plan task P1.16. Header button on the fp.job form that opens the
JobWorkspace OWL client action focused on the current WO. Primary
entry point for techs before the Landing kanban (Phase 3) ships;
remains as a back-office shortcut after.

Hidden when state == 'draft' (no steps to work yet).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 21:53:19 -04:00

2109 lines
93 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# fp.job extension — cross-module fields that couldn't live in core
# because their target models are in dependent modules. Per spec §5.1
# this module is the umbrella that re-bundles the cross-module
# extensions for the native job flow.
#
# qc_check_id is deferred to Task 2.7 (the underlying QC model still
# lives in fusion_plating_bridge_mrp; we'll address its sourcing then).
import logging
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FpJob(models.Model):
_inherit = 'fp.job'
# ---- Tier 3 mirrors from sale.order -----------------------------
# Related (not stored) — pure display mirrors. Values may change on
# the SO after job confirm (e.g. customer changes carrier preference)
# and the WO should reflect the latest; related auto-follows.
x_fc_delivery_method = fields.Selection(
related='sale_order_id.x_fc_delivery_method',
string='Delivery Method',
readonly=True,
)
x_fc_ship_via = fields.Char(
related='sale_order_id.x_fc_ship_via',
string='Ship Via',
readonly=True,
)
x_fc_invoice_strategy = fields.Selection(
related='sale_order_id.x_fc_invoice_strategy',
string='Invoice Strategy',
readonly=True,
)
part_catalog_id = fields.Many2one(
'fp.part.catalog',
string='Part',
ondelete='restrict',
)
customer_spec_id = fields.Many2one(
'fusion.plating.customer.spec',
string='Specification',
ondelete='set null',
help='Customer / industry spec the job ships under. Auto-filled '
'from the SO line at job creation.',
)
portal_job_id = fields.Many2one(
'fusion.plating.portal.job',
string='Portal Job',
ondelete='set null',
)
delivery_id = fields.Many2one(
'fusion.plating.delivery',
string='Delivery',
ondelete='set null',
)
override_ids = fields.One2many(
'fp.job.node.override',
'job_id',
string='Recipe Overrides',
)
# Sub 13 — sequential enforcement. Mirrored from the recipe root so
# button_start on each step can read the policy without walking the
# node tree. Stored so a recipe author flipping the recipe's flag
# AFTER job generation does NOT change behaviour mid-run (jobs
# snapshot the policy at creation, not on the fly).
enforce_sequential = fields.Boolean(
related='recipe_id.enforce_sequential',
string='Enforce Sequential Order',
store=True,
readonly=True,
help='Snapshotted from the recipe at job creation. When True, '
'every step waits for its predecessors before it can start '
'(unless the step itself is flagged Parallel Start, or a '
'manager bypasses via context).',
)
# Phase 7 — migration idempotency key. Populated by
# scripts/migrate_to_fp_jobs.py to mark a fp.job as the mirror of a
# specific mrp.production. Used to skip already-migrated MOs on
# subsequent runs. Cleared after the 2-week shadow period.
legacy_mrp_production_id = fields.Integer(
string='Legacy MRP Production ID',
index=True,
help='Database id of the source mrp.production record this job '
'was migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)
# Display formatter — "WO # 00001" used everywhere on tablet/dashboard.
# The underlying `name` field stays untouched (WH/JOB/00001) so reports,
# emails, and back-office forms continue using their canonical name.
# System-wide sequence rename is a separate decision (see spec
# 2026-05-22-shopfloor-tablet-redesign-design §6.5).
display_wo_name = fields.Char(
compute='_compute_display_wo_name',
string='WO #',
help='Tablet/dashboard formatter — "WO # 00001" derived from name. '
'Underlying name field is unchanged.',
)
@api.depends('name')
def _compute_display_wo_name(self):
for job in self:
raw = (job.name or '').strip()
if not raw:
job.display_wo_name = ''
continue
# Take the last "/"-separated segment as the number portion.
# WH/JOB/00001 → 00001 ; WH/JOB/2026/00042 → 00042 ; 00123 → 00123
tail = raw.rsplit('/', 1)[-1]
job.display_wo_name = f'WO # {tail}'
# ------------------------------------------------------------------
# Sub 14 — Configurable workflow state (status bar milestone)
# ------------------------------------------------------------------
# workflow_state_id auto-advances along the highest passed milestone
# in fp.job.workflow.state's sequence order. Replaces the hardcoded
# state Selection on the form's statusbar.
workflow_state_id = fields.Many2one(
'fp.job.workflow.state',
string='Workflow Stage',
compute='_compute_workflow_state_id',
store=True,
readonly=True,
help='Highest workflow milestone this job has passed, computed '
'from step states + per-state trigger conditions. Updates '
'automatically — the operator never sets it.',
)
@api.depends(
'state',
'step_ids',
'step_ids.state',
'step_ids.kind',
'step_ids.recipe_node_id',
'step_ids.recipe_node_id.default_kind',
'step_ids.recipe_node_id.triggers_workflow_state_id',
'quality_hold_count',
'delivery_id',
'delivery_id.state',
'sale_order_id',
'sale_order_id.x_fc_receiving_status',
)
def _compute_workflow_state_id(self):
WS = self.env['fp.job.workflow.state']
all_states = WS.search([], order='sequence, id')
for job in self:
passed = WS.browse()
for ws in all_states:
# Highest-passed semantics: untagged / not-applicable
# states don't block the cascade. The bar reflects
# the furthest milestone the job has actually reached.
if ws._fp_is_passed_for_job(job):
passed = ws
job.workflow_state_id = passed
# ------------------------------------------------------------------
# Smart-button counts (Feature A — operator workflow)
#
# Compute counts for each downstream model so the form view can
# render an oe_stat_button row similar to sale.order. Cross-module
# models are runtime-detected so this still works when one of the
# bridge modules is uninstalled.
# ------------------------------------------------------------------
sale_order_count = fields.Integer(compute='_compute_smart_counts')
delivery_count = fields.Integer(compute='_compute_smart_counts')
invoice_count = fields.Integer(compute='_compute_smart_counts')
payment_count = fields.Integer(compute='_compute_smart_counts')
quality_hold_count = fields.Integer(compute='_compute_smart_counts')
certificate_count = fields.Integer(compute='_compute_smart_counts')
timelog_count = fields.Integer(compute='_compute_smart_counts')
portal_job_count = fields.Integer(compute='_compute_smart_counts')
# ------------------------------------------------------------------
# Milestone cascade (Phase 1) — drives the header-button replacement
# that fires when every recipe step reaches a terminal state. See
# docs/superpowers/specs/2026-05-12-job-milestone-cascade-design.md.
# ------------------------------------------------------------------
all_steps_terminal = fields.Boolean(
compute='_compute_all_steps_terminal',
store=True,
help='True ⇔ at least one step exists AND every step is in '
'done/skipped/cancelled. Used to swap the per-step '
'Finish & Next button for a milestone-advance button.',
)
@api.depends('step_ids', 'step_ids.state')
def _compute_all_steps_terminal(self):
for job in self:
if not job.step_ids:
job.all_steps_terminal = False
else:
job.all_steps_terminal = all(
s.state in ('done', 'skipped', 'cancelled')
for s in job.step_ids
)
def _resolve_required_cert_types(self):
"""Set of cert types this job must produce.
Priority: part.certificate_requirement wins; 'inherit' falls
back to partner-level send_coc / send_thickness_report flags.
'none' returns empty (commercial customer, no paperwork).
Unknown requirement codes default to {'coc'} as a safety net.
Bundling rule (2026-05-18 — Entech workflow): when a CoC is
wanted AND thickness is wanted, the thickness data is delivered
as page 2 of the CoC PDF (see _fp_merge_thickness_into_pdf),
so we return ONE cert ({'coc'}) instead of two. A standalone
thickness_report cert is only produced when thickness is wanted
WITHOUT a CoC — a rare edge case kept for completeness.
Action_issue's thickness-data gate enforces actual readings or
a Fischerscope PDF on the merged CoC.
"""
self.ensure_one()
req = (
self.part_catalog_id
and self.part_catalog_id.certificate_requirement
) or 'inherit'
if req == 'inherit':
want_coc = bool(self.partner_id.x_fc_send_coc)
want_thickness = bool(self.partner_id.x_fc_send_thickness_report)
if want_coc:
return {'coc'} # thickness gets merged in
if want_thickness:
return {'thickness_report'}
return set()
return {
'none': set(),
'coc': {'coc'},
'coc_thickness': {'coc'}, # bundled — thickness on page 2
}.get(req, {'coc'})
next_milestone_action = fields.Selection(
[
('mark_done', 'Mark Job Done'),
('issue_certs', 'Issue Certs'),
('schedule_delivery', 'Schedule Delivery'),
('mark_shipped', 'Mark Shipped'),
('closed', 'Closed'),
],
compute='_compute_next_milestone_action',
help='What the manager should click next once steps complete. '
'Drives the milestone-advance buttons on the form header. '
'False/empty while steps are still running.',
)
next_milestone_label = fields.Char(
compute='_compute_next_milestone_action',
help='Human label for the next-action button.',
)
@api.depends(
'all_steps_terminal',
'state',
'delivery_id',
'delivery_id.state',
)
def _compute_next_milestone_action(self):
"""Resolve next action in priority order:
1. NOT all_steps_terminal → False (Finish & Next stays)
2. state != 'done' → mark_done
3. ANY required draft cert → issue_certs
4. NO delivery or draft → schedule_delivery
5. delivery scheduled/transit → mark_shipped
6. otherwise (delivered) → closed
"""
labels = dict(self._fields['next_milestone_action'].selection)
for job in self:
if not job.all_steps_terminal:
job.next_milestone_action = False
job.next_milestone_label = ''
continue
if job.state != 'done':
job.next_milestone_action = 'mark_done'
elif job._fp_has_draft_required_certs():
job.next_milestone_action = 'issue_certs'
elif (not job.delivery_id
or job.delivery_id.state == 'draft'):
job.next_milestone_action = 'schedule_delivery'
elif job.delivery_id.state in ('scheduled', 'in_transit'):
job.next_milestone_action = 'mark_shipped'
else:
job.next_milestone_action = 'closed'
job.next_milestone_label = labels.get(
job.next_milestone_action, ''
)
def _fp_has_draft_required_certs(self):
"""True if at least one cert of a required type is still 'draft'.
Returns False when no certs are required (commercial customers).
"""
self.ensure_one()
if 'fp.certificate' not in self.env:
return False
required = self._resolve_required_cert_types()
if not required:
return False
Cert = self.env['fp.certificate']
dom = [
('certificate_type', 'in', list(required)),
('state', '=', 'draft'),
]
if 'x_fc_job_id' in Cert._fields:
dom.append(('x_fc_job_id', '=', self.id))
elif self.sale_order_id and 'sale_order_id' in Cert._fields:
dom.append(('sale_order_id', '=', self.sale_order_id.id))
else:
return False # can't link safely → don't block the cascade
return bool(Cert.search_count(dom))
def action_advance_next_milestone(self):
"""Single entry point bound to all four milestone header buttons.
Branches on next_milestone_action and delegates to the existing
business-logic method. Never invents new logic — just routes."""
self.ensure_one()
action_map = {
'mark_done': self.button_mark_done,
'issue_certs': self._action_open_draft_certs,
'schedule_delivery': self._action_open_draft_delivery,
'mark_shipped': self._action_mark_active_delivery_delivered,
}
fn = action_map.get(self.next_milestone_action)
if not fn:
raise UserError(_(
'No milestone action available for job %(j)s '
'(next=%(a)s).'
) % {
'j': self.name,
'a': self.next_milestone_action or 'none',
})
return fn()
def _action_open_draft_certs(self):
"""Open the Issue Certs wizard for this job's draft certs.
The wizard prompts for a Fischerscope upload + readings per cert
that needs thickness data (bundled CoC or standalone thickness
report). Pure CoC certs (no thickness needed) appear in the
wizard too and just need a Confirm click. Cleaner than the old
"list view → open each cert → click Issue" flow.
Falls back to the cert list view if the wizard model isn't
installed (defensive — should always exist when this module is).
"""
self.ensure_one()
Wizard = self.env.get('fp.cert.issue.wizard')
if Wizard is not None:
try:
return Wizard.open_for_job(self)
except UserError:
raise
except Exception as e:
_logger.warning(
"Job %s: cert issue wizard failed (%s) — "
"falling back to cert list.", self.name, e,
)
return {
'type': 'ir.actions.act_window',
'name': _('Draft Certificates — %s') % self.name,
'res_model': 'fp.certificate',
'view_mode': 'list,form',
'domain': [
('x_fc_job_id', '=', self.id),
('state', '=', 'draft'),
],
'target': 'current',
}
def _action_open_draft_delivery(self):
"""Open the linked delivery if it's still in draft state.
Falls back to the delivery list filtered to this job's
delivery if the state isn't draft (defensive)."""
self.ensure_one()
if self.delivery_id and self.delivery_id.state == 'draft':
return {
'type': 'ir.actions.act_window',
'name': _('Schedule Delivery — %s') % self.name,
'res_model': 'fusion.plating.delivery',
'res_id': self.delivery_id.id,
'view_mode': 'form',
'target': 'current',
}
return {
'type': 'ir.actions.act_window',
'name': _('Deliveries — %s') % self.name,
'res_model': 'fusion.plating.delivery',
'view_mode': 'list,form',
'domain': [('job_ref', '=', self.name)],
'target': 'current',
}
def _action_mark_active_delivery_delivered(self):
"""Call action_mark_delivered on the linked delivery if it's
in scheduled / in_transit. Posts to job chatter on success."""
self.ensure_one()
if (not self.delivery_id
or self.delivery_id.state not in ('scheduled', 'in_transit')):
raise UserError(_(
'No scheduled or in-transit delivery to mark shipped '
'for %s.'
) % self.name)
self.delivery_id.action_mark_delivered()
self.message_post(body=_(
'Delivery %s marked shipped via milestone cascade.'
) % self.delivery_id.name)
return True
@api.depends(
'sale_order_id', 'delivery_id', 'portal_job_id', 'step_ids',
'step_ids.time_log_ids', 'origin', 'partner_id',
)
def _compute_smart_counts(self):
AccountMove = self.env.get('account.move')
AccountPayment = self.env.get('account.payment')
QualityHold = self.env.get('fusion.plating.quality.hold')
Certificate = self.env.get('fp.certificate')
for job in self:
job.sale_order_count = 1 if job.sale_order_id else 0
job.delivery_count = 1 if job.delivery_id else 0
job.portal_job_count = 1 if job.portal_job_id else 0
# Invoices via origin (the SO name)
if AccountMove is not None and job.origin:
job.invoice_count = AccountMove.search_count([
('invoice_origin', '=', job.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
])
else:
job.invoice_count = 0
# Payments — find invoices for this SO, then payments
# reconciled against them.
if (AccountMove is not None and AccountPayment is not None
and job.origin):
inv_ids = AccountMove.search([
('invoice_origin', '=', job.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
]).ids
if inv_ids:
job.payment_count = AccountPayment.search_count([
('reconciled_invoice_ids', 'in', inv_ids),
])
else:
job.payment_count = 0
else:
job.payment_count = 0
if QualityHold is not None:
job.quality_hold_count = QualityHold.search_count([
('x_fc_job_id', '=', job.id),
])
else:
job.quality_hold_count = 0
if Certificate is not None:
job.certificate_count = Certificate.search_count([
('x_fc_job_id', '=', job.id),
])
else:
job.certificate_count = 0
job.timelog_count = sum(
len(s.time_log_ids) for s in job.step_ids
)
# ------------------------------------------------------------------
# Smart-button actions
# ------------------------------------------------------------------
def action_view_sale_order(self):
self.ensure_one()
if not self.sale_order_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'sale.order',
'res_id': self.sale_order_id.id,
'view_mode': 'form',
'name': self.sale_order_id.name,
}
# All time logs across every step on this job — backs the Time Logs
# tab on the form so the manager sees the full labour audit without
# clicking into each step.
time_log_ids = fields.One2many(
'fp.job.step.timelog',
'job_id',
string='All Time Logs',
readonly=True,
)
# 2026-04-28 — link to the auto-created Sub 8 racking inspection so
# the job form can show a smart button + the manager can route into
# the inspection without leaving the job screen.
racking_inspection_ids = fields.One2many(
'fp.racking.inspection',
'x_fc_job_id',
string='Racking Inspections',
)
racking_inspection_id = fields.Many2one(
'fp.racking.inspection',
string='Racking Inspection',
compute='_compute_racking_inspection',
store=False,
help='The single racking inspection scoped to this job (Sub 8 '
'enforces uniqueness). Smart button on the form routes here.',
)
# Computed alongside racking_inspection_id so views can render the
# state badge without needing a related-on-non-stored field (which
# the ORM rejects). Selection mirrors fp.racking.inspection.state.
racking_inspection_state = fields.Selection(
[('draft', 'Draft'),
('inspecting', 'Inspecting'),
('done', 'Done'),
('discrepancy_flagged', 'Discrepancy Flagged')],
string='Racking Inspection Status',
compute='_compute_racking_inspection',
store=False,
)
@api.depends('racking_inspection_ids', 'racking_inspection_ids.state')
def _compute_racking_inspection(self):
for job in self:
ri = job.racking_inspection_ids[:1]
job.racking_inspection_id = ri
job.racking_inspection_state = ri.state if ri else False
def action_view_racking_inspection(self):
"""Open the racking inspection. Auto-create if missing, or seed
lines from the SO if it exists but was created before line auto-
seeding shipped (the helper handles both cases idempotently)."""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
from odoo.exceptions import UserError
raise UserError(_(
'Sub 8 racking inspection module not installed. '
'Install fusion_plating_receiving to enable.'
))
# Always call the helper — it short-circuits for already-populated
# draft inspections and creates fresh ones when missing. This is
# also the entry point that backfills lines on inspections that
# pre-date the line-seeding feature.
self._fp_create_racking_inspection()
self.invalidate_recordset(['racking_inspection_ids'])
ri = self.racking_inspection_id or self.racking_inspection_ids[:1]
if not ri:
from odoo.exceptions import UserError
raise UserError(_('Could not auto-create racking inspection.'))
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.racking.inspection',
'res_id': ri.id,
'view_mode': 'form',
'target': 'current',
'name': _('Racking Inspection — %s') % self.name,
}
def action_view_steps(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step',
'view_mode': 'list,form',
'domain': [('job_id', '=', self.id)],
'name': 'Steps — %s' % self.name,
'context': {'default_job_id': self.id},
}
def action_open_workspace(self):
"""Open the JobWorkspace OWL client action focused on this job.
Spec: 2026-05-22-shopfloor-tablet-redesign — Phase 1 deliverable.
Used as the smart-button entry point before the Landing kanban
(Phase 3) is shipped, and stays as a back-office shortcut after.
"""
self.ensure_one()
return {
'type': 'ir.actions.client',
'tag': 'fp_job_workspace',
'name': self.display_wo_name or self.name,
'params': {'job_id': self.id},
'target': 'current',
}
def action_finish_current_step(self):
"""Steelhead-style header button: finish whatever's currently
in_progress and auto-start the next pending/ready step. If
nothing is running yet, start the lowest-sequence pending step
instead — operator's first click on a fresh job just begins
the line.
Sub 12e v4 — when button_start returns an action (e.g. the
QA-005 redirect for contract_review steps), propagate it so
the operator lands on the right page in ONE click instead of
two.
"""
self.ensure_one()
running = self.step_ids.filtered(lambda s: s.state == 'in_progress')[:1]
if running:
return running.action_finish_and_advance()
# No running step — kick off the first pending/ready one.
first = self.step_ids.filtered(
lambda s: s.state in ('pending', 'ready', 'paused')
).sorted('sequence')[:1]
if not first:
raise UserError(_(
'No runnable step found on this job — either every step '
'is done or the job is still in draft.'
))
result = first.with_context(
fp_skip_predecessor_check=True,
).button_start()
self.message_post(body=_(
'Started first step "%s".'
) % first.name)
# Propagate any action returned by button_start (e.g. the
# QA-005 redirect on a contract_review step). If it's just
# True/False (the normal case), fall back to True.
if isinstance(result, dict):
return result
return True
def action_open_move_wizard(self):
"""Original Move wizard — kept available for cross-station moves
and rework / scrap transfers. The simple "finish current → start
next" flow is now action_finish_current_step (header button).
Opens the wizard pre-filled with the currently in-progress (or
most recently in-progress) step as the from-step.
"""
self.ensure_one()
active_step = self.step_ids.filtered(
lambda s: s.state == 'in_progress'
)[:1]
if not active_step:
active_step = self.step_ids.filtered(
lambda s: s.state in ('paused', 'ready')
).sorted('sequence')[:1]
if not active_step:
raise UserError(_(
'No in-progress, paused, or ready step found on this job. '
'Either every step is done or the job is still in draft.'
))
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.move.wizard',
'view_mode': 'form',
'target': 'new',
'name': _('Move Step — %s') % active_step.name,
'context': {
'default_from_step_id': active_step.id,
'default_job_id': self.id,
},
}
def action_print_traveller(self):
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_traveller'
).report_action(self)
def action_print_sticker(self):
"""Print the 6x4" job-box identification sticker (logo + WO# + QR
+ part / customer / thickness / notes). Used at receiving and at
every move so the box is always identifiable on the floor."""
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_sticker'
).report_action(self)
def action_print_wo_detail(self):
"""Print the Steelhead-style Work Order Detail PDF — chronological
chain-of-custody + per-step inputs + Certified By page. Use this
as the AS9100/Nadcap shippable audit document.
"""
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_wo_detail'
).report_action(self)
def action_view_deliveries(self):
self.ensure_one()
if not self.delivery_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.delivery',
'res_id': self.delivery_id.id,
'view_mode': 'form',
'name': self.delivery_id.name,
}
def action_view_invoices(self):
self.ensure_one()
if not self.origin:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'account.move',
'view_mode': 'list,form',
'domain': [
('invoice_origin', '=', self.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
],
'name': 'Invoices — %s' % self.name,
}
def action_view_payments(self):
self.ensure_one()
if not self.origin:
return {'type': 'ir.actions.act_window_close'}
AccountMove = self.env.get('account.move')
if AccountMove is None:
return {'type': 'ir.actions.act_window_close'}
inv_ids = AccountMove.search([
('invoice_origin', '=', self.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
]).ids
return {
'type': 'ir.actions.act_window',
'res_model': 'account.payment',
'view_mode': 'list,form',
'domain': (
[('reconciled_invoice_ids', 'in', inv_ids)]
if inv_ids else [('id', '=', 0)]
),
'name': 'Payments — %s' % self.name,
}
def action_view_quality_holds(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.hold',
'view_mode': 'list,form',
'domain': [('x_fc_job_id', '=', self.id)],
'name': 'Quality Holds — %s' % self.name,
'context': {'default_x_fc_job_id': self.id},
}
def action_view_certificates(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.certificate',
'view_mode': 'list,form',
'domain': [('x_fc_job_id', '=', self.id)],
'name': 'Certificates — %s' % self.name,
'context': {'default_x_fc_job_id': self.id},
}
def action_view_timelogs(self):
self.ensure_one()
step_ids = self.step_ids.ids
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.timelog',
'view_mode': 'list,form',
'domain': (
[('step_id', 'in', step_ids)]
if step_ids else [('id', '=', 0)]
),
'name': 'Time Logs — %s' % self.name,
}
def action_view_portal_job(self):
self.ensure_one()
if not self.portal_job_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.portal.job',
'res_id': self.portal_job_id.id,
'view_mode': 'form',
'name': self.portal_job_id.name,
}
# Sub-portal state sync — see fusion_plating_portal/.../fp_portal_job.py
# `_fp_recompute_portal_state` for the rules. The mapping table that
# used to live here was replaced by the helper so shipment / invoice
# signals can't drift away from the WO state any more.
def write(self, vals):
"""Write hook: (a) when qty_scrapped INCREASES, auto-spawn a
fusion.plating.quality.hold for the scrapped delta — AS9100 /
Nadcap need a disposition record per scrap event. (b) when state
transitions, mirror to the linked fusion.plating.portal.job so
the customer-facing portal stays in sync with the shop floor.
Idempotent per write: one hold per increase event. Operator
fills hold_reason + description on the spawned record.
"""
from markupsafe import Markup as _Markup
scrap_deltas = {}
if 'qty_scrapped' in vals:
new = vals['qty_scrapped'] or 0
for job in self:
old = job.qty_scrapped or 0
if new > old:
scrap_deltas[job.id] = (old, new)
# Capture state changes before super().write() so we know which
# records actually transitioned (vs no-op writes).
state_changed_ids = set()
if 'state' in vals:
new_state = vals['state']
for job in self:
if job.state != new_state:
state_changed_ids.add(job.id)
result = super().write(vals)
# Mirror state to portal_job via the central recompute helper, so
# the portal state always derives from the WO + shipment + invoice
# together rather than the most-recent event flag.
if state_changed_ids:
for job in self.filtered(lambda j: j.id in state_changed_ids):
if job.portal_job_id:
job.portal_job_id._fp_recompute_portal_state()
if not scrap_deltas:
return result
Hold = (self.env['fusion.plating.quality.hold']
if 'fusion.plating.quality.hold' in self.env else None)
if Hold is None:
return result
Facility = self.env['fusion.plating.facility']
for job in self:
if job.id not in scrap_deltas:
continue
old, new = scrap_deltas[job.id]
delta = new - old
facility = job.facility_id or Facility.search([
('company_id', '=', job.company_id.id),
], limit=1) or Facility.search([], limit=1)
part_ref = (
job.part_catalog_id.part_number if job.part_catalog_id
else job.product_id.default_code or job.name
)
# When the scrap was bumped from the tablet, the operator
# was prompted for a reason and we passed it via context as
# `fp_scrap_reason` (see /fp/shopfloor/bump_qty_scrapped).
# Prepend that reason to the description so the audit row
# captures what the operator actually typed instead of the
# generic "OPERATOR: replace this text..." placeholder.
scrap_reason = self.env.context.get('fp_scrap_reason')
if scrap_reason:
description = _(
'Operator reason: %s\n\n'
'Auto-spawned from job %s scrap update by %s: '
'qty_scrapped went from %g to %g (delta %g).'
) % (scrap_reason, job.name, self.env.user.name, old, new, delta)
else:
description = _(
'Auto-spawned from job %s scrap update by %s: '
'qty_scrapped went from %g to %g (delta %g). '
'OPERATOR: replace this text with the actual '
'reason (drop / contamination / out-of-spec / etc).'
) % (job.name, self.env.user.name, old, new, delta)
try:
hold = Hold.create({
'job_id': job.id,
'part_ref': (part_ref or job.name)[:64],
'qty_on_hold': int(delta),
'qty_original': int(job.qty or 0),
'mark_for_scrap': True,
'hold_reason': 'other',
'description': description,
'facility_id': facility.id if facility else False,
})
job.message_post(body=_Markup(_(
'⚠️ Scrap auto-Hold spawned: <b>%s</b> for %g part(s). '
'Operator must update description with the cause.'
)) % (hold.name, delta))
except Exception as e:
_logger.warning(
'Job %s: failed to auto-spawn scrap hold: %s',
job.name, e,
)
return result
def action_sync_qty_from_so(self):
"""Pull the SO qty into the job's qty field after a mid-job
SO line edit. Posts chatter so the audit trail captures who
synced + what the previous value was.
Manual action because qty changes mid-job have physical-world
consequences (rack more parts, stop early, scrap excess) — the
supervisor must explicitly acknowledge by clicking the button.
"""
from markupsafe import Markup
for job in self:
if not job.sale_order_id:
continue
so_qty = sum(job.sale_order_id.order_line.mapped('product_uom_qty'))
old = job.qty
if abs(old - so_qty) < 0.0001:
continue
job.qty = so_qty
job.message_post(body=Markup(_(
'Job qty synced from SO by <b>%s</b>: %g%g%+g). '
'Operator: confirm physical scope matches.'
)) % (self.env.user.name, old, so_qty, so_qty - old))
return True
# ------------------------------------------------------------------
# Recipe → fp.job.step generation (Task 2.4)
#
# Native port of fusion_plating_bridge_mrp's
# _generate_workorders_from_recipe. Walks the recipe tree, creates
# one fp.job.step per 'operation' node, formats child 'step' nodes
# as step instructions on chatter, respects opt-in/out overrides
# from fp.job.node.override.
#
# Adaptations from the original:
# - Creates fp.job.step (not mrp.workorder)
# - Maps fusion.plating.work.center → fp.work.centre via code
# fallback (no forward link exists yet)
# - Uses native field names (job_id, work_centre_id, etc.)
# - Drops work_role_id (not on fp.job.step yet — Task 2.6+)
# - Drops _fp_autofill_default_equipment (not yet on step)
# ------------------------------------------------------------------
def _generate_steps_from_recipe(self):
"""Generate fp.job.step records from the assigned recipe.
Walks the recipe tree, creates one step per 'operation' node,
and formats child 'step' nodes as step instructions on the
chatter. Respects opt-in/out overrides from override_ids.
"""
Step = self.env['fp.job.step']
Node = self.env['fusion.plating.process.node']
for job in self:
if not job.recipe_id:
continue # No recipe assigned
if job.step_ids:
continue # Steps already exist — don't duplicate
# Build lookup of overrides keyed by node ID
override_map = {ov.node_id.id: ov.included for ov in job.override_ids}
# Start-at-node: if set, the allowed set is the union of:
# 1. start_node and all its descendants
# 2. each ancestor of start_node
# 3. at each ancestor level, any LATER-sequence sibling and
# all of its descendants
start_node = job.start_at_node_id
allowed_ids = None # None = include everything
if start_node:
descendants = Node.search([('id', 'child_of', start_node.id)])
allowed_ids = set(descendants.ids)
cur = start_node
while cur.parent_id:
parent = cur.parent_id
allowed_ids.add(parent.id)
later_sibs = parent.child_ids.filtered(
lambda n: n.sequence > cur.sequence
)
for sib in later_sibs:
sib_descendants = Node.search([
('id', 'child_of', sib.id),
])
allowed_ids |= set(sib_descendants.ids)
cur = parent
step_vals_list = []
wo_steps = {} # {sequence: instruction text}
seq_counter = [10]
def _is_node_included(node):
"""Determine if a node should be included based on
opt-in/out logic, per-job overrides, and start-at-node
filter.
"""
nid = node.id
if allowed_ids is not None and nid not in allowed_ids:
return False
opt = node.opt_in_out or 'disabled'
if opt == 'disabled':
return True
if nid in override_map:
return override_map[nid]
if opt == 'opt_in':
return False # Default excluded
return True # opt_out → default included
def _resolve_work_centre(legacy_wc):
"""Map fusion.plating.work.center → fp.work.centre.
The legacy work-centre model does not (yet) have a forward
link to the new fp.work.centre. Try a forward link
(x_fc_fp_work_centre_id) if some bridge module added one;
otherwise fall back to a code lookup.
"""
if not legacy_wc:
return self.env['fp.work.centre']
# Forward link, if any
if (
'x_fc_fp_work_centre_id' in legacy_wc._fields
and legacy_wc.x_fc_fp_work_centre_id
):
return legacy_wc.x_fc_fp_work_centre_id
# Code fallback (legacy code is unique-per-facility,
# native code is globally unique — first match wins)
if legacy_wc.code:
found = self.env['fp.work.centre'].search(
[('code', '=', legacy_wc.code)], limit=1,
)
if found:
return found
return self.env['fp.work.centre']
def walk_node(node):
if not _is_node_included(node):
return
if node.node_type == 'operation':
work_centre = _resolve_work_centre(node.work_center_id)
if not work_centre:
_logger.warning(
'Job %s: operation "%s" has no mapped fp.work.centre — '
'creating step without work centre.',
job.name, node.name,
)
# Collect step instructions from child 'step' nodes
instructions = []
step_num = 1
for child in node.child_ids.sorted('sequence'):
if child.node_type == 'step' and _is_node_included(child):
line = '%d. %s' % (step_num, child.name)
if child.estimated_duration:
line += ' (%.0f min)' % child.estimated_duration
instructions.append(line)
step_num += 1
# Map recipe_node.default_kind → step.kind so the
# downstream gates (Sub 8 racking soft-gate, Policy B
# contract-review gate) work even when the step gets
# renamed by the customer (e.g. "Hang on Bar" instead
# of "Racking"). Without this, gate detection falls
# back to fragile name matching.
_NODE_KIND_TO_STEP_KIND = {
'cleaning': 'wet',
'etch': 'wet',
'rinse': 'wet',
'plate': 'wet',
'dry': 'wet',
'wbf_test': 'wet',
'bake': 'bake',
'mask': 'mask',
'demask': 'mask',
'racking': 'rack',
'derack': 'rack',
'inspect': 'inspect',
'final_inspect': 'inspect',
'contract_review': 'other',
'gating': 'other',
'ship': 'other',
}
step_kind = 'other'
node_kind = (
node.default_kind
if 'default_kind' in node._fields else None
)
if node_kind and node_kind in _NODE_KIND_TO_STEP_KIND:
step_kind = _NODE_KIND_TO_STEP_KIND[node_kind]
vals = {
'job_id': job.id,
'name': node.name,
'work_centre_id': work_centre.id if work_centre else False,
'duration_expected': node.estimated_duration or 0.0,
'sequence': seq_counter[0],
'recipe_node_id': node.id,
'kind': step_kind,
}
if node.estimated_duration:
vals['dwell_time_minutes'] = node.estimated_duration
# Pull thickness target from the recipe root when this
# is a plating step (matched by node name keyword).
# Recipe-root carries thickness fields post-promote-spec.
recipe_root = job.recipe_id
name_l = (node.name or '').lower()
is_plating_node = (
'plat' in name_l or 'nickel' in name_l
or 'chrome' in name_l or 'anodiz' in name_l
)
if recipe_root and is_plating_node:
if (
'thickness_max' in recipe_root._fields
and recipe_root.thickness_max
):
vals['thickness_target'] = recipe_root.thickness_max
if (
'thickness_uom' in recipe_root._fields
and recipe_root.thickness_uom
):
# Recipe uses long-form uom names (mils /
# microns / inches); fp.job.step uses short
# codes (mil / um / inch). Map between them.
_UOM_MAP = {
'mils': 'mil',
'mil': 'mil',
'microns': 'um',
'micron': 'um',
'um': 'um',
'inches': 'inch',
'inch': 'inch',
'in': 'inch',
}
mapped = _UOM_MAP.get(recipe_root.thickness_uom)
if mapped:
vals['thickness_uom'] = mapped
step_vals_list.append(vals)
if instructions:
wo_steps[seq_counter[0]] = '\n'.join(instructions)
seq_counter[0] += 10
elif node.node_type in ('recipe', 'sub_process'):
for child in node.child_ids.sorted('sequence'):
walk_node(child)
# 'step' nodes at top level are handled by their parent operation
# Walk from recipe root
walk_node(job.recipe_id)
# Bulk create
if step_vals_list:
created = Step.create(step_vals_list)
for step in created:
instr_text = wo_steps.get(step.sequence)
if instr_text:
step.message_post(
body=Markup(
'<b>Recipe steps:</b><br/><pre>%s</pre>'
) % instr_text,
subtype_xmlid='mail.mt_note',
)
job.message_post(
body=('%d steps generated from recipe "%s".') % (
len(step_vals_list), job.recipe_id.name,
),
)
# Rule 4 — repeat-order contract-review auto-complete.
# Runs after step creation so the contract-review step shows
# as already done on the operator's first view of the job.
job._fp_autocomplete_repeat_order_contract_review()
return True
def _fp_autocomplete_repeat_order_contract_review(self):
"""Rule 4 of the contract-review flow — when a job's part already
carries a complete fp.contract.review (i.e. the part has been
through QA-005 on a prior order), mark every contract-review
step in this job's recipe as 'done' immediately on job creation.
Copies the reviewer identity + timestamp from the review's
Section 3.0 sign-off (falling back to Section 2.0) so the Print
WO Detail report shows the original audit trail — Reviewer
initials, date reviewed, "QA-005 Approved" — not the operator
who would have hit Finish.
Skips:
* jobs whose part has no contract review or it isn't complete
(rule 5 still applies — the WO step gate will block finish)
* steps not detected as contract-review steps via
fp.job.step._fp_is_contract_review_step
* steps already in a terminal state (defensive idempotency)
"""
for job in self:
part = (
('part_catalog_id' in job._fields and job.part_catalog_id)
or False
)
if not part:
continue
review = (
('x_fc_contract_review_id' in part._fields
and part.x_fc_contract_review_id)
or False
)
if not review or review.state != 'complete':
continue
signer = review.s30_signed_by or review.s20_signed_by
signed_at = review.s30_signed_date or review.s20_signed_date
if not signer or not signed_at:
continue
steps_to_complete = job.step_ids.filtered(
lambda s: s.state not in ('done', 'skipped', 'cancelled')
and s._fp_is_contract_review_step()
)
if not steps_to_complete:
continue
steps_to_complete.write({
'state': 'done',
'started_by_user_id': signer.id,
'finished_by_user_id': signer.id,
'date_started': signed_at,
'date_finished': signed_at,
})
for step in steps_to_complete:
step.message_post(body=_(
'Contract Review step auto-completed from existing '
'QA-005 for %(part)s. Reviewer: %(user)s on %(date)s.'
) % {
'part': part.display_name or part.part_number or '',
'user': signer.name,
'date': fields.Datetime.to_string(signed_at),
})
return True
# ------------------------------------------------------------------
# UI — Process Tree client action (Phase 6)
# ------------------------------------------------------------------
def action_open_process_tree(self):
"""Open the OWL process-tree visualization for this job.
Launches the fp_process_tree client action (defined in
fusion_plating_shopfloor) with job_id in context. The component
fetches /fp/shopfloor/process_tree and renders the recipe ->
sub_process -> operation hierarchy as cards with per-step state
badges.
Consolidated 2026-04-24: this points at the canonical shopfloor
client action; the parallel fp_job_process_tree was removed.
"""
self.ensure_one()
return {
'type': 'ir.actions.client',
'tag': 'fp_process_tree',
'context': {
'job_id': self.id,
'back_job_id': self.id,
},
'name': 'Process Tree — %s' % (self.name or ''),
'target': 'current',
}
# ------------------------------------------------------------------
# Lifecycle hooks (Tasks 2.6, 2.7, 2.8)
#
# On confirm: create the portal-job mirror record and (when the
# customer requires QC) a fusion.plating.quality.check.
# On done: create a draft fusion.plating.delivery and best-effort
# trigger fp.certificate auto-generation.
#
# The QC and certificate models live in modules this module does NOT
# depend on by design (bridge_mrp). We runtime-detect those models so
# the hooks degrade gracefully when those modules are absent.
# ------------------------------------------------------------------
def action_confirm(self):
result = super().action_confirm()
# During migration, lifecycle side-effects are skipped — the
# migration script directly rebinds existing portal/QC/inspection
# records via x_fc_job_id. See scripts/migrate_to_fp_jobs.py.
if self.env.context.get('fp_jobs_migration'):
return result
for job in self:
# Auto-generate steps from the recipe — was previously only
# called by seed scripts, which meant real-life confirmed
# jobs sat with zero operations. Idempotent: the generator
# short-circuits when steps already exist.
if job.recipe_id and not job.step_ids:
job._generate_steps_from_recipe()
# Promote freshly-generated 'pending' steps to 'ready' so the
# operator has a Start button when they open the job. Without
# this the floor stalls — every step is parked in pending with
# no UI affordance to move it forward.
pending_steps = job.step_ids.filtered(
lambda s: s.state == 'pending'
)
if pending_steps:
pending_steps.write({'state': 'ready'})
# 2026-04-28 — auto-populate facility_id + manager_id so the
# job header surfaces them on the form. Page-1 audit found
# both empty on confirmed jobs.
job._fp_autofill_facility_and_manager()
job._fp_create_portal_job()
job._fp_create_qc_check_if_needed()
job._fp_create_racking_inspection()
job._fp_fire_notification('job_confirmed')
return result
def _fp_autofill_facility_and_manager(self):
"""Populate facility_id + manager_id on confirm if empty.
Resolution order:
facility_id —
1. Already set → leave alone.
2. First step with a work_centre that has a facility → use it.
3. Recipe's process_type → facility (if process_type carries one).
4. Single-facility company → use that one.
manager_id —
1. Already set → leave alone.
2. Confirming user IS in the Plating Manager group → use them.
3. Sale order user_id (the salesperson who confirmed the SO).
4. The customer's account manager (partner.user_id).
5. Leave blank — no sensible default.
"""
self.ensure_one()
# ---- facility_id ----
if not self.facility_id:
facility = False
for s in self.step_ids:
if s.work_centre_id and 'facility_id' in s.work_centre_id._fields:
facility = s.work_centre_id.facility_id
if facility:
break
if not facility and self.recipe_id and 'process_type_id' in self.recipe_id._fields:
pt = self.recipe_id.process_type_id
if pt and 'facility_id' in pt._fields:
facility = pt.facility_id
if not facility:
Facility = self.env.get('fusion.plating.facility')
if Facility is not None:
facilities = Facility.search([
('company_id', '=', self.company_id.id),
])
if len(facilities) == 1:
facility = facilities
if facility:
self.facility_id = facility.id
self.message_post(body=_(
'Facility auto-set on confirm: %s'
) % facility.display_name)
# ---- manager_id ----
if not self.manager_id:
mgr = False
ManagerGroup = self.env.ref(
'fusion_plating.group_fusion_plating_manager',
raise_if_not_found=False,
)
if ManagerGroup and self.env.user in ManagerGroup.user_ids:
mgr = self.env.user
elif self.sale_order_id and self.sale_order_id.user_id:
mgr = self.sale_order_id.user_id
elif self.partner_id and self.partner_id.user_id:
mgr = self.partner_id.user_id
if mgr:
self.manager_id = mgr.id
self.message_post(body=_(
'Plating Manager auto-set on confirm: %s'
) % mgr.name)
def _fp_create_racking_inspection(self):
"""Auto-create a draft racking inspection on job confirm.
Phase 9 — production_id is now optional on fp.racking.inspection,
so we always create one bound by `x_fc_job_id`. When the job is
also linked to an MO (legacy bridge_mrp coexistence), populate
production_id too so legacy reports keep working.
Idempotent — if an inspection already exists for this job, skip.
Either way the inspection's lines are seeded from the SO's
plating order lines so the racker walks into a pre-populated
checklist instead of an empty form.
"""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
return
Inspection = self.env['fp.racking.inspection'].sudo()
if 'x_fc_job_id' not in Inspection._fields:
# Schema not yet upgraded — skip.
return
existing = Inspection.search([
('x_fc_job_id', '=', self.id),
], limit=1)
if existing:
# Self-heal: pre-existing inspections from before line seeding
# was added show up empty. Top them up now if still empty +
# the inspection isn't already finalised (don't rewrite history).
if not existing.line_ids and existing.state == 'draft':
self._fp_seed_racking_lines(existing)
return
# Phase 6 (Sub 11) — production_id retired; bind by x_fc_job_id only.
vals = {'x_fc_job_id': self.id}
try:
insp = Inspection.create(vals)
self._fp_seed_racking_lines(insp)
except Exception as e:
_logger.warning(
"Job %s: failed to auto-create racking inspection: %s",
self.name, e,
)
def _fp_seed_racking_lines(self, inspection):
"""Populate the inspection with one line per SO plating order line.
Walks sale_order_line_ids (the M2M of SO lines tied to this job),
falling back to the linked SO's order_line. Each line carries the
part_catalog and the quoted qty as the expected count — the
racker confirms or amends on the floor.
"""
self.ensure_one()
if not inspection or inspection.line_ids:
return
Line = self.env['fp.racking.inspection.line'].sudo()
# Source preference: explicit M2M of plating lines bound to this
# job (fast-order multi-part jobs), falling back to the SO header.
so_lines = self.sale_order_line_ids
if not so_lines and self.sale_order_id:
so_lines = self.sale_order_id.order_line
plating_lines = so_lines.filtered(
lambda l: l.x_fc_part_catalog_id and not l.display_type
)
if not plating_lines:
return
seq = 10
for sol in plating_lines:
try:
Line.create({
'inspection_id': inspection.id,
'sequence': seq,
'part_catalog_id': sol.x_fc_part_catalog_id.id,
'qty_expected': int(sol.product_uom_qty or 0),
'condition': 'ok',
})
except Exception as e:
_logger.warning(
"Job %s: failed to seed racking line for SO line %s: %s",
self.name, sol.id, e,
)
seq += 10
def _fp_create_portal_job(self):
"""Create the fusion.plating.portal.job mirror record.
Initial state derived from the fp.job state via the same map
used by write() — so a job that's already 'in_progress' when
the portal mirror is created (e.g. a manual catch-up create)
doesn't reset to 'received'.
"""
self.ensure_one()
if self.portal_job_id:
return # already exists — idempotent
Portal = self.env['fusion.plating.portal.job'].sudo()
initial_state = self._FP_JOB_STATE_TO_PORTAL_STATE.get(
self.state, 'received',
)
portal = Portal.create({
'name': self.name,
'partner_id': self.partner_id.id,
'state': initial_state,
'x_fc_job_id': self.id,
})
self.portal_job_id = portal.id
def _fp_create_qc_check_if_needed(self):
"""If customer has x_fc_requires_qc=True, spawn a QC check via
the canonical fp.quality.check.create_for_job() entry point.
Sub 11 — model relocated from bridge_mrp to fusion_plating_quality.
create_for_job resolves the template (customer-specific or default),
clones every template line, returns an existing record if one is
already open, and posts a chatter trail.
"""
self.ensure_one()
partner = self.partner_id
wants_qc = (
'x_fc_requires_qc' in partner._fields
and partner.x_fc_requires_qc
)
if not wants_qc:
return
if 'fusion.plating.quality.check' not in self.env:
return
QC = self.env['fusion.plating.quality.check']
try:
QC.create_for_job(self)
except Exception as e:
_logger.warning(
"Job %s: create_for_job failed: %s", self.name, e,
)
# ------------------------------------------------------------------
# button_mark_done — Task 2.8
# ------------------------------------------------------------------
def button_mark_done(self):
"""Transition the job to 'done' and trigger downstream side effects.
- Blocks if any step is not done/skipped (manager bypass via
context key `fp_skip_step_gate=True`). Compliance: AS9100 /
Nadcap require evidence that every recipe step ran. Without
this guard an operator could close a job with zero work.
- Blocks if customer requires QC and the QC check isn't passed
(manager bypass via context key `fp_skip_qc_gate=True`)
- Sets state='done', date_finished=now
- Auto-creates a draft fusion.plating.delivery
- Triggers certificate auto-generation (best-effort)
"""
# During migration, side-effects are skipped — see action_confirm.
skip_side_effects = self.env.context.get('fp_jobs_migration')
skip_qc_gate = self.env.context.get('fp_skip_qc_gate')
skip_step_gate = self.env.context.get('fp_skip_step_gate')
QC = self.env['fusion.plating.quality.check'] \
if 'fusion.plating.quality.check' in self.env else None
for job in self:
if job.state == 'done':
continue
if job.state == 'cancelled':
raise UserError(
"Job %s is cancelled — cannot mark done." % job.name
)
# Step-completion gate: every step must be done (or explicitly
# skipped, once button_skip is implemented). Without this
# guard operators can close a recipe-driven job with zero
# actual work logged. Manager bypass via context.
if not skip_step_gate and job.step_ids:
# `skipped` and `cancelled` count as terminal — operator
# explicitly opted those out (skipped) or killed them
# (cancelled). Only steps still in pending/ready/in_progress/
# paused block job close.
undone = job.step_ids.filtered(
lambda s: s.state not in ('done', 'skipped', 'cancelled')
)
if undone:
raise UserError(_(
"Job %s cannot be marked Done — %d/%d step(s) "
"are not finished:\n %s\n\nWalk each step on "
"the tablet (or skip / cancel opt-in steps)."
) % (
job.name, len(undone), len(job.step_ids),
'\n '.join(
f'#{s.sequence} {s.name} ({s.state})'
for s in undone[:5]
),
))
# Bake-window gate (compliance — AS9100 / Nadcap): if any
# auto-spawned bake.window is still awaiting_bake OR
# bake_in_progress, the bake hasn't been documented and
# parts cannot ship. Without this guard a careless
# operator closes the job, parts ship, three weeks later
# a field failure surfaces and the auditor asks for the
# bake record that doesn't exist. Manager bypass via
# fp_skip_bake_gate=True for documented customer deviation.
skip_bake_gate = self.env.context.get('fp_skip_bake_gate')
BW = (self.env['fusion.plating.bake.window']
if 'fusion.plating.bake.window' in self.env else None)
if not skip_bake_gate and BW is not None:
pending_bw = BW.sudo().search([
('part_ref', '=', job.name),
('state', 'in', ('awaiting_bake', 'bake_in_progress')),
])
if pending_bw:
raise UserError(_(
"Job %s cannot be marked Done — bake window "
"still pending:\n %s\n\nBake hydrogen "
"embrittlement relief on the parts (start + "
"end the bake on the bake.window record), then "
"close the job. Manager override available for "
"documented customer deviation."
) % (
job.name,
'\n '.join(
f'{bw.name} (state={bw.state}, '
f'required_by={bw.bake_required_by})'
for bw in pending_bw[:5]
),
))
# Qty reconciliation gate: qty_done + qty_scrapped must
# equal qty when the job closes. Without this an operator
# can ship "5 of 5" while only 4 are actually plated +
# 1 contaminated, with no record of the missing piece.
# Manager bypass via fp_skip_qty_reconcile=True (e.g. when
# qty tracking truly doesn't apply).
skip_qty_gate = self.env.context.get('fp_skip_qty_reconcile')
if not skip_qty_gate and job.qty:
# Smooth the typical "clean close" case so the operator
# doesn't have to manually type qty_done = ordered_qty
# every time. Conditions for safe auto-fill:
# - operator has NOT recorded any scrap or done qty
# (so we're not overriding their explicit entry)
# - the receiving closed with matching qty (parts
# physically came in as expected)
# - no visual-inspection rejects recorded
# When any of those fail, fall through to the gate so
# the operator reconciles by hand. Mirrors the receiving
# `_update_job_qty_received` pattern: server fills the
# obvious default, operator owns the edge cases.
if (not job.qty_done and not job.qty_scrapped
and not (job.qty_visual_inspection_rejects or 0)
and job.qty_received
and abs(job.qty_received - job.qty) < 0.0001):
job.qty_done = job.qty
accounted = (job.qty_done or 0) + (job.qty_scrapped or 0)
if abs(accounted - job.qty) > 0.0001:
raise UserError(_(
"Job %s qty mismatch — ordered %g, but qty_done "
"(%g) + qty_scrapped (%g) = %g. Update Quantity "
"Completed and Quantity Scrapped on the job "
"header so they sum to %g before closing."
) % (
job.name, job.qty, job.qty_done or 0,
job.qty_scrapped or 0, accounted, job.qty,
))
# Receiving reconciliation: parts must be physically
# received before the job can close, and the count must
# match what came out (done + scrapped + visual rejects).
# Without this guard a job ships with the wrong cert qty,
# or worse, with no closed receiving for the auditor to
# trace back to. Same bypass flag covers both checks.
if not job.qty_received:
raise UserError(_(
"Job %s cannot be marked Done — Quantity Received "
"is blank. Close the receiving record for SO %s "
"before completing this job."
) % (
job.name,
job.sale_order_id.name if job.sale_order_id else '?',
))
rejects = job.qty_visual_inspection_rejects or 0
accounted_out = (
(job.qty_done or 0)
+ (job.qty_scrapped or 0)
+ rejects
)
if abs(job.qty_received - accounted_out) > 0.0001:
raise UserError(_(
"Job %s qty mismatch — received %g, but qty_done "
"(%g) + qty_scrapped (%g) + visual rejects (%g) "
"= %g. Reconcile before closing."
) % (
job.name, job.qty_received,
job.qty_done or 0, job.qty_scrapped or 0,
rejects, accounted_out,
))
# QC gate: customers flagged x_fc_requires_qc must have a
# passed QC before the job closes. AS9100 / Nadcap compliance.
if QC and not skip_qc_gate \
and 'x_fc_requires_qc' in job.partner_id._fields \
and job.partner_id.x_fc_requires_qc:
blocking_qc = QC.search([
('job_id', '=', job.id),
('state', 'not in', ('passed',)),
], order='create_date desc', limit=1)
if blocking_qc:
raise UserError(_(
"Job %s cannot be marked Done — QC check %s is in "
"state '%s'. Pass the QC checklist first, or have "
"a manager override via the bypass button."
) % (job.name, blocking_qc.name, blocking_qc.state))
# No QC at all? Spawn one now (idempotent) and require
# the operator to walk it before retrying.
no_qc = not QC.search_count([('job_id', '=', job.id)])
if no_qc:
QC.create_for_job(job)
raise UserError(_(
"Job %s requires QC. A new check has been created — "
"complete it before marking the job Done."
) % job.name)
job.state = 'done'
job.date_finished = fields.Datetime.now()
if not skip_side_effects:
job._fp_create_delivery()
job._fp_create_certificates()
job._fp_fire_notification('job_complete')
return True
# ------------------------------------------------------------------
# Notifications dispatch (Phase 4)
#
# Fires fp.notification.template records whose trigger_event matches
# the given event name. Best-effort: silently skips if the
# fusion_plating_notifications module is not installed (model not
# registered) and logs (without raising) on any send failure so the
# job lifecycle is never blocked by an email problem.
# ------------------------------------------------------------------
def _fp_fire_notification(self, event):
"""Best-effort notification dispatch for fp.job lifecycle events.
Looks up fp.notification.template records with the matching
trigger_event and dispatches via the central _dispatch helper
provided by fusion_plating_notifications. Silently no-ops when
that module isn't installed.
"""
self.ensure_one()
if 'fp.notification.template' not in self.env:
return
Template = self.env['fp.notification.template'].sudo()
try:
# The notifications module exposes a model-level _dispatch
# helper that handles template lookup, recipient resolution
# (Sub 6 contact routing), attachment rendering, and audit
# logging in one go. Pass partner explicitly since fp.job's
# partner_id is the customer.
Template._dispatch(event, self, partner=self.partner_id)
except Exception as e:
_logger.warning(
"Job %s: notification %s dispatch failed: %s",
self.name, event, e,
)
def _fp_create_delivery(self):
"""Create a draft fusion.plating.delivery linked to this job.
Sets BOTH x_fc_job_id (Many2one — strong link) AND job_ref
(Char — soft reference). Downstream code is split: smart-button
navigation reads x_fc_job_id, but the box-parity check, RMA
refund auto-link, and the legacy notification dispatch all
look up by job_ref. Setting both ends keeps every consumer
happy.
Auto-populates everything we can resolve from upstream
records so the shipping crew doesn't have to re-type
addresses / contacts / dates that already exist on the SO:
- delivery_address_id, contact_name, contact_phone — SO's
partner_shipping_id (falls back to partner_id)
- scheduled_date — SO.commitment_date
- source_facility_id — job.facility_id
- x_fc_carrier_id, x_fc_outbound_shipment_id — from the
SO's first receiving record (set at receive time)
- coc_attachment_id — issued cert.attachment_id for this
job (if a CoC is already issued before delivery exists;
otherwise the cert's action_issue back-fills it later)
Everything skips silently when the source field doesn't
exist or the source value is blank, so older install
topologies and partially-configured jobs still get a
delivery — just less pre-filled.
"""
self.ensure_one()
if self.delivery_id:
return
Delivery = self.env['fusion.plating.delivery'].sudo()
vals = self._fp_resolve_delivery_defaults(Delivery)
try:
delivery = Delivery.create(vals)
self.delivery_id = delivery.id
except Exception as e:
_logger.warning(
"Job %s: failed to auto-create delivery: %s", self.name, e,
)
def _fp_resolve_delivery_defaults(self, Delivery):
"""Build the create-vals for a fresh delivery, OR the
write-vals for refreshing an existing one. Centralised so
the create path, the per-cert post-issue sync, and any
future 'Refresh from Source' button all stay consistent.
"""
self.ensure_one()
vals = {'partner_id': self.partner_id.id}
if 'x_fc_job_id' in Delivery._fields:
vals['x_fc_job_id'] = self.id
if 'job_ref' in Delivery._fields:
vals['job_ref'] = self.name
# Delivery address + contact details from the SO. shipping
# partner is preferred (that's where parts physically go);
# fall back to the SO's main partner when no separate ship-to.
so = self.sale_order_id
ship_to = (so.partner_shipping_id or so.partner_id) if so else False
if ship_to:
if 'delivery_address_id' in Delivery._fields:
vals['delivery_address_id'] = ship_to.id
if 'contact_name' in Delivery._fields and ship_to.name:
vals['contact_name'] = ship_to.name
if 'contact_phone' in Delivery._fields:
vals['contact_phone'] = ship_to.phone or ship_to.mobile or ''
# Scheduled date — operator can adjust; this just primes it
# so they're not staring at a blank field.
if so and so.commitment_date and 'scheduled_date' in Delivery._fields:
vals['scheduled_date'] = so.commitment_date
# Source facility comes from the job (where it was plated).
if self.facility_id and 'source_facility_id' in Delivery._fields:
vals['source_facility_id'] = self.facility_id.id
# Outbound carrier + shipment mirrored from the SO's first
# receiving record (the crew chose these at receipt time).
if (so and 'x_fc_receiving_ids' in so._fields
and so.x_fc_receiving_ids):
recv = so.x_fc_receiving_ids[:1]
if 'x_fc_carrier_id' in Delivery._fields \
and 'x_fc_carrier_id' in recv._fields \
and recv.x_fc_carrier_id:
vals['x_fc_carrier_id'] = recv.x_fc_carrier_id.id
if 'x_fc_outbound_shipment_id' in Delivery._fields \
and 'x_fc_outbound_shipment_id' in recv._fields \
and recv.x_fc_outbound_shipment_id:
vals['x_fc_outbound_shipment_id'] = (
recv.x_fc_outbound_shipment_id.id
)
# CoC PDF — if a cert for this job is already issued and
# the delivery field accepts an attachment, link it. The
# cert's action_issue also calls _fp_sync_to_delivery for
# the case where the cert issues AFTER the delivery exists.
Cert = self.env.get('fp.certificate')
if Cert is not None and 'coc_attachment_id' in Delivery._fields:
issued_cert = Cert.sudo().search([
('x_fc_job_id', '=', self.id),
('certificate_type', '=', 'coc'),
('state', '=', 'issued'),
('attachment_id', '!=', False),
], order='issue_date desc, id desc', limit=1)
if issued_cert and issued_cert.attachment_id:
vals['coc_attachment_id'] = issued_cert.attachment_id.id
return vals
def _fp_create_certificates(self):
"""Auto-create one draft fp.certificate per type returned by
_resolve_required_cert_types. Idempotent per type — re-running
on a job that already has a CoC won't create another one.
Each cert is pre-populated with everything action_issue needs
(partner, spec_reference, process_description, certified_by,
contact_partner, part_number, quantity_shipped, NC qty, PO,
SO link, job link) so the manager just reviews and clicks Issue.
Resolution sources for the new prefill fields:
- process_description ← recipe.name (the job's process root)
- certified_by_id ← customer_spec.signer_user_id, falling
back to company.x_fc_owner_user_id
- contact_partner_id ← partner.x_fc_default_coc_contact_id
- nc_quantity ← qty_scrapped + qty_visual_insp_rejects
Honours part.certificate_requirement (coc / coc_thickness /
none / inherit) and partner-level send_coc /
send_thickness_report flags. Closes spec gap C-G1.
"""
self.ensure_one()
if 'fp.certificate' not in self.env:
return
Cert = self.env['fp.certificate'].sudo()
required = self._resolve_required_cert_types()
if not required:
return
has_job_link = 'x_fc_job_id' in Cert._fields
# Spec drives the cert spec_reference. The customer.spec was
# auto-filled onto the job at confirm time (sale_order.py).
spec = self.customer_spec_id
# Recipe drives the process description on the cert. Was previously
# sourced from sale_order.x_fc_coating_config_id (since retired);
# recipe.name is the human-readable replacement.
recipe = self.recipe_id
# Signer resolution: per-spec override wins, company default fills.
signer = False
if spec and 'signer_user_id' in spec._fields:
signer = spec.signer_user_id
if not signer and 'x_fc_owner_user_id' in self.company_id._fields:
signer = self.company_id.x_fc_owner_user_id
# Contact: per-customer default; blank means manager picks at issue.
contact = False
if 'x_fc_default_coc_contact_id' in self.partner_id._fields:
contact = self.partner_id.x_fc_default_coc_contact_id
# NC qty: scrapped + visual rejects. Both NULL-safe.
nc_qty = int(
(self.qty_scrapped or 0)
+ (self.qty_visual_inspection_rejects or 0)
)
for cert_type in sorted(required):
# Idempotency per type.
existing_dom = [('certificate_type', '=', cert_type)]
if has_job_link:
existing_dom.append(('x_fc_job_id', '=', self.id))
elif self.sale_order_id and 'sale_order_id' in Cert._fields:
existing_dom.append(
('sale_order_id', '=', self.sale_order_id.id),
)
else:
continue # can't safely identify — skip
if Cert.search_count(existing_dom):
continue
try:
vals = {
'partner_id': self.partner_id.id,
'certificate_type': cert_type,
}
if 'state' in Cert._fields:
vals['state'] = 'draft'
if has_job_link:
vals['x_fc_job_id'] = self.id
elif 'job_id' in Cert._fields:
vals['job_id'] = self.id
if 'sale_order_id' in Cert._fields and self.sale_order_id:
vals['sale_order_id'] = self.sale_order_id.id
# spec_reference is what action_issue blocks on.
# Format spec.code + revision for the cert text.
if spec and 'spec_reference' in Cert._fields:
ref = spec.code or ''
if spec.revision:
ref = (f'{ref} Rev {spec.revision}'
if ref else f'Rev {spec.revision}')
if ref:
vals['spec_reference'] = ref
if 'customer_spec_id' in Cert._fields:
vals['customer_spec_id'] = spec.id
if 'part_number' in Cert._fields and self.part_catalog_id:
vals['part_number'] = (
self.part_catalog_id.part_number or ''
)
if 'quantity_shipped' in Cert._fields:
vals['quantity_shipped'] = int(
(self.qty_done or self.qty or 0)
- (self.qty_scrapped or 0)
)
if 'nc_quantity' in Cert._fields:
vals['nc_quantity'] = nc_qty
if 'po_number' in Cert._fields and self.sale_order_id \
and 'x_fc_po_number' in self.sale_order_id._fields:
vals['po_number'] = (
self.sale_order_id.x_fc_po_number or ''
)
if 'customer_job_no' in Cert._fields \
and self.sale_order_id \
and 'x_fc_customer_job_number' \
in self.sale_order_id._fields:
vals['customer_job_no'] = (
self.sale_order_id.x_fc_customer_job_number or ''
)
if 'process_description' in Cert._fields and recipe:
vals['process_description'] = recipe.name or ''
if 'certified_by_id' in Cert._fields and signer:
vals['certified_by_id'] = signer.id
if 'contact_partner_id' in Cert._fields and contact:
vals['contact_partner_id'] = contact.id
if 'entech_wo_number' in Cert._fields:
vals['entech_wo_number'] = self.name or ''
cert = Cert.create(vals)
self.message_post(body=Markup(_(
'%(t)s <b>%(n)s</b> auto-created (draft). Issuer '
'should hit Issue when ready to ship.'
)) % {
't': dict(
Cert._fields['certificate_type'].selection
).get(cert_type, cert_type),
'n': cert.name,
})
except Exception as e:
_logger.warning(
"Job %s: failed to auto-create cert (%s): %s",
self.name, cert_type, e,
)
self.message_post(body=_(
'Cert auto-create (%(t)s) failed: %(e)s. '
'Create manually.'
) % {'t': cert_type, 'e': e})
# ------------------------------------------------------------------
# Backfill — closed jobs missing certs, plus cleanup of legacy
# duplicate thickness_report certs created before the bundling rule.
# ------------------------------------------------------------------
# One-shot management action for jobs that closed BEFORE the
# _fp_create_certificates bug fix (e.g. WO-30040). Two passes:
# 1. CREATE any missing draft cert per the (updated) resolver
# 2. VOID legacy duplicate thickness_report certs that have a
# paired CoC on the same job — the bundling rule says the
# CoC carries the thickness data on page 2
# Both passes are idempotent — safe to re-run.
@api.model
def action_backfill_missing_certs(self):
Cert = self.env.get('fp.certificate')
if Cert is None:
raise UserError(_(
'fp.certificate model is not installed. Install '
'fusion_plating_certificates before running this action.'
))
candidate_jobs = self.search([('state', '=', 'done')])
scanned = 0
backfilled_jobs = self.env['fp.job']
created_count = 0
voided_count = 0
has_job_link = 'x_fc_job_id' in Cert._fields
for job in candidate_jobs:
required = job._resolve_required_cert_types()
if not required:
continue
scanned += 1
existing_certs = (
Cert.sudo().search([('x_fc_job_id', '=', job.id)])
if has_job_link else
(Cert.sudo().search([
('sale_order_id', '=', job.sale_order_id.id),
]) if job.sale_order_id else Cert.browse())
)
existing_types = set(existing_certs.mapped('certificate_type'))
# ---- Pass 1: create missing certs --------------------------
missing = required - existing_types
if missing:
before = len(existing_certs)
job._fp_create_certificates()
# Re-read to get the freshly-created ones for pass 2.
existing_certs = (
Cert.sudo().search([('x_fc_job_id', '=', job.id)])
if has_job_link else existing_certs
)
delta = max(len(existing_certs) - before, 0)
if delta:
backfilled_jobs |= job
created_count += delta
# ---- Pass 2: void duplicate thickness_report certs ---------
# Bundling rule (CLAUDE.md): when CoC + thickness are both
# wanted, the CoC absorbs the thickness data. A leftover
# draft thickness_report cert on the same job is now noise
# and should not be issued. Void it with a clear reason so
# the audit trail tells the story.
if 'coc' in required and 'coc' in existing_types:
dup_thickness = existing_certs.filtered(
lambda c: (c.certificate_type == 'thickness_report'
and c.state == 'draft')
)
for cert in dup_thickness:
cert.sudo().write({
'state': 'voided',
'void_reason': (
'Auto-voided: bundling rule — thickness '
'data is delivered as page 2 of the paired '
'CoC, not as a separate cert.'
),
})
cert.message_post(body=_(
'Auto-voided by cleanup: bundling rule routes '
'thickness data to the CoC.'
))
voided_count += 1
backfilled_jobs |= job
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Cert backfill + cleanup complete'),
'message': _(
'Scanned %(s)d closed jobs. Created %(c)d draft '
'cert(s); voided %(v)d duplicate thickness_report '
'cert(s) across %(j)d job(s).'
) % {
's': scanned,
'c': created_count,
'v': voided_count,
'j': len(backfilled_jobs),
},
'sticky': True,
'type': 'success' if (created_count or voided_count) else 'warning',
},
}
class FpJobStep(models.Model):
"""Phase 7 — adds the migration idempotency key on fp.job.step.
Populated by scripts/migrate_to_fp_jobs.py to mark a step as the
mirror of a specific mrp.workorder. Used to skip already-migrated
WOs on subsequent runs.
"""
_inherit = 'fp.job.step'
legacy_mrp_workorder_id = fields.Integer(
string='Legacy MRP Work Order ID',
index=True,
help='Database id of the source mrp.workorder this step was '
'migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)
# ==========================================================================
# Sub 14 — Recipe-side trigger field
# ==========================================================================
# Adds an optional Many2one on every recipe operation node so the recipe
# author can explicitly map "completion of this step triggers workflow
# state X". Wins over the default-kind matching defined on the workflow
# state itself. Lives here (not core) because the target model
# (fp.job.workflow.state) is defined in this module.
class FusionPlatingProcessNodeWorkflow(models.Model):
_inherit = 'fusion.plating.process.node'
triggers_workflow_state_id = fields.Many2one(
'fp.job.workflow.state',
string='Triggers Workflow State',
ondelete='set null',
help='When a job step generated from this recipe node finishes '
'(or is skipped/cancelled), the job advances to this '
'workflow state. Leave blank to fall back to default-kind '
'matching defined on the workflow state catalog.',
)
class FpStepTemplateWorkflow(models.Model):
"""Sub 14 — workflow milestone trigger on the library step template.
Declared here (jobs module) instead of fusion_plating core because
the target model (fp.job.workflow.state) lives in this module —
core can't reference it without a cyclic dependency.
When the template lands in a recipe via simple_recipe_controller
drag-drop, the value is snapshot-copied to the new process_node
via _SNAPSHOT_FIELDS.
"""
_inherit = 'fp.step.template'
triggers_workflow_state_id = fields.Many2one(
'fp.job.workflow.state',
string='Triggers Workflow State',
ondelete='set null',
help='Sub 14. When a recipe step generated from this template '
'finishes (or is skipped/cancelled), the parent job '
'advances to this workflow state on its status bar. Leave '
'blank to fall back to default-kind matching defined on '
'the workflow state catalog.',
)