Files
gsinghpal f08f328688 changes
2026-04-27 00:11:18 -04:00

83 lines
2.6 KiB
Python

# Verify action_abort_for_retry on a fresh job.
import time
from odoo import fields
W = env['fp.direct.order.wizard']
Line = env['fp.direct.order.line']
P = env['res.partner']
Part = env['fp.part.catalog']
target = P.browse(2529)
part = Part.search([('x_fc_default_coating_config_id', '!=', False)], limit=1)
Tank = env['fusion.plating.tank']
tanks = Tank.search([], limit=2)
w = W.create({
'partner_id': target.id, 'po_pending': True,
'po_number': 'PO-S11V-' + fields.Datetime.now().strftime('%H%M%S'),
'invoice_strategy': 'net_terms',
})
w._onchange_partner_id()
Line.create({
'wizard_id': w.id, 'part_catalog_id': part.id,
'coating_config_id': part.x_fc_default_coating_config_id.id,
'quantity': 5, 'unit_price': 20.0,
})
r = w.action_create_order()
so = env['sale.order'].browse(r['res_id'])
so.action_confirm()
job = env['fp.job'].search([('sale_order_id', '=', so.id)], limit=1)
step = job.step_ids.sorted('sequence')[3] # plating
step.tank_id = tanks[0].id
step.button_start()
print(f' [Carlos] Started {step.name} on tank {step.tank_id.name}')
time.sleep(2)
# Equipment fails
print(f' ⚡ Rectifier dies on tank {step.tank_id.name}')
print()
before_msgs = len(job.message_ids)
step.action_abort_for_retry(
reason='Rectifier #3 tripped breaker; sparking on bus bar',
new_tank_id=tanks[1].id if len(tanks) > 1 else False,
)
print(f' After abort:')
print(f' state={step.state}')
print(f' tank_id={step.tank_id.name}')
print(f' duration_actual (partial work)={step.duration_actual:.4f} min')
print(f' timelogs={len(step.time_log_ids)}, all closed: '
f'{all(l.date_finished for l in step.time_log_ids)}')
print()
after_msgs = len(job.message_ids)
print(f' Job chatter: {before_msgs}{after_msgs} (delta {after_msgs - before_msgs})')
abort_msg = job.message_ids[0]
print(f' Latest message:')
print(f' {abort_msg.body[:300]}...')
# Operator restarts on the new tank
print()
print(f' [Carlos] Restarts the step on the new tank')
step.button_start()
time.sleep(2)
step.button_finish()
print(f' Final state={step.state}, total duration_actual={step.duration_actual:.4f} min')
print(f' Total timelogs={len(step.time_log_ids)} (1 from abort + 1 from retry)')
# Failure case: try to abort a step in 'ready' state
print()
print(f' Failure test: try abort on a ready (not in_progress) step')
ready_step = job.step_ids.filtered(lambda s: s.state == 'ready')[:1]
if ready_step:
try:
ready_step.action_abort_for_retry(reason='test')
print(f' ❌ Allowed abort on ready step')
except Exception as e:
print(f' ✓ Blocked: {str(e)[:100]}')
env.cr.commit()