feat(jobs): step qty gate + partial-qty + display rename
Three coupled shop-floor corrections: - fp.job._compute_display_name: renders "Work Order # 00011" in form header, breadcrumbs, M2O dropdowns, and error messages. DB name stays as WH/JOB/00011 - existing chatter/cert/delivery references unchanged. - fp.job.step.button_finish: refuses if qty_at_step > 0 AND a downstream pending/ready step exists. Last runnable step is exempt (parts complete in place). Manager bypass via fp_skip_qty_gate=True context key. - fp.job.step.action_complete_one_to_next: new per-row button "Complete 1 -> Next" for streaming flow (large parts going one-by-one). Records move(qty=1) to next step; if drain takes qty_at_step to 0, auto-finishes source + auto-starts destination via existing action_finish_and_advance. - fp.job.step._fp_record_one_piece_auto_move: auto-move shim wired into action_finish_and_advance. qty=1 + downstream => silently record move(1). qty>1 + downstream => raise pointing at Complete 1 -> Next. Last step always allowed. - 16 new TestQtyGate tests covering gate / shim / auto-finish / last-step exemption / display rename / Move wizard zero-qty. Spec: docs/superpowers/specs/2026-05-12-step-qty-gate-and-display-rename-design.md Plan: docs/superpowers/plans/2026-05-12-step-qty-gate-and-display-rename.md Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -331,3 +331,261 @@ class TestMilestoneCascade(TransactionCase):
|
||||
delivery.action_mark_delivered()
|
||||
except Exception as e:
|
||||
self.assertNotIn('draft certificate', str(e))
|
||||
|
||||
|
||||
class TestQtyGate(TransactionCase):
|
||||
"""Step-level quantity gate + partial-qty handling.
|
||||
|
||||
Covers:
|
||||
- button_finish blocks when qty_at_step > 0 AND downstream
|
||||
steps exist (mid-recipe)
|
||||
- manager bypass via fp_skip_qty_gate=True
|
||||
- last-runnable-step exemption (qty_at_step > 0 allowed)
|
||||
- action_complete_one_to_next (Task 3)
|
||||
- auto-move shim on action_finish_and_advance (Task 4)
|
||||
- display_name rename (Task 5)
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
cls.partner = cls.env['res.partner'].create({'name': 'QtyCust'})
|
||||
cls.product = cls.env['product.product'].create({
|
||||
'name': 'QtyWidget',
|
||||
})
|
||||
|
||||
def _make_job(self, qty=3, **kw):
|
||||
vals = {
|
||||
'partner_id': self.partner.id,
|
||||
'product_id': self.product.id,
|
||||
'qty': qty,
|
||||
}
|
||||
vals.update(kw)
|
||||
return self.env['fp.job'].create(vals)
|
||||
|
||||
def _make_step(self, job, name='Step', sequence=10, state='pending'):
|
||||
return self.env['fp.job.step'].create({
|
||||
'job_id': job.id,
|
||||
'name': name,
|
||||
'sequence': sequence,
|
||||
'state': state,
|
||||
})
|
||||
|
||||
def _make_two_step_chain(self, qty=3):
|
||||
"""Create a job with two steps; the first is in_progress
|
||||
with `qty` parts parked, the second is ready."""
|
||||
from odoo import fields
|
||||
job = self._make_job(qty=qty)
|
||||
step1 = self._make_step(
|
||||
job, name='Plate', sequence=10, state='in_progress',
|
||||
)
|
||||
step2 = self._make_step(
|
||||
job, name='Bake', sequence=20, state='ready',
|
||||
)
|
||||
step1.date_started = fields.Datetime.now()
|
||||
return job, step1, step2
|
||||
|
||||
# ---------------- button_finish gate ----------------------------
|
||||
|
||||
def test_button_finish_blocks_when_qty_at_step(self):
|
||||
from odoo.exceptions import UserError
|
||||
job, step1, step2 = self._make_two_step_chain(qty=3)
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(step1.qty_at_step, 3)
|
||||
with self.assertRaises(UserError) as exc:
|
||||
step1.button_finish()
|
||||
self.assertIn('parts parked', str(exc.exception).replace(
|
||||
'part(s) parked', 'parts parked'))
|
||||
|
||||
def test_button_finish_bypass(self):
|
||||
job, step1, step2 = self._make_two_step_chain(qty=3)
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
step1.with_context(fp_skip_qty_gate=True).button_finish()
|
||||
self.assertEqual(step1.state, 'done')
|
||||
|
||||
def test_button_finish_allows_last_step_with_qty(self):
|
||||
"""Last runnable step is exempt — parts complete in place."""
|
||||
from odoo import fields
|
||||
job = self._make_job(qty=5)
|
||||
last = self._make_step(
|
||||
job, name='FinalInspect', sequence=10, state='in_progress',
|
||||
)
|
||||
last.date_started = fields.Datetime.now()
|
||||
last.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(last.qty_at_step, 5)
|
||||
last.button_finish()
|
||||
self.assertEqual(last.state, 'done')
|
||||
|
||||
def test_button_finish_passes_when_qty_zero(self):
|
||||
"""qty_at_step==0 (already moved out) → no gate fires."""
|
||||
job, step1, step2 = self._make_two_step_chain(qty=2)
|
||||
self.env['fp.job.step.move'].create({
|
||||
'job_id': job.id,
|
||||
'from_step_id': step1.id,
|
||||
'to_step_id': step2.id,
|
||||
'transfer_type': 'step',
|
||||
'qty_moved': 2,
|
||||
'moved_by_user_id': self.env.user.id,
|
||||
})
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(step1.qty_at_step, 0)
|
||||
step1.button_finish()
|
||||
self.assertEqual(step1.state, 'done')
|
||||
|
||||
|
||||
# ---------------- action_complete_one_to_next -------------------
|
||||
|
||||
def test_complete_one_to_next_records_move(self):
|
||||
job, step1, step2 = self._make_two_step_chain(qty=3)
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(step1.qty_at_step, 3)
|
||||
step1.action_complete_one_to_next()
|
||||
moves = self.env['fp.job.step.move'].search([
|
||||
('from_step_id', '=', step1.id),
|
||||
])
|
||||
self.assertEqual(len(moves), 1)
|
||||
self.assertEqual(moves.qty_moved, 1)
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(step1.state, 'in_progress')
|
||||
self.assertEqual(step1.qty_at_step, 2)
|
||||
|
||||
def test_complete_one_to_next_auto_finishes_on_last(self):
|
||||
job, step1, step2 = self._make_two_step_chain(qty=1)
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(step1.qty_at_step, 1)
|
||||
step1.action_complete_one_to_next()
|
||||
self.assertEqual(step1.state, 'done')
|
||||
self.assertEqual(step2.state, 'in_progress')
|
||||
|
||||
def test_complete_one_to_next_blocks_when_empty(self):
|
||||
from odoo.exceptions import UserError
|
||||
job, step1, step2 = self._make_two_step_chain(qty=2)
|
||||
self.env['fp.job.step.move'].create({
|
||||
'job_id': job.id,
|
||||
'from_step_id': step1.id,
|
||||
'to_step_id': step2.id,
|
||||
'transfer_type': 'step',
|
||||
'qty_moved': 2,
|
||||
'moved_by_user_id': self.env.user.id,
|
||||
})
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
with self.assertRaises(UserError) as exc:
|
||||
step1.action_complete_one_to_next()
|
||||
self.assertIn('nothing to complete', str(exc.exception))
|
||||
|
||||
def test_complete_one_to_next_blocks_when_no_next_step(self):
|
||||
from odoo.exceptions import UserError
|
||||
from odoo import fields
|
||||
job = self._make_job(qty=3)
|
||||
last = self._make_step(
|
||||
job, name='Inspect', sequence=10, state='in_progress',
|
||||
)
|
||||
last.date_started = fields.Datetime.now()
|
||||
last.invalidate_recordset(['qty_at_step'])
|
||||
with self.assertRaises(UserError) as exc:
|
||||
last.action_complete_one_to_next()
|
||||
self.assertIn('last runnable step', str(exc.exception))
|
||||
|
||||
def test_complete_one_to_next_blocks_when_not_in_progress(self):
|
||||
from odoo.exceptions import UserError
|
||||
job, step1, step2 = self._make_two_step_chain(qty=3)
|
||||
step1.state = 'pending'
|
||||
with self.assertRaises(UserError) as exc:
|
||||
step1.action_complete_one_to_next()
|
||||
self.assertIn('must be in progress', str(exc.exception))
|
||||
|
||||
|
||||
# ---------------- auto-move shim on Finish & Next ---------------
|
||||
|
||||
def test_finish_and_advance_auto_move_for_qty_1(self):
|
||||
job, step1, step2 = self._make_two_step_chain(qty=1)
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(step1.qty_at_step, 1)
|
||||
step1.action_finish_and_advance()
|
||||
moves = self.env['fp.job.step.move'].search([
|
||||
('from_step_id', '=', step1.id),
|
||||
])
|
||||
self.assertEqual(len(moves), 1)
|
||||
self.assertEqual(moves.qty_moved, 1)
|
||||
self.assertEqual(step1.state, 'done')
|
||||
self.assertEqual(step2.state, 'in_progress')
|
||||
|
||||
def test_finish_and_advance_blocks_for_qty_gt_1(self):
|
||||
from odoo.exceptions import UserError
|
||||
job, step1, step2 = self._make_two_step_chain(qty=3)
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(step1.qty_at_step, 3)
|
||||
with self.assertRaises(UserError) as exc:
|
||||
step1.action_finish_and_advance()
|
||||
self.assertIn("Complete 1", str(exc.exception))
|
||||
self.assertEqual(step1.state, 'in_progress')
|
||||
|
||||
def test_finish_and_advance_passes_for_qty_0(self):
|
||||
job, step1, step2 = self._make_two_step_chain(qty=2)
|
||||
self.env['fp.job.step.move'].create({
|
||||
'job_id': job.id,
|
||||
'from_step_id': step1.id,
|
||||
'to_step_id': step2.id,
|
||||
'transfer_type': 'step',
|
||||
'qty_moved': 2,
|
||||
'moved_by_user_id': self.env.user.id,
|
||||
})
|
||||
step1.invalidate_recordset(['qty_at_step'])
|
||||
before = self.env['fp.job.step.move'].search_count([
|
||||
('from_step_id', '=', step1.id),
|
||||
])
|
||||
step1.action_finish_and_advance()
|
||||
after = self.env['fp.job.step.move'].search_count([
|
||||
('from_step_id', '=', step1.id),
|
||||
])
|
||||
self.assertEqual(after, before)
|
||||
self.assertEqual(step1.state, 'done')
|
||||
|
||||
def test_finish_and_advance_allows_last_step_with_qty_gt_1(self):
|
||||
from odoo import fields
|
||||
job = self._make_job(qty=5)
|
||||
last = self._make_step(
|
||||
job, name='FinalInspect', sequence=10, state='in_progress',
|
||||
)
|
||||
last.date_started = fields.Datetime.now()
|
||||
last.invalidate_recordset(['qty_at_step'])
|
||||
self.assertEqual(last.qty_at_step, 5)
|
||||
before = self.env['fp.job.step.move'].search_count([])
|
||||
last.action_finish_and_advance()
|
||||
after = self.env['fp.job.step.move'].search_count([])
|
||||
self.assertEqual(after, before)
|
||||
self.assertEqual(last.state, 'done')
|
||||
|
||||
|
||||
# ---------------- display_name rename ----------------------------
|
||||
|
||||
def test_display_name_format(self):
|
||||
job = self._make_job(qty=1)
|
||||
self.assertTrue(job.name.startswith('WH/JOB/'))
|
||||
self.assertTrue(job.display_name.startswith('Work Order # '))
|
||||
suffix = job.name.rsplit('/', 1)[-1]
|
||||
self.assertEqual(job.display_name, 'Work Order # %s' % suffix)
|
||||
|
||||
def test_display_name_no_slash_passthrough(self):
|
||||
"""Manually-named jobs without the sequence prefix display
|
||||
as-is (no rewrite)."""
|
||||
job = self._make_job(qty=1)
|
||||
job.name = 'SmokeJob42'
|
||||
job.invalidate_recordset(['display_name'])
|
||||
self.assertEqual(job.display_name, 'SmokeJob42')
|
||||
|
||||
# ---------------- Move wizard zero-qty regression ----------------
|
||||
|
||||
def test_move_wizard_blocks_zero_qty(self):
|
||||
from odoo.exceptions import UserError
|
||||
job, step1, step2 = self._make_two_step_chain(qty=2)
|
||||
wiz = self.env['fp.job.step.move.wizard'].create({
|
||||
'job_id': job.id,
|
||||
'from_step_id': step1.id,
|
||||
'to_step_id': step2.id,
|
||||
'qty_moved': 0,
|
||||
'transfer_type': 'step',
|
||||
})
|
||||
with self.assertRaises(UserError) as exc:
|
||||
wiz.action_commit()
|
||||
self.assertIn('at least 1', str(exc.exception))
|
||||
|
||||
Reference in New Issue
Block a user