# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# Real implementations for the state-machine action stubs that
# fusion_plating core's fp.job.step shipped as NotImplementedError
# placeholders. Per spec §5.2 state machine.
import logging
import re
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FpJobStep(models.Model):
_inherit = 'fp.job.step'
# ===== Sub 13 — sequential enforcement (recipe + per-step) =============
# Decision matrix for whether button_start must verify predecessors:
#
# recipe.enforce_sequential | step.parallel_start | step.req_pred (legacy) | block?
# --------------------------|---------------------|------------------------|------
# True | False | any | YES
# True | True | any | no
# False | any | True | YES
# False | any | False | no
#
# Manager bypass via context fp_skip_predecessor_check=True.
# Encapsulated in _fp_should_block_predecessors() so the same gate
# is reused by button_start and the Move wizard's predecessor check
# (fusion_plating_shopfloor/controllers/move_controller.py).
# ======================================================================
def _fp_should_block_predecessors(self):
"""Return True if this step must verify that every earlier-
sequence step is in a terminal state before it can start.
See decision matrix in the section header above.
"""
self.ensure_one()
# Defensive: jobs without a recipe (manual-build jobs) default
# to enforce-on so freshly-created records don't accidentally
# leak permissive behaviour through a related-field None.
if not self.job_id:
return True
recipe_seq = self.job_id.enforce_sequential
if recipe_seq:
return not self.parallel_start
# Free-flow recipe — only the legacy per-step flag still gates.
return bool(self.requires_predecessor_done)
can_start = fields.Boolean(
string='Can Start',
compute='_compute_can_start',
help='True when this step is in a startable state AND the '
'predecessor gate (if any) is currently clear. Drives the '
'tablet/job form Start button visibility.',
)
@api.depends(
'state',
'sequence',
'parallel_start',
'requires_predecessor_done',
'job_id.enforce_sequential',
'job_id.step_ids.state',
'job_id.step_ids.sequence',
)
def _compute_can_start(self):
for step in self:
if step.state not in ('pending', 'ready', 'paused'):
step.can_start = False
continue
if not step._fp_should_block_predecessors():
step.can_start = True
continue
blocking = step.job_id.step_ids.filtered(
lambda s: s.sequence < step.sequence and s.state not in (
'done', 'skipped', 'cancelled',
)
)
step.can_start = not bool(blocking)
# NOTE: the actual button_start override lives further down (~line
# 876) where it merges Sub 13 predecessor gate + Policy B Contract
# Review auto-open + Sub 8 Racking auto-open + the receiving soft
# check. Keeping ONE definition prevents the Python-overrides-the-
# earlier-method footgun that swallowed Sub 13 enforcement on
# WH/JOB/00342.
def button_pause(self):
"""Pause an in-progress step (operator break, end of shift).
Closes the open timelog row, sums duration_actual, transitions
state to 'paused'. button_start re-opens a fresh timelog when
the operator resumes.
"""
for step in self:
if step.state != 'in_progress':
raise UserError(_(
"Step '%s' is in state '%s' — only in-progress steps can pause."
) % (step.name, step.state))
now = fields.Datetime.now()
open_log = step.time_log_ids.filtered(lambda l: not l.date_finished)
if open_log:
open_log.write({'date_finished': now})
step.state = 'paused'
step.duration_actual = sum(step.time_log_ids.mapped('duration_minutes'))
return True
def button_skip(self):
"""Skip a pending/ready step (e.g. opt-in step the planner
decided not to activate for this job).
"""
for step in self:
if step.state not in ('pending', 'ready'):
raise UserError(_(
"Step '%s' is in state '%s' — only pending/ready steps can be skipped."
) % (step.name, step.state))
step.state = 'skipped'
return True
def button_cancel(self):
"""Cancel a single step. Use fp.job.action_cancel to cancel
the whole job.
"""
for step in self:
if step.state == 'done':
raise UserError(_(
"Step '%s' is done — cannot cancel."
) % step.name)
if step.state == 'cancelled':
raise UserError(_(
"Step '%s' is already cancelled."
) % step.name)
step.state = 'cancelled'
return True
def write(self, vals):
"""Post a chatter trail on the parent JOB whenever an active
step gets reassigned. The step itself already tracks
assigned_user_id (tracking=True) but supervisors don't open
each step's chatter — they read the job. Without a job-level
post the takeover is invisible.
Only fires for steps in active states (in_progress / paused)
so creating a draft job + assigning a step to someone doesn't
spam the job chatter. Comparing to the OLD assignment so we
don't post on the initial set-from-False either.
"""
post_for = []
if 'assigned_user_id' in vals:
new_uid = vals['assigned_user_id']
for step in self:
if step.state not in ('in_progress', 'paused'):
continue
old_uid = step.assigned_user_id.id
if not old_uid:
continue
if new_uid == old_uid:
continue
post_for.append((step, old_uid, new_uid))
result = super().write(vals)
Users = self.env['res.users']
for step, old_uid, new_uid in post_for:
old_name = Users.browse(old_uid).name if old_uid else '(unassigned)'
new_name = Users.browse(new_uid).name if new_uid else '(unassigned)'
step.job_id.message_post(body=Markup(_(
'Step %s reassigned from %s to %s '
'(state=%s) by %s.'
)) % (step.name, old_name, new_name, step.state,
self.env.user.name))
# Auto-promote parent job: confirmed → in_progress on first
# step start. Without this, fp.job.state never reaches
# 'in_progress' anywhere in the codebase, so the In Progress
# workflow milestone (trigger_on_job_state='in_progress')
# never fires.
if vals.get('state') == 'in_progress':
jobs_to_promote = self.mapped('job_id').filtered(
lambda j: j.state == 'confirmed'
)
if jobs_to_promote:
jobs_to_promote.write({'state': 'in_progress'})
return result
@api.model
def _cron_nudge_stale_paused(self, threshold_hours=24):
"""Daily nudge for steps stuck in `paused` longer than threshold."""
return self._cron_nudge_stale_steps(
states=('paused',),
threshold_hours=threshold_hours,
label='paused',
)
@api.model
def _cron_nudge_stale_in_progress(self, threshold_hours=8):
"""Cron nudge for steps stuck in `in_progress` longer than
threshold. Default 8 hours — operator started, walked away,
timelog accumulating phantom hours.
"""
return self._cron_nudge_stale_steps(
states=('in_progress',),
threshold_hours=threshold_hours,
label='in-progress',
)
@api.model
def _cron_nudge_stale_steps(self, states=('paused',),
threshold_hours=24, label='stale'):
"""Generic stale-step nudger.
Finds every fp.job.step in any of `states` with date_started
older than N hours. Schedules a 'todo' mail.activity on the
parent job for the job's manager_id (falls back to the user
who started the step). Idempotent — won't double-schedule if
an open activity with the same summary already exists.
"""
from datetime import timedelta as _td
cutoff = fields.Datetime.now() - _td(hours=threshold_hours)
stale = self.search([
('state', 'in', list(states)),
('date_started', '<', cutoff),
('date_started', '!=', False),
])
Activity = self.env['mail.activity']
ActivityType = self.env.ref(
'mail.mail_activity_data_todo', raise_if_not_found=False,
)
nudged_count = 0
for step in stale:
job = step.job_id
assignee = (job.manager_id or step.assigned_user_id
or step.started_by_user_id or self.env.user)
summary = _('Stale %s step: %s') % (label, step.name)
existing = Activity.search([
('res_model', '=', job._name),
('res_id', '=', job.id),
('summary', '=', summary),
], limit=1)
if existing:
continue
age_h = (fields.Datetime.now() - step.date_started).total_seconds() / 3600.0
note = _(
'Step "%(step)s" on job %(job)s has been in %(label)s state for '
'%(hours).1f hours (since %(start)s). Investigate: operator '
'reassignment, equipment failure, or finish + close out.'
) % {
'step': step.name, 'job': job.name, 'label': label,
'hours': age_h, 'start': step.date_started,
}
vals = {
'res_model_id': self.env['ir.model']._get(job._name).id,
'res_id': job.id,
'summary': summary,
'note': note,
'user_id': assignee.id,
'date_deadline': fields.Date.context_today(self),
}
if ActivityType:
vals['activity_type_id'] = ActivityType.id
Activity.create(vals)
nudged_count += 1
job.message_post(body=Markup(_(
'Stale %s step: %s has been idle %.1f hours. '
'Activity created for %s.'
)) % (label, step.name, age_h, assignee.name))
if nudged_count:
_logger.info(
'fp.job.step stale-%s cron: nudged %d step(s)',
label, nudged_count,
)
return nudged_count
def action_abort_for_retry(self, reason=None, new_tank_id=None,
new_bath_id=None):
"""Abort an in_progress / paused step so the operator can restart
it (typically after an equipment failure mid-step).
Closes the open timelog (preserves the partial-work record on
the audit trail), posts a clear chatter event on the JOB
explaining why + which tank, optionally moves the step to a
different tank/bath, and resets the step to `ready` so the
operator can hit Start again.
Without this method the operator's only options are
button_cancel (kills the step entirely) or
pause → write tank → start (no failure audit).
"""
if not reason:
reason = _('Equipment failure / abort for retry')
for step in self:
if step.state not in ('in_progress', 'paused'):
raise UserError(_(
"Step '%s' is in state '%s' — only in_progress / "
"paused steps can be aborted for retry."
) % (step.name, step.state))
old_tank = step.tank_id.display_name or '(no tank set)'
old_bath = step.bath_id.display_name or '(no bath set)'
now = fields.Datetime.now()
open_logs = step.time_log_ids.filtered(
lambda l: not l.date_finished
)
if open_logs:
open_logs.write({'date_finished': now})
partial_min = sum(step.time_log_ids.mapped('duration_minutes'))
change_msg = ''
if new_tank_id:
step.tank_id = new_tank_id
change_msg += ' -> tank %s' % step.tank_id.display_name
if new_bath_id:
step.bath_id = new_bath_id
change_msg += ' -> bath %s' % step.bath_id.display_name
step.state = 'ready'
step.duration_actual = partial_min
step.job_id.message_post(body=Markup(_(
'⚠️ Step %s aborted for retry by %s.
'
'Reason: %s
'
'Equipment: tank=%s, bath=%s%s
'
'Partial work captured: %.2f min in %d timelog(s). '
'Step is back in ready state — operator can '
'restart when the issue is resolved.'
)) % (
step.name, self.env.user.name, reason,
old_tank, old_bath, change_msg, partial_min,
len(step.time_log_ids),
))
return True
def action_recompute_duration_from_timelogs(self):
"""Manual button — re-sum duration_actual + post to chatter
for audit. Use case: supervisor adjusts a timelog row and
wants an explicit audit trail of the recompute. The
automatic version called from timelog hooks is
_fp_resum_duration_actual (no chatter).
"""
for step in self:
old = step.duration_actual or 0.0
new = sum(step.time_log_ids.mapped('duration_minutes'))
step.duration_actual = new
if abs(old - new) > 0.001:
step.job_id.message_post(body=Markup(_(
'Step %s duration recomputed from timelog rows: '
'%.2f min → %.2f min (Δ %+.2f). Recomputed by %s.'
)) % (step.name, old, new, new - old, self.env.user.name))
return True
def _fp_resum_duration_actual(self):
"""Quiet re-sum — used by automatic triggers (timelog
create/write/unlink hooks). No chatter post. Skips no-op
updates so writes are minimised."""
for step in self:
new = sum(step.time_log_ids.mapped('duration_minutes'))
if abs((step.duration_actual or 0.0) - new) > 0.001:
step.duration_actual = new
return True
def action_finish_and_advance(self):
"""Steelhead-style "Finish & Next" — finish this step then auto-
start the next pending/ready step in sequence. Single click
replaces the prior Finish-then-Move-wizard dance.
If the step has authored step_input prompts AND none have been
captured yet, we route through the simplified Record Inputs
wizard first; saving the wizard re-enters here with the
`fp_after_inputs=True` context flag so we don't loop.
"""
self.ensure_one()
if self.state != 'in_progress':
raise UserError(_(
"Step '%s' is in state '%s' — start it before clicking Finish."
) % (self.name, self.state))
# Contract Review (QA-005) routing — when the recipe step is
# flagged as a contract-review step, the operator should land on
# the part's QA-005 form rather than the generic measurement
# wizard. Once the review is complete or dismissed we fall
# through to the normal finish path so the step can advance.
cr_action = self._fp_contract_review_redirect()
if cr_action:
return cr_action
# Prompt-first behaviour: show the Record Inputs dialog when the
# recipe step has authored prompts and nothing has been captured
# in this run. Bypass when context flag is set (i.e. we're being
# called BACK from the wizard's commit), or when the operator
# already saved values via the Record Inputs button earlier.
if (not self.env.context.get('fp_after_inputs')
and self._fp_has_uncaptured_step_inputs()):
return self._fp_open_input_wizard(advance_after=True)
# Auto-move shim: for qty_at_step==1 + downstream step,
# silently record a move(qty=1) so the qty gate in
# button_finish passes. Raises for qty>1 (operator must use
# Complete 1 → Next or Move…). Last step is always allowed.
self._fp_record_one_piece_auto_move()
self.button_finish()
next_step = self._fp_next_runnable_step()
if next_step:
next_step.with_context(
fp_skip_predecessor_check=True,
).button_start()
self.job_id.message_post(body=_(
'Step "%(prev)s" finished — auto-started next step "%(next)s".'
) % {'prev': self.name, 'next': next_step.name})
return True
def _fp_next_runnable_step(self):
"""The lowest-sequence step on this job that isn't terminal yet
and isn't this one. Used by action_finish_and_advance."""
self.ensure_one()
candidates = self.job_id.step_ids.filtered(
lambda s: s.id != self.id
and s.state in ('pending', 'ready', 'paused')
).sorted('sequence')
return candidates[:1] or self.env['fp.job.step']
def _fp_has_uncaptured_step_inputs(self):
"""True when the recipe step defines step_input prompts AND
the user hasn't already saved values for this step's current
run via the Record Inputs wizard.
"""
self.ensure_one()
node = self.recipe_node_id
if not node:
return False
prompts = node.input_ids
if 'kind' in prompts._fields:
prompts = prompts.filtered(lambda i: i.kind == 'step_input')
if not prompts:
return False
# Has the operator already recorded values during this run?
# Heuristic: any in-place fp.job.step.move (transfer_type='step')
# for this step since date_started.
Move = self.env['fp.job.step.move']
already = Move.search_count([
('from_step_id', '=', self.id),
('transfer_type', '=', 'step'),
('move_datetime', '>=', self.date_started or fields.Datetime.now()),
])
return already == 0
def _fp_open_input_wizard(self, advance_after=False):
"""Open the Record Inputs OWL dialog (Sub 12e v4).
Replaces the form-view-based wizard with a custom OWL Dialog
component (fp_record_inputs_dialog.js). The dialog renders
each prompt as a proper card with semantic HTML — no more
list-cell-as-card CSS hacks.
When advance_after is True, the dialog's Save button commits
values then dispatches the result of action_finish_and_advance
so the step finishes + auto-starts the next step in one flow.
"""
self.ensure_one()
return {
'type': 'ir.actions.client',
'tag': 'fp_record_inputs_dialog',
'params': {
'step_id': self.id,
'advance_after': bool(advance_after),
},
}
# NB: action_open_input_wizard is defined further down (line ~829)
# — that one stays as the per-row "Record" button entry-point.
# _fp_open_input_wizard above adds the advance_after pathway used
# only by action_finish_and_advance.
def button_finish(self):
"""Override to:
1) Auto-spawn a bake.window when a wet plating step finishes
on a recipe that requires hydrogen-embrittlement relief
(AS9100 / Nadcap compliance). Bake fields live on the
recipe root post-promote-customer-spec.
2) Post a chatter warning when duration_actual exceeds 1.5×
duration_expected — silent overruns are a red flag for
scheduling and costing.
Both actions are idempotent and never block the finish itself.
"""
result = super().button_finish()
BW = self.env['fusion.plating.bake.window']
Bath = self.env['fusion.plating.bath']
for step in self:
if step.state != 'done':
continue
# Duration-overrun chatter alert.
if step.duration_expected and step.duration_actual:
ratio = step.duration_actual / step.duration_expected
if ratio >= 1.5:
step.job_id.message_post(body=Markup(_(
'⚠️ Step "%s" ran %.1fx expected — '
'expected %.0f min, actual %.0f min. Investigate: '
'equipment issue, training gap, or recipe time '
'estimate too tight.'
)) % (step.name, ratio, step.duration_expected,
step.duration_actual))
recipe_root = step.job_id.recipe_id
if not recipe_root:
continue
requires = getattr(recipe_root, 'requires_bake_relief', False)
window_hrs = getattr(recipe_root, 'bake_window_hours', 0.0)
if not requires or not window_hrs:
continue
# Trigger only on the actual plating-out step. We want
# exactly ONE bake.window per job (not one per step that
# happens to have "plate" in the name). Heuristic:
# - step.kind == 'wet' (clean, recipe-authored signal); OR
# - the step name contains "plating" as a word
# Explicit excludes: inspection / bake / mask / rack steps
# whose names might happen to mention plating in passing
# (e.g. "Post-plate Inspection").
name_l = (step.name or '').lower()
kind_match = step.kind == 'wet'
name_match = bool(re.search(r'\bplating\b', name_l))
excluded = any(kw in name_l for kw in (
'inspect', 'inspection', 'bake', 'mask', 'rack',
))
if (not kind_match and not name_match) or excluded:
continue
# Idempotency — only one bake.window per (job, step).
existing = BW.sudo().search([
('part_ref', '=', step.job_id.name),
('lot_ref', '=', f'step-{step.id}'),
], limit=1)
if existing:
continue
# Pick a bath: step.bath_id wins; fall back to the first
# active bath in the facility (best-effort — operator can
# correct on the bake.window record).
bath = step.bath_id or Bath.sudo().search(
[('facility_id', '=', step.facility_id.id)], limit=1,
) if step.facility_id else False
if not bath:
bath = Bath.sudo().search([], limit=1)
if not bath:
_logger.warning(
'Step %s: bake-window auto-spawn skipped — no bath '
'configured.', step.name,
)
continue
bw = BW.sudo().create({
'bath_id': bath.id,
'plate_exit_time': step.date_finished or fields.Datetime.now(),
'window_hours': window_hrs,
'part_ref': step.job_id.name,
'lot_ref': f'step-{step.id}',
'customer_ref': step.job_id.partner_id.display_name or '',
'quantity': int(step.job_id.qty or 0),
})
step.job_id.message_post(body=Markup(_(
'Bake window %s auto-created — %.1fh window from '
'plate exit. Required by %s.'
)) % (bw.name, window_hrs, bw.bake_required_by))
return result
# ==================================================================
# Phase 2 multi-serial — auto-promote serials on step transitions
# ==================================================================
def _fp_promote_serials_on_start(self):
"""When this step transitions to in_progress, lift any serial
attached to the parent SO line out of `received` / `racked` and
into `in_process`. Idempotent — already-promoted serials are
skipped.
"""
for step in self:
job = step.job_id
if not job.sale_order_line_ids:
continue
serials = job.sale_order_line_ids.mapped('x_fc_serial_ids')
to_promote = serials.filtered(
lambda s: s.state in ('received', 'racked')
)
if to_promote:
# Use sudo on the helper so operator-tier users can promote
# serial state without needing direct write on fp.serial.
to_promote.sudo()._set_state('in_process', message=_(
'Promoted to In Process on step "%s" start by %s.'
) % (step.name, self.env.user.name))
def _fp_promote_serials_on_finish(self):
"""When the LAST step of this step's job finishes (sequenced
terminal step OR an explicit inspect/final-inspect kind), bump
in-flight serials to `inspected` so the shipper sees them ready
for packing. Conservative — only promotes from `in_process`."""
for step in self:
job = step.job_id
if not job.sale_order_line_ids:
continue
# Is this the highest-sequence non-cancelled step on the job?
siblings = job.step_ids.filtered(
lambda s: s.state not in ('cancelled', 'skipped')
)
if not siblings:
continue
last_seq = max(siblings.mapped('sequence'))
is_terminal = (step.sequence == last_seq) or (
step.kind == 'inspect' or 'final' in (step.name or '').lower()
)
if not is_terminal:
continue
serials = job.sale_order_line_ids.mapped('x_fc_serial_ids')
to_promote = serials.filtered(lambda s: s.state == 'in_process')
if to_promote:
to_promote.sudo()._set_state('inspected', message=_(
'Promoted to Inspected on step "%s" finish by %s.'
) % (step.name, self.env.user.name))
# ==================================================================
# Policy B (2026-04-28) — Contract Review enforcement
# ==================================================================
# When a recipe author drops a "Contract Review" step into a recipe,
# button_start opens the QA-005 audit form for the linked part (auto-
# creates one if missing) and button_finish blocks completion until
# the form is `complete` AND the current user is on the recipe's
# contract_review_user_ids approver list (when configured).
#
# Detection — case-insensitive match on the step name OR
# recipe_node_id mapped from a step.template with default_kind ==
# 'contract_review' (the simple-editor library entry).
def _fp_is_contract_review_step(self):
self.ensure_one()
if (self.name or '').strip().lower() in ('contract review', 'qa-005'):
return True
node = self.recipe_node_id
if not node:
return False
# Source template kind (when authored via simple editor library)
if 'source_template_id' in node._fields and node.source_template_id:
if node.source_template_id.default_kind == 'contract_review':
return True
if 'default_kind' in node._fields and node.default_kind == 'contract_review':
return True
return False
def _fp_resolve_contract_review_part(self):
"""Find the fp.part.catalog this step's job is for. Used by the
Contract Review hooks to auto-create / look up the QA-005 form.
Falls through to None when no part can be resolved (no SO line,
SO line without x_fc_part_catalog_id, etc.)."""
self.ensure_one()
for so_line in self.job_id.sale_order_line_ids:
if (so_line.x_fc_part_catalog_id
and 'fp.contract.review' in self.env):
return so_line.x_fc_part_catalog_id
return None
def _fp_open_contract_review(self):
"""Auto-create the QA-005 form for this step's part if missing,
return the act_window pointing at it. Called from button_start
on Contract Review steps."""
self.ensure_one()
part = self._fp_resolve_contract_review_part()
if not part:
return None
Review = self.env.get('fp.contract.review')
if Review is None:
return None # quality module not installed — skip
review = part.x_fc_contract_review_id
if not review:
review = Review.sudo().create({
'part_id': part.id,
'state': 'assistant_review',
})
part.sudo().write({
'x_fc_contract_review_id': review.id,
'x_fc_contract_review_dismissed': False,
})
self.job_id.message_post(body=_(
'Contract Review (QA-005) auto-created for %(part)s on '
'Contract Review step start by %(user)s.'
) % {
'part': part.display_name or part.part_number or '',
'user': self.env.user.name,
})
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.contract.review',
'res_id': review.id,
'view_mode': 'form',
'target': 'current',
'name': _('Contract Review — %s') % (
part.display_name or part.part_number or ''
),
}
def _fp_check_contract_review_complete(self):
"""Block button_finish on a Contract Review step until QA-005 is
signed off. Only enforced when the customer has
partner.x_fc_contract_review_required=True. Manager bypass via
context fp_skip_contract_review_gate=True."""
if self.env.context.get('fp_skip_contract_review_gate'):
return
for step in self:
if not step._fp_is_contract_review_step():
continue
part = step._fp_resolve_contract_review_part()
if not part or not part.partner_id.x_fc_contract_review_required:
continue
review = part.x_fc_contract_review_id
if not review or review.state != 'complete':
state_label = (
review.state if review else _('not started')
)
raise UserError(_(
'Contract Review for %(part)s is %(state)s — must be '
'"complete" before this step can finish. Open the '
'QA-005 form (smart button on the part), get both '
'sections signed off, then retry. Manager bypass: '
'fp_skip_contract_review_gate=True in context.'
) % {
'part': part.display_name or part.part_number or '',
'state': state_label,
})
# Approver-list gate (restored from pre-Sub-11). When the
# recipe author named approvers on the recipe root, only those
# users can finish the Contract Review step.
recipe = step.recipe_node_id and step.recipe_node_id.recipe_root_id
approvers = (recipe.contract_review_user_ids
if (recipe and 'contract_review_user_ids' in recipe._fields)
else False)
if approvers and self.env.user not in approvers:
raise UserError(_(
'Only authorised Contract Review approvers can finish '
'this step. Approvers: %s.\n\nContact your Plating '
'Manager to add yourself if this is wrong, or hand '
'the step to one of the approvers.'
) % ', '.join(approvers.mapped('name')))
# ==================================================================
# Sub 8 follow-up (2026-04-28) — Racking Inspection enforcement
# ==================================================================
# When the recipe-side "Racking" step starts, auto-promote the linked
# fp.racking.inspection from draft → inspecting and route the operator
# straight into the inspection form. When the same step finishes,
# block unless the inspection is in `done` or `discrepancy_flagged`
# (operator cleared every line). Manager bypass via context
# `fp_skip_racking_inspection_gate=True`.
def _fp_is_racking_step(self):
self.ensure_one()
if (self.name or '').strip().lower() in ('racking', 'rack'):
return True
node = self.recipe_node_id
if not node:
return False
if 'source_template_id' in node._fields and node.source_template_id:
if node.source_template_id.default_kind == 'racking':
return True
if 'default_kind' in node._fields and node.default_kind == 'racking':
return True
if self.kind == 'rack':
return True
return False
def _fp_open_racking_inspection(self):
"""Auto-promote draft → inspecting + return act_window for the
linked racking inspection. Auto-creates one if missing."""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
return None
# Reach the job's existing inspection (auto-created on action_confirm)
# or trigger a fresh create if none exists.
ri = self.job_id.racking_inspection_id
if not ri:
self.job_id._fp_create_racking_inspection()
self.job_id.invalidate_recordset(['racking_inspection_ids'])
ri = self.job_id.racking_inspection_id
if not ri:
return None
# Promote draft → inspecting. action_start raises if state isn't
# draft, so guard.
if ri.state == 'draft':
ri.sudo().action_start()
self.job_id.message_post(body=_(
'Racking inspection auto-promoted to "Inspecting" on '
'%(step)s start by %(user)s.'
) % {'step': self.name, 'user': self.env.user.name})
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.racking.inspection',
'res_id': ri.id,
'view_mode': 'form',
'target': 'current',
'name': _('Racking Inspection — %s') % self.job_id.name,
}
def _fp_check_racking_inspection_complete(self):
"""Soft gate — block button_finish on a Racking step until the
linked inspection is in a terminal state. discrepancy_flagged
counts as complete (the operator finished but flagged issues —
the discrepancy activity will route to the manager separately)."""
if self.env.context.get('fp_skip_racking_inspection_gate'):
return
for step in self:
if not step._fp_is_racking_step():
continue
ri = step.job_id.racking_inspection_id
if not ri:
# No inspection at all — still let it finish, but log a
# chatter warning so the manager sees the gap.
step.job_id.message_post(body=_(
'⚠️ Racking step "%s" finished without a racking '
'inspection on file. Sub 8 expected one to be '
'auto-created on job confirm.'
) % step.name)
continue
if ri.state not in ('done', 'discrepancy_flagged'):
state_label = dict(ri._fields['state'].selection).get(
ri.state, ri.state)
raise UserError(_(
'Racking inspection for %(job)s is "%(state)s" — must '
'be Done or Discrepancy Flagged before this step can '
'finish. Click the Racking Insp. smart button on the '
'job, complete the line check-off, then retry. '
'Manager bypass: fp_skip_racking_inspection_gate=True.'
) % {
'job': step.job_id.name,
'state': state_label,
})
def _fp_check_receiving_gate(self):
"""Block step transitions until parts are physically received.
Applied to every step EXCEPT Contract Review (paperwork — doesn't
need parts on the floor). Fires from both button_start and
button_finish so an operator can't begin OR complete physical
work before the receiving record is closed.
Manager bypass: ``fp_skip_receiving_gate=True`` in context. Same
pattern as the qty / QC / bake gates. Audit trail is preserved
via the state-transition tracking on chatter.
Threshold: SO ``x_fc_receiving_status == 'received'``. Post-Sub-8
that's the terminal state (inspection moved into the recipe's
racking step; ``'inspected'`` was dropped in the 2026-05-18
cleanup).
"""
if self.env.context.get('fp_skip_receiving_gate'):
return
for step in self:
if step._fp_is_contract_review_step():
continue
so = step.job_id.sale_order_id
if not so:
# Internal rework / no SO — gate doesn't apply.
continue
if 'x_fc_receiving_status' not in so._fields:
# Defensive: configurator module not installed.
continue
if so.x_fc_receiving_status != 'received':
label = dict(
so._fields['x_fc_receiving_status'].selection
).get(
so.x_fc_receiving_status,
so.x_fc_receiving_status or 'unknown',
)
raise UserError(_(
'Step "%(step)s" cannot proceed — parts not received '
'yet (SO %(so)s receiving status: %(status)s).\n\n'
'Close the receiving record (Sales > %(so)s > '
'Receiving) before starting or finishing work on '
'this step. A manager can bypass this gate for '
'documented exceptions.'
) % {
'step': step.name,
'so': so.name or '?',
'status': label,
})
def button_start(self):
"""Single source of truth for step start:
1. Sub 13 predecessor gate (raise UserError if blocking)
2. Receiving gate (raise UserError if parts not received)
3. Policy B Contract Review auto-open (route to QA-005)
4. Sub 8 Racking auto-open (route to racking inspection)
5. super().button_start() + serial promotion for the standard
path
Manager bypasses available via context:
fp_skip_predecessor_check=True skips the Sub 13 gate
fp_skip_receiving_gate=True skips the receiving gate
"""
# ---- 1. Sub 13 predecessor gate ----------------------------------
skip_pred = self.env.context.get('fp_skip_predecessor_check')
for step in self:
if skip_pred:
continue
if not step._fp_should_block_predecessors():
continue
blocking = step.job_id.step_ids.filtered(
lambda s: s.sequence < step.sequence and s.state not in (
'done', 'skipped', 'cancelled',
)
)
if blocking:
raise UserError(_(
"Step '%s' cannot start until earlier steps are "
"finished, skipped, or cancelled.\n\nBlocking step(s):\n %s\n\n"
"Options:\n"
" * Finish/Skip the blocking step(s) first.\n"
" * If this step legitimately runs in parallel, ask "
"a manager to flag it as Parallel Start on the recipe.\n"
" * Manager override via context fp_skip_predecessor_check=True."
) % (
step.name,
'\n '.join(
f'#{s.sequence} {s.name} ({s.state})'
for s in blocking[:5]
),
))
# ---- 2. Receiving gate -------------------------------------------
# Hard block (replaces the prior soft chatter warning). The
# helper exempts Contract Review steps internally, so contract
# review can still auto-open below regardless of receiving state.
self._fp_check_receiving_gate()
# ---- 3. Policy B Contract Review auto-open -----------------------
for step in self:
if step._fp_is_contract_review_step():
action = step._fp_open_contract_review()
if action:
super(FpJobStep, step).button_start()
if step.state == 'in_progress':
step._fp_promote_serials_on_start()
return action
# ---- 4. Sub 8 Racking auto-open ----------------------------------
for step in self:
if step._fp_is_racking_step():
action = step._fp_open_racking_inspection()
if action:
super(FpJobStep, step).button_start()
if step.state == 'in_progress':
step._fp_promote_serials_on_start()
return action
# ---- 5. Standard path: start + serial promote --------------------
result = super().button_start()
for step in self:
if step.state == 'in_progress':
step._fp_promote_serials_on_start()
return result
def button_finish(self):
# Policy B — block until QA-005 complete (when customer requires it).
self._fp_check_contract_review_complete()
# Receiving gate — same helper as button_start, exempts CR steps.
self._fp_check_receiving_gate()
# NOTE: racking inspection gate removed — racking is now a recipe
# step, not a separate inspection workflow. _fp_check_racking_
# inspection_complete() is kept as a helper for diagnostics but
# no longer enforced from button_finish.
result = super().button_finish()
for step in self:
if step.state == 'done':
step._fp_promote_serials_on_finish()
return result
# ==================================================================
# Per-row shortcut actions used by the job form's inline action column
# ==================================================================
def action_open_move_wizard(self):
"""Open the Move wizard with this step pre-filled as the from-step."""
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.move.wizard',
'view_mode': 'form',
'target': 'new',
'name': _('Move from %s') % self.name,
'context': {
'default_from_step_id': self.id,
'default_job_id': self.job_id.id,
},
}
def action_open_input_wizard(self):
"""Open the Record Inputs OWL dialog from the per-row Record
button on the job form.
Contract-review steps redirect to the QA-005 form (same gate as
action_finish_and_advance) so the per-row Record button stays
consistent with Finish & Next.
"""
self.ensure_one()
cr_action = self._fp_contract_review_redirect()
if cr_action:
return cr_action
return {
'type': 'ir.actions.client',
'tag': 'fp_record_inputs_dialog',
'params': {
'step_id': self.id,
'advance_after': False,
},
}
def _fp_contract_review_redirect(self):
"""Return an ir.actions.act_window opening the part's QA-005
Contract Review form, or False to indicate "no redirect needed".
Triggers when:
* the recipe node is flagged default_kind='contract_review', AND
* the linked part has no review yet OR the review is still in
a non-terminal state (draft / assistant_review / manager_review).
Once the review reaches state 'complete' or 'dismissed' the step
is allowed to finish through the normal path, which is how the
operator clears the contract-review gate after signing QA-005.
Soft-fail: if the job has no part_catalog_id we cannot route to
a per-part review, so we fall through to the standard wizard
rather than blocking the operator.
"""
self.ensure_one()
node = self.recipe_node_id
if not node or node.default_kind != 'contract_review':
return False
part = self.job_id.part_catalog_id
if not part:
_logger.warning(
"Contract-review step '%s' on job %s has no part_catalog_id "
"— cannot redirect to QA-005 form, falling through to "
"standard wizard.",
self.name, self.job_id.name,
)
return False
review = part.x_fc_contract_review_id
if review and review.state in ('complete', 'dismissed'):
return False
return part.action_start_contract_review()
# ------------------------------------------------------------------
# Live duration helper — view binds to a non-stored compute that
# ticks each time the form re-reads. For a true live ticking clock
# we'd need an OWL widget; this gives "minutes since start" that's
# accurate at every record refresh, which is good enough for a
# backend manager's view.
# ------------------------------------------------------------------
duration_running_minutes = fields.Float(
string='Running Min',
compute='_compute_duration_running',
help='Minutes since the step\'s current open timelog started. '
'Re-reads on every form refresh; equals duration_actual once '
'the step is finished.',
)
@api.depends('state', 'date_started', 'time_log_ids',
'time_log_ids.date_started', 'time_log_ids.date_finished',
'duration_actual')
def _compute_duration_running(self):
now = fields.Datetime.now()
for step in self:
if step.state == 'in_progress':
# Sum closed intervals + (now - open interval start)
closed = sum(step.time_log_ids.mapped('duration_minutes'))
open_log = step.time_log_ids.filtered(
lambda l: not l.date_finished
)[:1]
running = 0.0
if open_log and open_log.date_started:
delta = (now - open_log.date_started).total_seconds() / 60.0
running = max(0.0, delta)
step.duration_running_minutes = closed + running
else:
step.duration_running_minutes = step.duration_actual or 0.0
# ------------------------------------------------------------------
# Sub 12d — Step Details Quick-Look modal
# ------------------------------------------------------------------
# Three computed/related fields that power the read-only manager
# quick-look modal. The modal is bound via context= on the parent
# job form's — no TransientModel needed.
quick_look_instructions = fields.Html(
string='Operator Instructions',
related='recipe_node_id.description',
readonly=True,
)
quick_look_collect_master = fields.Boolean(
string='Collect Measurements',
related='recipe_node_id.collect_measurements',
readonly=True,
)
quick_look_prompt_ids = fields.Many2many(
'fusion.plating.process.node.input',
string='Prompts',
compute='_compute_quick_look_prompt_ids',
)
quick_look_recorded_value_ids = fields.Many2many(
'fp.job.step.move.input.value',
string='Recorded Values',
compute='_compute_quick_look_recorded_value_ids',
)
@api.depends('recipe_node_id', 'recipe_node_id.input_ids')
def _compute_quick_look_prompt_ids(self):
for step in self:
node = step.recipe_node_id
if node:
step.quick_look_prompt_ids = node.input_ids.filtered(
lambda i: (i.kind or 'step_input') == 'step_input'
).sorted('sequence')
else:
step.quick_look_prompt_ids = False
def _compute_quick_look_recorded_value_ids(self):
Value = self.env['fp.job.step.move.input.value']
for step in self:
if not step.id:
step.quick_look_recorded_value_ids = False
continue
step.quick_look_recorded_value_ids = Value.search([
('move_id.from_step_id', '=', step.id),
], order='create_date desc')
def action_open_full_form(self):
"""From the quick-look modal, escape to the full editable form."""
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step',
'res_id': self.id,
'view_mode': 'form',
'target': 'current',
'name': self.name,
}
def action_open_quick_look(self):
"""Open the read-only Step Details quick-look modal.
Bound to the row-button on the embedded step list — explicit
trigger needed because editable="bottom" intercepts row clicks
for inline editing rather than opening the form view.
"""
self.ensure_one()
view = self.env.ref(
'fusion_plating_jobs.view_fp_job_step_quick_look_form'
)
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step',
'res_id': self.id,
'view_mode': 'form',
'view_id': view.id,
'views': [(view.id, 'form')],
'target': 'new',
'name': self.name,
}
def _fp_record_one_piece_auto_move(self):
"""Decide whether to silently record a move before the step
finishes. Six cases:
- qty_at_step == 0: nothing to do (parts already moved).
- last runnable step: parts complete in place; no move.
- SEED-ONLY qty + downstream: bulk-move all parts to next
step in one move. Paperwork / first steps don't physically
hold parts per-piece.
- real qty == 1 + downstream: record move(1).
- real qty > 1 + downstream: raise — operator must use
Complete 1 → Next (streaming) or Move… (batched).
- real qty > 1 + last step: allow (qty_done auto-tick Phase 2).
Called from action_finish_and_advance just before button_finish.
"""
self.ensure_one()
qty = self.qty_at_step
if qty <= 0:
return False
next_step = self.job_id.step_ids.filtered(
lambda s: s.sequence > self.sequence
and s.state in ('pending', 'ready')
).sorted('sequence')[:1]
if not next_step:
return False
# Bulk-move all parts in one shot. Covers three use cases:
# - seed-only qty (paperwork / first step): creates the
# first explicit move record for the batch
# - real incoming, qty == 1 (streaming flow last part):
# same as Complete 1 → Next's tail call
# - real incoming, qty > 1 (batched flow): one click moves
# everything forward — operators with small parts don't
# have to click Complete 1 → Next repeatedly
# Complete 1 → Next is still available for one-by-one flow.
self.env['fp.job.step.move'].create({
'job_id': self.job_id.id,
'from_step_id': self.id,
'to_step_id': next_step.id,
'transfer_type': 'step',
'qty_moved': qty,
'moved_by_user_id': self.env.user.id,
})
return True