Files
Odoo-Modules/fusion_plating/fusion_plating_jobs/models/fp_job_step.py
gsinghpal f08f328688 changes
2026-04-27 00:11:18 -04:00

416 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# Real implementations for the state-machine action stubs that
# fusion_plating core's fp.job.step shipped as NotImplementedError
# placeholders. Per spec §5.2 state machine.
import logging
import re
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FpJobStep(models.Model):
_inherit = 'fp.job.step'
def button_start(self):
"""Override — soft gate when parts haven't been received yet,
plus hard predecessor gate for steps flagged
requires_predecessor_done by the recipe author.
Receiving check is soft (logs to chatter) — manager wants the
shop to start prep regardless when parts are in-transit late.
Predecessor check IS hard-blocking — if the recipe author
marked this step as serial-required, every earlier-sequence
step must be terminal (done / skipped / cancelled) before
Start fires. Manager bypass via fp_skip_predecessor_check=True.
"""
skip_pred = self.env.context.get('fp_skip_predecessor_check')
for step in self:
if not step.requires_predecessor_done or skip_pred:
continue
blocking = step.job_id.step_ids.filtered(
lambda s: s.sequence < step.sequence and s.state not in (
'done', 'skipped', 'cancelled',
)
)
if blocking:
raise UserError(_(
"Step '%s' requires predecessors done first. "
"Blocking earlier step(s):\n %s\n\nFinish or skip "
"those before starting this one (manager can "
"override via context fp_skip_predecessor_check=True)."
) % (
step.name,
'\n '.join(
f'#{s.sequence} {s.name} ({s.state})'
for s in blocking[:5]
),
))
result = super().button_start()
for step in self:
so = step.job_id.sale_order_id
if not so:
continue
recv = so.x_fc_receiving_status if (
'x_fc_receiving_status' in so._fields
) else None
if recv in (False, None, 'not_received'):
step.job_id.message_post(body=_(
'Step "%(step)s" started before parts were received '
'(SO %(so)s — receiving status: %(status)s). '
'Confirm the parts are physically on the floor before '
'continuing.'
) % {
'step': step.name,
'so': so.name or '',
'status': recv or 'unknown',
})
return result
def button_pause(self):
"""Pause an in-progress step (operator break, end of shift).
Closes the open timelog row, sums duration_actual, transitions
state to 'paused'. button_start re-opens a fresh timelog when
the operator resumes.
"""
for step in self:
if step.state != 'in_progress':
raise UserError(_(
"Step '%s' is in state '%s' — only in-progress steps can pause."
) % (step.name, step.state))
now = fields.Datetime.now()
open_log = step.time_log_ids.filtered(lambda l: not l.date_finished)
if open_log:
open_log.write({'date_finished': now})
step.state = 'paused'
step.duration_actual = sum(step.time_log_ids.mapped('duration_minutes'))
return True
def button_skip(self):
"""Skip a pending/ready step (e.g. opt-in step the planner
decided not to activate for this job).
"""
for step in self:
if step.state not in ('pending', 'ready'):
raise UserError(_(
"Step '%s' is in state '%s' — only pending/ready steps can be skipped."
) % (step.name, step.state))
step.state = 'skipped'
return True
def button_cancel(self):
"""Cancel a single step. Use fp.job.action_cancel to cancel
the whole job.
"""
for step in self:
if step.state == 'done':
raise UserError(_(
"Step '%s' is done — cannot cancel."
) % step.name)
if step.state == 'cancelled':
raise UserError(_(
"Step '%s' is already cancelled."
) % step.name)
step.state = 'cancelled'
return True
def write(self, vals):
"""Post a chatter trail on the parent JOB whenever an active
step gets reassigned. The step itself already tracks
assigned_user_id (tracking=True) but supervisors don't open
each step's chatter — they read the job. Without a job-level
post the takeover is invisible.
Only fires for steps in active states (in_progress / paused)
so creating a draft job + assigning a step to someone doesn't
spam the job chatter. Comparing to the OLD assignment so we
don't post on the initial set-from-False either.
"""
post_for = []
if 'assigned_user_id' in vals:
new_uid = vals['assigned_user_id']
for step in self:
if step.state not in ('in_progress', 'paused'):
continue
old_uid = step.assigned_user_id.id
if not old_uid:
continue
if new_uid == old_uid:
continue
post_for.append((step, old_uid, new_uid))
result = super().write(vals)
Users = self.env['res.users']
for step, old_uid, new_uid in post_for:
old_name = Users.browse(old_uid).name if old_uid else '(unassigned)'
new_name = Users.browse(new_uid).name if new_uid else '(unassigned)'
step.job_id.message_post(body=Markup(_(
'Step <b>%s</b> reassigned from <b>%s</b> to <b>%s</b> '
'(state=%s) by %s.'
)) % (step.name, old_name, new_name, step.state,
self.env.user.name))
return result
@api.model
def _cron_nudge_stale_paused(self, threshold_hours=24):
"""Daily nudge for steps stuck in `paused` longer than threshold."""
return self._cron_nudge_stale_steps(
states=('paused',),
threshold_hours=threshold_hours,
label='paused',
)
@api.model
def _cron_nudge_stale_in_progress(self, threshold_hours=8):
"""Cron nudge for steps stuck in `in_progress` longer than
threshold. Default 8 hours — operator started, walked away,
timelog accumulating phantom hours.
"""
return self._cron_nudge_stale_steps(
states=('in_progress',),
threshold_hours=threshold_hours,
label='in-progress',
)
@api.model
def _cron_nudge_stale_steps(self, states=('paused',),
threshold_hours=24, label='stale'):
"""Generic stale-step nudger.
Finds every fp.job.step in any of `states` with date_started
older than N hours. Schedules a 'todo' mail.activity on the
parent job for the job's manager_id (falls back to the user
who started the step). Idempotent — won't double-schedule if
an open activity with the same summary already exists.
"""
from datetime import timedelta as _td
cutoff = fields.Datetime.now() - _td(hours=threshold_hours)
stale = self.search([
('state', 'in', list(states)),
('date_started', '<', cutoff),
('date_started', '!=', False),
])
Activity = self.env['mail.activity']
ActivityType = self.env.ref(
'mail.mail_activity_data_todo', raise_if_not_found=False,
)
nudged_count = 0
for step in stale:
job = step.job_id
assignee = (job.manager_id or step.assigned_user_id
or step.started_by_user_id or self.env.user)
summary = _('Stale %s step: %s') % (label, step.name)
existing = Activity.search([
('res_model', '=', job._name),
('res_id', '=', job.id),
('summary', '=', summary),
], limit=1)
if existing:
continue
age_h = (fields.Datetime.now() - step.date_started).total_seconds() / 3600.0
note = _(
'Step "%(step)s" on job %(job)s has been in %(label)s state for '
'%(hours).1f hours (since %(start)s). Investigate: operator '
'reassignment, equipment failure, or finish + close out.'
) % {
'step': step.name, 'job': job.name, 'label': label,
'hours': age_h, 'start': step.date_started,
}
vals = {
'res_model_id': self.env['ir.model']._get(job._name).id,
'res_id': job.id,
'summary': summary,
'note': note,
'user_id': assignee.id,
'date_deadline': fields.Date.context_today(self),
}
if ActivityType:
vals['activity_type_id'] = ActivityType.id
Activity.create(vals)
nudged_count += 1
job.message_post(body=Markup(_(
'Stale %s step: <b>%s</b> has been idle %.1f hours. '
'Activity created for %s.'
)) % (label, step.name, age_h, assignee.name))
if nudged_count:
_logger.info(
'fp.job.step stale-%s cron: nudged %d step(s)',
label, nudged_count,
)
return nudged_count
def action_abort_for_retry(self, reason=None, new_tank_id=None,
new_bath_id=None):
"""Abort an in_progress / paused step so the operator can restart
it (typically after an equipment failure mid-step).
Closes the open timelog (preserves the partial-work record on
the audit trail), posts a clear chatter event on the JOB
explaining why + which tank, optionally moves the step to a
different tank/bath, and resets the step to `ready` so the
operator can hit Start again.
Without this method the operator's only options are
button_cancel (kills the step entirely) or
pause → write tank → start (no failure audit).
"""
if not reason:
reason = _('Equipment failure / abort for retry')
for step in self:
if step.state not in ('in_progress', 'paused'):
raise UserError(_(
"Step '%s' is in state '%s' — only in_progress / "
"paused steps can be aborted for retry."
) % (step.name, step.state))
old_tank = step.tank_id.display_name or '(no tank set)'
old_bath = step.bath_id.display_name or '(no bath set)'
now = fields.Datetime.now()
open_logs = step.time_log_ids.filtered(
lambda l: not l.date_finished
)
if open_logs:
open_logs.write({'date_finished': now})
partial_min = sum(step.time_log_ids.mapped('duration_minutes'))
change_msg = ''
if new_tank_id:
step.tank_id = new_tank_id
change_msg += ' -> tank %s' % step.tank_id.display_name
if new_bath_id:
step.bath_id = new_bath_id
change_msg += ' -> bath %s' % step.bath_id.display_name
step.state = 'ready'
step.duration_actual = partial_min
step.job_id.message_post(body=Markup(_(
'⚠️ Step <b>%s</b> aborted for retry by %s.<br/>'
'Reason: <em>%s</em><br/>'
'Equipment: tank=%s, bath=%s%s<br/>'
'Partial work captured: %.2f min in %d timelog(s). '
'Step is back in <b>ready</b> state — operator can '
'restart when the issue is resolved.'
)) % (
step.name, self.env.user.name, reason,
old_tank, old_bath, change_msg, partial_min,
len(step.time_log_ids),
))
return True
def action_recompute_duration_from_timelogs(self):
"""Re-sum duration_actual from the step's timelog rows.
Use case: supervisor adjusts a timelog row (back-date a forgotten
click, fix wrong operator, delete a stale entry that was left
open over a shift change) and needs the step's duration_actual
to reflect the corrected reality. Without this, edits to time_log_ids
rows don't propagate into duration_actual (which is set once
by button_finish).
Posts the before/after to chatter for audit.
"""
for step in self:
old = step.duration_actual or 0.0
new = sum(step.time_log_ids.mapped('duration_minutes'))
step.duration_actual = new
if abs(old - new) > 0.001:
step.job_id.message_post(body=Markup(_(
'Step <b>%s</b> duration recomputed from timelog rows: '
'%.2f min → %.2f min (Δ %+.2f). Recomputed by %s.'
)) % (step.name, old, new, new - old, self.env.user.name))
return True
def button_finish(self):
"""Override to:
1) Auto-spawn a bake.window when a wet plating step finishes
on a coating that requires hydrogen-embrittlement relief
(AS9100 / Nadcap compliance);
2) Post a chatter warning when duration_actual exceeds 1.5×
duration_expected — silent overruns are a red flag for
scheduling and costing.
Both actions are idempotent and never block the finish itself.
"""
result = super().button_finish()
BW = self.env['fusion.plating.bake.window']
Bath = self.env['fusion.plating.bath']
for step in self:
if step.state != 'done':
continue
# Duration-overrun chatter alert.
if step.duration_expected and step.duration_actual:
ratio = step.duration_actual / step.duration_expected
if ratio >= 1.5:
step.job_id.message_post(body=Markup(_(
'⚠️ <b>Step "%s" ran %.1fx expected</b> — '
'expected %.0f min, actual %.0f min. Investigate: '
'equipment issue, training gap, or recipe time '
'estimate too tight.'
)) % (step.name, ratio, step.duration_expected,
step.duration_actual))
coating = step.job_id.coating_config_id \
if 'coating_config_id' in step.job_id._fields else False
if not coating:
continue
requires = getattr(coating, 'requires_bake_relief', False)
window_hrs = getattr(coating, 'bake_window_hours', 0.0)
if not requires or not window_hrs:
continue
# Trigger only on the actual plating-out step. We want
# exactly ONE bake.window per job (not one per step that
# happens to have "plate" in the name). Heuristic:
# - step.kind == 'wet' (clean, recipe-authored signal); OR
# - the step name contains "plating" as a word
# Explicit excludes: inspection / bake / mask / rack steps
# whose names might happen to mention plating in passing
# (e.g. "Post-plate Inspection").
name_l = (step.name or '').lower()
kind_match = step.kind == 'wet'
name_match = bool(re.search(r'\bplating\b', name_l))
excluded = any(kw in name_l for kw in (
'inspect', 'inspection', 'bake', 'mask', 'rack',
))
if (not kind_match and not name_match) or excluded:
continue
# Idempotency — only one bake.window per (job, step).
existing = BW.sudo().search([
('part_ref', '=', step.job_id.name),
('lot_ref', '=', f'step-{step.id}'),
], limit=1)
if existing:
continue
# Pick a bath: step.bath_id wins; fall back to the first
# active bath in the facility (best-effort — operator can
# correct on the bake.window record).
bath = step.bath_id or Bath.sudo().search(
[('facility_id', '=', step.facility_id.id)], limit=1,
) if step.facility_id else False
if not bath:
bath = Bath.sudo().search([], limit=1)
if not bath:
_logger.warning(
'Step %s: bake-window auto-spawn skipped — no bath '
'configured.', step.name,
)
continue
bw = BW.sudo().create({
'bath_id': bath.id,
'plate_exit_time': step.date_finished or fields.Datetime.now(),
'window_hours': window_hrs,
'part_ref': step.job_id.name,
'lot_ref': f'step-{step.id}',
'customer_ref': step.job_id.partner_id.display_name or '',
'quantity': int(step.job_id.qty or 0),
})
step.job_id.message_post(body=Markup(_(
'Bake window <b>%s</b> auto-created — %.1fh window from '
'plate exit. Required by %s.'
)) % (bw.name, window_hrs, bw.bake_required_by))
return result