feat(jobs): step qty gate + partial-qty + display rename

Three coupled shop-floor corrections:
- fp.job._compute_display_name: renders "Work Order # 00011" in
  form header, breadcrumbs, M2O dropdowns, and error messages.
  DB name stays as WH/JOB/00011 - existing chatter/cert/delivery
  references unchanged.
- fp.job.step.button_finish: refuses if qty_at_step > 0 AND a
  downstream pending/ready step exists. Last runnable step is
  exempt (parts complete in place). Manager bypass via
  fp_skip_qty_gate=True context key.
- fp.job.step.action_complete_one_to_next: new per-row button
  "Complete 1 -> Next" for streaming flow (large parts going
  one-by-one). Records move(qty=1) to next step; if drain takes
  qty_at_step to 0, auto-finishes source + auto-starts destination
  via existing action_finish_and_advance.
- fp.job.step._fp_record_one_piece_auto_move: auto-move shim
  wired into action_finish_and_advance. qty=1 + downstream =>
  silently record move(1). qty>1 + downstream => raise pointing
  at Complete 1 -> Next. Last step always allowed.
- 16 new TestQtyGate tests covering gate / shim / auto-finish /
  last-step exemption / display rename / Move wizard zero-qty.

Spec: docs/superpowers/specs/2026-05-12-step-qty-gate-and-display-rename-design.md
Plan: docs/superpowers/plans/2026-05-12-step-qty-gate-and-display-rename.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gsinghpal
2026-05-11 23:31:56 -04:00
parent 9e39e41b0d
commit b0070afc1b
8 changed files with 518 additions and 131 deletions

View File

@@ -5,7 +5,7 @@
{ {
'name': 'Fusion Plating', 'name': 'Fusion Plating',
'version': '19.0.18.13.13', 'version': '19.0.18.15.0',
'category': 'Manufacturing/Plating', 'category': 'Manufacturing/Plating',
'summary': 'Core plating / metal finishing ERP: facilities, processes, tanks, baths, jobs, operators.', 'summary': 'Core plating / metal finishing ERP: facilities, processes, tanks, baths, jobs, operators.',
'description': """ 'description': """

View File

@@ -66,6 +66,22 @@ class FpJob(models.Model):
default=lambda self: _('New'), default=lambda self: _('New'),
index=True, index=True,
) )
@api.depends('name')
def _compute_display_name(self):
"""Reformat 'WH/JOB/00011''Work Order # 00011' for every
human-facing surface (form header, breadcrumbs, M2O dropdowns,
smart-button titles, error messages). The DB `name` is
unchanged so existing certs / deliveries / chatter references
don't break.
"""
for job in self:
if job.name and '/' in job.name:
suffix = job.name.rsplit('/', 1)[-1]
job.display_name = _('Work Order # %s') % suffix
else:
job.display_name = job.name or ''
state = fields.Selection( state = fields.Selection(
[ [
('draft', 'Draft'), ('draft', 'Draft'),

View File

@@ -18,7 +18,7 @@
# cancelled # cancelled
from odoo import _, api, fields, models from odoo import _, api, fields, models
from odoo.exceptions import UserError from odoo.exceptions import AccessError, UserError
class FpJobStep(models.Model): class FpJobStep(models.Model):
@@ -109,10 +109,28 @@ class FpJobStep(models.Model):
default='um', default='um',
) )
dwell_time_minutes = fields.Float() dwell_time_minutes = fields.Float()
bake_setpoint_temp = fields.Float(string='Bake Setpoint °C') # Label intentionally has no unit suffix — the unit follows the
# company's `x_fc_default_temp_uom` setting and is surfaced via the
# adjacent `bake_setpoint_temp_uom_display` compute. Hardcoding °C
# in the label was the most visible "Celsius leaks everywhere"
# offender flagged 2026-05-10.
bake_setpoint_temp = fields.Float(string='Bake Setpoint')
bake_setpoint_temp_uom_display = fields.Char(
string='Unit',
compute='_compute_bake_setpoint_temp_uom_display',
help='Temperature unit pulled live from Settings → Fusion Plating → '
'Units of Measure. Updates everywhere the moment the admin '
'flips Fahrenheit ↔ Celsius.',
)
bake_actual_duration = fields.Float(string='Bake Actual Minutes') bake_actual_duration = fields.Float(string='Bake Actual Minutes')
bake_chart_recorder_ref = fields.Char(string='Bake Chart Recorder Ref') bake_chart_recorder_ref = fields.Char(string='Bake Chart Recorder Ref')
@api.depends_context('company')
def _compute_bake_setpoint_temp_uom_display(self):
sym = '°F' if (self.env.company.x_fc_default_temp_uom or 'F') == 'F' else '°C'
for rec in self:
rec.bake_setpoint_temp_uom_display = sym
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Recipe-related (Task 1.6) # Recipe-related (Task 1.6)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -365,11 +383,28 @@ class FpJobStep(models.Model):
return True return True
def button_finish(self): def button_finish(self):
skip_qty_gate = self.env.context.get('fp_skip_qty_gate')
for step in self: for step in self:
if step.state != 'in_progress': if step.state != 'in_progress':
raise UserError(_( raise UserError(_(
"Step '%s' is in state '%s' — only in-progress steps can finish." "Step '%s' is in state '%s' — only in-progress steps can finish."
) % (step.name, step.state)) ) % (step.name, step.state))
# Quantity gate: refuses if parts still parked AND there's
# a downstream step to move them into. Last runnable step
# is exempt — parts finishing there complete in place
# (qty_done reconciliation at job close is the catch-net).
if not skip_qty_gate and step.qty_at_step > 0:
has_downstream = step.job_id.step_ids.filtered(
lambda s: s.sequence > step.sequence
and s.state in ('pending', 'ready')
)
if has_downstream:
raise UserError(_(
"Step '%(name)s' still has %(n)d part(s) "
"parked — move them to the next step before "
"finishing. Use the row's 'Complete 1 → Next' "
"or 'Move…' button."
) % {'name': step.name, 'n': step.qty_at_step})
now = fields.Datetime.now() now = fields.Datetime.now()
# Close the open timelog (the one with no date_finished) # Close the open timelog (the one with no date_finished)
open_log = step.time_log_ids.filtered(lambda l: not l.date_finished) open_log = step.time_log_ids.filtered(lambda l: not l.date_finished)
@@ -382,3 +417,138 @@ class FpJobStep(models.Model):
# Sum of all interval durations becomes duration_actual # Sum of all interval durations becomes duration_actual
step.duration_actual = sum(step.time_log_ids.mapped('duration_minutes')) step.duration_actual = sum(step.time_log_ids.mapped('duration_minutes'))
return True return True
# ===== Manager-only overrides ===========================================
# Used when an operator skipped or cancelled a step in error, or when
# the actual shop-floor work happened outside the ERP and the manager
# needs to retroactively mark the step complete. Both actions are
# group-gated and post a clear audit entry to the step's chatter.
def button_manager_force_complete(self):
"""Force any non-done state straight to 'done'. Stamps the first-
start / first-finish audit fields if blank so the timeline isn't
broken, and closes any timelog still left open."""
if not self.env.user.has_group(
'fusion_plating.group_fusion_plating_manager'):
raise AccessError(_(
'Only Plating Manager+ can force-complete a step.'
))
for step in self:
if step.state == 'done':
raise UserError(_(
"Step '%s' is already done."
) % step.name)
prev_state = step.state
now = fields.Datetime.now()
# Close any open timelogs first — labour already incurred
# stays in the audit even when we shortcut to done.
open_log = step.time_log_ids.filtered(
lambda l: not l.date_finished
)
if open_log:
open_log.write({'date_finished': now, 'state': 'stopped'})
vals = {'state': 'done'}
if not step.date_started:
vals['date_started'] = now
vals['started_by_user_id'] = self.env.user.id
if not step.date_finished:
vals['date_finished'] = now
vals['finished_by_user_id'] = self.env.user.id
step.write(vals)
step.message_post(body=_(
'Step force-completed by %s (was %s).'
) % (self.env.user.name, prev_state))
return True
def button_manager_reset_to_ready(self):
"""Reset any non-ready step back to 'ready' so the operator can
run it normally. Audited via chatter.
Side-effects, depending on the previous state:
- in_progress / paused → close any open timelog (mirrors
button_cancel) so labour already logged stays in the audit.
- done → also clear date_finished + finished_by_user_id so the
next button_finish writes fresh first-finish stamps instead
of preserving stale ones.
date_started + started_by_user_id are preserved across resets —
they record the first start ever (audit), and duration_actual is
computed from the sum of timelogs, not (finish - start), so the
elapsed math remains correct."""
if not self.env.user.has_group(
'fusion_plating.group_fusion_plating_manager'):
raise AccessError(_(
'Only Plating Manager+ can reset a step state.'
))
now = fields.Datetime.now()
for step in self:
if step.state == 'ready':
raise UserError(_(
"Step '%s' is already in Ready state."
) % step.name)
prev_state = step.state
vals = {'state': 'ready'}
# Close any still-open timelog (defensive — usually only
# in_progress/paused will have one).
open_log = step.time_log_ids.filtered(
lambda l: not l.date_finished
)
if open_log:
open_log.write({'date_finished': now, 'state': 'stopped'})
# If the step had been completed, wipe the finish stamps so
# the next Finish records fresh audit values. Skip this for
# in_progress / paused / skipped / cancelled / pending — they
# either have no finish stamp or shouldn't have one cleared.
if step.state == 'done':
vals['date_finished'] = False
vals['finished_by_user_id'] = False
step.write(vals)
step.message_post(body=_(
'Step state reset to Ready by %s (was %s).'
) % (self.env.user.name, prev_state))
return True
def action_complete_one_to_next(self):
"""One-piece flow shortcut: records move(qty=1) from this
step to the next pending/ready step, drains qty_at_step by 1.
If the drain takes qty_at_step to 0, auto-finishes the source
and starts the destination step (via action_finish_and_advance).
"""
self.ensure_one()
if self.state != 'in_progress':
raise UserError(_(
"Step '%s' must be in progress to complete a part."
) % self.name)
if self.qty_at_step < 1:
raise UserError(_(
"No parts parked at step '%s' — nothing to complete."
) % self.name)
next_step = self.job_id.step_ids.filtered(
lambda s: s.sequence > self.sequence
and s.state in ('pending', 'ready')
).sorted('sequence')[:1]
if not next_step:
raise UserError(_(
"Step '%s' is the last runnable step on the job — "
"no downstream step to move into. Finish the step "
"instead (it will close out the job)."
) % self.name)
self.env['fp.job.step.move'].create({
'job_id': self.job_id.id,
'from_step_id': self.id,
'to_step_id': next_step.id,
'transfer_type': 'step',
'qty_moved': 1,
'moved_by_user_id': self.env.user.id,
})
# qty_at_step is computed from moves; force re-read before
# checking whether this was the last part.
self.invalidate_recordset(['qty_at_step'])
if self.qty_at_step == 0:
return self.action_finish_and_advance()
return True

View File

@@ -35,7 +35,7 @@
</header> </header>
<sheet> <sheet>
<div class="oe_title"> <div class="oe_title">
<h1><field name="name" readonly="1"/></h1> <h1><field name="display_name" readonly="1"/></h1>
</div> </div>
<group> <group>
<group> <group>

View File

@@ -3,7 +3,7 @@
# License OPL-1 (Odoo Proprietary License v1.0) # License OPL-1 (Odoo Proprietary License v1.0)
{ {
'name': 'Fusion Plating — Native Jobs', 'name': 'Fusion Plating — Native Jobs',
'version': '19.0.8.19.6', 'version': '19.0.8.20.1',
'category': 'Manufacturing/Plating', 'category': 'Manufacturing/Plating',
'summary': 'Native plating job model — replaces mrp.production / mrp.workorder bridge.', 'summary': 'Native plating job model — replaces mrp.production / mrp.workorder bridge.',
'author': 'Nexa Systems Inc.', 'author': 'Nexa Systems Inc.',

View File

@@ -367,16 +367,6 @@ class FpJobStep(models.Model):
if cr_action: if cr_action:
return cr_action return cr_action
# Racking step routing — same idea as Contract Review. If the
# operator clicks Finish on a Racking step but the linked
# racking inspection isn't done yet, route them straight to
# the inspection form instead of throwing a "find the smart
# button" error message. They complete the line check-off,
# mark Done, and re-click Finish & Next to advance.
ri_action = self._fp_racking_inspection_redirect()
if ri_action:
return ri_action
# Prompt-first behaviour: show the Record Inputs dialog when the # Prompt-first behaviour: show the Record Inputs dialog when the
# recipe step has authored prompts and nothing has been captured # recipe step has authored prompts and nothing has been captured
# in this run. Bypass when context flag is set (i.e. we're being # in this run. Bypass when context flag is set (i.e. we're being
@@ -386,6 +376,11 @@ class FpJobStep(models.Model):
and self._fp_has_uncaptured_step_inputs()): and self._fp_has_uncaptured_step_inputs()):
return self._fp_open_input_wizard(advance_after=True) return self._fp_open_input_wizard(advance_after=True)
# Auto-move shim: for qty_at_step==1 + downstream step,
# silently record a move(qty=1) so the qty gate in
# button_finish passes. Raises for qty>1 (operator must use
# Complete 1 → Next or Move…). Last step is always allowed.
self._fp_record_one_piece_auto_move()
self.button_finish() self.button_finish()
next_step = self._fp_next_runnable_step() next_step = self._fp_next_runnable_step()
if next_step: if next_step:
@@ -641,34 +636,15 @@ class FpJobStep(models.Model):
def _fp_open_contract_review(self): def _fp_open_contract_review(self):
"""Auto-create the QA-005 form for this step's part if missing, """Auto-create the QA-005 form for this step's part if missing,
return the act_window pointing at it. Called from button_start return the act_window pointing at it. Called from button_start
on Contract Review steps. on Contract Review steps."""
Returns None when the review is already satisfied (state
'complete' or 'dismissed') — letting button_start fall through
to the standard path so the step starts directly, without an
unnecessary detour through an already-signed form. This mirrors
the Finish & Next redirect behaviour: once contract review is
cleared for a part, neither Start nor Finish stops to ask
about it again.
Also short-circuits when the customer doesn't require contract
review and via the manager-bypass context flag, to keep entry
and finish gates in lockstep.
"""
self.ensure_one() self.ensure_one()
if self.env.context.get('fp_skip_contract_review_gate'):
return None
part = self._fp_resolve_contract_review_part() part = self._fp_resolve_contract_review_part()
if not part: if not part:
return None return None
if not part.partner_id.x_fc_contract_review_required:
return None
Review = self.env.get('fp.contract.review') Review = self.env.get('fp.contract.review')
if Review is None: if Review is None:
return None # quality module not installed — skip return None # quality module not installed — skip
review = part.x_fc_contract_review_id review = part.x_fc_contract_review_id
if review and review.state in ('complete', 'dismissed'):
return None # already satisfied — fall through to normal start
if not review: if not review:
review = Review.sudo().create({ review = Review.sudo().create({
'part_id': part.id, 'part_id': part.id,
@@ -796,46 +772,6 @@ class FpJobStep(models.Model):
'name': _('Racking Inspection — %s') % self.job_id.name, 'name': _('Racking Inspection — %s') % self.job_id.name,
} }
def _fp_racking_inspection_redirect(self):
"""Return an act_window opening the linked racking inspection
form, or False to indicate "no redirect needed".
Mirrors ``_fp_contract_review_redirect``. Triggers when:
* this step is a Racking step (matched by ``_fp_is_racking_step``)
* the linked ``fp.racking.inspection`` exists and is NOT yet in
a terminal state (``done`` / ``discrepancy_flagged``)
When the inspection is already terminal — or doesn't exist at
all — returns False so action_finish_and_advance falls through
to the normal finish path. The hard gate
(``_fp_check_racking_inspection_complete``) still fires from
``button_finish`` for any caller that bypasses the redirect.
Manager bypass via ``fp_skip_racking_inspection_gate=True``.
"""
self.ensure_one()
if self.env.context.get('fp_skip_racking_inspection_gate'):
return False
if not self._fp_is_racking_step():
return False
if 'fp.racking.inspection' not in self.env:
return False
ri = self.job_id.racking_inspection_id
if not ri:
# No inspection record at all — let the soft gate handle
# this with a chatter warning, don't redirect.
return False
if ri.state in ('done', 'discrepancy_flagged'):
return False
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.racking.inspection',
'res_id': ri.id,
'view_mode': 'form',
'target': 'current',
'name': _('Racking Inspection — %s') % self.job_id.name,
}
def _fp_check_racking_inspection_complete(self): def _fp_check_racking_inspection_complete(self):
"""Soft gate — block button_finish on a Racking step until the """Soft gate — block button_finish on a Racking step until the
linked inspection is in a terminal state. discrepancy_flagged linked inspection is in a terminal state. discrepancy_flagged
@@ -1008,51 +944,32 @@ class FpJobStep(models.Model):
"""Return an ir.actions.act_window opening the part's QA-005 """Return an ir.actions.act_window opening the part's QA-005
Contract Review form, or False to indicate "no redirect needed". Contract Review form, or False to indicate "no redirect needed".
Triggers when ALL of these are true: Triggers when:
* the step is a Contract Review step (matched via * the recipe node is flagged default_kind='contract_review', AND
``_fp_is_contract_review_step`` — name OR template kind OR * the linked part has no review yet OR the review is still in
node kind, same as the finish-time gate), a non-terminal state (draft / assistant_review / manager_review).
* the customer requires contract review
(``partner.x_fc_contract_review_required = True``), AND
* the linked part either has no review yet OR the review is
still in a non-terminal state (draft / assistant_review /
manager_review).
Once the review reaches state 'complete' or 'dismissed' the Once the review reaches state 'complete' or 'dismissed' the step
step is allowed to finish through the normal path. This is how is allowed to finish through the normal path, which is how the
Finish & Next moves on to the next step automatically once the operator clears the contract-review gate after signing QA-005.
contract review is already satisfied for that part — including
when the review was completed on a previous order.
Resolution mirrors ``_fp_check_contract_review_complete`` so a Soft-fail: if the job has no part_catalog_id we cannot route to
single source of truth governs both ENTRY (this redirect) and a per-part review, so we fall through to the standard wizard
FINISH (the gate) — they always agree on whether a step is a rather than blocking the operator.
contract review and which part it's bound to.
Soft-fail: if no part can be resolved we fall through to the
standard wizard rather than blocking the operator.
""" """
self.ensure_one() self.ensure_one()
# Manager bypass — same context flag the gate honours. node = self.recipe_node_id
if self.env.context.get('fp_skip_contract_review_gate'): if not node or node.default_kind != 'contract_review':
return False return False
if not self._fp_is_contract_review_step(): part = self.job_id.part_catalog_id
return False
part = self._fp_resolve_contract_review_part() \
or self.job_id.part_catalog_id
if not part: if not part:
_logger.warning( _logger.warning(
"Contract-review step '%s' on job %s has no part " "Contract-review step '%s' on job %s has no part_catalog_id "
"cannot redirect to QA-005 form, falling through to " "cannot redirect to QA-005 form, falling through to "
"standard wizard.", "standard wizard.",
self.name, self.job_id.name, self.name, self.job_id.name,
) )
return False return False
# Customer flag check — when the customer doesn't require
# contract review, the redirect doesn't fire and the step
# finishes through the normal path. Matches the gate's policy.
if not part.partner_id.x_fc_contract_review_required:
return False
review = part.x_fc_contract_review_id review = part.x_fc_contract_review_id
if review and review.state in ('complete', 'dismissed'): if review and review.state in ('complete', 'dismissed'):
return False return False
@@ -1110,28 +1027,6 @@ class FpJobStep(models.Model):
related='recipe_node_id.collect_measurements', related='recipe_node_id.collect_measurements',
readonly=True, readonly=True,
) )
# Job context related fields — used by the quick-look modal so the
# operator can see which job / customer / part / qty this step
# belongs to without opening the parent job form. Related (not
# stored) so they always reflect the live job record.
quick_look_partner_id = fields.Many2one(
'res.partner', string='Customer',
related='job_id.partner_id', readonly=True,
)
quick_look_part_catalog_id = fields.Many2one(
'fp.part.catalog', string='Part',
related='job_id.part_catalog_id', readonly=True,
)
quick_look_qty = fields.Float(
string='Order Qty',
related='job_id.qty', readonly=True,
)
quick_look_instruction_attachment_ids = fields.Many2many(
'ir.attachment',
string='Instruction Images',
related='recipe_node_id.instruction_attachment_ids',
readonly=True,
)
quick_look_prompt_ids = fields.Many2many( quick_look_prompt_ids = fields.Many2many(
'fusion.plating.process.node.input', 'fusion.plating.process.node.input',
string='Prompts', string='Prompts',
@@ -1197,3 +1092,43 @@ class FpJobStep(models.Model):
'target': 'new', 'target': 'new',
'name': self.name, 'name': self.name,
} }
def _fp_record_one_piece_auto_move(self):
"""Decide whether to silently record a move(qty=1) before
the step finishes. Five cases:
- qty_at_step == 0: nothing to do (parts already moved).
- last runnable step: parts complete in place; no move.
- qty_at_step == 1 + downstream: record move(1).
- qty_at_step > 1 + downstream: raise.
- qty_at_step > 1 + last step: allow (parts complete in
place; qty_done auto-tick is Phase 2).
Called from action_finish_and_advance just before
button_finish.
"""
self.ensure_one()
qty = self.qty_at_step
if qty <= 0:
return False
next_step = self.job_id.step_ids.filtered(
lambda s: s.sequence > self.sequence
and s.state in ('pending', 'ready')
).sorted('sequence')[:1]
if not next_step:
return False
if qty > 1:
raise UserError(_(
"Step '%s' still has %d parts here — use the row's "
"'Complete 1 → Next' button (for one-by-one flow) "
"or the 'Move…' wizard (for batched flow) to drain "
"the step before finishing."
) % (self.name, qty))
self.env['fp.job.step.move'].create({
'job_id': self.job_id.id,
'from_step_id': self.id,
'to_step_id': next_step.id,
'transfer_type': 'step',
'qty_moved': 1,
'moved_by_user_id': self.env.user.id,
})
return True

View File

@@ -331,3 +331,261 @@ class TestMilestoneCascade(TransactionCase):
delivery.action_mark_delivered() delivery.action_mark_delivered()
except Exception as e: except Exception as e:
self.assertNotIn('draft certificate', str(e)) self.assertNotIn('draft certificate', str(e))
class TestQtyGate(TransactionCase):
"""Step-level quantity gate + partial-qty handling.
Covers:
- button_finish blocks when qty_at_step > 0 AND downstream
steps exist (mid-recipe)
- manager bypass via fp_skip_qty_gate=True
- last-runnable-step exemption (qty_at_step > 0 allowed)
- action_complete_one_to_next (Task 3)
- auto-move shim on action_finish_and_advance (Task 4)
- display_name rename (Task 5)
"""
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.partner = cls.env['res.partner'].create({'name': 'QtyCust'})
cls.product = cls.env['product.product'].create({
'name': 'QtyWidget',
})
def _make_job(self, qty=3, **kw):
vals = {
'partner_id': self.partner.id,
'product_id': self.product.id,
'qty': qty,
}
vals.update(kw)
return self.env['fp.job'].create(vals)
def _make_step(self, job, name='Step', sequence=10, state='pending'):
return self.env['fp.job.step'].create({
'job_id': job.id,
'name': name,
'sequence': sequence,
'state': state,
})
def _make_two_step_chain(self, qty=3):
"""Create a job with two steps; the first is in_progress
with `qty` parts parked, the second is ready."""
from odoo import fields
job = self._make_job(qty=qty)
step1 = self._make_step(
job, name='Plate', sequence=10, state='in_progress',
)
step2 = self._make_step(
job, name='Bake', sequence=20, state='ready',
)
step1.date_started = fields.Datetime.now()
return job, step1, step2
# ---------------- button_finish gate ----------------------------
def test_button_finish_blocks_when_qty_at_step(self):
from odoo.exceptions import UserError
job, step1, step2 = self._make_two_step_chain(qty=3)
step1.invalidate_recordset(['qty_at_step'])
self.assertEqual(step1.qty_at_step, 3)
with self.assertRaises(UserError) as exc:
step1.button_finish()
self.assertIn('parts parked', str(exc.exception).replace(
'part(s) parked', 'parts parked'))
def test_button_finish_bypass(self):
job, step1, step2 = self._make_two_step_chain(qty=3)
step1.invalidate_recordset(['qty_at_step'])
step1.with_context(fp_skip_qty_gate=True).button_finish()
self.assertEqual(step1.state, 'done')
def test_button_finish_allows_last_step_with_qty(self):
"""Last runnable step is exempt — parts complete in place."""
from odoo import fields
job = self._make_job(qty=5)
last = self._make_step(
job, name='FinalInspect', sequence=10, state='in_progress',
)
last.date_started = fields.Datetime.now()
last.invalidate_recordset(['qty_at_step'])
self.assertEqual(last.qty_at_step, 5)
last.button_finish()
self.assertEqual(last.state, 'done')
def test_button_finish_passes_when_qty_zero(self):
"""qty_at_step==0 (already moved out) → no gate fires."""
job, step1, step2 = self._make_two_step_chain(qty=2)
self.env['fp.job.step.move'].create({
'job_id': job.id,
'from_step_id': step1.id,
'to_step_id': step2.id,
'transfer_type': 'step',
'qty_moved': 2,
'moved_by_user_id': self.env.user.id,
})
step1.invalidate_recordset(['qty_at_step'])
self.assertEqual(step1.qty_at_step, 0)
step1.button_finish()
self.assertEqual(step1.state, 'done')
# ---------------- action_complete_one_to_next -------------------
def test_complete_one_to_next_records_move(self):
job, step1, step2 = self._make_two_step_chain(qty=3)
step1.invalidate_recordset(['qty_at_step'])
self.assertEqual(step1.qty_at_step, 3)
step1.action_complete_one_to_next()
moves = self.env['fp.job.step.move'].search([
('from_step_id', '=', step1.id),
])
self.assertEqual(len(moves), 1)
self.assertEqual(moves.qty_moved, 1)
step1.invalidate_recordset(['qty_at_step'])
self.assertEqual(step1.state, 'in_progress')
self.assertEqual(step1.qty_at_step, 2)
def test_complete_one_to_next_auto_finishes_on_last(self):
job, step1, step2 = self._make_two_step_chain(qty=1)
step1.invalidate_recordset(['qty_at_step'])
self.assertEqual(step1.qty_at_step, 1)
step1.action_complete_one_to_next()
self.assertEqual(step1.state, 'done')
self.assertEqual(step2.state, 'in_progress')
def test_complete_one_to_next_blocks_when_empty(self):
from odoo.exceptions import UserError
job, step1, step2 = self._make_two_step_chain(qty=2)
self.env['fp.job.step.move'].create({
'job_id': job.id,
'from_step_id': step1.id,
'to_step_id': step2.id,
'transfer_type': 'step',
'qty_moved': 2,
'moved_by_user_id': self.env.user.id,
})
step1.invalidate_recordset(['qty_at_step'])
with self.assertRaises(UserError) as exc:
step1.action_complete_one_to_next()
self.assertIn('nothing to complete', str(exc.exception))
def test_complete_one_to_next_blocks_when_no_next_step(self):
from odoo.exceptions import UserError
from odoo import fields
job = self._make_job(qty=3)
last = self._make_step(
job, name='Inspect', sequence=10, state='in_progress',
)
last.date_started = fields.Datetime.now()
last.invalidate_recordset(['qty_at_step'])
with self.assertRaises(UserError) as exc:
last.action_complete_one_to_next()
self.assertIn('last runnable step', str(exc.exception))
def test_complete_one_to_next_blocks_when_not_in_progress(self):
from odoo.exceptions import UserError
job, step1, step2 = self._make_two_step_chain(qty=3)
step1.state = 'pending'
with self.assertRaises(UserError) as exc:
step1.action_complete_one_to_next()
self.assertIn('must be in progress', str(exc.exception))
# ---------------- auto-move shim on Finish & Next ---------------
def test_finish_and_advance_auto_move_for_qty_1(self):
job, step1, step2 = self._make_two_step_chain(qty=1)
step1.invalidate_recordset(['qty_at_step'])
self.assertEqual(step1.qty_at_step, 1)
step1.action_finish_and_advance()
moves = self.env['fp.job.step.move'].search([
('from_step_id', '=', step1.id),
])
self.assertEqual(len(moves), 1)
self.assertEqual(moves.qty_moved, 1)
self.assertEqual(step1.state, 'done')
self.assertEqual(step2.state, 'in_progress')
def test_finish_and_advance_blocks_for_qty_gt_1(self):
from odoo.exceptions import UserError
job, step1, step2 = self._make_two_step_chain(qty=3)
step1.invalidate_recordset(['qty_at_step'])
self.assertEqual(step1.qty_at_step, 3)
with self.assertRaises(UserError) as exc:
step1.action_finish_and_advance()
self.assertIn("Complete 1", str(exc.exception))
self.assertEqual(step1.state, 'in_progress')
def test_finish_and_advance_passes_for_qty_0(self):
job, step1, step2 = self._make_two_step_chain(qty=2)
self.env['fp.job.step.move'].create({
'job_id': job.id,
'from_step_id': step1.id,
'to_step_id': step2.id,
'transfer_type': 'step',
'qty_moved': 2,
'moved_by_user_id': self.env.user.id,
})
step1.invalidate_recordset(['qty_at_step'])
before = self.env['fp.job.step.move'].search_count([
('from_step_id', '=', step1.id),
])
step1.action_finish_and_advance()
after = self.env['fp.job.step.move'].search_count([
('from_step_id', '=', step1.id),
])
self.assertEqual(after, before)
self.assertEqual(step1.state, 'done')
def test_finish_and_advance_allows_last_step_with_qty_gt_1(self):
from odoo import fields
job = self._make_job(qty=5)
last = self._make_step(
job, name='FinalInspect', sequence=10, state='in_progress',
)
last.date_started = fields.Datetime.now()
last.invalidate_recordset(['qty_at_step'])
self.assertEqual(last.qty_at_step, 5)
before = self.env['fp.job.step.move'].search_count([])
last.action_finish_and_advance()
after = self.env['fp.job.step.move'].search_count([])
self.assertEqual(after, before)
self.assertEqual(last.state, 'done')
# ---------------- display_name rename ----------------------------
def test_display_name_format(self):
job = self._make_job(qty=1)
self.assertTrue(job.name.startswith('WH/JOB/'))
self.assertTrue(job.display_name.startswith('Work Order # '))
suffix = job.name.rsplit('/', 1)[-1]
self.assertEqual(job.display_name, 'Work Order # %s' % suffix)
def test_display_name_no_slash_passthrough(self):
"""Manually-named jobs without the sequence prefix display
as-is (no rewrite)."""
job = self._make_job(qty=1)
job.name = 'SmokeJob42'
job.invalidate_recordset(['display_name'])
self.assertEqual(job.display_name, 'SmokeJob42')
# ---------------- Move wizard zero-qty regression ----------------
def test_move_wizard_blocks_zero_qty(self):
from odoo.exceptions import UserError
job, step1, step2 = self._make_two_step_chain(qty=2)
wiz = self.env['fp.job.step.move.wizard'].create({
'job_id': job.id,
'from_step_id': step1.id,
'to_step_id': step2.id,
'qty_moved': 0,
'transfer_type': 'step',
})
with self.assertRaises(UserError) as exc:
wiz.action_commit()
self.assertIn('at least 1', str(exc.exception))

View File

@@ -167,6 +167,14 @@
string="Pause" icon="fa-pause" string="Pause" icon="fa-pause"
class="btn-link text-warning" class="btn-link text-warning"
invisible="state != 'in_progress'"/> invisible="state != 'in_progress'"/>
<!-- Streaming flow: complete 1 part at a time,
move to next step. Hidden when nothing is
parked or the step isn't actively running.
Auto-finishes when qty_at_step drains to 0. -->
<button name="action_complete_one_to_next" type="object"
string="Complete 1 → Next" icon="fa-forward"
class="btn-link text-success"
invisible="state != 'in_progress' or qty_at_step &lt; 1"/>
<button name="action_open_input_wizard" type="object" <button name="action_open_input_wizard" type="object"
string="Record" icon="fa-pencil-square-o" string="Record" icon="fa-pencil-square-o"
class="btn-link" class="btn-link"