Files
gsinghpal 091f98e1f9 changes
2026-05-18 22:33:23 -04:00

1200 lines
52 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# Real implementations for the state-machine action stubs that
# fusion_plating core's fp.job.step shipped as NotImplementedError
# placeholders. Per spec §5.2 state machine.
import logging
import re
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FpJobStep(models.Model):
_inherit = 'fp.job.step'
# ===== Sub 13 — sequential enforcement (recipe + per-step) =============
# Decision matrix for whether button_start must verify predecessors:
#
# recipe.enforce_sequential | step.parallel_start | step.req_pred (legacy) | block?
# --------------------------|---------------------|------------------------|------
# True | False | any | YES
# True | True | any | no
# False | any | True | YES
# False | any | False | no
#
# Manager bypass via context fp_skip_predecessor_check=True.
# Encapsulated in _fp_should_block_predecessors() so the same gate
# is reused by button_start and the Move wizard's predecessor check
# (fusion_plating_shopfloor/controllers/move_controller.py).
# ======================================================================
def _fp_should_block_predecessors(self):
"""Return True if this step must verify that every earlier-
sequence step is in a terminal state before it can start.
See decision matrix in the section header above.
"""
self.ensure_one()
# Defensive: jobs without a recipe (manual-build jobs) default
# to enforce-on so freshly-created records don't accidentally
# leak permissive behaviour through a related-field None.
if not self.job_id:
return True
recipe_seq = self.job_id.enforce_sequential
if recipe_seq:
return not self.parallel_start
# Free-flow recipe — only the legacy per-step flag still gates.
return bool(self.requires_predecessor_done)
can_start = fields.Boolean(
string='Can Start',
compute='_compute_can_start',
help='True when this step is in a startable state AND the '
'predecessor gate (if any) is currently clear. Drives the '
'tablet/job form Start button visibility.',
)
@api.depends(
'state',
'sequence',
'parallel_start',
'requires_predecessor_done',
'job_id.enforce_sequential',
'job_id.step_ids.state',
'job_id.step_ids.sequence',
)
def _compute_can_start(self):
for step in self:
if step.state not in ('pending', 'ready', 'paused'):
step.can_start = False
continue
if not step._fp_should_block_predecessors():
step.can_start = True
continue
blocking = step.job_id.step_ids.filtered(
lambda s: s.sequence < step.sequence and s.state not in (
'done', 'skipped', 'cancelled',
)
)
step.can_start = not bool(blocking)
# NOTE: the actual button_start override lives further down (~line
# 876) where it merges Sub 13 predecessor gate + Policy B Contract
# Review auto-open + Sub 8 Racking auto-open + the receiving soft
# check. Keeping ONE definition prevents the Python-overrides-the-
# earlier-method footgun that swallowed Sub 13 enforcement on
# WH/JOB/00342.
def button_pause(self):
"""Pause an in-progress step (operator break, end of shift).
Closes the open timelog row, sums duration_actual, transitions
state to 'paused'. button_start re-opens a fresh timelog when
the operator resumes.
"""
for step in self:
if step.state != 'in_progress':
raise UserError(_(
"Step '%s' is in state '%s' — only in-progress steps can pause."
) % (step.name, step.state))
now = fields.Datetime.now()
open_log = step.time_log_ids.filtered(lambda l: not l.date_finished)
if open_log:
open_log.write({'date_finished': now})
step.state = 'paused'
step.duration_actual = sum(step.time_log_ids.mapped('duration_minutes'))
return True
def button_skip(self):
"""Skip a pending/ready step (e.g. opt-in step the planner
decided not to activate for this job).
"""
for step in self:
if step.state not in ('pending', 'ready'):
raise UserError(_(
"Step '%s' is in state '%s' — only pending/ready steps can be skipped."
) % (step.name, step.state))
step.state = 'skipped'
return True
def button_cancel(self):
"""Cancel a single step. Use fp.job.action_cancel to cancel
the whole job.
"""
for step in self:
if step.state == 'done':
raise UserError(_(
"Step '%s' is done — cannot cancel."
) % step.name)
if step.state == 'cancelled':
raise UserError(_(
"Step '%s' is already cancelled."
) % step.name)
step.state = 'cancelled'
return True
def write(self, vals):
"""Post a chatter trail on the parent JOB whenever an active
step gets reassigned. The step itself already tracks
assigned_user_id (tracking=True) but supervisors don't open
each step's chatter — they read the job. Without a job-level
post the takeover is invisible.
Only fires for steps in active states (in_progress / paused)
so creating a draft job + assigning a step to someone doesn't
spam the job chatter. Comparing to the OLD assignment so we
don't post on the initial set-from-False either.
"""
post_for = []
if 'assigned_user_id' in vals:
new_uid = vals['assigned_user_id']
for step in self:
if step.state not in ('in_progress', 'paused'):
continue
old_uid = step.assigned_user_id.id
if not old_uid:
continue
if new_uid == old_uid:
continue
post_for.append((step, old_uid, new_uid))
result = super().write(vals)
Users = self.env['res.users']
for step, old_uid, new_uid in post_for:
old_name = Users.browse(old_uid).name if old_uid else '(unassigned)'
new_name = Users.browse(new_uid).name if new_uid else '(unassigned)'
step.job_id.message_post(body=Markup(_(
'Step <b>%s</b> reassigned from <b>%s</b> to <b>%s</b> '
'(state=%s) by %s.'
)) % (step.name, old_name, new_name, step.state,
self.env.user.name))
# Auto-promote parent job: confirmed → in_progress on first
# step start. Without this, fp.job.state never reaches
# 'in_progress' anywhere in the codebase, so the In Progress
# workflow milestone (trigger_on_job_state='in_progress')
# never fires.
if vals.get('state') == 'in_progress':
jobs_to_promote = self.mapped('job_id').filtered(
lambda j: j.state == 'confirmed'
)
if jobs_to_promote:
jobs_to_promote.write({'state': 'in_progress'})
return result
@api.model
def _cron_nudge_stale_paused(self, threshold_hours=24):
"""Daily nudge for steps stuck in `paused` longer than threshold."""
return self._cron_nudge_stale_steps(
states=('paused',),
threshold_hours=threshold_hours,
label='paused',
)
@api.model
def _cron_nudge_stale_in_progress(self, threshold_hours=8):
"""Cron nudge for steps stuck in `in_progress` longer than
threshold. Default 8 hours — operator started, walked away,
timelog accumulating phantom hours.
"""
return self._cron_nudge_stale_steps(
states=('in_progress',),
threshold_hours=threshold_hours,
label='in-progress',
)
@api.model
def _cron_nudge_stale_steps(self, states=('paused',),
threshold_hours=24, label='stale'):
"""Generic stale-step nudger.
Finds every fp.job.step in any of `states` with date_started
older than N hours. Schedules a 'todo' mail.activity on the
parent job for the job's manager_id (falls back to the user
who started the step). Idempotent — won't double-schedule if
an open activity with the same summary already exists.
"""
from datetime import timedelta as _td
cutoff = fields.Datetime.now() - _td(hours=threshold_hours)
stale = self.search([
('state', 'in', list(states)),
('date_started', '<', cutoff),
('date_started', '!=', False),
])
Activity = self.env['mail.activity']
ActivityType = self.env.ref(
'mail.mail_activity_data_todo', raise_if_not_found=False,
)
nudged_count = 0
for step in stale:
job = step.job_id
assignee = (job.manager_id or step.assigned_user_id
or step.started_by_user_id or self.env.user)
summary = _('Stale %s step: %s') % (label, step.name)
existing = Activity.search([
('res_model', '=', job._name),
('res_id', '=', job.id),
('summary', '=', summary),
], limit=1)
if existing:
continue
age_h = (fields.Datetime.now() - step.date_started).total_seconds() / 3600.0
note = _(
'Step "%(step)s" on job %(job)s has been in %(label)s state for '
'%(hours).1f hours (since %(start)s). Investigate: operator '
'reassignment, equipment failure, or finish + close out.'
) % {
'step': step.name, 'job': job.name, 'label': label,
'hours': age_h, 'start': step.date_started,
}
vals = {
'res_model_id': self.env['ir.model']._get(job._name).id,
'res_id': job.id,
'summary': summary,
'note': note,
'user_id': assignee.id,
'date_deadline': fields.Date.context_today(self),
}
if ActivityType:
vals['activity_type_id'] = ActivityType.id
Activity.create(vals)
nudged_count += 1
job.message_post(body=Markup(_(
'Stale %s step: <b>%s</b> has been idle %.1f hours. '
'Activity created for %s.'
)) % (label, step.name, age_h, assignee.name))
if nudged_count:
_logger.info(
'fp.job.step stale-%s cron: nudged %d step(s)',
label, nudged_count,
)
return nudged_count
def action_abort_for_retry(self, reason=None, new_tank_id=None,
new_bath_id=None):
"""Abort an in_progress / paused step so the operator can restart
it (typically after an equipment failure mid-step).
Closes the open timelog (preserves the partial-work record on
the audit trail), posts a clear chatter event on the JOB
explaining why + which tank, optionally moves the step to a
different tank/bath, and resets the step to `ready` so the
operator can hit Start again.
Without this method the operator's only options are
button_cancel (kills the step entirely) or
pause → write tank → start (no failure audit).
"""
if not reason:
reason = _('Equipment failure / abort for retry')
for step in self:
if step.state not in ('in_progress', 'paused'):
raise UserError(_(
"Step '%s' is in state '%s' — only in_progress / "
"paused steps can be aborted for retry."
) % (step.name, step.state))
old_tank = step.tank_id.display_name or '(no tank set)'
old_bath = step.bath_id.display_name or '(no bath set)'
now = fields.Datetime.now()
open_logs = step.time_log_ids.filtered(
lambda l: not l.date_finished
)
if open_logs:
open_logs.write({'date_finished': now})
partial_min = sum(step.time_log_ids.mapped('duration_minutes'))
change_msg = ''
if new_tank_id:
step.tank_id = new_tank_id
change_msg += ' -> tank %s' % step.tank_id.display_name
if new_bath_id:
step.bath_id = new_bath_id
change_msg += ' -> bath %s' % step.bath_id.display_name
step.state = 'ready'
step.duration_actual = partial_min
step.job_id.message_post(body=Markup(_(
'⚠️ Step <b>%s</b> aborted for retry by %s.<br/>'
'Reason: <em>%s</em><br/>'
'Equipment: tank=%s, bath=%s%s<br/>'
'Partial work captured: %.2f min in %d timelog(s). '
'Step is back in <b>ready</b> state — operator can '
'restart when the issue is resolved.'
)) % (
step.name, self.env.user.name, reason,
old_tank, old_bath, change_msg, partial_min,
len(step.time_log_ids),
))
return True
def action_recompute_duration_from_timelogs(self):
"""Manual button — re-sum duration_actual + post to chatter
for audit. Use case: supervisor adjusts a timelog row and
wants an explicit audit trail of the recompute. The
automatic version called from timelog hooks is
_fp_resum_duration_actual (no chatter).
"""
for step in self:
old = step.duration_actual or 0.0
new = sum(step.time_log_ids.mapped('duration_minutes'))
step.duration_actual = new
if abs(old - new) > 0.001:
step.job_id.message_post(body=Markup(_(
'Step <b>%s</b> duration recomputed from timelog rows: '
'%.2f min → %.2f min (Δ %+.2f). Recomputed by %s.'
)) % (step.name, old, new, new - old, self.env.user.name))
return True
def _fp_resum_duration_actual(self):
"""Quiet re-sum — used by automatic triggers (timelog
create/write/unlink hooks). No chatter post. Skips no-op
updates so writes are minimised."""
for step in self:
new = sum(step.time_log_ids.mapped('duration_minutes'))
if abs((step.duration_actual or 0.0) - new) > 0.001:
step.duration_actual = new
return True
def action_finish_and_advance(self):
"""Steelhead-style "Finish & Next" — finish this step then auto-
start the next pending/ready step in sequence. Single click
replaces the prior Finish-then-Move-wizard dance.
If the step has authored step_input prompts AND none have been
captured yet, we route through the simplified Record Inputs
wizard first; saving the wizard re-enters here with the
`fp_after_inputs=True` context flag so we don't loop.
"""
self.ensure_one()
if self.state != 'in_progress':
raise UserError(_(
"Step '%s' is in state '%s' — start it before clicking Finish."
) % (self.name, self.state))
# Contract Review (QA-005) routing — when the recipe step is
# flagged as a contract-review step, the operator should land on
# the part's QA-005 form rather than the generic measurement
# wizard. Once the review is complete or dismissed we fall
# through to the normal finish path so the step can advance.
cr_action = self._fp_contract_review_redirect()
if cr_action:
return cr_action
# Prompt-first behaviour: show the Record Inputs dialog when the
# recipe step has authored prompts and nothing has been captured
# in this run. Bypass when context flag is set (i.e. we're being
# called BACK from the wizard's commit), or when the operator
# already saved values via the Record Inputs button earlier.
if (not self.env.context.get('fp_after_inputs')
and self._fp_has_uncaptured_step_inputs()):
return self._fp_open_input_wizard(advance_after=True)
# Auto-move shim: for qty_at_step==1 + downstream step,
# silently record a move(qty=1) so the qty gate in
# button_finish passes. Raises for qty>1 (operator must use
# Complete 1 → Next or Move…). Last step is always allowed.
self._fp_record_one_piece_auto_move()
self.button_finish()
next_step = self._fp_next_runnable_step()
if next_step:
next_step.with_context(
fp_skip_predecessor_check=True,
).button_start()
self.job_id.message_post(body=_(
'Step "%(prev)s" finished — auto-started next step "%(next)s".'
) % {'prev': self.name, 'next': next_step.name})
return True
def _fp_next_runnable_step(self):
"""The lowest-sequence step on this job that isn't terminal yet
and isn't this one. Used by action_finish_and_advance."""
self.ensure_one()
candidates = self.job_id.step_ids.filtered(
lambda s: s.id != self.id
and s.state in ('pending', 'ready', 'paused')
).sorted('sequence')
return candidates[:1] or self.env['fp.job.step']
def _fp_has_uncaptured_step_inputs(self):
"""True when the recipe step defines step_input prompts AND
the user hasn't already saved values for this step's current
run via the Record Inputs wizard.
"""
self.ensure_one()
node = self.recipe_node_id
if not node:
return False
prompts = node.input_ids
if 'kind' in prompts._fields:
prompts = prompts.filtered(lambda i: i.kind == 'step_input')
if not prompts:
return False
# Has the operator already recorded values during this run?
# Heuristic: any in-place fp.job.step.move (transfer_type='step')
# for this step since date_started.
Move = self.env['fp.job.step.move']
already = Move.search_count([
('from_step_id', '=', self.id),
('transfer_type', '=', 'step'),
('move_datetime', '>=', self.date_started or fields.Datetime.now()),
])
return already == 0
def _fp_open_input_wizard(self, advance_after=False):
"""Open the Record Inputs OWL dialog (Sub 12e v4).
Replaces the form-view-based wizard with a custom OWL Dialog
component (fp_record_inputs_dialog.js). The dialog renders
each prompt as a proper card with semantic HTML — no more
list-cell-as-card CSS hacks.
When advance_after is True, the dialog's Save button commits
values then dispatches the result of action_finish_and_advance
so the step finishes + auto-starts the next step in one flow.
"""
self.ensure_one()
return {
'type': 'ir.actions.client',
'tag': 'fp_record_inputs_dialog',
'params': {
'step_id': self.id,
'advance_after': bool(advance_after),
},
}
# NB: action_open_input_wizard is defined further down (line ~829)
# — that one stays as the per-row "Record" button entry-point.
# _fp_open_input_wizard above adds the advance_after pathway used
# only by action_finish_and_advance.
def button_finish(self):
"""Override to:
1) Auto-spawn a bake.window when a wet plating step finishes
on a recipe that requires hydrogen-embrittlement relief
(AS9100 / Nadcap compliance). Bake fields live on the
recipe root post-promote-customer-spec.
2) Post a chatter warning when duration_actual exceeds 1.5×
duration_expected — silent overruns are a red flag for
scheduling and costing.
Both actions are idempotent and never block the finish itself.
"""
result = super().button_finish()
BW = self.env['fusion.plating.bake.window']
Bath = self.env['fusion.plating.bath']
for step in self:
if step.state != 'done':
continue
# Duration-overrun chatter alert.
if step.duration_expected and step.duration_actual:
ratio = step.duration_actual / step.duration_expected
if ratio >= 1.5:
step.job_id.message_post(body=Markup(_(
'⚠️ <b>Step "%s" ran %.1fx expected</b> — '
'expected %.0f min, actual %.0f min. Investigate: '
'equipment issue, training gap, or recipe time '
'estimate too tight.'
)) % (step.name, ratio, step.duration_expected,
step.duration_actual))
recipe_root = step.job_id.recipe_id
if not recipe_root:
continue
requires = getattr(recipe_root, 'requires_bake_relief', False)
window_hrs = getattr(recipe_root, 'bake_window_hours', 0.0)
if not requires or not window_hrs:
continue
# Trigger only on the actual plating-out step. We want
# exactly ONE bake.window per job (not one per step that
# happens to have "plate" in the name). Heuristic:
# - step.kind == 'wet' (clean, recipe-authored signal); OR
# - the step name contains "plating" as a word
# Explicit excludes: inspection / bake / mask / rack steps
# whose names might happen to mention plating in passing
# (e.g. "Post-plate Inspection").
name_l = (step.name or '').lower()
kind_match = step.kind == 'wet'
name_match = bool(re.search(r'\bplating\b', name_l))
excluded = any(kw in name_l for kw in (
'inspect', 'inspection', 'bake', 'mask', 'rack',
))
if (not kind_match and not name_match) or excluded:
continue
# Idempotency — only one bake.window per (job, step).
existing = BW.sudo().search([
('part_ref', '=', step.job_id.name),
('lot_ref', '=', f'step-{step.id}'),
], limit=1)
if existing:
continue
# Pick a bath: step.bath_id wins; fall back to the first
# active bath in the facility (best-effort — operator can
# correct on the bake.window record).
bath = step.bath_id or Bath.sudo().search(
[('facility_id', '=', step.facility_id.id)], limit=1,
) if step.facility_id else False
if not bath:
bath = Bath.sudo().search([], limit=1)
if not bath:
_logger.warning(
'Step %s: bake-window auto-spawn skipped — no bath '
'configured.', step.name,
)
continue
bw = BW.sudo().create({
'bath_id': bath.id,
'plate_exit_time': step.date_finished or fields.Datetime.now(),
'window_hours': window_hrs,
'part_ref': step.job_id.name,
'lot_ref': f'step-{step.id}',
'customer_ref': step.job_id.partner_id.display_name or '',
'quantity': int(step.job_id.qty or 0),
})
step.job_id.message_post(body=Markup(_(
'Bake window <b>%s</b> auto-created — %.1fh window from '
'plate exit. Required by %s.'
)) % (bw.name, window_hrs, bw.bake_required_by))
return result
# ==================================================================
# Phase 2 multi-serial — auto-promote serials on step transitions
# ==================================================================
def _fp_promote_serials_on_start(self):
"""When this step transitions to in_progress, lift any serial
attached to the parent SO line out of `received` / `racked` and
into `in_process`. Idempotent — already-promoted serials are
skipped.
"""
for step in self:
job = step.job_id
if not job.sale_order_line_ids:
continue
serials = job.sale_order_line_ids.mapped('x_fc_serial_ids')
to_promote = serials.filtered(
lambda s: s.state in ('received', 'racked')
)
if to_promote:
# Use sudo on the helper so operator-tier users can promote
# serial state without needing direct write on fp.serial.
to_promote.sudo()._set_state('in_process', message=_(
'Promoted to In Process on step "%s" start by %s.'
) % (step.name, self.env.user.name))
def _fp_promote_serials_on_finish(self):
"""When the LAST step of this step's job finishes (sequenced
terminal step OR an explicit inspect/final-inspect kind), bump
in-flight serials to `inspected` so the shipper sees them ready
for packing. Conservative — only promotes from `in_process`."""
for step in self:
job = step.job_id
if not job.sale_order_line_ids:
continue
# Is this the highest-sequence non-cancelled step on the job?
siblings = job.step_ids.filtered(
lambda s: s.state not in ('cancelled', 'skipped')
)
if not siblings:
continue
last_seq = max(siblings.mapped('sequence'))
is_terminal = (step.sequence == last_seq) or (
step.kind == 'inspect' or 'final' in (step.name or '').lower()
)
if not is_terminal:
continue
serials = job.sale_order_line_ids.mapped('x_fc_serial_ids')
to_promote = serials.filtered(lambda s: s.state == 'in_process')
if to_promote:
to_promote.sudo()._set_state('inspected', message=_(
'Promoted to Inspected on step "%s" finish by %s.'
) % (step.name, self.env.user.name))
# ==================================================================
# Policy B (2026-04-28) — Contract Review enforcement
# ==================================================================
# When a recipe author drops a "Contract Review" step into a recipe,
# button_start opens the QA-005 audit form for the linked part (auto-
# creates one if missing) and button_finish blocks completion until
# the form is `complete` AND the current user is on the recipe's
# contract_review_user_ids approver list (when configured).
#
# Detection — case-insensitive match on the step name OR
# recipe_node_id mapped from a step.template with default_kind ==
# 'contract_review' (the simple-editor library entry).
def _fp_is_contract_review_step(self):
self.ensure_one()
if (self.name or '').strip().lower() in ('contract review', 'qa-005'):
return True
node = self.recipe_node_id
if not node:
return False
# Source template kind (when authored via simple editor library)
if 'source_template_id' in node._fields and node.source_template_id:
if node.source_template_id.default_kind == 'contract_review':
return True
if 'default_kind' in node._fields and node.default_kind == 'contract_review':
return True
return False
def _fp_resolve_contract_review_part(self):
"""Find the fp.part.catalog this step's job is for. Used by the
Contract Review hooks to auto-create / look up the QA-005 form.
Falls through to None when no part can be resolved (no SO line,
SO line without x_fc_part_catalog_id, etc.)."""
self.ensure_one()
for so_line in self.job_id.sale_order_line_ids:
if (so_line.x_fc_part_catalog_id
and 'fp.contract.review' in self.env):
return so_line.x_fc_part_catalog_id
return None
def _fp_open_contract_review(self):
"""Auto-create the QA-005 form for this step's part if missing,
return the act_window pointing at it. Called from button_start
on Contract Review steps."""
self.ensure_one()
part = self._fp_resolve_contract_review_part()
if not part:
return None
Review = self.env.get('fp.contract.review')
if Review is None:
return None # quality module not installed — skip
review = part.x_fc_contract_review_id
if not review:
review = Review.sudo().create({
'part_id': part.id,
'state': 'assistant_review',
})
part.sudo().write({
'x_fc_contract_review_id': review.id,
'x_fc_contract_review_dismissed': False,
})
self.job_id.message_post(body=_(
'Contract Review (QA-005) auto-created for %(part)s on '
'Contract Review step start by %(user)s.'
) % {
'part': part.display_name or part.part_number or '',
'user': self.env.user.name,
})
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.contract.review',
'res_id': review.id,
'view_mode': 'form',
'target': 'current',
'name': _('Contract Review — %s') % (
part.display_name or part.part_number or ''
),
}
def _fp_check_contract_review_complete(self):
"""Block button_finish on a Contract Review step until QA-005 is
signed off. Only enforced when the customer has
partner.x_fc_contract_review_required=True. Manager bypass via
context fp_skip_contract_review_gate=True."""
if self.env.context.get('fp_skip_contract_review_gate'):
return
for step in self:
if not step._fp_is_contract_review_step():
continue
part = step._fp_resolve_contract_review_part()
if not part or not part.partner_id.x_fc_contract_review_required:
continue
review = part.x_fc_contract_review_id
if not review or review.state != 'complete':
state_label = (
review.state if review else _('not started')
)
raise UserError(_(
'Contract Review for %(part)s is %(state)s — must be '
'"complete" before this step can finish. Open the '
'QA-005 form (smart button on the part), get both '
'sections signed off, then retry. Manager bypass: '
'fp_skip_contract_review_gate=True in context.'
) % {
'part': part.display_name or part.part_number or '',
'state': state_label,
})
# Approver-list gate (restored from pre-Sub-11). When the
# recipe author named approvers on the recipe root, only those
# users can finish the Contract Review step.
recipe = step.recipe_node_id and step.recipe_node_id.recipe_root_id
approvers = (recipe.contract_review_user_ids
if (recipe and 'contract_review_user_ids' in recipe._fields)
else False)
if approvers and self.env.user not in approvers:
raise UserError(_(
'Only authorised Contract Review approvers can finish '
'this step. Approvers: %s.\n\nContact your Plating '
'Manager to add yourself if this is wrong, or hand '
'the step to one of the approvers.'
) % ', '.join(approvers.mapped('name')))
# ==================================================================
# Sub 8 follow-up (2026-04-28) — Racking Inspection enforcement
# ==================================================================
# When the recipe-side "Racking" step starts, auto-promote the linked
# fp.racking.inspection from draft → inspecting and route the operator
# straight into the inspection form. When the same step finishes,
# block unless the inspection is in `done` or `discrepancy_flagged`
# (operator cleared every line). Manager bypass via context
# `fp_skip_racking_inspection_gate=True`.
def _fp_is_racking_step(self):
self.ensure_one()
if (self.name or '').strip().lower() in ('racking', 'rack'):
return True
node = self.recipe_node_id
if not node:
return False
if 'source_template_id' in node._fields and node.source_template_id:
if node.source_template_id.default_kind == 'racking':
return True
if 'default_kind' in node._fields and node.default_kind == 'racking':
return True
if self.kind == 'rack':
return True
return False
def _fp_open_racking_inspection(self):
"""Auto-promote draft → inspecting + return act_window for the
linked racking inspection. Auto-creates one if missing."""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
return None
# Reach the job's existing inspection (auto-created on action_confirm)
# or trigger a fresh create if none exists.
ri = self.job_id.racking_inspection_id
if not ri:
self.job_id._fp_create_racking_inspection()
self.job_id.invalidate_recordset(['racking_inspection_ids'])
ri = self.job_id.racking_inspection_id
if not ri:
return None
# Promote draft → inspecting. action_start raises if state isn't
# draft, so guard.
if ri.state == 'draft':
ri.sudo().action_start()
self.job_id.message_post(body=_(
'Racking inspection auto-promoted to "Inspecting" on '
'%(step)s start by %(user)s.'
) % {'step': self.name, 'user': self.env.user.name})
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.racking.inspection',
'res_id': ri.id,
'view_mode': 'form',
'target': 'current',
'name': _('Racking Inspection — %s') % self.job_id.name,
}
def _fp_check_racking_inspection_complete(self):
"""Soft gate — block button_finish on a Racking step until the
linked inspection is in a terminal state. discrepancy_flagged
counts as complete (the operator finished but flagged issues —
the discrepancy activity will route to the manager separately)."""
if self.env.context.get('fp_skip_racking_inspection_gate'):
return
for step in self:
if not step._fp_is_racking_step():
continue
ri = step.job_id.racking_inspection_id
if not ri:
# No inspection at all — still let it finish, but log a
# chatter warning so the manager sees the gap.
step.job_id.message_post(body=_(
'⚠️ Racking step "%s" finished without a racking '
'inspection on file. Sub 8 expected one to be '
'auto-created on job confirm.'
) % step.name)
continue
if ri.state not in ('done', 'discrepancy_flagged'):
state_label = dict(ri._fields['state'].selection).get(
ri.state, ri.state)
raise UserError(_(
'Racking inspection for %(job)s is "%(state)s" — must '
'be Done or Discrepancy Flagged before this step can '
'finish. Click the Racking Insp. smart button on the '
'job, complete the line check-off, then retry. '
'Manager bypass: fp_skip_racking_inspection_gate=True.'
) % {
'job': step.job_id.name,
'state': state_label,
})
def _fp_check_receiving_gate(self):
"""Block step transitions until parts are physically received.
Applied to every step EXCEPT Contract Review (paperwork — doesn't
need parts on the floor). Fires from both button_start and
button_finish so an operator can't begin OR complete physical
work before the receiving record is closed.
Manager bypass: ``fp_skip_receiving_gate=True`` in context. Same
pattern as the qty / QC / bake gates. Audit trail is preserved
via the state-transition tracking on chatter.
Threshold: SO ``x_fc_receiving_status == 'received'``. Post-Sub-8
that's the terminal state (inspection moved into the recipe's
racking step; ``'inspected'`` was dropped in the 2026-05-18
cleanup).
"""
if self.env.context.get('fp_skip_receiving_gate'):
return
for step in self:
if step._fp_is_contract_review_step():
continue
so = step.job_id.sale_order_id
if not so:
# Internal rework / no SO — gate doesn't apply.
continue
if 'x_fc_receiving_status' not in so._fields:
# Defensive: configurator module not installed.
continue
if so.x_fc_receiving_status != 'received':
label = dict(
so._fields['x_fc_receiving_status'].selection
).get(
so.x_fc_receiving_status,
so.x_fc_receiving_status or 'unknown',
)
raise UserError(_(
'Step "%(step)s" cannot proceed — parts not received '
'yet (SO %(so)s receiving status: %(status)s).\n\n'
'Close the receiving record (Sales > %(so)s > '
'Receiving) before starting or finishing work on '
'this step. A manager can bypass this gate for '
'documented exceptions.'
) % {
'step': step.name,
'so': so.name or '?',
'status': label,
})
def button_start(self):
"""Single source of truth for step start:
1. Sub 13 predecessor gate (raise UserError if blocking)
2. Receiving gate (raise UserError if parts not received)
3. Policy B Contract Review auto-open (route to QA-005)
4. Sub 8 Racking auto-open (route to racking inspection)
5. super().button_start() + serial promotion for the standard
path
Manager bypasses available via context:
fp_skip_predecessor_check=True skips the Sub 13 gate
fp_skip_receiving_gate=True skips the receiving gate
"""
# ---- 1. Sub 13 predecessor gate ----------------------------------
skip_pred = self.env.context.get('fp_skip_predecessor_check')
for step in self:
if skip_pred:
continue
if not step._fp_should_block_predecessors():
continue
blocking = step.job_id.step_ids.filtered(
lambda s: s.sequence < step.sequence and s.state not in (
'done', 'skipped', 'cancelled',
)
)
if blocking:
raise UserError(_(
"Step '%s' cannot start until earlier steps are "
"finished, skipped, or cancelled.\n\nBlocking step(s):\n %s\n\n"
"Options:\n"
" * Finish/Skip the blocking step(s) first.\n"
" * If this step legitimately runs in parallel, ask "
"a manager to flag it as Parallel Start on the recipe.\n"
" * Manager override via context fp_skip_predecessor_check=True."
) % (
step.name,
'\n '.join(
f'#{s.sequence} {s.name} ({s.state})'
for s in blocking[:5]
),
))
# ---- 2. Receiving gate -------------------------------------------
# Hard block (replaces the prior soft chatter warning). The
# helper exempts Contract Review steps internally, so contract
# review can still auto-open below regardless of receiving state.
self._fp_check_receiving_gate()
# ---- 3. Policy B Contract Review auto-open -----------------------
for step in self:
if step._fp_is_contract_review_step():
action = step._fp_open_contract_review()
if action:
super(FpJobStep, step).button_start()
if step.state == 'in_progress':
step._fp_promote_serials_on_start()
return action
# ---- 4. Sub 8 Racking auto-open ----------------------------------
for step in self:
if step._fp_is_racking_step():
action = step._fp_open_racking_inspection()
if action:
super(FpJobStep, step).button_start()
if step.state == 'in_progress':
step._fp_promote_serials_on_start()
return action
# ---- 5. Standard path: start + serial promote --------------------
result = super().button_start()
for step in self:
if step.state == 'in_progress':
step._fp_promote_serials_on_start()
return result
def button_finish(self):
# Policy B — block until QA-005 complete (when customer requires it).
self._fp_check_contract_review_complete()
# Receiving gate — same helper as button_start, exempts CR steps.
self._fp_check_receiving_gate()
# NOTE: racking inspection gate removed — racking is now a recipe
# step, not a separate inspection workflow. _fp_check_racking_
# inspection_complete() is kept as a helper for diagnostics but
# no longer enforced from button_finish.
result = super().button_finish()
for step in self:
if step.state == 'done':
step._fp_promote_serials_on_finish()
return result
# ==================================================================
# Per-row shortcut actions used by the job form's inline action column
# ==================================================================
def action_open_move_wizard(self):
"""Open the Move wizard with this step pre-filled as the from-step."""
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.move.wizard',
'view_mode': 'form',
'target': 'new',
'name': _('Move from %s') % self.name,
'context': {
'default_from_step_id': self.id,
'default_job_id': self.job_id.id,
},
}
def action_open_input_wizard(self):
"""Open the Record Inputs OWL dialog from the per-row Record
button on the job form.
Contract-review steps redirect to the QA-005 form (same gate as
action_finish_and_advance) so the per-row Record button stays
consistent with Finish & Next.
"""
self.ensure_one()
cr_action = self._fp_contract_review_redirect()
if cr_action:
return cr_action
return {
'type': 'ir.actions.client',
'tag': 'fp_record_inputs_dialog',
'params': {
'step_id': self.id,
'advance_after': False,
},
}
def _fp_contract_review_redirect(self):
"""Return an ir.actions.act_window opening the part's QA-005
Contract Review form, or False to indicate "no redirect needed".
Triggers when:
* the recipe node is flagged default_kind='contract_review', AND
* the linked part has no review yet OR the review is still in
a non-terminal state (draft / assistant_review / manager_review).
Once the review reaches state 'complete' or 'dismissed' the step
is allowed to finish through the normal path, which is how the
operator clears the contract-review gate after signing QA-005.
Soft-fail: if the job has no part_catalog_id we cannot route to
a per-part review, so we fall through to the standard wizard
rather than blocking the operator.
"""
self.ensure_one()
node = self.recipe_node_id
if not node or node.default_kind != 'contract_review':
return False
part = self.job_id.part_catalog_id
if not part:
_logger.warning(
"Contract-review step '%s' on job %s has no part_catalog_id "
"— cannot redirect to QA-005 form, falling through to "
"standard wizard.",
self.name, self.job_id.name,
)
return False
review = part.x_fc_contract_review_id
if review and review.state in ('complete', 'dismissed'):
return False
return part.action_start_contract_review()
# ------------------------------------------------------------------
# Live duration helper — view binds to a non-stored compute that
# ticks each time the form re-reads. For a true live ticking clock
# we'd need an OWL widget; this gives "minutes since start" that's
# accurate at every record refresh, which is good enough for a
# backend manager's view.
# ------------------------------------------------------------------
duration_running_minutes = fields.Float(
string='Running Min',
compute='_compute_duration_running',
help='Minutes since the step\'s current open timelog started. '
'Re-reads on every form refresh; equals duration_actual once '
'the step is finished.',
)
@api.depends('state', 'date_started', 'time_log_ids',
'time_log_ids.date_started', 'time_log_ids.date_finished',
'duration_actual')
def _compute_duration_running(self):
now = fields.Datetime.now()
for step in self:
if step.state == 'in_progress':
# Sum closed intervals + (now - open interval start)
closed = sum(step.time_log_ids.mapped('duration_minutes'))
open_log = step.time_log_ids.filtered(
lambda l: not l.date_finished
)[:1]
running = 0.0
if open_log and open_log.date_started:
delta = (now - open_log.date_started).total_seconds() / 60.0
running = max(0.0, delta)
step.duration_running_minutes = closed + running
else:
step.duration_running_minutes = step.duration_actual or 0.0
# ------------------------------------------------------------------
# Sub 12d — Step Details Quick-Look modal
# ------------------------------------------------------------------
# Three computed/related fields that power the read-only manager
# quick-look modal. The modal is bound via context= on the parent
# job form's <field name="step_ids"/> — no TransientModel needed.
quick_look_instructions = fields.Html(
string='Operator Instructions',
related='recipe_node_id.description',
readonly=True,
)
quick_look_collect_master = fields.Boolean(
string='Collect Measurements',
related='recipe_node_id.collect_measurements',
readonly=True,
)
quick_look_prompt_ids = fields.Many2many(
'fusion.plating.process.node.input',
string='Prompts',
compute='_compute_quick_look_prompt_ids',
)
quick_look_recorded_value_ids = fields.Many2many(
'fp.job.step.move.input.value',
string='Recorded Values',
compute='_compute_quick_look_recorded_value_ids',
)
@api.depends('recipe_node_id', 'recipe_node_id.input_ids')
def _compute_quick_look_prompt_ids(self):
for step in self:
node = step.recipe_node_id
if node:
step.quick_look_prompt_ids = node.input_ids.filtered(
lambda i: (i.kind or 'step_input') == 'step_input'
).sorted('sequence')
else:
step.quick_look_prompt_ids = False
def _compute_quick_look_recorded_value_ids(self):
Value = self.env['fp.job.step.move.input.value']
for step in self:
if not step.id:
step.quick_look_recorded_value_ids = False
continue
step.quick_look_recorded_value_ids = Value.search([
('move_id.from_step_id', '=', step.id),
], order='create_date desc')
def action_open_full_form(self):
"""From the quick-look modal, escape to the full editable form."""
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step',
'res_id': self.id,
'view_mode': 'form',
'target': 'current',
'name': self.name,
}
def action_open_quick_look(self):
"""Open the read-only Step Details quick-look modal.
Bound to the row-button on the embedded step list — explicit
trigger needed because editable="bottom" intercepts row clicks
for inline editing rather than opening the form view.
"""
self.ensure_one()
view = self.env.ref(
'fusion_plating_jobs.view_fp_job_step_quick_look_form'
)
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step',
'res_id': self.id,
'view_mode': 'form',
'view_id': view.id,
'views': [(view.id, 'form')],
'target': 'new',
'name': self.name,
}
def _fp_record_one_piece_auto_move(self):
"""Decide whether to silently record a move before the step
finishes. Six cases:
- qty_at_step == 0: nothing to do (parts already moved).
- last runnable step: parts complete in place; no move.
- SEED-ONLY qty + downstream: bulk-move all parts to next
step in one move. Paperwork / first steps don't physically
hold parts per-piece.
- real qty == 1 + downstream: record move(1).
- real qty > 1 + downstream: raise — operator must use
Complete 1 → Next (streaming) or Move… (batched).
- real qty > 1 + last step: allow (qty_done auto-tick Phase 2).
Called from action_finish_and_advance just before button_finish.
"""
self.ensure_one()
qty = self.qty_at_step
if qty <= 0:
return False
next_step = self.job_id.step_ids.filtered(
lambda s: s.sequence > self.sequence
and s.state in ('pending', 'ready')
).sorted('sequence')[:1]
if not next_step:
return False
# Bulk-move all parts in one shot. Covers three use cases:
# - seed-only qty (paperwork / first step): creates the
# first explicit move record for the batch
# - real incoming, qty == 1 (streaming flow last part):
# same as Complete 1 → Next's tail call
# - real incoming, qty > 1 (batched flow): one click moves
# everything forward — operators with small parts don't
# have to click Complete 1 → Next repeatedly
# Complete 1 → Next is still available for one-by-one flow.
self.env['fp.job.step.move'].create({
'job_id': self.job_id.id,
'from_step_id': self.id,
'to_step_id': next_step.id,
'transfer_type': 'step',
'qty_moved': qty,
'moved_by_user_id': self.env.user.id,
})
return True