Files
Odoo-Modules/fusion_plating/fusion_plating_jobs/models/fp_job.py
gsinghpal a6186120b2 fix(plating): self-heal work orders missing recipe/steps + repair existing
Root cause: jobs auto-created at SO confirm resolve the recipe once; if the
SO line's process variant is not set at that instant (new parts, or the
copy-from-quote path), the WO is created with no recipe and no steps, and
setting the recipe on the line afterward never propagates (idempotency
guard + no line-to-job sync).

Fix (fusion_plating_jobs 19.0.12.6.0):
- fp.job._fp_resync_recipe_from_so(): re-resolve recipe from the SO line(s)
  and build steps (mirrors action_confirm: generate, promote pending to
  ready, express overrides). Acts only on not-started jobs; idempotent.
- action_fp_resync_recipe_from_so(): "Re-sync Recipe from SO" header button.
- sale.order.line.write: when x_fc_process_variant_id changes, auto-heal the
  linked not-yet-started WO.

Verified on entech: WO-30071-01/02 healed (recipe + 8 steps each); auto-
propagation confirmed in a rolled-back transaction. Deployed in place
(fusion_plating_jobs is ahead of git); same change recorded here.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 01:14:57 -04:00

3043 lines
134 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# fp.job extension - cross-module fields that couldn't live in core
# because their target models are in dependent modules. Per spec §5.1
# this module is the umbrella that re-bundles the cross-module
# extensions for the native job flow.
#
# qc_check_id is deferred to Task 2.7 (the underlying QC model still
# lives in fusion_plating_bridge_mrp; we'll address its sourcing then).
import datetime
import json
import logging
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
# Plant-view kanban - fixed 9-column sequence (spec §4.1). The order
# here is the visual order on the board AND the order in the
# mini-timeline strip. Never reorder; columns are first-class identity.
_COLUMN_SEQUENCE = [
'receiving', 'masking', 'blasting', 'racking', 'plating',
'baking', 'de_racking', 'inspection', 'shipping',
]
class FpJob(models.Model):
_inherit = 'fp.job'
# ===== Post-shop state extension (spec 2026-05-25) =================
# Two intermediate states between in_progress and done so completed
# jobs awaiting cert + shipping stay visible on the Shop Floor board.
# See docs/superpowers/specs/2026-05-25-post-shop-cert-shipping-job-states-design.md
state = fields.Selection(
selection_add=[
('awaiting_cert', 'Awaiting Cert'),
('awaiting_ship', 'Awaiting Ship'),
],
ondelete={
'awaiting_cert': 'set default',
'awaiting_ship': 'set default',
},
)
# ---- Tier 3 mirrors from sale.order -----------------------------
# Related (not stored) - pure display mirrors. Values may change on
# the SO after job confirm (e.g. customer changes carrier preference)
# and the WO should reflect the latest; related auto-follows.
x_fc_delivery_method = fields.Selection(
related='sale_order_id.x_fc_delivery_method',
string='Delivery Method',
readonly=True,
)
x_fc_ship_via = fields.Char(
related='sale_order_id.x_fc_ship_via',
string='Ship Via',
readonly=True,
)
x_fc_invoice_strategy = fields.Selection(
related='sale_order_id.x_fc_invoice_strategy',
string='Invoice Strategy',
readonly=True,
)
part_catalog_id = fields.Many2one(
'fp.part.catalog',
string='Part',
ondelete='restrict',
)
customer_spec_id = fields.Many2one(
'fusion.plating.customer.spec',
string='Specification',
ondelete='set null',
help='Customer / industry spec the job ships under. Auto-filled '
'from the SO line at job creation.',
)
portal_job_id = fields.Many2one(
'fusion.plating.portal.job',
string='Portal Job',
ondelete='set null',
)
delivery_id = fields.Many2one(
'fusion.plating.delivery',
string='Delivery',
ondelete='set null',
)
override_ids = fields.One2many(
'fp.job.node.override',
'job_id',
string='Recipe Overrides',
)
# Sub 13 - sequential enforcement. Mirrored from the recipe root so
# button_start on each step can read the policy without walking the
# node tree. Stored so a recipe author flipping the recipe's flag
# AFTER job generation does NOT change behaviour mid-run (jobs
# snapshot the policy at creation, not on the fly).
enforce_sequential = fields.Boolean(
related='recipe_id.enforce_sequential',
string='Enforce Sequential Order',
store=True,
readonly=True,
help='Snapshotted from the recipe at job creation. When True, '
'every step waits for its predecessors before it can start '
'(unless the step itself is flagged Parallel Start, or a '
'manager bypasses via context).',
)
# Phase 7 - migration idempotency key. Populated by
# scripts/migrate_to_fp_jobs.py to mark a fp.job as the mirror of a
# specific mrp.production. Used to skip already-migrated MOs on
# subsequent runs. Cleared after the 2-week shadow period.
legacy_mrp_production_id = fields.Integer(
string='Legacy MRP Production ID',
index=True,
help='Database id of the source mrp.production record this job '
'was migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)
# Display formatter - "WO # 00001" used everywhere on tablet/dashboard.
# The underlying `name` field stays untouched (WH/JOB/00001) so reports,
# emails, and back-office forms continue using their canonical name.
# System-wide sequence rename is a separate decision (see spec
# 2026-05-22-shopfloor-tablet-redesign-design §6.5).
display_wo_name = fields.Char(
compute='_compute_display_wo_name',
string='WO #',
help='Tablet/dashboard formatter - "WO # 00001" derived from name. '
'Underlying name field is unchanged.',
)
@api.depends('name')
def _compute_display_wo_name(self):
for job in self:
raw = (job.name or '').strip()
if not raw:
job.display_wo_name = ''
continue
# Take the last "/"-separated segment as the number portion.
# WH/JOB/00001 → 00001 ; WH/JOB/2026/00042 → 00042 ; 00123 → 00123
tail = raw.rsplit('/', 1)[-1]
job.display_wo_name = f'WO # {tail}'
# Phase 2 - At-Risk view + Workspace landing focus.
late_risk_ratio = fields.Float(
compute='_compute_late_risk_ratio',
store=True,
string='Late-risk Ratio',
help='remaining_planned_minutes / minutes_to_deadline. '
'>1.0 means the job will be late if nothing changes. '
'Drives the At-Risk view on the manager dashboard.',
)
active_step_id = fields.Many2one(
'fp.job.step',
compute='_compute_active_step_id',
store=True,
index=True,
string='Active Step',
help='Currently the live step under the priority chain '
'(in_progress > paused > ready > pending). Drives '
'JobWorkspace landing focus + card_state. Stored so '
'card_state\'s `active_step_id.area_kind` dependency '
'can search back to dependent jobs without erroring.',
)
# ===== 2026-05-23 Plant-view kanban - card_state + mini_timeline ====
card_state = fields.Char(
string='Card State (plant view)',
compute='_compute_card_state',
store=True,
index=True,
help='One of 13 mutually-exclusive states driving the plant-view '
'kanban card chrome. See spec §6 for the catalog and the '
'explicit precedence dispatch. Stored for fast filter '
'queries (count by state, filter "blocked", etc.).',
)
mini_timeline_json = fields.Text(
string='Mini-Timeline (JSON)',
compute='_compute_mini_timeline_json',
help='Serialized 9-element array, one per Shop Floor column, '
'each {area, state, variant?}. Card UI reads this to render '
'the bottom timeline strip without knowing recipe shape.',
)
# ----- Precedence helpers (spec §6.2 + §9.4) -----------------------
# Each returns a bool. _compute_card_state calls them in precedence
# order and the first truthy one wins. Centralized here so future
# audit-found states can be added by writing one new helper + one new
# rule in the dispatcher.
def _fp_inbound_not_received(self):
"""no_parts - job confirmed, customer's parts in transit."""
self.ensure_one()
if self.state != 'confirmed':
return False
so = self.sale_order_id
if not so or 'x_fc_receiving_ids' not in so._fields:
return False
return any(r.state == 'draft' for r in so.x_fc_receiving_ids)
def _fp_has_open_hold(self):
"""on_hold - fusion.plating.quality.hold open on this job."""
self.ensure_one()
if 'fusion.plating.quality.hold' not in self.env:
return False
Hold = self.env['fusion.plating.quality.hold']
return bool(Hold.search_count([
('job_id', '=', self.id),
('state', '=', 'open'),
]))
def _fp_has_pending_qc(self):
"""awaiting_qc - quality check in draft / in_progress on this job."""
self.ensure_one()
if 'fusion.plating.quality.check' not in self.env:
return False
QC = self.env['fusion.plating.quality.check']
return bool(QC.search_count([
('job_id', '=', self.id),
('state', 'in', ('draft', 'in_progress')),
]))
def _fp_bake_window_due_soon(self, threshold_hours=1):
"""bake_due - bake.window awaiting_bake with deadline < threshold."""
self.ensure_one()
if 'fusion.plating.bake.window' not in self.env:
return False
Window = self.env['fusion.plating.bake.window']
cutoff = fields.Datetime.now() + datetime.timedelta(hours=threshold_hours)
domain = [
('state', '=', 'awaiting_bake'),
('bake_required_by', '<=', cutoff),
]
# bake.window's link to a job varies across installs - fall back
# to SO when no direct fp.job link exists.
if 'job_id' in Window._fields:
domain.append(('job_id', '=', self.id))
elif self.sale_order_id and 'sale_order_id' in Window._fields:
domain.append(('sale_order_id', '=', self.sale_order_id.id))
else:
return False
return bool(Window.search_count(domain))
def _fp_is_mine(self, user=None):
"""*_mine variants - active step's work centre is in operator's
paired stations. MVP holds 1 row in paired_work_centre_ids; Phase 2
multi-station picker can populate multiple."""
self.ensure_one()
user = user or self.env.user
step = self.active_step_id
if not step or not step.work_centre_id:
return False
if 'paired_work_centre_ids' not in user._fields:
return False
return step.work_centre_id.id in user.paired_work_centre_ids.ids
# ----- card_state compute -------------------------------------------
@api.depends(
'state',
'active_step_id',
'active_step_id.state',
'active_step_id.requires_signoff',
'active_step_id.signoff_user_id',
'active_step_id.last_activity_at',
'active_step_id.area_kind',
'active_step_id.recipe_node_id.default_kind',
)
def _compute_card_state(self):
"""Dispatch matching spec §6.2 / §9.3 explicit precedence list."""
for job in self:
# Edge: no live step (all steps done OR no steps at all).
# - job.state='done' → 'done' (defensive - done jobs are
# filtered off the Shop Floor board upstream, but the
# field still needs a value).
# - confirmed + parts not yet received → 'no_parts'.
# - else → 'ready' (job awaiting work, no steps yet OR
# recipe not assigned).
if not job.active_step_id:
if job.state == 'done':
job.card_state = 'done'
elif job.state == 'awaiting_cert':
# Spec 2026-05-25 - state drives card_state for
# post-shop jobs (active_step_id is False because
# every step is terminal).
job.card_state = 'awaiting_cert'
elif job.state == 'awaiting_ship':
job.card_state = 'awaiting_ship'
elif (job.state == 'confirmed'
and job._fp_inbound_not_received()):
job.card_state = 'no_parts'
else:
job.card_state = 'ready'
continue
step = job.active_step_id
# Rule 1 - no_parts
if job._fp_inbound_not_received():
job.card_state = 'no_parts'
continue
# Rule 2 - on_hold
if job._fp_has_open_hold():
job.card_state = 'on_hold'
continue
# Rule 3 - awaiting_signoff (S22)
if (step.requires_signoff and step.state == 'done'
and not step.signoff_user_id):
job.card_state = 'awaiting_signoff'
continue
# Rule 4 - awaiting_qc
if job._fp_has_pending_qc():
job.card_state = 'awaiting_qc'
continue
# Rule 5 - bake_due
if job._fp_bake_window_due_soon():
job.card_state = 'bake_due'
continue
# Rule 6 - predecessor_locked
if (step._fp_should_block_predecessors()
and step._fp_has_unfinished_predecessors()):
job.card_state = 'predecessor_locked'
continue
# Rule 7 - idle_warning (S16)
if (step.state == 'in_progress'
and step._fp_is_idle(threshold_hours=8)):
job.card_state = 'idle_warning'
continue
# Rule 7.5 - awaiting_cert + awaiting_ship (spec 2026-05-25)
# State drives card_state regardless of step state. Inserted
# BEFORE the done rule because state='done' jobs are filtered
# off the board upstream so the done rule is unreachable
# from cards we'd actually render.
if job.state == 'awaiting_cert':
job.card_state = 'awaiting_cert'
continue
if job.state == 'awaiting_ship':
job.card_state = 'awaiting_ship'
continue
# Rule 8 - done
if step.area_kind == 'shipping' and job.state == 'done':
job.card_state = 'done'
continue
# Rule 9 - contract_review
if (step.recipe_node_id
and step.recipe_node_id.default_kind == 'contract_review'):
job.card_state = 'contract_review'
continue
# Rules 10/12 - running (mine vs not)
if step.state == 'in_progress':
job.card_state = ('running_mine' if job._fp_is_mine()
else 'running')
continue
# Rules 11/13 - ready (mine vs not)
if step.state == 'ready':
job.card_state = ('ready_mine' if job._fp_is_mine()
else 'ready')
continue
# Safe default
job.card_state = 'ready'
# ----- mini-timeline compute ----------------------------------------
@api.depends(
'step_ids.state',
'step_ids.area_kind',
'active_step_id',
'card_state',
'state',
)
def _compute_mini_timeline_json(self):
"""9-element JSON array, one per Shop Floor column.
For awaiting_cert / awaiting_ship (spec 2026-05-25): the
Final-inspection or Shipping dot renders as 'current' with the
state-named variant; all earlier dots render 'done'. Lets the
QM see at a glance "this card has cleared the line, just
waiting on paperwork/shipping".
"""
for job in self:
# Post-shop state override (spec 2026-05-25): visually walk
# the card across the two right-most columns even though
# the recipe may not have steps with those area_kinds.
if job.state == 'awaiting_cert':
timeline = []
for area in _COLUMN_SEQUENCE:
if area == 'inspection':
timeline.append({
'area': area,
'state': 'current',
'variant': 'awaiting_cert',
})
elif area == 'shipping':
timeline.append({'area': area, 'state': 'upcoming'})
else:
timeline.append({'area': area, 'state': 'done'})
job.mini_timeline_json = json.dumps(timeline)
continue
if job.state == 'awaiting_ship':
timeline = []
for area in _COLUMN_SEQUENCE:
if area == 'shipping':
timeline.append({
'area': area,
'state': 'current',
'variant': 'awaiting_ship',
})
else:
timeline.append({'area': area, 'state': 'done'})
job.mini_timeline_json = json.dumps(timeline)
continue
# Standard path - pre-existing logic.
active_area = (job.active_step_id.area_kind
if job.active_step_id else None)
timeline = []
for area in _COLUMN_SEQUENCE:
steps_in_area = job.step_ids.filtered(
lambda s: s.area_kind == area,
)
if not steps_in_area:
# Recipe doesn't visit this area - show as upcoming
# to keep visual alignment across cards
timeline.append({'area': area, 'state': 'upcoming'})
continue
if all(s.state in ('done', 'skipped') for s in steps_in_area):
timeline.append({'area': area, 'state': 'done'})
elif area == active_area:
timeline.append({
'area': area,
'state': 'current',
'variant': job.card_state or '',
})
else:
timeline.append({'area': area, 'state': 'upcoming'})
job.mini_timeline_json = json.dumps(timeline)
@api.depends(
'date_deadline',
'step_ids.state',
'step_ids.duration_expected',
)
def _compute_late_risk_ratio(self):
from datetime import datetime
for job in self:
if not job.date_deadline:
job.late_risk_ratio = 0.0
continue
open_steps = job.step_ids.filtered(
lambda s: s.state not in ('done', 'skipped', 'cancelled')
)
remaining_planned = sum(open_steps.mapped('duration_expected') or [0])
if remaining_planned <= 0:
job.late_risk_ratio = 0.0
continue
now = datetime.now()
# date_deadline is naive UTC in Odoo; compare directly
minutes_to_deadline = max(
1.0,
(job.date_deadline - now).total_seconds() / 60.0,
)
job.late_risk_ratio = remaining_planned / minutes_to_deadline
@api.depends('step_ids.state', 'step_ids.sequence')
def _compute_active_step_id(self):
"""Pick the "live" step - first match by priority then sequence.
Priority order:
in_progress > paused > ready > first pending
in_progress is the most informative (someone is actively
working on it). paused means someone was working and stopped -
the card belongs at that station so the next operator can
pick it up. ready is the next-up step waiting for an operator.
The first pending after a done step is the "next gate" -
where the card visually waits between steps.
Returns False only when every step is `done` (job finished)
or when there are no steps at all (recipe not assigned).
See spec 2026-05-24-shopfloor-live-step-fix-design.md Change 1.
"""
PRIORITY_STATES = ('in_progress', 'paused', 'ready', 'pending')
for job in self:
ordered = job.step_ids.sorted('sequence')
live = job.env['fp.job.step']
for state in PRIORITY_STATES:
live = ordered.filtered(lambda s: s.state == state)
if live:
break
job.active_step_id = live[:1].id if live else False
# ------------------------------------------------------------------
# Sub 14 - Configurable workflow state (status bar milestone)
# ------------------------------------------------------------------
# workflow_state_id auto-advances along the highest passed milestone
# in fp.job.workflow.state's sequence order. Replaces the hardcoded
# state Selection on the form's statusbar.
workflow_state_id = fields.Many2one(
'fp.job.workflow.state',
string='Workflow Stage',
compute='_compute_workflow_state_id',
store=True,
readonly=True,
help='Highest workflow milestone this job has passed, computed '
'from step states + per-state trigger conditions. Updates '
'automatically - the operator never sets it.',
)
@api.depends(
'state',
'step_ids',
'step_ids.state',
'step_ids.kind',
'step_ids.recipe_node_id',
'step_ids.recipe_node_id.default_kind',
'step_ids.recipe_node_id.triggers_workflow_state_id',
'quality_hold_count',
'delivery_id',
'delivery_id.state',
'sale_order_id',
'sale_order_id.x_fc_receiving_status',
)
def _compute_workflow_state_id(self):
WS = self.env['fp.job.workflow.state']
all_states = WS.search([], order='sequence, id')
for job in self:
passed = WS.browse()
for ws in all_states:
# Highest-passed semantics: untagged / not-applicable
# states don't block the cascade. The bar reflects
# the furthest milestone the job has actually reached.
if ws._fp_is_passed_for_job(job):
passed = ws
job.workflow_state_id = passed
# ------------------------------------------------------------------
# Smart-button counts (Feature A - operator workflow)
#
# Compute counts for each downstream model so the form view can
# render an oe_stat_button row similar to sale.order. Cross-module
# models are runtime-detected so this still works when one of the
# bridge modules is uninstalled.
# ------------------------------------------------------------------
sale_order_count = fields.Integer(compute='_compute_smart_counts')
delivery_count = fields.Integer(compute='_compute_smart_counts')
invoice_count = fields.Integer(compute='_compute_smart_counts')
payment_count = fields.Integer(compute='_compute_smart_counts')
quality_hold_count = fields.Integer(compute='_compute_smart_counts')
certificate_count = fields.Integer(compute='_compute_smart_counts')
timelog_count = fields.Integer(compute='_compute_smart_counts')
portal_job_count = fields.Integer(compute='_compute_smart_counts')
# ------------------------------------------------------------------
# Milestone cascade (Phase 1) - drives the header-button replacement
# that fires when every recipe step reaches a terminal state. See
# docs/superpowers/specs/2026-05-12-job-milestone-cascade-design.md.
# ------------------------------------------------------------------
all_steps_terminal = fields.Boolean(
compute='_compute_all_steps_terminal',
store=True,
help='True ⇔ at least one step exists AND every step is in '
'done/skipped/cancelled. Used to swap the per-step '
'Finish & Next button for a milestone-advance button.',
)
@api.depends('step_ids', 'step_ids.state')
def _compute_all_steps_terminal(self):
for job in self:
if not job.step_ids:
job.all_steps_terminal = False
else:
job.all_steps_terminal = all(
s.state in ('done', 'skipped', 'cancelled')
for s in job.step_ids
)
def _resolve_required_cert_types(self):
"""Set of cert types this job must produce.
Three-step resolution (spec 2026-05-27 - see
docs/superpowers/specs/2026-05-27-recipe-cert-toggles-design.md):
Step 1 - Start from partner + part flags. The existing logic,
extended to read 3 new orphan-type partner toggles
(Nadcap / Mill Test / Customer Specific).
Step 2 - Apply recipe suppression. Recipe-level requires_*
Booleans on fusion.plating.process.node REMOVE cert
types from the set but never add them. This is the
"passivation = no thickness even if customer asked"
case. Locked decision Q1: recipe suppresses only.
Step 3 - Bundling rule preserved. When CoC AND thickness are
both in the set, thickness collapses into the CoC PDF
as page 2 (see _fp_merge_thickness_into_pdf). The
returned set holds just {'coc'} in that case.
Field-existence guards on partner / recipe attribute reads
defend against installs where fusion_plating_certificates or
fusion_plating's latest schema bump hasn't landed yet -
matches the defensive pattern used elsewhere in this file.
"""
self.ensure_one()
# ---- Step 1 - partner + part baseline ----
req = (
self.part_catalog_id
and self.part_catalog_id.certificate_requirement
) or 'inherit'
if req == 'inherit':
wanted = set()
p = self.partner_id
if p:
if p.x_fc_send_coc:
wanted.add('coc')
if p.x_fc_send_thickness_report:
wanted.add('thickness_report')
# Three aerospace/defence partner toggles. Field guards
# let this module load even if fusion_plating_certificates
# is at an older version that pre-dates the new fields.
if ('x_fc_send_nadcap_cert' in p._fields
and p.x_fc_send_nadcap_cert):
wanted.add('nadcap_cert')
if ('x_fc_send_mill_test' in p._fields
and p.x_fc_send_mill_test):
wanted.add('mill_test')
if ('x_fc_send_customer_specific' in p._fields
and p.x_fc_send_customer_specific):
wanted.add('customer_specific')
else:
wanted = {
'none': set(),
'coc': {'coc'},
'coc_thickness': {'coc', 'thickness_report'},
}.get(req, {'coc'})
# ---- Step 2 - recipe suppression (suppress-only) ----
recipe = self.recipe_id
if recipe:
if ('requires_coc' in recipe._fields
and not recipe.requires_coc):
wanted.discard('coc')
if ('requires_thickness_report' in recipe._fields
and not recipe.requires_thickness_report):
wanted.discard('thickness_report')
if ('requires_nadcap_cert' in recipe._fields
and not recipe.requires_nadcap_cert):
wanted.discard('nadcap_cert')
if ('requires_mill_test' in recipe._fields
and not recipe.requires_mill_test):
wanted.discard('mill_test')
if ('requires_customer_specific' in recipe._fields
and not recipe.requires_customer_specific):
wanted.discard('customer_specific')
# ---- Step 3 - CoC + thickness bundling ----
# Thickness data is merged as page 2 of the CoC PDF by
# _fp_merge_thickness_into_pdf, so we return ONE cert
# instead of two. action_issue's thickness-data gate enforces
# actual readings or a Fischerscope PDF on the merged CoC.
if 'coc' in wanted and 'thickness_report' in wanted:
wanted.discard('thickness_report')
return wanted
next_milestone_action = fields.Selection(
[
('mark_done', 'Mark Job Done'),
('issue_certs', 'Issue Certs'),
('schedule_delivery', 'Schedule Delivery'),
('mark_shipped', 'Mark Shipped'),
('closed', 'Closed'),
],
compute='_compute_next_milestone_action',
help='What the manager should click next once steps complete. '
'Drives the milestone-advance buttons on the form header. '
'False/empty while steps are still running.',
)
next_milestone_label = fields.Char(
compute='_compute_next_milestone_action',
help='Human label for the next-action button.',
)
@api.depends(
'all_steps_terminal',
'state',
'delivery_id',
'delivery_id.state',
)
def _compute_next_milestone_action(self):
"""Resolve next action in priority order:
1. NOT all_steps_terminal → False (Finish & Next stays)
2. state != 'done' → mark_done
3. ANY required draft cert → issue_certs
4. NO delivery or draft → schedule_delivery
5. delivery scheduled/transit → mark_shipped
6. otherwise (delivered) → closed
"""
labels = dict(self._fields['next_milestone_action'].selection)
for job in self:
if not job.all_steps_terminal:
job.next_milestone_action = False
job.next_milestone_label = ''
continue
# New state machine (spec 2026-05-25). The auto-advance
# helper normally fires button_finish post-super, so we
# rarely see state='in_progress' here. When we do (e.g.
# historical jobs caught mid-migration, or jobs whose
# cert/delivery infra failed mid-transition), surface
# mark_done as a manual fallback.
if job.state == 'in_progress':
job.next_milestone_action = 'mark_done'
elif job.state == 'awaiting_cert':
job.next_milestone_action = 'issue_certs'
elif job.state == 'awaiting_ship':
job.next_milestone_action = 'mark_shipped'
elif job.state == 'done':
# Legacy path - historical jobs that closed before the
# new state machine landed. Preserve the old cascade
# so their milestone buttons keep working.
if job._fp_has_draft_required_certs():
job.next_milestone_action = 'issue_certs'
elif (not job.delivery_id
or job.delivery_id.state == 'draft'):
job.next_milestone_action = 'schedule_delivery'
elif job.delivery_id.state in ('scheduled', 'in_transit'):
job.next_milestone_action = 'mark_shipped'
else:
job.next_milestone_action = 'closed'
else:
job.next_milestone_action = False
job.next_milestone_label = labels.get(
job.next_milestone_action, ''
)
def _fp_has_draft_required_certs(self):
"""True if at least one cert of a required type is still 'draft'.
Returns False when no certs are required (commercial customers).
"""
self.ensure_one()
if 'fp.certificate' not in self.env:
return False
required = self._resolve_required_cert_types()
if not required:
return False
Cert = self.env['fp.certificate']
dom = [
('certificate_type', 'in', list(required)),
('state', '=', 'draft'),
]
if 'x_fc_job_id' in Cert._fields:
dom.append(('x_fc_job_id', '=', self.id))
elif self.sale_order_id and 'sale_order_id' in Cert._fields:
dom.append(('sale_order_id', '=', self.sale_order_id.id))
else:
return False # can't link safely → don't block the cascade
return bool(Cert.search_count(dom))
def action_advance_next_milestone(self):
"""Single entry point bound to all four milestone header buttons.
Branches on next_milestone_action and delegates to the existing
business-logic method. Never invents new logic - just routes."""
self.ensure_one()
action_map = {
'mark_done': self.button_mark_done,
'issue_certs': self._action_open_draft_certs,
'schedule_delivery': self._action_open_draft_delivery,
# Spec 2026-05-25: dispatch between the new state-machine
# path (state=awaiting_ship → button_mark_shipped) and the
# legacy delivery path (state=done + scheduled delivery).
'mark_shipped': self._action_mark_shipped_dispatch,
}
fn = action_map.get(self.next_milestone_action)
if not fn:
raise UserError(_(
'No milestone action available for job %(j)s '
'(next=%(a)s).'
) % {
'j': self.name,
'a': self.next_milestone_action or 'none',
})
return fn()
def _action_open_draft_certs(self):
"""Open the Issue Certs wizard for this job's draft certs.
The wizard prompts for a Fischerscope upload + readings per cert
that needs thickness data (bundled CoC or standalone thickness
report). Pure CoC certs (no thickness needed) appear in the
wizard too and just need a Confirm click. Cleaner than the old
"list view → open each cert → click Issue" flow.
Falls back to the cert list view if the wizard model isn't
installed (defensive - should always exist when this module is).
"""
self.ensure_one()
Wizard = self.env.get('fp.cert.issue.wizard')
if Wizard is not None:
try:
return Wizard.open_for_job(self)
except UserError:
raise
except Exception as e:
_logger.warning(
"Job %s: cert issue wizard failed (%s) - "
"falling back to cert list.", self.name, e,
)
return {
'type': 'ir.actions.act_window',
'name': _('Draft Certificates - %s') % self.name,
'res_model': 'fp.certificate',
'view_mode': 'list,form',
'domain': [
('x_fc_job_id', '=', self.id),
('state', '=', 'draft'),
],
'target': 'current',
}
def _action_open_draft_delivery(self):
"""Open the linked delivery if it's still in draft state.
Falls back to the delivery list filtered to this job's
delivery if the state isn't draft (defensive)."""
self.ensure_one()
if self.delivery_id and self.delivery_id.state == 'draft':
return {
'type': 'ir.actions.act_window',
'name': _('Schedule Delivery - %s') % self.name,
'res_model': 'fusion.plating.delivery',
'res_id': self.delivery_id.id,
'view_mode': 'form',
'target': 'current',
}
return {
'type': 'ir.actions.act_window',
'name': _('Deliveries - %s') % self.name,
'res_model': 'fusion.plating.delivery',
'view_mode': 'list,form',
'domain': [('job_ref', '=', self.name)],
'target': 'current',
}
def _action_mark_active_delivery_delivered(self):
"""Call action_mark_delivered on the linked delivery if it's
in scheduled / in_transit. Posts to job chatter on success."""
self.ensure_one()
if (not self.delivery_id
or self.delivery_id.state not in ('scheduled', 'in_transit')):
raise UserError(_(
'No scheduled or in-transit delivery to mark shipped '
'for %s.'
) % self.name)
self.delivery_id.action_mark_delivered()
self.message_post(body=_(
'Delivery %s marked shipped via milestone cascade.'
) % self.delivery_id.name)
return True
def _action_mark_shipped_dispatch(self):
"""Dispatch the milestone-cascade 'Mark Shipped' button to the
right handler based on job state. Spec 2026-05-25:
- awaiting_ship → button_mark_shipped (new state machine)
- done + active delivery → _action_mark_active_delivery_delivered
(legacy historical path)
"""
self.ensure_one()
if self.state == 'awaiting_ship':
return self.button_mark_shipped()
return self._action_mark_active_delivery_delivered()
@api.depends(
'sale_order_id', 'delivery_id', 'portal_job_id', 'step_ids',
'step_ids.time_log_ids', 'origin', 'partner_id',
)
def _compute_smart_counts(self):
AccountMove = self.env.get('account.move')
AccountPayment = self.env.get('account.payment')
QualityHold = self.env.get('fusion.plating.quality.hold')
Certificate = self.env.get('fp.certificate')
for job in self:
job.sale_order_count = 1 if job.sale_order_id else 0
job.delivery_count = 1 if job.delivery_id else 0
job.portal_job_count = 1 if job.portal_job_id else 0
# Invoices via origin (the SO name)
if AccountMove is not None and job.origin:
job.invoice_count = AccountMove.search_count([
('invoice_origin', '=', job.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
])
else:
job.invoice_count = 0
# Payments - find invoices for this SO, then payments
# reconciled against them.
if (AccountMove is not None and AccountPayment is not None
and job.origin):
inv_ids = AccountMove.search([
('invoice_origin', '=', job.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
]).ids
if inv_ids:
job.payment_count = AccountPayment.search_count([
('reconciled_invoice_ids', 'in', inv_ids),
])
else:
job.payment_count = 0
else:
job.payment_count = 0
if QualityHold is not None:
job.quality_hold_count = QualityHold.search_count([
('x_fc_job_id', '=', job.id),
])
else:
job.quality_hold_count = 0
if Certificate is not None:
job.certificate_count = Certificate.search_count([
('x_fc_job_id', '=', job.id),
])
else:
job.certificate_count = 0
job.timelog_count = sum(
len(s.time_log_ids) for s in job.step_ids
)
# ------------------------------------------------------------------
# Smart-button actions
# ------------------------------------------------------------------
def action_view_sale_order(self):
self.ensure_one()
if not self.sale_order_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'sale.order',
'res_id': self.sale_order_id.id,
'view_mode': 'form',
'name': self.sale_order_id.name,
}
# All time logs across every step on this job - backs the Time Logs
# tab on the form so the manager sees the full labour audit without
# clicking into each step.
time_log_ids = fields.One2many(
'fp.job.step.timelog',
'job_id',
string='All Time Logs',
readonly=True,
)
# 2026-04-28 - link to the auto-created Sub 8 racking inspection so
# the job form can show a smart button + the manager can route into
# the inspection without leaving the job screen.
racking_inspection_ids = fields.One2many(
'fp.racking.inspection',
'x_fc_job_id',
string='Racking Inspections',
)
racking_inspection_id = fields.Many2one(
'fp.racking.inspection',
string='Racking Inspection',
compute='_compute_racking_inspection',
store=False,
help='The single racking inspection scoped to this job (Sub 8 '
'enforces uniqueness). Smart button on the form routes here.',
)
# Computed alongside racking_inspection_id so views can render the
# state badge without needing a related-on-non-stored field (which
# the ORM rejects). Selection mirrors fp.racking.inspection.state.
racking_inspection_state = fields.Selection(
[('draft', 'Draft'),
('inspecting', 'Inspecting'),
('done', 'Done'),
('discrepancy_flagged', 'Discrepancy Flagged')],
string='Racking Inspection Status',
compute='_compute_racking_inspection',
store=False,
)
@api.depends('racking_inspection_ids', 'racking_inspection_ids.state')
def _compute_racking_inspection(self):
for job in self:
ri = job.racking_inspection_ids[:1]
job.racking_inspection_id = ri
job.racking_inspection_state = ri.state if ri else False
def action_view_racking_inspection(self):
"""Open the racking inspection. Auto-create if missing, or seed
lines from the SO if it exists but was created before line auto-
seeding shipped (the helper handles both cases idempotently)."""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
from odoo.exceptions import UserError
raise UserError(_(
'Sub 8 racking inspection module not installed. '
'Install fusion_plating_receiving to enable.'
))
# Always call the helper - it short-circuits for already-populated
# draft inspections and creates fresh ones when missing. This is
# also the entry point that backfills lines on inspections that
# pre-date the line-seeding feature.
self._fp_create_racking_inspection()
self.invalidate_recordset(['racking_inspection_ids'])
ri = self.racking_inspection_id or self.racking_inspection_ids[:1]
if not ri:
from odoo.exceptions import UserError
raise UserError(_('Could not auto-create racking inspection.'))
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.racking.inspection',
'res_id': ri.id,
'view_mode': 'form',
'target': 'current',
'name': _('Racking Inspection - %s') % self.name,
}
def action_view_steps(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step',
'view_mode': 'list,form',
'domain': [('job_id', '=', self.id)],
'name': 'Steps - %s' % self.name,
'context': {'default_job_id': self.id},
}
def action_open_workspace(self):
"""Open the JobWorkspace OWL client action focused on this job.
Spec: 2026-05-22-shopfloor-tablet-redesign - Phase 1 deliverable.
Used as the smart-button entry point before the Landing kanban
(Phase 3) is shipped, and stays as a back-office shortcut after.
"""
self.ensure_one()
return {
'type': 'ir.actions.client',
'tag': 'fp_job_workspace',
'name': self.display_wo_name or self.name,
'params': {'job_id': self.id},
'target': 'current',
}
def action_finish_current_step(self):
"""Steelhead-style header button: finish whatever's currently
in_progress and auto-start the next pending/ready step. If
nothing is running yet, start the lowest-sequence pending step
instead - operator's first click on a fresh job just begins
the line.
Sub 12e v4 - when button_start returns an action (e.g. the
QA-005 redirect for contract_review steps), propagate it so
the operator lands on the right page in ONE click instead of
two.
"""
self.ensure_one()
running = self.step_ids.filtered(lambda s: s.state == 'in_progress')[:1]
if running:
return running.action_finish_and_advance()
# No running step - kick off the first pending/ready one.
first = self.step_ids.filtered(
lambda s: s.state in ('pending', 'ready', 'paused')
).sorted('sequence')[:1]
if not first:
raise UserError(_(
'No runnable step found on this job - either every step '
'is done or the job is still in draft.'
))
result = first.with_context(
fp_skip_predecessor_check=True,
).button_start()
self.message_post(body=_(
'Started first step "%s".'
) % first.name)
# Propagate any action returned by button_start (e.g. the
# QA-005 redirect on a contract_review step). If it's just
# True/False (the normal case), fall back to True.
if isinstance(result, dict):
return result
return True
def action_open_move_wizard(self):
"""Original Move wizard - kept available for cross-station moves
and rework / scrap transfers. The simple "finish current → start
next" flow is now action_finish_current_step (header button).
Opens the wizard pre-filled with the currently in-progress (or
most recently in-progress) step as the from-step.
"""
self.ensure_one()
active_step = self.step_ids.filtered(
lambda s: s.state == 'in_progress'
)[:1]
if not active_step:
active_step = self.step_ids.filtered(
lambda s: s.state in ('paused', 'ready')
).sorted('sequence')[:1]
if not active_step:
raise UserError(_(
'No in-progress, paused, or ready step found on this job. '
'Either every step is done or the job is still in draft.'
))
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.move.wizard',
'view_mode': 'form',
'target': 'new',
'name': _('Move Step - %s') % active_step.name,
'context': {
'default_from_step_id': active_step.id,
'default_job_id': self.id,
},
}
def action_print_traveller(self):
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_traveller'
).report_action(self)
def action_print_sticker(self):
"""Print the 6x4" job-box identification sticker (logo + WO# + QR
+ part / customer / thickness / notes). Used at receiving and at
every move so the box is always identifiable on the floor."""
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_sticker'
).report_action(self)
def action_print_wo_detail(self):
"""Print the Steelhead-style Work Order Detail PDF - chronological
chain-of-custody + per-step inputs + Certified By page. Use this
as the AS9100/Nadcap shippable audit document.
"""
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_wo_detail'
).report_action(self)
def action_view_deliveries(self):
self.ensure_one()
if not self.delivery_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.delivery',
'res_id': self.delivery_id.id,
'view_mode': 'form',
'name': self.delivery_id.name,
}
def action_view_invoices(self):
self.ensure_one()
if not self.origin:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'account.move',
'view_mode': 'list,form',
'domain': [
('invoice_origin', '=', self.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
],
'name': 'Invoices - %s' % self.name,
}
def action_view_payments(self):
self.ensure_one()
if not self.origin:
return {'type': 'ir.actions.act_window_close'}
AccountMove = self.env.get('account.move')
if AccountMove is None:
return {'type': 'ir.actions.act_window_close'}
inv_ids = AccountMove.search([
('invoice_origin', '=', self.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
]).ids
return {
'type': 'ir.actions.act_window',
'res_model': 'account.payment',
'view_mode': 'list,form',
'domain': (
[('reconciled_invoice_ids', 'in', inv_ids)]
if inv_ids else [('id', '=', 0)]
),
'name': 'Payments - %s' % self.name,
}
def action_view_quality_holds(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.hold',
'view_mode': 'list,form',
'domain': [('x_fc_job_id', '=', self.id)],
'name': 'Quality Holds - %s' % self.name,
'context': {'default_x_fc_job_id': self.id},
}
def action_view_certificates(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.certificate',
'view_mode': 'list,form',
'domain': [('x_fc_job_id', '=', self.id)],
'name': 'Certificates - %s' % self.name,
'context': {'default_x_fc_job_id': self.id},
}
def action_view_timelogs(self):
self.ensure_one()
step_ids = self.step_ids.ids
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.timelog',
'view_mode': 'list,form',
'domain': (
[('step_id', 'in', step_ids)]
if step_ids else [('id', '=', 0)]
),
'name': 'Time Logs - %s' % self.name,
}
def action_view_portal_job(self):
self.ensure_one()
if not self.portal_job_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.portal.job',
'res_id': self.portal_job_id.id,
'view_mode': 'form',
'name': self.portal_job_id.name,
}
# Sub-portal state sync - see fusion_plating_portal/.../fp_portal_job.py
# `_fp_recompute_portal_state` for the rules. The mapping table that
# used to live here was replaced by the helper so shipment / invoice
# signals can't drift away from the WO state any more.
def write(self, vals):
"""Write hook: (a) when qty_scrapped INCREASES, auto-spawn a
fusion.plating.quality.hold for the scrapped delta - AS9100 /
Nadcap need a disposition record per scrap event. (b) when state
transitions, mirror to the linked fusion.plating.portal.job so
the customer-facing portal stays in sync with the shop floor.
Idempotent per write: one hold per increase event. Operator
fills hold_reason + description on the spawned record.
"""
from markupsafe import Markup as _Markup
scrap_deltas = {}
if 'qty_scrapped' in vals:
new = vals['qty_scrapped'] or 0
for job in self:
old = job.qty_scrapped or 0
if new > old:
scrap_deltas[job.id] = (old, new)
# Capture state changes before super().write() so we know which
# records actually transitioned (vs no-op writes).
state_changed_ids = set()
if 'state' in vals:
new_state = vals['state']
for job in self:
if job.state != new_state:
state_changed_ids.add(job.id)
result = super().write(vals)
# Mirror state to portal_job via the central recompute helper, so
# the portal state always derives from the WO + shipment + invoice
# together rather than the most-recent event flag.
if state_changed_ids:
for job in self.filtered(lambda j: j.id in state_changed_ids):
if job.portal_job_id:
job.portal_job_id._fp_recompute_portal_state()
if not scrap_deltas:
return result
Hold = (self.env['fusion.plating.quality.hold']
if 'fusion.plating.quality.hold' in self.env else None)
if Hold is None:
return result
Facility = self.env['fusion.plating.facility']
for job in self:
if job.id not in scrap_deltas:
continue
old, new = scrap_deltas[job.id]
delta = new - old
facility = job.facility_id or Facility.search([
('company_id', '=', job.company_id.id),
], limit=1) or Facility.search([], limit=1)
part_ref = (
job.part_catalog_id.part_number if job.part_catalog_id
else job.product_id.default_code or job.name
)
# When the scrap was bumped from the tablet, the operator
# was prompted for a reason and we passed it via context as
# `fp_scrap_reason` (see /fp/shopfloor/bump_qty_scrapped).
# Prepend that reason to the description so the audit row
# captures what the operator actually typed instead of the
# generic "OPERATOR: replace this text..." placeholder.
scrap_reason = self.env.context.get('fp_scrap_reason')
if scrap_reason:
description = _(
'Operator reason: %s\n\n'
'Auto-spawned from job %s scrap update by %s: '
'qty_scrapped went from %g to %g (delta %g).'
) % (scrap_reason, job.name, self.env.user.name, old, new, delta)
else:
description = _(
'Auto-spawned from job %s scrap update by %s: '
'qty_scrapped went from %g to %g (delta %g). '
'OPERATOR: replace this text with the actual '
'reason (drop / contamination / out-of-spec / etc).'
) % (job.name, self.env.user.name, old, new, delta)
try:
hold = Hold.create({
'job_id': job.id,
'part_ref': (part_ref or job.name)[:64],
'qty_on_hold': int(delta),
'qty_original': int(job.qty or 0),
'mark_for_scrap': True,
'hold_reason': 'other',
'description': description,
'facility_id': facility.id if facility else False,
})
job.message_post(body=_Markup(_(
'⚠️ Scrap auto-Hold spawned: <b>%s</b> for %g part(s). '
'Operator must update description with the cause.'
)) % (hold.name, delta))
except Exception as e:
_logger.warning(
'Job %s: failed to auto-spawn scrap hold: %s',
job.name, e,
)
return result
def action_sync_qty_from_so(self):
"""Pull the SO qty into the job's qty field after a mid-job
SO line edit. Posts chatter so the audit trail captures who
synced + what the previous value was.
Manual action because qty changes mid-job have physical-world
consequences (rack more parts, stop early, scrap excess) - the
supervisor must explicitly acknowledge by clicking the button.
"""
from markupsafe import Markup
for job in self:
if not job.sale_order_id:
continue
so_qty = sum(job.sale_order_id.order_line.mapped('product_uom_qty'))
old = job.qty
if abs(old - so_qty) < 0.0001:
continue
job.qty = so_qty
job.message_post(body=Markup(_(
'Job qty synced from SO by <b>%s</b>: %g%g%+g). '
'Operator: confirm physical scope matches.'
)) % (self.env.user.name, old, so_qty, so_qty - old))
return True
# ------------------------------------------------------------------
# Recipe → fp.job.step generation (Task 2.4)
#
# Native port of fusion_plating_bridge_mrp's
# _generate_workorders_from_recipe. Walks the recipe tree, creates
# one fp.job.step per 'operation' node, formats child 'step' nodes
# as step instructions on chatter, respects opt-in/out overrides
# from fp.job.node.override.
#
# Adaptations from the original:
# - Creates fp.job.step (not mrp.workorder)
# - Maps fusion.plating.work.center → fp.work.centre via code
# fallback (no forward link exists yet)
# - Uses native field names (job_id, work_centre_id, etc.)
# - Drops work_role_id (not on fp.job.step yet - Task 2.6+)
# - Drops _fp_autofill_default_equipment (not yet on step)
# ------------------------------------------------------------------
def _fp_resync_recipe_from_so(self):
"""Re-resolve the recipe from this job's SO line(s) and build its
steps. Heals work orders created before the recipe was set on the
SO line (new parts, or the copy-from-quote path): once the
estimator sets the process variant on the line, the WO can pull it
in. Acts only on jobs with NO steps yet and not terminal, so
in-progress work is never disturbed. Idempotent. Returns True when
steps were generated.
"""
self.ensure_one()
if self.step_ids or self.state in ('done', 'cancelled'):
return False
so = self.sale_order_id
if not so:
return False
recipe = False
for line in self.sale_order_line_ids:
recipe = so._fp_resolve_recipe_for_line(line)
if recipe:
break
if not recipe:
return False
if self.recipe_id != recipe:
self.recipe_id = recipe.id
# Mirror action_confirm's post-recipe sequence so a healed WO
# matches a normally-confirmed one (steps, ready, express text).
self._generate_steps_from_recipe()
pending = self.step_ids.filtered(lambda s: s.state == 'pending')
if pending:
pending.write({'state': 'ready'})
if (self.recipe_id and self.step_ids
and 'x_fc_masking_enabled' in self.env['sale.order.line']._fields):
for sol in self.sale_order_line_ids:
if hasattr(sol, '_fp_apply_express_overrides_to_job'):
sol._fp_apply_express_overrides_to_job(self)
self.message_post(body=_(
'Recipe re-synced from the sale order (%(recipe)s); %(n)d '
'step(s) generated.'
) % {'recipe': recipe.display_name, 'n': len(self.step_ids)})
return True
def action_fp_resync_recipe_from_so(self):
"""Header button: re-pull the recipe from the SO and build steps
for work orders that came out empty because the recipe was set on
the SO line after the WO was created. Safe and idempotent.
"""
healed = self.env['fp.job']
for job in self:
if job._fp_resync_recipe_from_so():
healed |= job
if not healed:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'warning',
'message': _('Nothing to re-sync: no resolvable recipe '
'on the sale order, or the job already has '
'steps.'),
'sticky': False,
},
}
return True
def _generate_steps_from_recipe(self):
"""Generate fp.job.step records from the assigned recipe.
Walks the recipe tree, creates one step per 'operation' node,
and formats child 'step' nodes as step instructions on the
chatter. Respects opt-in/out overrides from override_ids.
"""
Step = self.env['fp.job.step']
Node = self.env['fusion.plating.process.node']
for job in self:
if not job.recipe_id:
continue # No recipe assigned
if job.step_ids:
continue # Steps already exist - don't duplicate
# Build lookup of overrides keyed by node ID
override_map = {ov.node_id.id: ov.included for ov in job.override_ids}
# Start-at-node: if set, the allowed set is the union of:
# 1. start_node and all its descendants
# 2. each ancestor of start_node
# 3. at each ancestor level, any LATER-sequence sibling and
# all of its descendants
start_node = job.start_at_node_id
allowed_ids = None # None = include everything
if start_node:
descendants = Node.search([('id', 'child_of', start_node.id)])
allowed_ids = set(descendants.ids)
cur = start_node
while cur.parent_id:
parent = cur.parent_id
allowed_ids.add(parent.id)
later_sibs = parent.child_ids.filtered(
lambda n: n.sequence > cur.sequence
)
for sib in later_sibs:
sib_descendants = Node.search([
('id', 'child_of', sib.id),
])
allowed_ids |= set(sib_descendants.ids)
cur = parent
step_vals_list = []
wo_steps = {} # {sequence: instruction text}
seq_counter = [10]
def _is_node_included(node):
"""Determine if a node should be included based on
opt-in/out logic, per-job overrides, and start-at-node
filter.
Override map (per-job override rows) ALWAYS wins
regardless of the node's opt_in_out setting. This is the
escape hatch the Express Orders flow relies on to opt
out of 'disabled' (mandatory) nodes like masking + bake.
Without this, overrides on disabled nodes were silently
ignored.
"""
nid = node.id
if allowed_ids is not None and nid not in allowed_ids:
return False
# Explicit per-job override wins over any default behaviour.
if nid in override_map:
return override_map[nid]
opt = node.opt_in_out or 'disabled'
if opt == 'disabled':
return True
if opt == 'opt_in':
return False # Default excluded
return True # opt_out → default included
def _resolve_work_centre(legacy_wc):
"""Map fusion.plating.work.center → fp.work.centre.
The legacy work-centre model does not (yet) have a forward
link to the new fp.work.centre. Try a forward link
(x_fc_fp_work_centre_id) if some bridge module added one;
otherwise fall back to a code lookup.
"""
if not legacy_wc:
return self.env['fp.work.centre']
# Forward link, if any
if (
'x_fc_fp_work_centre_id' in legacy_wc._fields
and legacy_wc.x_fc_fp_work_centre_id
):
return legacy_wc.x_fc_fp_work_centre_id
# Code fallback (legacy code is unique-per-facility,
# native code is globally unique - first match wins)
if legacy_wc.code:
found = self.env['fp.work.centre'].search(
[('code', '=', legacy_wc.code)], limit=1,
)
if found:
return found
return self.env['fp.work.centre']
def walk_node(node):
if not _is_node_included(node):
return
if node.node_type == 'operation':
work_centre = _resolve_work_centre(node.work_center_id)
if not work_centre:
_logger.warning(
'Job %s: operation "%s" has no mapped fp.work.centre - '
'creating step without work centre.',
job.name, node.name,
)
# Collect step instructions from child 'step' nodes
instructions = []
step_num = 1
for child in node.child_ids.sorted('sequence'):
if child.node_type == 'step' and _is_node_included(child):
line = '%d. %s' % (step_num, child.name)
if child.estimated_duration:
line += ' (%.0f min)' % child.estimated_duration
instructions.append(line)
step_num += 1
# Map recipe_node.default_kind → step.kind so the
# downstream gates (Sub 8 racking soft-gate, Policy B
# contract-review gate) work even when the step gets
# renamed by the customer (e.g. "Hang on Bar" instead
# of "Racking"). Without this, gate detection falls
# back to fragile name matching.
_NODE_KIND_TO_STEP_KIND = {
'cleaning': 'wet',
'etch': 'wet',
'rinse': 'wet',
'plate': 'wet',
'dry': 'wet',
'wbf_test': 'wet',
'bake': 'bake',
'mask': 'mask',
'demask': 'mask',
'racking': 'rack',
'derack': 'rack',
'inspect': 'inspect',
'final_inspect': 'inspect',
'contract_review': 'other',
'gating': 'other',
'ship': 'other',
}
step_kind = 'other'
node_kind = (
node.default_kind
if 'default_kind' in node._fields else None
)
if node_kind and node_kind in _NODE_KIND_TO_STEP_KIND:
step_kind = _NODE_KIND_TO_STEP_KIND[node_kind]
vals = {
'job_id': job.id,
'name': node.name,
'work_centre_id': work_centre.id if work_centre else False,
'duration_expected': node.estimated_duration or 0.0,
'sequence': seq_counter[0],
'recipe_node_id': node.id,
'kind': step_kind,
}
if node.estimated_duration:
vals['dwell_time_minutes'] = node.estimated_duration
# Pull thickness target from the recipe root when this
# is a plating step (matched by node name keyword).
# Recipe-root carries thickness fields post-promote-spec.
recipe_root = job.recipe_id
name_l = (node.name or '').lower()
is_plating_node = (
'plat' in name_l or 'nickel' in name_l
or 'chrome' in name_l or 'anodiz' in name_l
)
if recipe_root and is_plating_node:
if (
'thickness_max' in recipe_root._fields
and recipe_root.thickness_max
):
vals['thickness_target'] = recipe_root.thickness_max
if (
'thickness_uom' in recipe_root._fields
and recipe_root.thickness_uom
):
# Recipe uses long-form uom names (mils /
# microns / inches); fp.job.step uses short
# codes (mil / um / inch). Map between them.
_UOM_MAP = {
'mils': 'mil',
'mil': 'mil',
'microns': 'um',
'micron': 'um',
'um': 'um',
'inches': 'inch',
'inch': 'inch',
'in': 'inch',
}
mapped = _UOM_MAP.get(recipe_root.thickness_uom)
if mapped:
vals['thickness_uom'] = mapped
step_vals_list.append(vals)
if instructions:
wo_steps[seq_counter[0]] = '\n'.join(instructions)
seq_counter[0] += 10
elif node.node_type in ('recipe', 'sub_process'):
for child in node.child_ids.sorted('sequence'):
walk_node(child)
# 'step' nodes at top level are handled by their parent operation
# Walk from recipe root
walk_node(job.recipe_id)
# Bulk create
if step_vals_list:
created = Step.create(step_vals_list)
for step in created:
instr_text = wo_steps.get(step.sequence)
if instr_text:
step.message_post(
body=Markup(
'<b>Recipe steps:</b><br/><pre>%s</pre>'
) % instr_text,
subtype_xmlid='mail.mt_note',
)
job.message_post(
body=('%d steps generated from recipe "%s".') % (
len(step_vals_list), job.recipe_id.name,
),
)
# Rule 4 - repeat-order contract-review auto-complete.
# Runs after step creation so the contract-review step shows
# as already done on the operator's first view of the job.
job._fp_autocomplete_repeat_order_contract_review()
return True
def _fp_autocomplete_repeat_order_contract_review(self):
"""Rule 4 of the contract-review flow - when a job's part already
carries a complete fp.contract.review (i.e. the part has been
through QA-005 on a prior order), mark every contract-review
step in this job's recipe as 'done' immediately on job creation.
Copies the reviewer identity + timestamp from the review's
Section 3.0 sign-off (falling back to Section 2.0) so the Print
WO Detail report shows the original audit trail - Reviewer
initials, date reviewed, "QA-005 Approved" - not the operator
who would have hit Finish.
Skips:
* jobs whose part has no contract review or it isn't complete
(rule 5 still applies - the WO step gate will block finish)
* steps not detected as contract-review steps via
fp.job.step._fp_is_contract_review_step
* steps already in a terminal state (defensive idempotency)
"""
for job in self:
part = (
('part_catalog_id' in job._fields and job.part_catalog_id)
or False
)
if not part:
continue
review = (
('x_fc_contract_review_id' in part._fields
and part.x_fc_contract_review_id)
or False
)
if not review or review.state != 'complete':
continue
signer = review.s30_signed_by or review.s20_signed_by
signed_at = review.s30_signed_date or review.s20_signed_date
if not signer or not signed_at:
continue
steps_to_complete = job.step_ids.filtered(
lambda s: s.state not in ('done', 'skipped', 'cancelled')
and s._fp_is_contract_review_step()
)
if not steps_to_complete:
continue
steps_to_complete.write({
'state': 'done',
'started_by_user_id': signer.id,
'finished_by_user_id': signer.id,
'date_started': signed_at,
'date_finished': signed_at,
})
for step in steps_to_complete:
step.message_post(body=_(
'Contract Review step auto-completed from existing '
'QA-005 for %(part)s. Reviewer: %(user)s on %(date)s.'
) % {
'part': part.display_name or part.part_number or '',
'user': signer.name,
'date': fields.Datetime.to_string(signed_at),
})
return True
# ------------------------------------------------------------------
# UI - Process Tree client action (Phase 6)
# ------------------------------------------------------------------
def action_open_process_tree(self):
"""Open the OWL process-tree visualization for this job.
Launches the fp_process_tree client action (defined in
fusion_plating_shopfloor) with job_id in context. The component
fetches /fp/shopfloor/process_tree and renders the recipe ->
sub_process -> operation hierarchy as cards with per-step state
badges.
Consolidated 2026-04-24: this points at the canonical shopfloor
client action; the parallel fp_job_process_tree was removed.
"""
self.ensure_one()
return {
'type': 'ir.actions.client',
'tag': 'fp_process_tree',
'context': {
'job_id': self.id,
'back_job_id': self.id,
},
'name': 'Process Tree - %s' % (self.name or ''),
'target': 'current',
}
# ------------------------------------------------------------------
# Lifecycle hooks (Tasks 2.6, 2.7, 2.8)
#
# On confirm: create the portal-job mirror record and (when the
# customer requires QC) a fusion.plating.quality.check.
# On done: create a draft fusion.plating.delivery and best-effort
# trigger fp.certificate auto-generation.
#
# The QC and certificate models live in modules this module does NOT
# depend on by design (bridge_mrp). We runtime-detect those models so
# the hooks degrade gracefully when those modules are absent.
# ------------------------------------------------------------------
def action_confirm(self):
result = super().action_confirm()
# During migration, lifecycle side-effects are skipped - the
# migration script directly rebinds existing portal/QC/inspection
# records via x_fc_job_id. See scripts/migrate_to_fp_jobs.py.
if self.env.context.get('fp_jobs_migration'):
return result
for job in self:
# Auto-generate steps from the recipe - was previously only
# called by seed scripts, which meant real-life confirmed
# jobs sat with zero operations. Idempotent: the generator
# short-circuits when steps already exist.
if job.recipe_id and not job.step_ids:
job._generate_steps_from_recipe()
# Promote freshly-generated 'pending' steps to 'ready' so the
# operator has a Start button when they open the job. Without
# this the floor stalls - every step is parked in pending with
# no UI affordance to move it forward.
pending_steps = job.step_ids.filtered(
lambda s: s.state == 'pending'
)
if pending_steps:
pending_steps.write({'state': 'ready'})
# Express Orders (2026-05-26) - second pass of the override
# helper. The first pass (in sale_order._fp_auto_create_job)
# creates override rows. Now that steps exist (just generated
# above), this pass writes Express bake_instructions to the
# bake step's `instructions` field. Idempotent: pre-deletes
# prior masking/bake overrides then recreates identical rows.
if (job.recipe_id and job.step_ids
and 'x_fc_masking_enabled' in self.env['sale.order.line']._fields):
for sol in job.sale_order_line_ids:
if hasattr(sol, '_fp_apply_express_overrides_to_job'):
sol._fp_apply_express_overrides_to_job(job)
# 2026-04-28 - auto-populate facility_id + manager_id so the
# job header surfaces them on the form. Page-1 audit found
# both empty on confirmed jobs.
job._fp_autofill_facility_and_manager()
job._fp_create_portal_job()
job._fp_create_qc_check_if_needed()
job._fp_create_racking_inspection()
job._fp_fire_notification('job_confirmed')
return result
def _fp_autofill_facility_and_manager(self):
"""Populate facility_id + manager_id on confirm if empty.
Resolution order:
facility_id -
1. Already set → leave alone.
2. First step with a work_centre that has a facility → use it.
3. Recipe's process_type → facility (if process_type carries one).
4. Single-facility company → use that one.
manager_id -
1. Already set → leave alone.
2. Confirming user IS in the Plating Manager group → use them.
3. Sale order user_id (the salesperson who confirmed the SO).
4. The customer's account manager (partner.user_id).
5. Leave blank - no sensible default.
"""
self.ensure_one()
# ---- facility_id ----
if not self.facility_id:
facility = False
for s in self.step_ids:
if s.work_centre_id and 'facility_id' in s.work_centre_id._fields:
facility = s.work_centre_id.facility_id
if facility:
break
if not facility and self.recipe_id and 'process_type_id' in self.recipe_id._fields:
pt = self.recipe_id.process_type_id
if pt and 'facility_id' in pt._fields:
facility = pt.facility_id
if not facility:
Facility = self.env.get('fusion.plating.facility')
if Facility is not None:
facilities = Facility.search([
('company_id', '=', self.company_id.id),
])
if len(facilities) == 1:
facility = facilities
if facility:
self.facility_id = facility.id
self.message_post(body=_(
'Facility auto-set on confirm: %s'
) % facility.display_name)
# ---- manager_id ----
if not self.manager_id:
mgr = False
ManagerGroup = self.env.ref(
'fusion_plating.group_fusion_plating_manager',
raise_if_not_found=False,
)
if ManagerGroup and self.env.user in ManagerGroup.user_ids:
mgr = self.env.user
elif self.sale_order_id and self.sale_order_id.user_id:
mgr = self.sale_order_id.user_id
elif self.partner_id and self.partner_id.user_id:
mgr = self.partner_id.user_id
if mgr:
self.manager_id = mgr.id
self.message_post(body=_(
'Plating Manager auto-set on confirm: %s'
) % mgr.name)
def _fp_create_racking_inspection(self):
"""Auto-create a draft racking inspection on job confirm.
Phase 9 - production_id is now optional on fp.racking.inspection,
so we always create one bound by `x_fc_job_id`. When the job is
also linked to an MO (legacy bridge_mrp coexistence), populate
production_id too so legacy reports keep working.
Idempotent - if an inspection already exists for this job, skip.
Either way the inspection's lines are seeded from the SO's
plating order lines so the racker walks into a pre-populated
checklist instead of an empty form.
"""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
return
Inspection = self.env['fp.racking.inspection'].sudo()
if 'x_fc_job_id' not in Inspection._fields:
# Schema not yet upgraded - skip.
return
existing = Inspection.search([
('x_fc_job_id', '=', self.id),
], limit=1)
if existing:
# Self-heal: pre-existing inspections from before line seeding
# was added show up empty. Top them up now if still empty +
# the inspection isn't already finalised (don't rewrite history).
if not existing.line_ids and existing.state == 'draft':
self._fp_seed_racking_lines(existing)
return
# Phase 6 (Sub 11) - production_id retired; bind by x_fc_job_id only.
vals = {'x_fc_job_id': self.id}
try:
insp = Inspection.create(vals)
self._fp_seed_racking_lines(insp)
except Exception as e:
_logger.warning(
"Job %s: failed to auto-create racking inspection: %s",
self.name, e,
)
def _fp_seed_racking_lines(self, inspection):
"""Populate the inspection with one line per SO plating order line.
Walks sale_order_line_ids (the M2M of SO lines tied to this job),
falling back to the linked SO's order_line. Each line carries the
part_catalog and the quoted qty as the expected count - the
racker confirms or amends on the floor.
"""
self.ensure_one()
if not inspection or inspection.line_ids:
return
Line = self.env['fp.racking.inspection.line'].sudo()
# Source preference: explicit M2M of plating lines bound to this
# job (fast-order multi-part jobs), falling back to the SO header.
so_lines = self.sale_order_line_ids
if not so_lines and self.sale_order_id:
so_lines = self.sale_order_id.order_line
plating_lines = so_lines.filtered(
lambda l: l.x_fc_part_catalog_id and not l.display_type
)
if not plating_lines:
return
seq = 10
for sol in plating_lines:
try:
Line.create({
'inspection_id': inspection.id,
'sequence': seq,
'part_catalog_id': sol.x_fc_part_catalog_id.id,
'qty_expected': int(sol.product_uom_qty or 0),
'condition': 'ok',
})
except Exception as e:
_logger.warning(
"Job %s: failed to seed racking line for SO line %s: %s",
self.name, sol.id, e,
)
seq += 10
def _fp_create_portal_job(self):
"""Create the fusion.plating.portal.job mirror record.
Seeded with 'received' then handed to
`fusion.plating.portal.job._fp_recompute_portal_state` - that
helper is the single source of truth for portal state and
derives it from the WO + shipment + invoice signals, so a
catch-up create on an already-in-progress job lands on the
right state rather than stuck on 'received'.
"""
self.ensure_one()
if self.portal_job_id:
return # already exists - idempotent
Portal = self.env['fusion.plating.portal.job'].sudo()
portal = Portal.create({
'name': self.name,
'partner_id': self.partner_id.id,
'state': 'received',
'x_fc_job_id': self.id,
})
self.portal_job_id = portal.id
if hasattr(portal, '_fp_recompute_portal_state'):
portal._fp_recompute_portal_state()
def _fp_create_qc_check_if_needed(self):
"""If customer has x_fc_requires_qc=True, spawn a QC check via
the canonical fp.quality.check.create_for_job() entry point.
Sub 11 - model relocated from bridge_mrp to fusion_plating_quality.
create_for_job resolves the template (customer-specific or default),
clones every template line, returns an existing record if one is
already open, and posts a chatter trail.
"""
self.ensure_one()
partner = self.partner_id
wants_qc = (
'x_fc_requires_qc' in partner._fields
and partner.x_fc_requires_qc
)
if not wants_qc:
return
if 'fusion.plating.quality.check' not in self.env:
return
QC = self.env['fusion.plating.quality.check']
try:
QC.create_for_job(self)
except Exception as e:
# F7 - surface silent failures on the job's chatter so the
# operator sees the gap and creates the QC manually. Logging
# to /var/log/odoo/odoo-server.log alone meant nobody noticed
# (2CM's WH/JOB/00002 silently lost its QC check this way).
_logger.warning(
"Job %s: create_for_job failed: %s", self.name, e,
)
self.message_post(body=_(
'QC check auto-create failed: %(e)s. '
'Create the QC check manually from the Quality menu.'
) % {'e': e})
# ------------------------------------------------------------------
# button_mark_done - Task 2.8
# ------------------------------------------------------------------
def button_mark_done(self):
"""Transition the job to 'done' and trigger downstream side effects.
- Blocks if any step is not done/skipped (manager bypass via
context key `fp_skip_step_gate=True`). Compliance: AS9100 /
Nadcap require evidence that every recipe step ran. Without
this guard an operator could close a job with zero work.
- Blocks if customer requires QC and the QC check isn't passed
(manager bypass via context key `fp_skip_qc_gate=True`)
- Sets state='done', date_finished=now
- Auto-creates a draft fusion.plating.delivery
- Triggers certificate auto-generation (best-effort)
"""
# During migration, side-effects are skipped - see action_confirm.
skip_side_effects = self.env.context.get('fp_jobs_migration')
skip_qc_gate = self.env.context.get('fp_skip_qc_gate')
skip_step_gate = self.env.context.get('fp_skip_step_gate')
QC = self.env['fusion.plating.quality.check'] \
if 'fusion.plating.quality.check' in self.env else None
for job in self:
if job.state == 'done':
continue
if job.state == 'cancelled':
raise UserError(
"Job %s is cancelled - cannot mark done." % job.name
)
# Step-completion gate: every step must be done (or explicitly
# skipped, once button_skip is implemented). Without this
# guard operators can close a recipe-driven job with zero
# actual work logged. Manager bypass via context.
if not skip_step_gate and job.step_ids:
# `skipped` and `cancelled` count as terminal - operator
# explicitly opted those out (skipped) or killed them
# (cancelled). Only steps still in pending/ready/in_progress/
# paused block job close.
undone = job.step_ids.filtered(
lambda s: s.state not in ('done', 'skipped', 'cancelled')
)
if undone:
raise UserError(_(
"Job %s cannot be marked Done - %d/%d step(s) "
"are not finished:\n %s\n\nWalk each step on "
"the tablet (or skip / cancel opt-in steps)."
) % (
job.name, len(undone), len(job.step_ids),
'\n '.join(
f'#{s.sequence} {s.name} ({s.state})'
for s in undone[:5]
),
))
# Bake-window gate (compliance - AS9100 / Nadcap): if any
# auto-spawned bake.window is still awaiting_bake OR
# bake_in_progress, the bake hasn't been documented and
# parts cannot ship. Without this guard a careless
# operator closes the job, parts ship, three weeks later
# a field failure surfaces and the auditor asks for the
# bake record that doesn't exist. Manager bypass via
# fp_skip_bake_gate=True for documented customer deviation.
skip_bake_gate = self.env.context.get('fp_skip_bake_gate')
BW = (self.env['fusion.plating.bake.window']
if 'fusion.plating.bake.window' in self.env else None)
if not skip_bake_gate and BW is not None:
pending_bw = BW.sudo().search([
('part_ref', '=', job.name),
('state', 'in', ('awaiting_bake', 'bake_in_progress')),
])
if pending_bw:
raise UserError(_(
"Job %s cannot be marked Done - bake window "
"still pending:\n %s\n\nBake hydrogen "
"embrittlement relief on the parts (start + "
"end the bake on the bake.window record), then "
"close the job. Manager override available for "
"documented customer deviation."
) % (
job.name,
'\n '.join(
f'{bw.name} (state={bw.state}, '
f'required_by={bw.bake_required_by})'
for bw in pending_bw[:5]
),
))
# Qty reconciliation gate: qty_done + qty_scrapped must
# equal qty when the job closes. Without this an operator
# can ship "5 of 5" while only 4 are actually plated +
# 1 contaminated, with no record of the missing piece.
# Manager bypass via fp_skip_qty_reconcile=True (e.g. when
# qty tracking truly doesn't apply).
skip_qty_gate = self.env.context.get('fp_skip_qty_reconcile')
if not skip_qty_gate and job.qty:
# Smooth the typical "clean close" case so the operator
# doesn't have to manually type qty_done = ordered_qty
# every time. Conditions for safe auto-fill:
# - operator has NOT recorded any scrap or done qty
# (so we're not overriding their explicit entry)
# - the receiving closed with matching qty (parts
# physically came in as expected)
# - no visual-inspection rejects recorded
# When any of those fail, fall through to the gate so
# the operator reconciles by hand. Mirrors the receiving
# `_update_job_qty_received` pattern: server fills the
# obvious default, operator owns the edge cases.
# Partial-order handling (2026-06-02): surface scrap that
# was recorded through the Move log (transfer_type='scrap')
# into qty_scrapped, so the reconciliation + cert qty stay
# honest even when scrap was done from the tablet Move
# dialog rather than the qty_scrapped field. Only when the
# field hasn't been set by hand.
scrap_moves = job._fp_scrapped_via_moves()
if scrap_moves and not job.qty_scrapped:
job.qty_scrapped = scrap_moves
# Clean-close auto-fill: derive the good (done) count from
# what physically came in minus scrap, instead of blindly
# assuming the whole order completed (which over-counts when
# parts were scrapped mid-line). Skips when the operator
# already typed qty_done, or when visual rejects make the
# split non-obvious - then the gate below makes them
# reconcile by hand.
if (not job.qty_done
and not (job.qty_visual_inspection_rejects or 0)
and job.qty_received
and abs(job.qty_received - job.qty) < 0.0001):
job.qty_done = job.qty - (job.qty_scrapped or 0)
accounted = (job.qty_done or 0) + (job.qty_scrapped or 0)
if abs(accounted - job.qty) > 0.0001:
raise UserError(_(
"Job %s qty mismatch - ordered %g, but qty_done "
"(%g) + qty_scrapped (%g) = %g. Update Quantity "
"Completed and Quantity Scrapped on the job "
"header so they sum to %g before closing."
) % (
job.name, job.qty, job.qty_done or 0,
job.qty_scrapped or 0, accounted, job.qty,
))
# Receiving reconciliation: parts must be physically
# received before the job can close, and the count must
# match what came out (done + scrapped + visual rejects).
# Without this guard a job ships with the wrong cert qty,
# or worse, with no closed receiving for the auditor to
# trace back to. Same bypass flag covers both checks.
if not job.qty_received:
raise UserError(_(
"Job %s cannot be marked Done - Quantity Received "
"is blank. Close the receiving record for SO %s "
"before completing this job."
) % (
job.name,
job.sale_order_id.name if job.sale_order_id else '?',
))
rejects = job.qty_visual_inspection_rejects or 0
accounted_out = (
(job.qty_done or 0)
+ (job.qty_scrapped or 0)
+ rejects
)
if abs(job.qty_received - accounted_out) > 0.0001:
raise UserError(_(
"Job %s qty mismatch - received %g, but qty_done "
"(%g) + qty_scrapped (%g) + visual rejects (%g) "
"= %g. Reconcile before closing."
) % (
job.name, job.qty_received,
job.qty_done or 0, job.qty_scrapped or 0,
rejects, accounted_out,
))
# QC gate: customers flagged x_fc_requires_qc must have a
# passed QC before the job closes. AS9100 / Nadcap compliance.
if QC and not skip_qc_gate \
and 'x_fc_requires_qc' in job.partner_id._fields \
and job.partner_id.x_fc_requires_qc:
blocking_qc = QC.search([
('job_id', '=', job.id),
('state', 'not in', ('passed',)),
], order='create_date desc', limit=1)
if blocking_qc:
raise UserError(_(
"Job %s cannot be marked Done - QC check %s is in "
"state '%s'. Pass the QC checklist first, or have "
"a manager override via the bypass button."
) % (job.name, blocking_qc.name, blocking_qc.state))
# No QC at all? Spawn one now (idempotent) and require
# the operator to walk it before retrying.
no_qc = not QC.search_count([('job_id', '=', job.id)])
if no_qc:
QC.create_for_job(job)
raise UserError(_(
"Job %s requires QC. A new check has been created - "
"complete it before marking the job Done."
) % job.name)
# When called as a gate-check from fp.job.step.button_finish
# (per spec 2026-05-25 D12), exit BEFORE flipping state -
# the post-shop advance helper handles the actual transition.
if self.env.context.get('fp_check_gates_only'):
continue
job.state = 'done'
job.date_finished = fields.Datetime.now()
if not skip_side_effects:
job._fp_create_delivery()
job._fp_create_certificates()
job._fp_fire_notification('job_complete')
return True
def button_mark_shipped(self):
"""Manual transition awaiting_ship → done. Operator-facing
button on the job form; restricted to Manager / Owner via
groups= on the view button.
Does NOT re-run the bake/qty/QC gates - those passed when the
job first transitioned out of in_progress. This is just the
"yes, shipped" stamp.
Future hook: delivery.action_mark_delivered will call this
automatically - out of scope for this iteration (spec 2026-05-25).
"""
for job in self:
if job.state != 'awaiting_ship':
raise UserError(_(
'Job %s cannot be marked Shipped - state is "%s" '
'(expected "awaiting_ship").'
) % (job.name, job.state))
job.state = 'done'
job.date_finished = fields.Datetime.now()
job._fp_fire_notification('job_shipped')
job.message_post(body=_(
'Marked shipped by %s.'
) % self.env.user.name)
return True
# ------------------------------------------------------------------
# Order-level ship readiness (tablet receiving+shipping, 2026-05-29)
#
# An order can split into several jobs (one per part/recipe) but has
# ONE outbound shipment (the physical boxes). Spec D4 = "ship
# together": the order can ship only when EVERY active job on it is
# awaiting_ship or done, with at least one awaiting_ship to act on.
# Both the tablet endpoints and /fp/workspace/load read this.
# ------------------------------------------------------------------
def _fp_order_ship_state(self):
"""Return ship-readiness for the whole order this job belongs to.
{ready, not_ready:[{wo_name, state_label}], awaiting_ship_jobs,
order_jobs, order_receiving}
Runs in the caller's env: call on a sudo job for display, on a
user job (the tablet tech) when you want real write attribution.
"""
self.ensure_one()
empty_job = self.browse()
empty_rcv = (self.env['fp.receiving'].browse()
if 'fp.receiving' in self.env else empty_job)
so = self.sale_order_id
if not so:
return {'ready': False, 'not_ready': [],
'awaiting_ship_jobs': empty_job, 'order_jobs': self,
'order_receiving': empty_rcv}
jobs = self.search([
('sale_order_id', '=', so.id),
('state', '!=', 'cancelled'),
])
not_ready = jobs.filtered(lambda j: j.state not in ('awaiting_ship', 'done'))
awaiting = jobs.filtered(lambda j: j.state == 'awaiting_ship')
ready = bool(jobs) and not not_ready and bool(awaiting)
state_sel = dict(self._fields['state'].selection)
rcv = empty_rcv
if 'fp.receiving' in self.env:
rcv = self.env['fp.receiving'].search(
[('sale_order_id', '=', so.id)], order='id desc', limit=1)
return {
'ready': ready,
'not_ready': [{'wo_name': j.display_wo_name,
'state_label': state_sel.get(j.state, j.state)}
for j in not_ready],
'awaiting_ship_jobs': awaiting,
'order_jobs': jobs,
'order_receiving': rcv,
}
def _fp_mark_order_shipped(self):
"""Mark every awaiting_ship job on the order as shipped (done).
Gated on _fp_order_ship_state['ready']; raises UserError naming
the unfinished jobs otherwise. Returns the recordset marked.
"""
self.ensure_one()
info = self._fp_order_ship_state()
if not info['ready']:
names = ', '.join(n['wo_name'] for n in info['not_ready']) or _('none')
raise UserError(_(
'Cannot ship yet - these jobs on the order are not '
'finished: %s'
) % names)
awaiting = info['awaiting_ship_jobs']
awaiting.button_mark_shipped()
return awaiting
# ------------------------------------------------------------------
# Notifications dispatch (Phase 4)
#
# Fires fp.notification.template records whose trigger_event matches
# the given event name. Best-effort: silently skips if the
# fusion_plating_notifications module is not installed (model not
# registered) and logs (without raising) on any send failure so the
# job lifecycle is never blocked by an email problem.
# ------------------------------------------------------------------
def _fp_fire_notification(self, event):
"""Best-effort notification dispatch for fp.job lifecycle events.
Looks up fp.notification.template records with the matching
trigger_event and dispatches via the central _dispatch helper
provided by fusion_plating_notifications. Silently no-ops when
that module isn't installed.
"""
self.ensure_one()
if 'fp.notification.template' not in self.env:
return
Template = self.env['fp.notification.template'].sudo()
try:
# The notifications module exposes a model-level _dispatch
# helper that handles template lookup, recipient resolution
# (Sub 6 contact routing), attachment rendering, and audit
# logging in one go. Pass partner explicitly since fp.job's
# partner_id is the customer.
Template._dispatch(event, self, partner=self.partner_id)
except Exception as e:
# F7 - surface on chatter. A missed customer notification
# (e.g. "your parts have shipped") is invisible to the
# operator until the customer complains; the chatter post
# gives accounting / sales a recoverable signal.
_logger.warning(
"Job %s: notification %s dispatch failed: %s",
self.name, event, e,
)
self.message_post(body=_(
'Notification dispatch failed for event "%(ev)s": %(e)s. '
'Send manually if the customer expected an update.'
) % {'ev': event, 'e': e})
def _fp_create_delivery(self):
"""Create a draft fusion.plating.delivery linked to this job.
Sets BOTH x_fc_job_id (Many2one - strong link) AND job_ref
(Char - soft reference). Downstream code is split: smart-button
navigation reads x_fc_job_id, but the box-parity check, RMA
refund auto-link, and the legacy notification dispatch all
look up by job_ref. Setting both ends keeps every consumer
happy.
Auto-populates everything we can resolve from upstream
records so the shipping crew doesn't have to re-type
addresses / contacts / dates that already exist on the SO:
- delivery_address_id, contact_name, contact_phone - SO's
partner_shipping_id (falls back to partner_id)
- scheduled_date - SO.commitment_date
- source_facility_id - job.facility_id
- x_fc_carrier_id, x_fc_outbound_shipment_id - from the
SO's first receiving record (set at receive time)
- coc_attachment_id - issued cert.attachment_id for this
job (if a CoC is already issued before delivery exists;
otherwise the cert's action_issue back-fills it later)
Everything skips silently when the source field doesn't
exist or the source value is blank, so older install
topologies and partially-configured jobs still get a
delivery - just less pre-filled.
"""
self.ensure_one()
if self.delivery_id:
return
Delivery = self.env['fusion.plating.delivery'].sudo()
vals = self._fp_resolve_delivery_defaults(Delivery)
try:
delivery = Delivery.create(vals)
self.delivery_id = delivery.id
except Exception as e:
# F7 - surface on chatter. Without this, the operator sees
# "Job marked done" but no delivery record exists, and the
# next milestone advance fails silently.
_logger.warning(
"Job %s: failed to auto-create delivery: %s", self.name, e,
)
self.message_post(body=_(
'Delivery auto-create failed: %(e)s. '
'Create the delivery manually from the Logistics menu.'
) % {'e': e})
def _fp_resolve_delivery_defaults(self, Delivery):
"""Build the create-vals for a fresh delivery, OR the
write-vals for refreshing an existing one. Centralised so
the create path, the per-cert post-issue sync, and any
future 'Refresh from Source' button all stay consistent.
"""
self.ensure_one()
vals = {'partner_id': self.partner_id.id}
if 'x_fc_job_id' in Delivery._fields:
vals['x_fc_job_id'] = self.id
if 'job_ref' in Delivery._fields:
vals['job_ref'] = self.name
# Delivery address + contact details from the SO. shipping
# partner is preferred (that's where parts physically go);
# fall back to the SO's main partner when no separate ship-to.
so = self.sale_order_id
ship_to = (so.partner_shipping_id or so.partner_id) if so else False
if ship_to:
if 'delivery_address_id' in Delivery._fields:
vals['delivery_address_id'] = ship_to.id
if 'contact_name' in Delivery._fields and ship_to.name:
vals['contact_name'] = ship_to.name
if 'contact_phone' in Delivery._fields:
# res.partner has no `mobile` field in this Odoo 19 build -
# guard it so the read can't AttributeError (and still picks
# up mobile on instances that do define it).
vals['contact_phone'] = (
ship_to.phone
or (ship_to.mobile if 'mobile' in ship_to._fields else '')
or ''
)
# Scheduled date - operator can adjust; this just primes it
# so they're not staring at a blank field.
if so and so.commitment_date and 'scheduled_date' in Delivery._fields:
vals['scheduled_date'] = so.commitment_date
# Source facility comes from the job (where it was plated).
if self.facility_id and 'source_facility_id' in Delivery._fields:
vals['source_facility_id'] = self.facility_id.id
# Outbound carrier + shipment mirrored from the SO's first
# receiving record (the crew chose these at receipt time).
if (so and 'x_fc_receiving_ids' in so._fields
and so.x_fc_receiving_ids):
recv = so.x_fc_receiving_ids[:1]
if 'x_fc_carrier_id' in Delivery._fields \
and 'x_fc_carrier_id' in recv._fields \
and recv.x_fc_carrier_id:
vals['x_fc_carrier_id'] = recv.x_fc_carrier_id.id
if 'x_fc_outbound_shipment_id' in Delivery._fields \
and 'x_fc_outbound_shipment_id' in recv._fields \
and recv.x_fc_outbound_shipment_id:
vals['x_fc_outbound_shipment_id'] = (
recv.x_fc_outbound_shipment_id.id
)
# CoC PDF - if a cert for this job is already issued and
# the delivery field accepts an attachment, link it. The
# cert's action_issue also calls _fp_sync_to_delivery for
# the case where the cert issues AFTER the delivery exists.
Cert = self.env.get('fp.certificate')
if Cert is not None and 'coc_attachment_id' in Delivery._fields:
issued_cert = Cert.sudo().search([
('x_fc_job_id', '=', self.id),
('certificate_type', '=', 'coc'),
('state', '=', 'issued'),
('attachment_id', '!=', False),
], order='issue_date desc, id desc', limit=1)
if issued_cert and issued_cert.attachment_id:
vals['coc_attachment_id'] = issued_cert.attachment_id.id
return vals
# ==================================================================
# Post-shop auto-advance helpers (spec 2026-05-25)
# ------------------------------------------------------------------
# When the last open recipe step finishes, the job auto-advances to
# awaiting_cert (if any cert is required) or awaiting_ship (if not).
# Cert issue auto-advances awaiting_cert → awaiting_ship. Cert void
# regresses awaiting_ship → awaiting_cert. All helpers are
# idempotent - safe to call from any hook.
# ==================================================================
def _fp_check_finish_gates(self):
"""Run the bake-window / qty-reconciliation / QC gates that used
to live in button_mark_done. Called from
fp.job.step.button_finish when the operator is finishing the
LAST open step on the job (spec D12).
Raises UserError on failure - operator stays on the step, fixes
the issue, retries the click. Manager bypass via the same
context flags as button_mark_done (fp_skip_bake_gate,
fp_skip_qty_reconcile, fp_skip_qc_gate).
The trick: re-uses button_mark_done's gate logic but short-
circuits BEFORE the state flip via the fp_check_gates_only
context flag.
IMPORTANT: pass fp_skip_step_gate=True. At pre-super time the
current step is STILL in_progress (we're about to finish it
but super().button_finish hasn't fired yet), so button_mark_done's
step-completion gate would always fail with "1/1 step is not
finished". The step gate is structurally wrong for this caller;
the bake/qty/QC gates are not. Bit us on entech smoke test.
"""
self.ensure_one()
self.with_context(
fp_check_gates_only=True,
fp_skip_step_gate=True,
).button_mark_done()
def _fp_scrapped_via_moves(self):
"""Total parts scrapped through the Move log (transfer_type=
'scrap') for this job. Lets button_mark_done's reconciliation
count scrap done via the tablet Move dialog, not just the
qty_scrapped field (partial-order handling, 2026-06-02)."""
self.ensure_one()
Move = self.env['fp.job.step.move']
moves = Move.sudo().search([
('job_id', '=', self.id),
('transfer_type', '=', 'scrap'),
])
return int(sum(m.qty_moved or 0 for m in moves))
def _fp_check_advance_post_shop(self):
"""Auto-advance in_progress jobs whose recipe steps are all
terminal. Called from fp.job.step.button_finish post-super().
Does NOT raise - gate failures (bake/qty/QC) are surfaced by
fp.job.step.button_finish BEFORE this is called (per spec D12).
At this point the step IS finished and the transition is safe.
Idempotent: re-running on a job already past in_progress is a
no-op.
"""
for job in self:
if job.state != 'in_progress':
continue
if not job.step_ids:
continue
if any(s.state not in ('done', 'skipped', 'cancelled')
for s in job.step_ids):
continue
required = job._resolve_required_cert_types() or set()
new_state = 'awaiting_cert' if required else 'awaiting_ship'
job.state = new_state
# Side effects that used to run in button_mark_done - still
# need to fire here so cert + delivery records exist.
if new_state == 'awaiting_cert':
job._fp_create_certificates()
job._fp_fire_notification('cert_awaiting_issuance')
# Forward reference - _fp_schedule_cert_activity is
# defined in Task 20. hasattr guard keeps this safe
# during incremental rollout.
if hasattr(job, '_fp_schedule_cert_activity'):
job._fp_schedule_cert_activity()
else:
job._fp_create_delivery()
job._fp_fire_notification('job_complete')
def _fp_check_advance_after_cert_issue(self):
"""Called from fp.certificate.action_issue. If every required
cert TYPE for this job has at least one `issued` cert, advance
awaiting_cert → awaiting_ship. Idempotent - safe to call
repeatedly.
Semantics chosen: per-TYPE coverage, not per-CERT exhaustion.
A previously-voided cert (state='voided') of the same type is
irrelevant - the operator's intent on void was "this attempt
is invalid"; a fresh `issued` cert of that type satisfies the
requirement. Counting voided certs as outstanding would block
the advance after a void+re-issue cycle (caught on entech
smoke test 2026-05-25).
"""
for job in self:
if job.state != 'awaiting_cert':
continue
if 'fp.certificate' not in self.env:
continue
required = job._resolve_required_cert_types() or set()
if not required:
# Edge case: required set went empty after creation
# (e.g. partner flag toggled). Treat as "ready to ship".
job.state = 'awaiting_ship'
job._fp_create_delivery()
if hasattr(job, '_fp_resolve_cert_activities'):
job._fp_resolve_cert_activities()
continue
Cert = self.env['fp.certificate'].sudo()
# Per-type coverage: every required cert type must have at
# least ONE cert in state=issued. Voided certs are ignored.
covered_types = set(Cert.search([
('x_fc_job_id', '=', job.id),
('certificate_type', 'in', list(required)),
('state', '=', 'issued'),
]).mapped('certificate_type'))
if required.issubset(covered_types):
job.state = 'awaiting_ship'
job._fp_create_delivery()
if hasattr(job, '_fp_resolve_cert_activities'):
job._fp_resolve_cert_activities()
def _fp_check_regress_after_cert_void(self):
"""Called from fp.certificate.write when state=voided. If a
required cert TYPE has lost coverage (no remaining issued
cert), slide the job back to awaiting_cert so it reappears in
Final Inspection and the QM is re-notified.
Per-type coverage (mirror of _fp_check_advance_after_cert_issue):
voiding ONE cert only regresses if it was the only issued cert
of its type. If a sibling issued cert still covers the type,
coverage holds and no regress fires.
"""
for job in self:
if job.state != 'awaiting_ship':
continue
if 'fp.certificate' not in self.env:
continue
required = job._resolve_required_cert_types() or set()
if not required:
continue
Cert = self.env['fp.certificate'].sudo()
covered_types = set(Cert.search([
('x_fc_job_id', '=', job.id),
('certificate_type', 'in', list(required)),
('state', '=', 'issued'),
]).mapped('certificate_type'))
if not required.issubset(covered_types):
job.state = 'awaiting_cert'
job._fp_fire_notification('cert_voided_re_notify')
if hasattr(job, '_fp_schedule_cert_activity'):
job._fp_schedule_cert_activity()
def _fp_schedule_cert_activity(self):
"""Schedule an Issue-CoC mail.activity for one QM. Round-robin
by oldest login_date (least recently active QM, likely least
busy). Idempotent - re-firing while an open activity already
exists is a no-op.
Spec 2026-05-25 §mail.activity belt + suspenders.
"""
self.ensure_one()
activity_type = self.env.ref(
'fusion_plating_jobs.activity_type_issue_coc',
raise_if_not_found=False,
)
if not activity_type:
return
# Idempotency: skip if an open activity of this type exists.
existing = self.activity_ids.filtered(
lambda a: a.activity_type_id == activity_type
)
if existing:
return
# env.get('model.name') returns an EMPTY recordset when the
# model exists but has no records - empty recordsets are
# falsy in Python, so `if not Template: return` exits early
# even when the model IS registered. Use the membership check
# instead. Bit us on entech smoke test 2026-05-25.
if 'fp.notification.template' not in self.env:
return
Template = self.env['fp.notification.template']
if not hasattr(Template, '_fp_resolve_cert_authority_users'):
return
qms = Template.sudo()._fp_resolve_cert_authority_users(self)
if not qms:
return
# Round-robin: pick the QM who logged in least recently (likely
# least busy). NULL login_date sorts first.
qm = qms.sorted(
lambda u: u.login_date or fields.Datetime.from_string(
'1970-01-01 00:00:00'
)
)[:1]
try:
self.activity_schedule(
activity_type_id=activity_type.id,
user_id=qm.id,
summary=_('Issue CoC for %s') % (
self.display_wo_name or self.name or 'job'
),
)
except Exception as e:
_logger.warning(
"Job %s: schedule cert activity failed: %s", self.name, e,
)
def _fp_resolve_cert_activities(self):
"""Auto-resolve all open Issue-CoC activities on this job.
Called from _fp_check_advance_after_cert_issue when the job
transitions awaiting_cert → awaiting_ship. Spec 2026-05-25.
"""
self.ensure_one()
activity_type = self.env.ref(
'fusion_plating_jobs.activity_type_issue_coc',
raise_if_not_found=False,
)
if not activity_type:
return
open_activities = self.activity_ids.filtered(
lambda a: a.activity_type_id == activity_type
)
for act in open_activities:
try:
act.action_feedback(feedback=_('Cert issued - auto-resolved.'))
except Exception as e:
_logger.warning(
"Job %s: auto-resolve cert activity failed: %s",
self.name, e,
)
def _fp_create_certificates(self):
"""Auto-create one draft fp.certificate per type returned by
_resolve_required_cert_types. Idempotent per type - re-running
on a job that already has a CoC won't create another one.
Each cert is pre-populated with everything action_issue needs
(partner, spec_reference, process_description, certified_by,
contact_partner, part_number, quantity_shipped, NC qty, PO,
SO link, job link) so the manager just reviews and clicks Issue.
Resolution sources for the new prefill fields:
- process_description ← recipe.name (the job's process root)
- certified_by_id ← customer_spec.signer_user_id ONLY
(a per-spec override). Left empty
otherwise so the LIVE company owner
resolves at render / issue time.
- contact_partner_ids ← partner.x_fc_default_coc_contact_ids (all)
- nc_quantity ← qty_scrapped + qty_visual_insp_rejects
Honours part.certificate_requirement (coc / coc_thickness /
none / inherit) and partner-level send_coc /
send_thickness_report flags. Closes spec gap C-G1.
"""
self.ensure_one()
if 'fp.certificate' not in self.env:
return
Cert = self.env['fp.certificate'].sudo()
required = self._resolve_required_cert_types()
if not required:
return
has_job_link = 'x_fc_job_id' in Cert._fields
# Spec drives the cert spec_reference. The customer.spec was
# auto-filled onto the job at confirm time (sale_order.py).
spec = self.customer_spec_id
# Recipe drives the process description on the cert. Was previously
# sourced from sale_order.x_fc_coating_config_id (since retired);
# recipe.name is the human-readable replacement.
recipe = self.recipe_id
# Signer resolution (2026-05-28): snapshot ONLY a deliberate
# per-spec signer here. Do NOT freeze the company owner into
# certified_by_id - leaving it empty lets the CoC report and
# action_issue resolve the LIVE company owner (res.company
# .x_fc_owner_user_id / Settings "Certificate Owner") at render /
# issue time. That way changing the Settings signer flows through
# to existing draft certs instead of being frozen to whoever was
# the owner when the cert was created.
signer = False
if spec and 'signer_user_id' in spec._fields:
signer = spec.signer_user_id
# Contact: per-customer default; blank means manager picks at issue.
contact = False
if 'x_fc_default_coc_contact_ids' in self.partner_id._fields:
# ALL the customer's CoC contacts -> the cert's Customer Contact
# M2m. First is the primary (printed on the CoC); every contact
# is emailed at send (fp.certificate.action_send_to_customer).
contact = self.partner_id.x_fc_default_coc_contact_ids
# NC qty: scrapped + visual rejects. Both NULL-safe.
nc_qty = int(
(self.qty_scrapped or 0)
+ (self.qty_visual_inspection_rejects or 0)
)
for cert_type in sorted(required):
# Idempotency per type.
existing_dom = [('certificate_type', '=', cert_type)]
if has_job_link:
existing_dom.append(('x_fc_job_id', '=', self.id))
elif self.sale_order_id and 'sale_order_id' in Cert._fields:
existing_dom.append(
('sale_order_id', '=', self.sale_order_id.id),
)
else:
continue # can't safely identify - skip
if Cert.search_count(existing_dom):
continue
try:
vals = {
'partner_id': self.partner_id.id,
'certificate_type': cert_type,
}
if 'state' in Cert._fields:
vals['state'] = 'draft'
if has_job_link:
vals['x_fc_job_id'] = self.id
elif 'job_id' in Cert._fields:
vals['job_id'] = self.id
if 'sale_order_id' in Cert._fields and self.sale_order_id:
vals['sale_order_id'] = self.sale_order_id.id
# spec_reference is what action_issue blocks on.
# Format spec.code + revision for the cert text.
if spec and 'spec_reference' in Cert._fields:
ref = spec.code or ''
if spec.revision:
ref = (f'{ref} Rev {spec.revision}'
if ref else f'Rev {spec.revision}')
if ref:
vals['spec_reference'] = ref
if 'customer_spec_id' in Cert._fields:
vals['customer_spec_id'] = spec.id
if 'part_number' in Cert._fields and self.part_catalog_id:
vals['part_number'] = (
self.part_catalog_id.part_number or ''
)
if 'quantity_shipped' in Cert._fields:
vals['quantity_shipped'] = int(
(self.qty_done or self.qty or 0)
- (self.qty_scrapped or 0)
)
if 'nc_quantity' in Cert._fields:
vals['nc_quantity'] = nc_qty
if 'po_number' in Cert._fields and self.sale_order_id \
and 'x_fc_po_number' in self.sale_order_id._fields:
vals['po_number'] = (
self.sale_order_id.x_fc_po_number or ''
)
if 'customer_job_no' in Cert._fields \
and self.sale_order_id \
and 'x_fc_customer_job_number' \
in self.sale_order_id._fields:
vals['customer_job_no'] = (
self.sale_order_id.x_fc_customer_job_number or ''
)
if 'process_description' in Cert._fields and recipe:
vals['process_description'] = recipe.name or ''
if 'certified_by_id' in Cert._fields and signer:
vals['certified_by_id'] = signer.id
if 'contact_partner_ids' in Cert._fields and contact:
vals['contact_partner_ids'] = [(6, 0, contact.ids)]
if 'entech_wo_number' in Cert._fields:
vals['entech_wo_number'] = self.name or ''
cert = Cert.create(vals)
self.message_post(body=Markup(_(
'%(t)s <b>%(n)s</b> auto-created (draft). Issuer '
'should hit Issue when ready to ship.'
)) % {
't': dict(
Cert._fields['certificate_type'].selection
).get(cert_type, cert_type),
'n': cert.name,
})
except Exception as e:
_logger.warning(
"Job %s: failed to auto-create cert (%s): %s",
self.name, cert_type, e,
)
self.message_post(body=_(
'Cert auto-create (%(t)s) failed: %(e)s. '
'Create manually.'
) % {'t': cert_type, 'e': e})
# ------------------------------------------------------------------
# Backfill - closed jobs missing certs, plus cleanup of legacy
# duplicate thickness_report certs created before the bundling rule.
# ------------------------------------------------------------------
# One-shot management action for jobs that closed BEFORE the
# _fp_create_certificates bug fix (e.g. WO-30040). Two passes:
# 1. CREATE any missing draft cert per the (updated) resolver
# 2. VOID legacy duplicate thickness_report certs that have a
# paired CoC on the same job - the bundling rule says the
# CoC carries the thickness data on page 2
# Both passes are idempotent - safe to re-run.
@api.model
def action_backfill_missing_certs(self):
Cert = self.env.get('fp.certificate')
if Cert is None:
raise UserError(_(
'fp.certificate model is not installed. Install '
'fusion_plating_certificates before running this action.'
))
candidate_jobs = self.search([('state', '=', 'done')])
scanned = 0
backfilled_jobs = self.env['fp.job']
created_count = 0
voided_count = 0
has_job_link = 'x_fc_job_id' in Cert._fields
for job in candidate_jobs:
required = job._resolve_required_cert_types()
if not required:
continue
scanned += 1
existing_certs = (
Cert.sudo().search([('x_fc_job_id', '=', job.id)])
if has_job_link else
(Cert.sudo().search([
('sale_order_id', '=', job.sale_order_id.id),
]) if job.sale_order_id else Cert.browse())
)
existing_types = set(existing_certs.mapped('certificate_type'))
# ---- Pass 1: create missing certs --------------------------
missing = required - existing_types
if missing:
before = len(existing_certs)
job._fp_create_certificates()
# Re-read to get the freshly-created ones for pass 2.
existing_certs = (
Cert.sudo().search([('x_fc_job_id', '=', job.id)])
if has_job_link else existing_certs
)
delta = max(len(existing_certs) - before, 0)
if delta:
backfilled_jobs |= job
created_count += delta
# ---- Pass 2: void duplicate thickness_report certs ---------
# Bundling rule (CLAUDE.md): when CoC + thickness are both
# wanted, the CoC absorbs the thickness data. A leftover
# draft thickness_report cert on the same job is now noise
# and should not be issued. Void it with a clear reason so
# the audit trail tells the story.
if 'coc' in required and 'coc' in existing_types:
dup_thickness = existing_certs.filtered(
lambda c: (c.certificate_type == 'thickness_report'
and c.state == 'draft')
)
for cert in dup_thickness:
cert.sudo().write({
'state': 'voided',
'void_reason': (
'Auto-voided: bundling rule - thickness '
'data is delivered as page 2 of the paired '
'CoC, not as a separate cert.'
),
})
cert.message_post(body=_(
'Auto-voided by cleanup: bundling rule routes '
'thickness data to the CoC.'
))
voided_count += 1
backfilled_jobs |= job
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Cert backfill + cleanup complete'),
'message': _(
'Scanned %(s)d closed jobs. Created %(c)d draft '
'cert(s); voided %(v)d duplicate thickness_report '
'cert(s) across %(j)d job(s).'
) % {
's': scanned,
'c': created_count,
'v': voided_count,
'j': len(backfilled_jobs),
},
'sticky': True,
'type': 'success' if (created_count or voided_count) else 'warning',
},
}
class FpJobStep(models.Model):
"""Phase 7 - adds the migration idempotency key on fp.job.step.
Populated by scripts/migrate_to_fp_jobs.py to mark a step as the
mirror of a specific mrp.workorder. Used to skip already-migrated
WOs on subsequent runs.
"""
_inherit = 'fp.job.step'
legacy_mrp_workorder_id = fields.Integer(
string='Legacy MRP Work Order ID',
index=True,
help='Database id of the source mrp.workorder this step was '
'migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)
# ==========================================================================
# Sub 14 - Recipe-side trigger field
# ==========================================================================
# Adds an optional Many2one on every recipe operation node so the recipe
# author can explicitly map "completion of this step triggers workflow
# state X". Wins over the default-kind matching defined on the workflow
# state itself. Lives here (not core) because the target model
# (fp.job.workflow.state) is defined in this module.
class FusionPlatingProcessNodeWorkflow(models.Model):
_inherit = 'fusion.plating.process.node'
triggers_workflow_state_id = fields.Many2one(
'fp.job.workflow.state',
string='Triggers Workflow State',
ondelete='set null',
help='When a job step generated from this recipe node finishes '
'(or is skipped/cancelled), the job advances to this '
'workflow state. Leave blank to fall back to default-kind '
'matching defined on the workflow state catalog.',
)
class FpStepTemplateWorkflow(models.Model):
"""Sub 14 - workflow milestone trigger on the library step template.
Declared here (jobs module) instead of fusion_plating core because
the target model (fp.job.workflow.state) lives in this module -
core can't reference it without a cyclic dependency.
When the template lands in a recipe via simple_recipe_controller
drag-drop, the value is snapshot-copied to the new process_node
via _SNAPSHOT_FIELDS.
"""
_inherit = 'fp.step.template'
triggers_workflow_state_id = fields.Many2one(
'fp.job.workflow.state',
string='Triggers Workflow State',
ondelete='set null',
help='Sub 14. When a recipe step generated from this template '
'finishes (or is skipped/cancelled), the parent job '
'advances to this workflow state on its status bar. Leave '
'blank to fall back to default-kind matching defined on '
'the workflow state catalog.',
)