# Battle test v2 — re-verify after fixes for: bake-window override, # duration overrun chatter, qty reconciliation, recompute-duration. from datetime import timedelta from odoo import fields W = env['fp.direct.order.wizard'] Line = env['fp.direct.order.line'] P = env['res.partner'] Part = env['fp.part.catalog'] target = P.browse(2529) part = Part.search([('x_fc_default_coating_config_id', '!=', False)], limit=1) def make_job(po_suffix): w = W.create({ 'partner_id': target.id, 'po_pending': True, 'po_number': f'PO-BT2-{po_suffix}', 'invoice_strategy': 'net_terms', }) w._onchange_partner_id() Line.create({ 'wizard_id': w.id, 'part_catalog_id': part.id, 'coating_config_id': part.x_fc_default_coating_config_id.id, 'quantity': 5, 'unit_price': 20.0, }) r = w.action_create_order() so = env['sale.order'].browse(r['res_id']) so.action_confirm() return env['fp.job'].search([('sale_order_id', '=', so.id)], limit=1) # ====================================================================== Fix 1 print('='*72) print('FIX 1 — Bake-window: missed_window blocks, manager override allowed + audited') print('='*72) BW = env['fusion.plating.bake.window'] Bath = env['fusion.plating.bath'] expired = BW.create({ 'bath_id': Bath.search([], limit=1).id, 'plate_exit_time': fields.Datetime.now() - timedelta(hours=10), 'window_hours': 4.0, 'part_ref': 'BT2-EXPIRED', 'quantity': 5, }) BW._cron_update_states() expired.invalidate_recordset() print(f' Window {expired.name} state: {expired.state}') # Naive operator (no override) — should fail try: expired.action_start_bake() print(f' ❌ start_bake worked without override') except Exception as e: print(f' ✓ Blocked: {str(e)[:120]}') # Manager override try: expired.action_force_start_missed() print(f' ✓ Manager override succeeded: state={expired.state}') # Check chatter msgs = expired.message_ids.filtered(lambda m: 'OVERRIDE' in (m.body or '')) print(f' ✓ Chatter audit: {len(msgs)} OVERRIDE message logged') except Exception as e: print(f' ❌ Override failed: {e}') # ====================================================================== Fix 2 print() print('='*72) print('FIX 2 — Duration overrun: > 1.5x expected posts chatter warning') print('='*72) job = make_job('F2-' + fields.Datetime.now().strftime('%H%M%S')) step = job.step_ids.sorted('sequence')[0] step.duration_expected = 30 # 30 min expected step.button_start() # Force a 6h elapsed via timelog backdate step.write({'date_started': fields.Datetime.now() - timedelta(hours=6)}) # Update the open timelog to start 6h ago too open_log = step.time_log_ids.filtered(lambda l: not l.date_finished) open_log.write({'date_started': fields.Datetime.now() - timedelta(hours=6)}) step.button_finish() print(f' duration_expected={step.duration_expected:.0f} min, ' f'duration_actual={step.duration_actual:.0f} min, ' f'ratio={step.duration_actual/step.duration_expected:.1f}x') overrun_msgs = job.message_ids.filtered(lambda m: 'expected' in (m.body or '')) print(f' Chatter overrun warnings on job: {len(overrun_msgs)}') if overrun_msgs: print(f' ✓ Posted: {overrun_msgs[0].body[:100]}...') # ====================================================================== Fix 3 print() print('='*72) print('FIX 3 — Qty reconciliation: job mark-done blocks if qty mismatch') print('='*72) job = make_job('F3-' + fields.Datetime.now().strftime('%H%M%S')) for s in job.step_ids.sorted('sequence'): if s.state in ('pending', 'ready'): s.button_start() if s.state == 'in_progress': s.button_finish() print(f' Job qty={job.qty}, qty_done={job.qty_done}, qty_scrapped={job.qty_scrapped}') # Try mark done with qty_done = 0 try: job.button_mark_done() print(f' ❌ Job closed with qty_done=0!') except Exception as e: print(f' ✓ Blocked: {str(e)[:160]}') # Set qty_done = 4, qty_scrapped = 1, retry job.qty_done = 4 job.qty_scrapped = 1 print(f' Update: qty_done=4, qty_scrapped=1 (sums to qty=5)') try: job.button_mark_done() print(f' ✓ Closed with reconciled qty: state={job.state}') except Exception as e: print(f' ❌ Still blocked: {e}') # ====================================================================== Fix 4 print() print('='*72) print('FIX 4 — Supervisor edits timelog → Recompute Duration action picks it up') print('='*72) job = make_job('F4-' + fields.Datetime.now().strftime('%H%M%S')) step = job.step_ids.sorted('sequence')[0] step.button_start() import time as _t _t.sleep(1) step.button_finish() print(f' Initial: duration_actual={step.duration_actual:.4f} min, ' f'logs={len(step.time_log_ids)}') # Bob backdates the timelog (operator forgot to start; was actually 30 min) log = step.time_log_ids[0] real_start = log.date_finished - timedelta(minutes=30) log.write({'date_started': real_start}) print(f' Bob backdates log: started 30 min before finish') print(f' log.duration_minutes (auto): {log.duration_minutes:.2f} min') print(f' step.duration_actual STILL stale: {step.duration_actual:.2f} min') # Apply recompute step.action_recompute_duration_from_timelogs() print(f' After Recompute: duration_actual={step.duration_actual:.2f} min') recompute_msgs = job.message_ids.filtered(lambda m: 'recomputed' in (m.body or '').lower()) print(f' Chatter audit: {len(recompute_msgs)} recompute entry logged') env.cr.commit() print() print('== Battle test v2 complete ==')