Files
Odoo-Modules/fusion_plating/fusion_plating/models/fp_job_step.py
gsinghpal 8c76a16366 chore(plating): de-dash shipped code + intake-neutral customer emails
Replace em-dashes and en-dashes with hyphens across 789 shipped source
files (py/xml/js/scss) so the delivered module reads as human-written;
em-dashes had become a recognizable AI-generated tell. Internal .md dev
notes are excluded. The WO-sticker mojibake strippers keep their dash
search targets (now written — / –). No logic changes: comments
and display strings only; validated with py_compile + lxml parse.

Rewrite the 7 customer notification emails to be intake-neutral
(ship-in / drop-off / pickup) and repair-aware, and fix the Shipped
email documents line (packing slip vs bill of lading; certificate only
when issued). Subjects use a hyphen separator.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 00:16:19 -04:00

567 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# fp.job.step - one operation within a plating job.
#
# Replaces mrp.workorder. Each step instantiates from a recipe
# operation node (recipe_node_id). Container nodes (recipe,
# sub_process) and step nodes (instructions) are NOT rows here -
# they live on the recipe template and are used at view-render time
# to display hierarchy. See spec §5.2 (Option A - operations only).
#
# State machine:
# pending → ready → in_progress → done
# ↓ ↓ ↑
# skipped paused
# ↓
# cancelled
from odoo import _, api, fields, models
from odoo.exceptions import AccessError, UserError
class FpJobStep(models.Model):
_name = 'fp.job.step'
_description = 'Work Order Step'
_inherit = ['mail.thread']
_order = 'job_id, sequence, id'
job_id = fields.Many2one(
'fp.job',
required=True,
ondelete='cascade',
index=True,
)
name = fields.Char(required=True)
sequence = fields.Integer(default=10)
state = fields.Selection(
[
('pending', 'Pending'),
('ready', 'Ready'),
('in_progress', 'In Progress'),
('paused', 'Paused'),
('done', 'Done'),
('skipped', 'Skipped'),
('cancelled', 'Cancelled'),
],
default='pending',
required=True,
tracking=True,
index=True,
)
recipe_node_id = fields.Many2one(
'fusion.plating.process.node',
string='Recipe Operation',
domain=[('node_type', '=', 'operation')],
)
work_centre_id = fields.Many2one('fp.work.centre', index=True)
kind = fields.Selection(
[
('wet', 'Wet'),
('bake', 'Bake'),
('mask', 'Mask'),
('rack', 'Rack'),
('inspect', 'Inspect'),
('other', 'Other'),
],
default='other',
)
assigned_user_id = fields.Many2one('res.users', tracking=True)
started_by_user_id = fields.Many2one('res.users', readonly=True)
finished_by_user_id = fields.Many2one('res.users', readonly=True)
date_started = fields.Datetime(readonly=True)
date_finished = fields.Datetime(readonly=True)
duration_expected = fields.Float(string='Expected Minutes')
duration_actual = fields.Float(string='Actual Minutes', readonly=True)
instructions = fields.Html(string='Step Instructions')
time_log_ids = fields.One2many(
'fp.job.step.timelog',
'step_id',
string='Time Logs',
)
# ------------------------------------------------------------------
# Equipment + audit (Task 1.6)
# oven_id is deferred to a bridge module - fusion.plating.bake.oven
# lives in fusion_plating_shopfloor and core can't depend on it.
# masking_material_id is deferred - fusion.plating.masking.material
# does not yet exist in any installed module; will be added when
# the masking model lands (likely in fusion_plating_process_en
# or a future fusion_plating_masking module).
# ------------------------------------------------------------------
bath_id = fields.Many2one('fusion.plating.bath')
tank_id = fields.Many2one('fusion.plating.tank')
rack_id = fields.Many2one('fusion.plating.rack')
signoff_user_id = fields.Many2one('res.users', readonly=True)
facility_id = fields.Many2one(
'fusion.plating.facility',
related='work_centre_id.facility_id',
store=True,
)
# ------------------------------------------------------------------
# Plating spec (Task 1.6)
# ------------------------------------------------------------------
thickness_target = fields.Float(string='Target Thickness')
thickness_uom = fields.Selection(
[('um', 'µm'), ('mil', 'mil'), ('inch', 'in')],
default='um',
)
dwell_time_minutes = fields.Float()
# Label intentionally has no unit suffix - the unit follows the
# company's `x_fc_default_temp_uom` setting and is surfaced via the
# adjacent `bake_setpoint_temp_uom_display` compute. Hardcoding °C
# in the label was the most visible "Celsius leaks everywhere"
# offender flagged 2026-05-10.
bake_setpoint_temp = fields.Float(string='Bake Setpoint')
bake_setpoint_temp_uom_display = fields.Char(
string='Unit',
compute='_compute_bake_setpoint_temp_uom_display',
help='Temperature unit pulled live from Settings → Fusion Plating → '
'Units of Measure. Updates everywhere the moment the admin '
'flips Fahrenheit ↔ Celsius.',
)
bake_actual_duration = fields.Float(string='Bake Actual Minutes')
bake_chart_recorder_ref = fields.Char(string='Bake Chart Recorder Ref')
@api.depends_context('company')
def _compute_bake_setpoint_temp_uom_display(self):
sym = '°F' if (self.env.company.x_fc_default_temp_uom or 'F') == 'F' else '°C'
for rec in self:
rec.bake_setpoint_temp_uom_display = sym
# ------------------------------------------------------------------
# Recipe-related (Task 1.6)
# ------------------------------------------------------------------
requires_signoff = fields.Boolean(
related='recipe_node_id.requires_signoff',
store=True,
)
auto_complete = fields.Boolean(
related='recipe_node_id.auto_complete',
store=True,
)
is_manual = fields.Boolean(
related='recipe_node_id.is_manual',
store=True,
)
customer_visible = fields.Boolean(
related='recipe_node_id.customer_visible',
store=True,
)
requires_predecessor_done = fields.Boolean(
related='recipe_node_id.requires_predecessor_done',
store=True,
help='LEGACY: per-step opt-in for predecessor enforcement. '
'Still honoured when the parent recipe has '
'enforce_sequential=False (free-flow recipe with one '
'specific step that needs to wait).',
)
# Sub 13 - sequential enforcement (recipe + per-step). New default
# behaviour is "every step waits for predecessors", with two escape
# hatches: enforce_sequential=False on the recipe (free-flow), or
# parallel_start=True on this specific step (explicit parallelism).
# The per-step parallel_start field is on this CORE model because
# it just mirrors a core field (recipe_node_id.parallel_start).
# The runtime gate logic (can_start, _fp_should_block_predecessors)
# lives in fusion_plating_jobs because it reads the recipe-level
# enforce_sequential which only exists when that bridge is loaded.
parallel_start = fields.Boolean(
related='recipe_node_id.parallel_start',
store=True,
help='If True, this step can start while earlier-sequence '
'steps are still in progress. Only meaningful when the '
'parent recipe has enforce_sequential=True.',
)
# ===== Sub 12b - chain-of-custody + rack awareness =====================
# Note: rack_id (line 95 above) already exists - reused as the "current
# rack on this step" pointer. Sub 12b builds the runtime guards on top.
requires_rack_assignment = fields.Boolean(
related='recipe_node_id.requires_rack_assignment',
store=True,
help='If True, the Move Parts dialog requires a rack to be '
'assigned to the parts before the move commits. Snapshot '
'from the recipe step at job creation.',
)
requires_transition_form = fields.Boolean(
related='recipe_node_id.requires_transition_form',
store=True,
)
move_ids = fields.One2many(
'fp.job.step.move', 'from_step_id',
string='Outgoing Moves',
)
incoming_move_ids = fields.One2many(
'fp.job.step.move', 'to_step_id',
string='Incoming Moves',
)
is_racked = fields.Boolean(
string='Racked', compute='_compute_is_racked', store=True,
help='True when rack_id is set - drives the tablet rack-vs-parts '
'button-state guard (Move Parts greys out).',
)
qty_at_step_start = fields.Integer(string='Qty at Step Start')
qty_at_step_finish = fields.Integer(string='Qty at Step Finish')
# Live "qty currently parked at this step" - drives partial-qty
# workflows. = (incoming moves' qty outgoing moves' qty), with a
# first-step seed: the lowest-sequence step on a confirmed job
# implicitly receives the full job qty when the job starts (no
# explicit "kickoff" move record). Without that seed, the first
# step would always show 0 here until the operator manually moved
# parts in, which doesn't match how the floor thinks about it.
qty_at_step = fields.Integer(
string='Qty Here',
compute='_compute_qty_at_step',
help='Quantity currently parked at this step. Drains as moves '
'transfer parts to later steps. The Move dialog defaults '
'to this value and blocks moves above it.',
)
@api.depends('move_ids.qty_moved', 'move_ids.to_step_id',
'incoming_move_ids.qty_moved',
'incoming_move_ids.from_step_id',
'state', 'job_id.qty', 'job_id.step_ids',
'job_id.step_ids.sequence', 'sequence')
def _compute_qty_at_step(self):
for rec in self:
# Terminal states: nothing parked here anymore. Operators
# don't care if "done" steps technically have qty residue -
# surfacing zero keeps the column readable.
if rec.state in ('done', 'cancelled', 'skipped'):
rec.qty_at_step = 0
continue
# Self-loop moves (from_step == to_step, transfer_type='step')
# are how the Record Inputs wizard logs measurements; they
# don't move qty so we exclude them on both sides.
incoming = sum(
m.qty_moved for m in rec.incoming_move_ids
if m.from_step_id != rec
)
outgoing = sum(
m.qty_moved for m in rec.move_ids
if m.to_step_id != rec
)
# First-step seed: the earliest non-terminal step on a job
# implicitly receives the full job qty when the job kicks
# off (no explicit kickoff move). Without this seed, qty
# here would read 0 even when the floor has the full batch.
if not incoming and rec.job_id and rec.job_id.qty:
first_active = rec.job_id.step_ids.filtered(
lambda s: s.state not in ('done', 'cancelled', 'skipped')
).sorted('sequence')[:1]
if rec == first_active:
incoming = int(rec.job_id.qty)
rec.qty_at_step = max(0, incoming - outgoing)
@api.depends('rack_id')
def _compute_is_racked(self):
for rec in self:
rec.is_racked = bool(rec.rack_id)
# ------------------------------------------------------------------
# Cost rollup (Task 1.6)
# cost_per_hour comes from fp.work.centre (Task 1.2 added it there).
# cost_total recomputes when duration_actual or rate changes.
# duration_actual is set by button_finish as the sum of timelog
# row durations (see fp.job.step.timelog).
# ------------------------------------------------------------------
cost_per_hour = fields.Monetary(
related='work_centre_id.cost_per_hour',
currency_field='currency_id',
)
cost_total = fields.Monetary(
compute='_compute_cost_total',
store=True,
currency_field='currency_id',
)
currency_id = fields.Many2one(
'res.currency',
related='work_centre_id.currency_id',
)
@api.depends('duration_actual', 'cost_per_hour')
def _compute_cost_total(self):
for step in self:
step.cost_total = (step.duration_actual / 60.0) * step.cost_per_hour
# ------------------------------------------------------------------
# State machine - actions
# ------------------------------------------------------------------
# Implemented: button_start (ready/paused → in_progress),
# button_finish (in_progress → done).
# Stubs (raise NotImplementedError; wiring deferred):
# button_pause (in_progress → paused)
# button_resume (covered by button_start when state='paused')
# button_skip (pending/ready → skipped)
# button_cancel (any non-done → cancelled)
# Predecessor-driven transition pending → ready will be wired
# alongside first-step / dependency logic in a future task.
# ------------------------------------------------------------------
def button_pause(self):
"""Operator pause / break / end-of-shift. Closes the open timelog
without finishing the step, flips state to 'paused'. button_start
will reopen a fresh timelog when resuming.
"""
for step in self:
if step.state != 'in_progress':
raise UserError(_(
"Step '%s' is in state '%s' - only in-progress steps can pause."
) % (step.name, step.state))
now = fields.Datetime.now()
open_log = step.time_log_ids.filtered(lambda l: not l.date_finished)
open_log.write({'date_finished': now, 'state': 'paused'})
step.state = 'paused'
step.message_post(body=_('Step paused by %s') % self.env.user.name)
return True
def button_resume(self):
"""Resume a paused step - thin alias over button_start so views
can show distinct labels (Resume vs Start) without duplicating
the state-machine logic."""
for step in self:
if step.state != 'paused':
raise UserError(_(
"Step '%s' is in state '%s' - only paused steps can resume."
) % (step.name, step.state))
return self.button_start()
def button_skip(self):
"""Skip an opt-in step that wasn't activated for this job. Allowed
from pending or ready only - a step that's already running shouldn't
be skipped without an audit narrative (use button_cancel for that).
"""
for step in self:
if step.state not in ('pending', 'ready'):
raise UserError(_(
"Step '%s' is in state '%s' - only pending/ready steps can skip."
) % (step.name, step.state))
step.state = 'skipped'
step.message_post(body=_('Step skipped by %s') % self.env.user.name)
return True
def button_cancel(self):
"""Cancel a single step. Used when an operator realises mid-stream
that a step doesn't apply to this job (e.g. a customer-specific
step that's not needed). Closes any open timelog so labour cost
already incurred is preserved.
"""
for step in self:
if step.state in ('done', 'cancelled'):
raise UserError(_(
"Step '%s' is in state '%s' - cannot cancel."
) % (step.name, step.state))
now = fields.Datetime.now()
open_log = step.time_log_ids.filtered(lambda l: not l.date_finished)
open_log.write({'date_finished': now, 'state': 'stopped'})
step.state = 'cancelled'
step.message_post(body=_('Step cancelled by %s') % self.env.user.name)
return True
def button_start(self):
for step in self:
if step.state not in ('ready', 'paused'):
raise UserError(_(
"Step '%s' is in state '%s' - only ready/paused steps can start."
) % (step.name, step.state))
now = fields.Datetime.now()
step.state = 'in_progress'
# First-start audit (mirrors button_finish first-finish guard)
if not step.date_started:
step.date_started = now
step.started_by_user_id = self.env.user
# Open a fresh timelog row for this start interval - uses the
# same `now` as the first-start stamp so the step and its
# first log share a single instant.
self.env['fp.job.step.timelog'].create({
'step_id': step.id,
'user_id': self.env.user.id,
'date_started': now,
})
return True
def button_finish(self):
skip_qty_gate = self.env.context.get('fp_skip_qty_gate')
for step in self:
if step.state != 'in_progress':
raise UserError(_(
"Step '%s' is in state '%s' - only in-progress steps can finish."
) % (step.name, step.state))
# Quantity gate: refuses if parts still parked AND there's
# a downstream step to move them into. Last runnable step
# is exempt - parts finishing there complete in place
# (qty_done reconciliation at job close is the catch-net).
#
# Seed-only exemption: the first-step seed in
# _compute_qty_at_step gives the earliest non-terminal step
# a notional qty = job.qty. That's a UI hint, not a real
# parked batch - no incoming move record backs it. Paperwork
# steps (Contract Review, Inspection, etc.) sit on that seed.
# If the step has no REAL incoming moves, skip the gate.
if not skip_qty_gate and step.qty_at_step > 0:
has_downstream = step.job_id.step_ids.filtered(
lambda s: s.sequence > step.sequence
and s.state in ('pending', 'ready')
)
has_real_incoming = bool(
step.incoming_move_ids.filtered(
lambda m: m.from_step_id != step
)
)
if has_downstream and has_real_incoming:
raise UserError(_(
"Step '%(name)s' still has %(n)d part(s) "
"parked - move them to the next step before "
"finishing. Use the row's 'Complete 1 → Next' "
"or 'Move…' button."
) % {'name': step.name, 'n': step.qty_at_step})
now = fields.Datetime.now()
# Close the open timelog (the one with no date_finished)
open_log = step.time_log_ids.filtered(lambda l: not l.date_finished)
open_log.write({'date_finished': now, 'state': 'stopped'})
step.state = 'done'
# First-finish audit (mirrors button_start first-start guard)
if not step.date_finished:
step.date_finished = now
step.finished_by_user_id = self.env.user
# Sum of all interval durations becomes duration_actual
step.duration_actual = sum(step.time_log_ids.mapped('duration_minutes'))
return True
# ===== Manager-only overrides ===========================================
# Used when an operator skipped or cancelled a step in error, or when
# the actual shop-floor work happened outside the ERP and the manager
# needs to retroactively mark the step complete. Both actions are
# group-gated and post a clear audit entry to the step's chatter.
def button_manager_force_complete(self):
"""Force any non-done state straight to 'done'. Stamps the first-
start / first-finish audit fields if blank so the timeline isn't
broken, and closes any timelog still left open."""
if not self.env.user.has_group(
'fusion_plating.group_fusion_plating_manager'):
raise AccessError(_(
'Only Plating Manager+ can force-complete a step.'
))
for step in self:
if step.state == 'done':
raise UserError(_(
"Step '%s' is already done."
) % step.name)
prev_state = step.state
now = fields.Datetime.now()
# Close any open timelogs first - labour already incurred
# stays in the audit even when we shortcut to done.
open_log = step.time_log_ids.filtered(
lambda l: not l.date_finished
)
if open_log:
open_log.write({'date_finished': now, 'state': 'stopped'})
vals = {'state': 'done'}
if not step.date_started:
vals['date_started'] = now
vals['started_by_user_id'] = self.env.user.id
if not step.date_finished:
vals['date_finished'] = now
vals['finished_by_user_id'] = self.env.user.id
step.write(vals)
step.message_post(body=_(
'Step force-completed by %s (was %s).'
) % (self.env.user.name, prev_state))
return True
def button_manager_reset_to_ready(self):
"""Reset any non-ready step back to 'ready' so the operator can
run it normally. Audited via chatter.
Side-effects, depending on the previous state:
- in_progress / paused → close any open timelog (mirrors
button_cancel) so labour already logged stays in the audit.
- done → also clear date_finished + finished_by_user_id so the
next button_finish writes fresh first-finish stamps instead
of preserving stale ones.
date_started + started_by_user_id are preserved across resets -
they record the first start ever (audit), and duration_actual is
computed from the sum of timelogs, not (finish - start), so the
elapsed math remains correct."""
if not self.env.user.has_group(
'fusion_plating.group_fusion_plating_manager'):
raise AccessError(_(
'Only Plating Manager+ can reset a step state.'
))
now = fields.Datetime.now()
for step in self:
if step.state == 'ready':
raise UserError(_(
"Step '%s' is already in Ready state."
) % step.name)
prev_state = step.state
vals = {'state': 'ready'}
# Close any still-open timelog (defensive - usually only
# in_progress/paused will have one).
open_log = step.time_log_ids.filtered(
lambda l: not l.date_finished
)
if open_log:
open_log.write({'date_finished': now, 'state': 'stopped'})
# If the step had been completed, wipe the finish stamps so
# the next Finish records fresh audit values. Skip this for
# in_progress / paused / skipped / cancelled / pending - they
# either have no finish stamp or shouldn't have one cleared.
if step.state == 'done':
vals['date_finished'] = False
vals['finished_by_user_id'] = False
step.write(vals)
step.message_post(body=_(
'Step state reset to Ready by %s (was %s).'
) % (self.env.user.name, prev_state))
return True
def action_complete_one_to_next(self):
"""One-piece flow shortcut: records move(qty=1) from this
step to the next pending/ready step, drains qty_at_step by 1.
If the drain takes qty_at_step to 0, auto-finishes the source
and starts the destination step (via action_finish_and_advance).
"""
self.ensure_one()
if self.state != 'in_progress':
raise UserError(_(
"Step '%s' must be in progress to complete a part."
) % self.name)
if self.qty_at_step < 1:
raise UserError(_(
"No parts parked at step '%s' - nothing to complete."
) % self.name)
next_step = self.job_id.step_ids.filtered(
lambda s: s.sequence > self.sequence
and s.state in ('pending', 'ready')
).sorted('sequence')[:1]
if not next_step:
raise UserError(_(
"Step '%s' is the last runnable step on the job - "
"no downstream step to move into. Finish the step "
"instead (it will close out the job)."
) % self.name)
self.env['fp.job.step.move'].create({
'job_id': self.job_id.id,
'from_step_id': self.id,
'to_step_id': next_step.id,
'transfer_type': 'step',
'qty_moved': 1,
'moved_by_user_id': self.env.user.id,
})
# qty_at_step is computed from moves; force re-read before
# checking whether this was the last part.
self.invalidate_recordset(['qty_at_step'])
if self.qty_at_step == 0:
return self.action_finish_and_advance()
return True