# -*- coding: utf-8 -*- # Copyright 2026 Nexa Systems Inc. # License OPL-1 (Odoo Proprietary License v1.0) # # fp.job.step — one operation within a plating job. # # Replaces mrp.workorder. Each step instantiates from a recipe # operation node (recipe_node_id). Container nodes (recipe, # sub_process) and step nodes (instructions) are NOT rows here — # they live on the recipe template and are used at view-render time # to display hierarchy. See spec §5.2 (Option A — operations only). # # State machine: # pending → ready → in_progress → done # ↓ ↓ ↑ # skipped paused # ↓ # cancelled from odoo import _, api, fields, models from odoo.exceptions import AccessError, UserError class FpJobStep(models.Model): _name = 'fp.job.step' _description = 'Work Order Step' _inherit = ['mail.thread'] _order = 'job_id, sequence, id' job_id = fields.Many2one( 'fp.job', required=True, ondelete='cascade', index=True, ) name = fields.Char(required=True) sequence = fields.Integer(default=10) state = fields.Selection( [ ('pending', 'Pending'), ('ready', 'Ready'), ('in_progress', 'In Progress'), ('paused', 'Paused'), ('done', 'Done'), ('skipped', 'Skipped'), ('cancelled', 'Cancelled'), ], default='pending', required=True, tracking=True, index=True, ) recipe_node_id = fields.Many2one( 'fusion.plating.process.node', string='Recipe Operation', domain=[('node_type', '=', 'operation')], ) work_centre_id = fields.Many2one('fp.work.centre', index=True) kind = fields.Selection( [ ('wet', 'Wet'), ('bake', 'Bake'), ('mask', 'Mask'), ('rack', 'Rack'), ('inspect', 'Inspect'), ('other', 'Other'), ], default='other', ) assigned_user_id = fields.Many2one('res.users', tracking=True) started_by_user_id = fields.Many2one('res.users', readonly=True) finished_by_user_id = fields.Many2one('res.users', readonly=True) date_started = fields.Datetime(readonly=True) date_finished = fields.Datetime(readonly=True) duration_expected = fields.Float(string='Expected Minutes') duration_actual = fields.Float(string='Actual Minutes', readonly=True) instructions = fields.Html(string='Step Instructions') time_log_ids = fields.One2many( 'fp.job.step.timelog', 'step_id', string='Time Logs', ) # ------------------------------------------------------------------ # Equipment + audit (Task 1.6) # oven_id is deferred to a bridge module — fusion.plating.bake.oven # lives in fusion_plating_shopfloor and core can't depend on it. # masking_material_id is deferred — fusion.plating.masking.material # does not yet exist in any installed module; will be added when # the masking model lands (likely in fusion_plating_process_en # or a future fusion_plating_masking module). # ------------------------------------------------------------------ bath_id = fields.Many2one('fusion.plating.bath') tank_id = fields.Many2one('fusion.plating.tank') rack_id = fields.Many2one('fusion.plating.rack') signoff_user_id = fields.Many2one('res.users', readonly=True) facility_id = fields.Many2one( 'fusion.plating.facility', related='work_centre_id.facility_id', store=True, ) # ------------------------------------------------------------------ # Plating spec (Task 1.6) # ------------------------------------------------------------------ thickness_target = fields.Float(string='Target Thickness') thickness_uom = fields.Selection( [('um', 'µm'), ('mil', 'mil'), ('inch', 'in')], default='um', ) dwell_time_minutes = fields.Float() # Label intentionally has no unit suffix — the unit follows the # company's `x_fc_default_temp_uom` setting and is surfaced via the # adjacent `bake_setpoint_temp_uom_display` compute. Hardcoding °C # in the label was the most visible "Celsius leaks everywhere" # offender flagged 2026-05-10. bake_setpoint_temp = fields.Float(string='Bake Setpoint') bake_setpoint_temp_uom_display = fields.Char( string='Unit', compute='_compute_bake_setpoint_temp_uom_display', help='Temperature unit pulled live from Settings → Fusion Plating → ' 'Units of Measure. Updates everywhere the moment the admin ' 'flips Fahrenheit ↔ Celsius.', ) bake_actual_duration = fields.Float(string='Bake Actual Minutes') bake_chart_recorder_ref = fields.Char(string='Bake Chart Recorder Ref') @api.depends_context('company') def _compute_bake_setpoint_temp_uom_display(self): sym = '°F' if (self.env.company.x_fc_default_temp_uom or 'F') == 'F' else '°C' for rec in self: rec.bake_setpoint_temp_uom_display = sym # ------------------------------------------------------------------ # Recipe-related (Task 1.6) # ------------------------------------------------------------------ requires_signoff = fields.Boolean( related='recipe_node_id.requires_signoff', store=True, ) auto_complete = fields.Boolean( related='recipe_node_id.auto_complete', store=True, ) is_manual = fields.Boolean( related='recipe_node_id.is_manual', store=True, ) customer_visible = fields.Boolean( related='recipe_node_id.customer_visible', store=True, ) requires_predecessor_done = fields.Boolean( related='recipe_node_id.requires_predecessor_done', store=True, help='LEGACY: per-step opt-in for predecessor enforcement. ' 'Still honoured when the parent recipe has ' 'enforce_sequential=False (free-flow recipe with one ' 'specific step that needs to wait).', ) # Sub 13 — sequential enforcement (recipe + per-step). New default # behaviour is "every step waits for predecessors", with two escape # hatches: enforce_sequential=False on the recipe (free-flow), or # parallel_start=True on this specific step (explicit parallelism). # The per-step parallel_start field is on this CORE model because # it just mirrors a core field (recipe_node_id.parallel_start). # The runtime gate logic (can_start, _fp_should_block_predecessors) # lives in fusion_plating_jobs because it reads the recipe-level # enforce_sequential which only exists when that bridge is loaded. parallel_start = fields.Boolean( related='recipe_node_id.parallel_start', store=True, help='If True, this step can start while earlier-sequence ' 'steps are still in progress. Only meaningful when the ' 'parent recipe has enforce_sequential=True.', ) # ===== Sub 12b — chain-of-custody + rack awareness ===================== # Note: rack_id (line 95 above) already exists — reused as the "current # rack on this step" pointer. Sub 12b builds the runtime guards on top. requires_rack_assignment = fields.Boolean( related='recipe_node_id.requires_rack_assignment', store=True, help='If True, the Move Parts dialog requires a rack to be ' 'assigned to the parts before the move commits. Snapshot ' 'from the recipe step at job creation.', ) requires_transition_form = fields.Boolean( related='recipe_node_id.requires_transition_form', store=True, ) move_ids = fields.One2many( 'fp.job.step.move', 'from_step_id', string='Outgoing Moves', ) incoming_move_ids = fields.One2many( 'fp.job.step.move', 'to_step_id', string='Incoming Moves', ) is_racked = fields.Boolean( string='Racked', compute='_compute_is_racked', store=True, help='True when rack_id is set — drives the tablet rack-vs-parts ' 'button-state guard (Move Parts greys out).', ) qty_at_step_start = fields.Integer(string='Qty at Step Start') qty_at_step_finish = fields.Integer(string='Qty at Step Finish') # Live "qty currently parked at this step" — drives partial-qty # workflows. = (incoming moves' qty − outgoing moves' qty), with a # first-step seed: the lowest-sequence step on a confirmed job # implicitly receives the full job qty when the job starts (no # explicit "kickoff" move record). Without that seed, the first # step would always show 0 here until the operator manually moved # parts in, which doesn't match how the floor thinks about it. qty_at_step = fields.Integer( string='Qty Here', compute='_compute_qty_at_step', help='Quantity currently parked at this step. Drains as moves ' 'transfer parts to later steps. The Move dialog defaults ' 'to this value and blocks moves above it.', ) @api.depends('move_ids.qty_moved', 'move_ids.to_step_id', 'incoming_move_ids.qty_moved', 'incoming_move_ids.from_step_id', 'state', 'job_id.qty', 'job_id.step_ids', 'job_id.step_ids.sequence', 'sequence') def _compute_qty_at_step(self): for rec in self: # Terminal states: nothing parked here anymore. Operators # don't care if "done" steps technically have qty residue — # surfacing zero keeps the column readable. if rec.state in ('done', 'cancelled', 'skipped'): rec.qty_at_step = 0 continue # Self-loop moves (from_step == to_step, transfer_type='step') # are how the Record Inputs wizard logs measurements; they # don't move qty so we exclude them on both sides. incoming = sum( m.qty_moved for m in rec.incoming_move_ids if m.from_step_id != rec ) outgoing = sum( m.qty_moved for m in rec.move_ids if m.to_step_id != rec ) # First-step seed: the earliest non-terminal step on a job # implicitly receives the full job qty when the job kicks # off (no explicit kickoff move). Without this seed, qty # here would read 0 even when the floor has the full batch. if not incoming and rec.job_id and rec.job_id.qty: first_active = rec.job_id.step_ids.filtered( lambda s: s.state not in ('done', 'cancelled', 'skipped') ).sorted('sequence')[:1] if rec == first_active: incoming = int(rec.job_id.qty) rec.qty_at_step = max(0, incoming - outgoing) @api.depends('rack_id') def _compute_is_racked(self): for rec in self: rec.is_racked = bool(rec.rack_id) # ------------------------------------------------------------------ # Cost rollup (Task 1.6) # cost_per_hour comes from fp.work.centre (Task 1.2 added it there). # cost_total recomputes when duration_actual or rate changes. # duration_actual is set by button_finish as the sum of timelog # row durations (see fp.job.step.timelog). # ------------------------------------------------------------------ cost_per_hour = fields.Monetary( related='work_centre_id.cost_per_hour', currency_field='currency_id', ) cost_total = fields.Monetary( compute='_compute_cost_total', store=True, currency_field='currency_id', ) currency_id = fields.Many2one( 'res.currency', related='work_centre_id.currency_id', ) @api.depends('duration_actual', 'cost_per_hour') def _compute_cost_total(self): for step in self: step.cost_total = (step.duration_actual / 60.0) * step.cost_per_hour # ------------------------------------------------------------------ # State machine — actions # ------------------------------------------------------------------ # Implemented: button_start (ready/paused → in_progress), # button_finish (in_progress → done). # Stubs (raise NotImplementedError; wiring deferred): # button_pause (in_progress → paused) # button_resume (covered by button_start when state='paused') # button_skip (pending/ready → skipped) # button_cancel (any non-done → cancelled) # Predecessor-driven transition pending → ready will be wired # alongside first-step / dependency logic in a future task. # ------------------------------------------------------------------ def button_pause(self): """Operator pause / break / end-of-shift. Closes the open timelog without finishing the step, flips state to 'paused'. button_start will reopen a fresh timelog when resuming. """ for step in self: if step.state != 'in_progress': raise UserError(_( "Step '%s' is in state '%s' — only in-progress steps can pause." ) % (step.name, step.state)) now = fields.Datetime.now() open_log = step.time_log_ids.filtered(lambda l: not l.date_finished) open_log.write({'date_finished': now, 'state': 'paused'}) step.state = 'paused' step.message_post(body=_('Step paused by %s') % self.env.user.name) return True def button_resume(self): """Resume a paused step — thin alias over button_start so views can show distinct labels (Resume vs Start) without duplicating the state-machine logic.""" for step in self: if step.state != 'paused': raise UserError(_( "Step '%s' is in state '%s' — only paused steps can resume." ) % (step.name, step.state)) return self.button_start() def button_skip(self): """Skip an opt-in step that wasn't activated for this job. Allowed from pending or ready only — a step that's already running shouldn't be skipped without an audit narrative (use button_cancel for that). """ for step in self: if step.state not in ('pending', 'ready'): raise UserError(_( "Step '%s' is in state '%s' — only pending/ready steps can skip." ) % (step.name, step.state)) step.state = 'skipped' step.message_post(body=_('Step skipped by %s') % self.env.user.name) return True def button_cancel(self): """Cancel a single step. Used when an operator realises mid-stream that a step doesn't apply to this job (e.g. a customer-specific step that's not needed). Closes any open timelog so labour cost already incurred is preserved. """ for step in self: if step.state in ('done', 'cancelled'): raise UserError(_( "Step '%s' is in state '%s' — cannot cancel." ) % (step.name, step.state)) now = fields.Datetime.now() open_log = step.time_log_ids.filtered(lambda l: not l.date_finished) open_log.write({'date_finished': now, 'state': 'stopped'}) step.state = 'cancelled' step.message_post(body=_('Step cancelled by %s') % self.env.user.name) return True def button_start(self): for step in self: if step.state not in ('ready', 'paused'): raise UserError(_( "Step '%s' is in state '%s' — only ready/paused steps can start." ) % (step.name, step.state)) now = fields.Datetime.now() step.state = 'in_progress' # First-start audit (mirrors button_finish first-finish guard) if not step.date_started: step.date_started = now step.started_by_user_id = self.env.user # Open a fresh timelog row for this start interval — uses the # same `now` as the first-start stamp so the step and its # first log share a single instant. self.env['fp.job.step.timelog'].create({ 'step_id': step.id, 'user_id': self.env.user.id, 'date_started': now, }) return True def button_finish(self): skip_qty_gate = self.env.context.get('fp_skip_qty_gate') for step in self: if step.state != 'in_progress': raise UserError(_( "Step '%s' is in state '%s' — only in-progress steps can finish." ) % (step.name, step.state)) # Quantity gate: refuses if parts still parked AND there's # a downstream step to move them into. Last runnable step # is exempt — parts finishing there complete in place # (qty_done reconciliation at job close is the catch-net). # # Seed-only exemption: the first-step seed in # _compute_qty_at_step gives the earliest non-terminal step # a notional qty = job.qty. That's a UI hint, not a real # parked batch — no incoming move record backs it. Paperwork # steps (Contract Review, Inspection, etc.) sit on that seed. # If the step has no REAL incoming moves, skip the gate. if not skip_qty_gate and step.qty_at_step > 0: has_downstream = step.job_id.step_ids.filtered( lambda s: s.sequence > step.sequence and s.state in ('pending', 'ready') ) has_real_incoming = bool( step.incoming_move_ids.filtered( lambda m: m.from_step_id != step ) ) if has_downstream and has_real_incoming: raise UserError(_( "Step '%(name)s' still has %(n)d part(s) " "parked — move them to the next step before " "finishing. Use the row's 'Complete 1 → Next' " "or 'Move…' button." ) % {'name': step.name, 'n': step.qty_at_step}) now = fields.Datetime.now() # Close the open timelog (the one with no date_finished) open_log = step.time_log_ids.filtered(lambda l: not l.date_finished) open_log.write({'date_finished': now, 'state': 'stopped'}) step.state = 'done' # First-finish audit (mirrors button_start first-start guard) if not step.date_finished: step.date_finished = now step.finished_by_user_id = self.env.user # Sum of all interval durations becomes duration_actual step.duration_actual = sum(step.time_log_ids.mapped('duration_minutes')) return True # ===== Manager-only overrides =========================================== # Used when an operator skipped or cancelled a step in error, or when # the actual shop-floor work happened outside the ERP and the manager # needs to retroactively mark the step complete. Both actions are # group-gated and post a clear audit entry to the step's chatter. def button_manager_force_complete(self): """Force any non-done state straight to 'done'. Stamps the first- start / first-finish audit fields if blank so the timeline isn't broken, and closes any timelog still left open.""" if not self.env.user.has_group( 'fusion_plating.group_fusion_plating_manager'): raise AccessError(_( 'Only Plating Manager+ can force-complete a step.' )) for step in self: if step.state == 'done': raise UserError(_( "Step '%s' is already done." ) % step.name) prev_state = step.state now = fields.Datetime.now() # Close any open timelogs first — labour already incurred # stays in the audit even when we shortcut to done. open_log = step.time_log_ids.filtered( lambda l: not l.date_finished ) if open_log: open_log.write({'date_finished': now, 'state': 'stopped'}) vals = {'state': 'done'} if not step.date_started: vals['date_started'] = now vals['started_by_user_id'] = self.env.user.id if not step.date_finished: vals['date_finished'] = now vals['finished_by_user_id'] = self.env.user.id step.write(vals) step.message_post(body=_( 'Step force-completed by %s (was %s).' ) % (self.env.user.name, prev_state)) return True def button_manager_reset_to_ready(self): """Reset any non-ready step back to 'ready' so the operator can run it normally. Audited via chatter. Side-effects, depending on the previous state: - in_progress / paused → close any open timelog (mirrors button_cancel) so labour already logged stays in the audit. - done → also clear date_finished + finished_by_user_id so the next button_finish writes fresh first-finish stamps instead of preserving stale ones. date_started + started_by_user_id are preserved across resets — they record the first start ever (audit), and duration_actual is computed from the sum of timelogs, not (finish - start), so the elapsed math remains correct.""" if not self.env.user.has_group( 'fusion_plating.group_fusion_plating_manager'): raise AccessError(_( 'Only Plating Manager+ can reset a step state.' )) now = fields.Datetime.now() for step in self: if step.state == 'ready': raise UserError(_( "Step '%s' is already in Ready state." ) % step.name) prev_state = step.state vals = {'state': 'ready'} # Close any still-open timelog (defensive — usually only # in_progress/paused will have one). open_log = step.time_log_ids.filtered( lambda l: not l.date_finished ) if open_log: open_log.write({'date_finished': now, 'state': 'stopped'}) # If the step had been completed, wipe the finish stamps so # the next Finish records fresh audit values. Skip this for # in_progress / paused / skipped / cancelled / pending — they # either have no finish stamp or shouldn't have one cleared. if step.state == 'done': vals['date_finished'] = False vals['finished_by_user_id'] = False step.write(vals) step.message_post(body=_( 'Step state reset to Ready by %s (was %s).' ) % (self.env.user.name, prev_state)) return True def action_complete_one_to_next(self): """One-piece flow shortcut: records move(qty=1) from this step to the next pending/ready step, drains qty_at_step by 1. If the drain takes qty_at_step to 0, auto-finishes the source and starts the destination step (via action_finish_and_advance). """ self.ensure_one() if self.state != 'in_progress': raise UserError(_( "Step '%s' must be in progress to complete a part." ) % self.name) if self.qty_at_step < 1: raise UserError(_( "No parts parked at step '%s' — nothing to complete." ) % self.name) next_step = self.job_id.step_ids.filtered( lambda s: s.sequence > self.sequence and s.state in ('pending', 'ready') ).sorted('sequence')[:1] if not next_step: raise UserError(_( "Step '%s' is the last runnable step on the job — " "no downstream step to move into. Finish the step " "instead (it will close out the job)." ) % self.name) self.env['fp.job.step.move'].create({ 'job_id': self.job_id.id, 'from_step_id': self.id, 'to_step_id': next_step.id, 'transfer_type': 'step', 'qty_moved': 1, 'moved_by_user_id': self.env.user.id, }) # qty_at_step is computed from moves; force re-read before # checking whether this was the last part. self.invalidate_recordset(['qty_at_step']) if self.qty_at_step == 0: return self.action_finish_and_advance() return True