Per-step audit caught real enforcement bugs across all 9 WO kinds.
Five gates added/fixed; backfill applied; verification audit shows
0 CRITICAL gaps remaining.
**1. Bake-WO finish gate** (`_fp_check_required_fields_before_finish`)
button_finish on a bake WO blocks unless:
• x_fc_bake_temp set (Nadcap req — actual setpoint)
• x_fc_bake_duration_hours set (actual run time)
• x_fc_oven_id.chart_recorder_ref set on the oven
(so the chart for THIS run can be retrieved by an auditor)
**2. Rack-WO start gate** added to button_start.
**3. Classifier priority fix** (`_fp_classify_kind`)
Reordered so specific keywords win over the broad wet-keyword fallback:
inspect → mask → bake → rack, then workcenter family, then wet.
"Post-plate Inspection" now → inspect (was wrongly → wet).
"Oven bake (Post de-rack)" now → bake (was wrongly → rack).
**4. Auto-populate** target_thickness + dwell_time at WO generation.
Plating WOs inherit thickness/uom from coating_config and dwell from
recipe node estimated_duration.
**5. Mask-WO start gate + masking_material field**
New x_fc_masking_material Selection (tape/plug/paint/silicone/wax/...).
Required to start mask/de-mask WO. Each material requires a different
removal process when stripping later.
**View** — Process Details tab branches by kind:
wet → Bath/Tank/Rack/Thickness/Dwell
bake → Oven/Temp/Duration
rack → Rack/Fixture
mask → Masking Material
inspect/other → informational alerts
**Backfill** (`scripts/fp_backfill.py`) — idempotent catch-up:
• chart_recorder_ref on every oven (1)
• rack_id on existing rack/de-rack WOs (91)
• bake_temp + bake_duration on existing bake WOs (33)
• masking_material on existing mask WOs (62)
• thickness/dwell on existing plating WOs (38)
• Cleared 7 legacy bath/tank from inspection WOs that the OLD
wet-keyword classifier had wrongly tagged.
**Per-step audit** (`scripts/fp_per_step_audit.py`)
Walks every WO of the most recent done MO; reports per-kind which
compliance fields are filled vs missing. Re-runnable for regressions.
**Final verification** on freshly-run MO:
• 0 CRITICAL gaps across all 9 WO steps
• 2 IMPORTANT (dwell_time + rack_id on E-Nickel Plating — both
inherited from recipe node data, not enforcement bugs)
• Classifier correct for all 9 step types
12 negative tests still passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
937 lines
37 KiB
Python
937 lines
37 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""Comprehensive E2E simulator — workforce edition.
|
||
|
||
Role-plays each employee touching a job from quote → invoice. For
|
||
each work order:
|
||
• The assigned operator clocks in (button_start)
|
||
• Real time elapses (time.sleep)
|
||
• Chemistry / quality data is logged where relevant
|
||
• The operator clocks out (button_finish)
|
||
|
||
Then audits:
|
||
• Per-WO duration captured (mrp.workorder.duration)
|
||
• mrp.workcenter.productivity records exist with operator user
|
||
• Chemistry log entries on bath
|
||
• Certificate state, attachment, thickness readings
|
||
• Chain-of-custody entries on delivery
|
||
• Notification log with attachment names
|
||
• Portal job final state + SO workflow_stage
|
||
|
||
Findings printed at the end as PASS/FAIL/WARN — each FAIL/WARN is a
|
||
gap that needs fixing before this can ship to a real shop floor.
|
||
"""
|
||
from datetime import datetime
|
||
import time
|
||
import base64
|
||
|
||
env = env # noqa injected by odoo shell
|
||
from odoo import fields # noqa
|
||
|
||
|
||
def banner(label):
|
||
print(f'\n{"="*76}\n {label}\n{"="*76}')
|
||
|
||
|
||
def step(actor, action):
|
||
print(f' → [{actor:<14}] {action}')
|
||
|
||
|
||
def show(label, value):
|
||
print(f' {label:<32} {value}')
|
||
|
||
|
||
FINDINGS = []
|
||
|
||
|
||
def finding(level, area, msg):
|
||
"""level: PASS | WARN | FAIL"""
|
||
FINDINGS.append((level, area, msg))
|
||
sym = {'PASS': '✓', 'WARN': '⚠', 'FAIL': '✗'}[level]
|
||
print(f' {sym} {level:<5} [{area}] {msg}')
|
||
|
||
|
||
stamp = datetime.now().strftime('%y%m%d-%H%M%S')
|
||
|
||
# =====================================================================
|
||
banner(f'PHASE 0 — Set up cast of employees ({stamp})')
|
||
# =====================================================================
|
||
|
||
# Reuse existing users when present so we don't bloat the DB on reruns.
|
||
# Each persona gets a real res.users so with_user() exercises permission
|
||
# checks the way an operator would experience them on the iPad.
|
||
PERSONAS = {
|
||
'sandra': ('Sandra Kim', 'Sales rep / estimator'),
|
||
'carlos': ('Carlos Reyes', 'Receiving clerk'),
|
||
'hannah': ('Hannah Patel', 'Production planner / manager'),
|
||
'john': ('John Murphy', 'Masking operator'),
|
||
'maria': ('Maria Lopez', 'Rack / handler'),
|
||
'tom': ('Tom Wright', 'Plater'),
|
||
'ana': ('Ana Silva', 'De-mask / clean'),
|
||
'frank': ('Frank Bauer', 'QC / inspector'),
|
||
'dave': ('Dave Chen', 'Driver'),
|
||
'linda': ('Linda Brown', 'Accounting'),
|
||
}
|
||
|
||
users = {}
|
||
mgr_group = env.ref('fusion_plating.group_fusion_plating_manager', raise_if_not_found=False)
|
||
op_group = env.ref('fusion_plating.group_fusion_plating_operator', raise_if_not_found=False)
|
||
internal_group = env.ref('base.group_user')
|
||
for key, (name, desc) in PERSONAS.items():
|
||
login = f'fp_{key}'
|
||
u = env['res.users'].search([('login', '=', login)], limit=1)
|
||
if not u:
|
||
u = env['res.users'].sudo().create({
|
||
'name': name,
|
||
'login': login,
|
||
'email': f'{login}@enplating.example',
|
||
'group_ids': [(6, 0, [internal_group.id])],
|
||
})
|
||
# Put managers in the manager group, operators in the operator group
|
||
extra = mgr_group if key in ('hannah',) else op_group
|
||
if extra and extra not in u.group_ids:
|
||
u.sudo().write({'group_ids': [(4, extra.id)]})
|
||
users[key] = u
|
||
# Make sure each has an hr.employee record (proficiency tracking
|
||
# writes to employee records).
|
||
emp = env['hr.employee'].search([('user_id', '=', u.id)], limit=1)
|
||
if not emp:
|
||
emp = env['hr.employee'].sudo().create({
|
||
'name': name,
|
||
'user_id': u.id,
|
||
})
|
||
show(f'{key:<8}', f'{u.name} ({desc}) — uid={u.id}, emp={emp.id}')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 1 — Sandra builds a quote (estimator)')
|
||
# =====================================================================
|
||
|
||
customer = env['res.partner'].sudo().create({
|
||
'name': f'Beacon Aerospace {stamp}',
|
||
'company_type': 'company',
|
||
'email': f'orders-{stamp}@beacon.example',
|
||
'phone': '+1-416-555-0199',
|
||
'street': '500 University Ave',
|
||
'city': 'Toronto', 'zip': 'M5G 1V7',
|
||
'country_id': env.ref('base.ca').id,
|
||
})
|
||
# Net-30 default so invoices created later inherit the right schedule.
|
||
net30 = env.ref('account.account_payment_term_30days', raise_if_not_found=False)
|
||
if net30:
|
||
customer.sudo().property_payment_term_id = net30.id
|
||
|
||
# Make sure the company has a default facility so MO confirm succeeds.
|
||
co = env.company
|
||
if not co.x_fc_default_facility_id:
|
||
f = env['fusion.plating.facility'].search([('active', '=', True)], limit=1)
|
||
if f:
|
||
co.sudo().x_fc_default_facility_id = f.id
|
||
show('company default facility set', f.name)
|
||
|
||
step('SANDRA', f'Receives RFQ from {customer.name}')
|
||
|
||
rfq = env['fusion.plating.quote.request'].with_user(users['sandra']).sudo().create({
|
||
'partner_id': customer.id,
|
||
'contact_name': 'Procurement',
|
||
'contact_email': customer.email,
|
||
'company_name': customer.name,
|
||
'part_description': '<p>40 housings, AMS 2404, 50µin ENP, rush.</p>',
|
||
'quantity': 40,
|
||
'state': 'new',
|
||
})
|
||
show('RFQ', f'{rfq.name}')
|
||
|
||
step('SANDRA', 'Builds configurator quote with PO# and override price')
|
||
coating = env['fp.coating.config'].search([], limit=1)
|
||
part_cat = env['fp.part.catalog'].search([], limit=1)
|
||
po_number = f'PO-BCN-{stamp}'
|
||
quote = env['fp.quote.configurator'].with_user(users['sandra']).sudo().create({
|
||
'partner_id': customer.id,
|
||
'part_catalog_id': part_cat.id,
|
||
'coating_config_id': coating.id,
|
||
'quantity': 40,
|
||
'po_number_preliminary': po_number,
|
||
'estimator_override_price': 3200.00,
|
||
'rush_order': True,
|
||
})
|
||
result = quote.with_user(users['sandra']).sudo().action_create_quotation()
|
||
so = env['sale.order'].browse(result.get('res_id'))
|
||
show('SO', f'{so.name} ({so.amount_total:,.2f})')
|
||
finding('PASS' if so.client_order_ref == po_number else 'FAIL',
|
||
'quote→SO PO#', f'client_order_ref="{so.client_order_ref}"')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 2 — Customer accepts → SO confirm → auto-MO + portal job')
|
||
# =====================================================================
|
||
|
||
step('CUSTOMER', 'Accepts quote — Sandra confirms SO')
|
||
so.with_user(users['sandra']).sudo().action_confirm()
|
||
finding('PASS' if so.state == 'sale' else 'FAIL', 'SO confirm', f'state={so.state}')
|
||
|
||
mo = env['mrp.production'].search([('origin', '=', so.name)], limit=1)
|
||
finding('PASS' if mo else 'FAIL', 'auto-MO', mo.name if mo else 'MISSING')
|
||
if mo and mo.state == 'draft':
|
||
mo.with_user(users['hannah']).sudo().action_confirm()
|
||
finding('PASS' if mo and mo.state == 'confirmed' else 'WARN',
|
||
'MO confirm', f'state={mo.state if mo else "n/a"}')
|
||
|
||
job = mo.x_fc_portal_job_id if mo else False
|
||
finding('PASS' if job else 'FAIL', 'portal job', job.name if job else 'MISSING')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 3 — Carlos receives parts')
|
||
# =====================================================================
|
||
|
||
step('CARLOS', 'Logs receiving — 40 housings in 2 boxes from FedEx')
|
||
recv = env['fp.receiving'].with_user(users['carlos']).sudo().create({
|
||
'partner_id': customer.id,
|
||
'sale_order_id': so.id,
|
||
'received_date': fields.Datetime.now(),
|
||
'expected_qty': 40,
|
||
'carrier_name': 'FedEx',
|
||
'carrier_tracking': f'FX{stamp}',
|
||
'line_ids': [(0, 0, {
|
||
'description': '40 stainless aero housings',
|
||
'expected_qty': 40,
|
||
'received_qty': 40,
|
||
})],
|
||
})
|
||
finding('PASS' if recv.received_qty == 40 else 'FAIL',
|
||
'receiving prefill', f'expected={recv.expected_qty} received={recv.received_qty}')
|
||
|
||
step('CARLOS', 'Inspects → accepts')
|
||
recv.with_user(users['carlos']).sudo().action_start_inspection()
|
||
recv.with_user(users['carlos']).sudo().action_accept()
|
||
finding('PASS' if recv.state == 'accepted' else 'FAIL',
|
||
'receiving accept', f'state={recv.state}')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 4 — Hannah plans the job')
|
||
# =====================================================================
|
||
|
||
step('HANNAH', 'Assigns recipe + generates work orders')
|
||
recipe = env['fusion.plating.process.node'].search(
|
||
[('node_type', '=', 'recipe')], limit=1)
|
||
mo_h = mo.with_user(users['hannah']).sudo()
|
||
if not mo_h.x_fc_recipe_id:
|
||
mo_h.x_fc_recipe_id = recipe.id
|
||
mo_h._generate_workorders_from_recipe()
|
||
n_wos = len(mo.workorder_ids)
|
||
finding('PASS' if n_wos > 0 else 'FAIL', 'WOs generated', f'{n_wos} work orders from {recipe.name}')
|
||
|
||
# Map operations to operators by station/role hints
|
||
WO_OPERATORS = {
|
||
'masking': 'john',
|
||
'racking': 'maria',
|
||
'ready': 'maria',
|
||
'plating': 'tom',
|
||
'enickel': 'tom',
|
||
'nickel': 'tom',
|
||
'demask': 'ana',
|
||
'de-mask': 'ana',
|
||
'clean': 'ana',
|
||
'rinse': 'ana',
|
||
'inspect': 'frank',
|
||
'qc': 'frank',
|
||
}
|
||
|
||
step('HANNAH', 'Assigns each WO to a specific operator')
|
||
# Pick a bath + a tank for any WO that needs wet-process traceability
|
||
test_bath = env['fusion.plating.bath'].search([], limit=1)
|
||
test_tank = env['fusion.plating.tank'].search([], limit=1)
|
||
test_oven = env['fusion.plating.bake.oven'].search([], limit=1)
|
||
if not test_oven:
|
||
f0 = env['fusion.plating.facility'].search([], limit=1)
|
||
test_oven = env['fusion.plating.bake.oven'].sudo().create({
|
||
'name': 'Bake Oven 1', 'code': 'OVEN-1',
|
||
'facility_id': f0.id if f0 else False,
|
||
'target_temp_min': 350.0, 'target_temp_max': 380.0,
|
||
'chart_recorder_ref': 'CR-OVEN1-2026',
|
||
})
|
||
# Make sure the oven has a chart_recorder_ref (new gate requirement)
|
||
if test_oven and not test_oven.chart_recorder_ref:
|
||
test_oven.sudo().chart_recorder_ref = f'CR-{test_oven.code}-2026'
|
||
|
||
# Issue operator certifications for the bath's process type so the cert
|
||
# gate doesn't block legitimate operators (in real life the manager
|
||
# tracks training + issues certs; for a clean E2E we pre-issue).
|
||
Cert = env.get('fp.operator.certification')
|
||
if Cert is not None and test_bath and test_bath.process_type_id:
|
||
pt = test_bath.process_type_id
|
||
for op_key in ('john', 'maria', 'tom', 'ana', 'frank'):
|
||
emp = env['hr.employee'].search(
|
||
[('user_id', '=', users[op_key].id)], limit=1)
|
||
if not emp:
|
||
continue
|
||
existing = Cert.sudo().search([
|
||
('employee_id', '=', emp.id),
|
||
('process_type_id', '=', pt.id),
|
||
('revoked', '=', False),
|
||
], limit=1)
|
||
if not existing:
|
||
Cert.sudo().create({
|
||
'employee_id': emp.id,
|
||
'process_type_id': pt.id,
|
||
'issued_by_id': users['hannah'].id,
|
||
'notes': 'Auto-issued for E2E workforce simulation',
|
||
})
|
||
show(' certifications', f'pre-issued for {pt.name} → 5 operators')
|
||
show(' test bath', f'{test_bath.name}' if test_bath else '(none — wet-WO assignment will fail)')
|
||
show(' test tank', f'{test_tank.name}' if test_tank else '(none — wet-WO assignment will fail)')
|
||
|
||
assignments = []
|
||
wet_assignments = []
|
||
for wo in mo.workorder_ids:
|
||
name_l = (wo.name or '').lower()
|
||
operator_key = None
|
||
for kw, k in WO_OPERATORS.items():
|
||
if kw in name_l:
|
||
operator_key = k
|
||
break
|
||
operator_key = operator_key or 'john'
|
||
op_user = users[operator_key]
|
||
wo.sudo().x_fc_assigned_user_id = op_user.id
|
||
|
||
# Pin per-kind equipment using the new classifier (post inspect/mask/
|
||
# rack/bake priority fix), so Post-plate Inspection no longer gets
|
||
# bath assigned just because its name contains "plat".
|
||
kind = wo._fp_classify_kind() if hasattr(wo, '_fp_classify_kind') else 'other'
|
||
extras = f' [{kind}]'
|
||
if kind == 'wet' and test_bath and test_tank:
|
||
wo.sudo().write({
|
||
'x_fc_bath_id': test_bath.id,
|
||
'x_fc_tank_id': test_tank.id,
|
||
})
|
||
wet_assignments.append(wo)
|
||
extras = f' [WET — bath={test_bath.name}, tank={test_tank.name}]'
|
||
elif kind == 'bake' and test_oven:
|
||
wo.sudo().x_fc_oven_id = test_oven.id
|
||
extras = f' [BAKE — oven={test_oven.name}]'
|
||
elif kind == 'rack':
|
||
rack = env['fusion.plating.rack'].search([], limit=1)
|
||
if rack:
|
||
wo.sudo().x_fc_rack_id = rack.id
|
||
extras = f' [RACK — fixture={rack.name}]'
|
||
elif kind == 'mask':
|
||
wo.sudo().x_fc_masking_material = 'tape'
|
||
extras = ' [MASK — material=tape]'
|
||
|
||
assignments.append((wo, op_user, operator_key))
|
||
show(f' WO {wo.id}', f'"{wo.name}" → {op_user.name}{extras}')
|
||
|
||
assigned_count = sum(1 for w, _, _ in assignments if w.x_fc_assigned_user_id)
|
||
finding('PASS' if assigned_count == n_wos else 'FAIL',
|
||
'WO assignment', f'{assigned_count}/{n_wos} have x_fc_assigned_user_id')
|
||
|
||
wet_with_bath = sum(1 for w in wet_assignments if w.x_fc_bath_id and w.x_fc_tank_id)
|
||
finding('PASS' if (not wet_assignments) or (wet_with_bath == len(wet_assignments)) else 'FAIL',
|
||
'wet-WO bath+tank set',
|
||
f'{wet_with_bath}/{len(wet_assignments)} wet WOs have both bath + tank')
|
||
|
||
# ===== Negative tests: validation MUST block bad starts =====
|
||
banner('PHASE 4b — Negative tests: validation gates fire correctly')
|
||
|
||
# Test 1: try to start a WO with operator stripped → expect UserError
|
||
step('SYSTEM', 'Test 1 — un-assigning operator and trying to start')
|
||
test_wo = mo.workorder_ids[0]
|
||
saved_op = test_wo.x_fc_assigned_user_id.id
|
||
test_wo.sudo().x_fc_assigned_user_id = False
|
||
gate_fired = False
|
||
try:
|
||
test_wo.sudo().button_start()
|
||
except Exception as e:
|
||
gate_fired = 'Assigned Operator' in str(e) or 'required' in str(e).lower()
|
||
show(' blocked with', str(e).splitlines()[0][:120])
|
||
finding('PASS' if gate_fired else 'FAIL',
|
||
'gate: missing operator',
|
||
'blocked' if gate_fired else 'NOT blocked — validation broken')
|
||
test_wo.sudo().x_fc_assigned_user_id = saved_op
|
||
|
||
# Test 2: try to start a WET WO without bath/tank → expect UserError
|
||
if wet_assignments:
|
||
step('SYSTEM', 'Test 2 — wet WO with bath/tank stripped')
|
||
wet_wo = wet_assignments[0]
|
||
saved_bath = wet_wo.x_fc_bath_id.id
|
||
saved_tank = wet_wo.x_fc_tank_id.id
|
||
wet_wo.sudo().write({'x_fc_bath_id': False, 'x_fc_tank_id': False})
|
||
gate_fired = False
|
||
try:
|
||
wet_wo.sudo().button_start()
|
||
except Exception as e:
|
||
msg = str(e)
|
||
gate_fired = ('Bath' in msg and 'Tank' in msg) or 'required' in msg.lower()
|
||
show(' blocked with', msg.splitlines()[0][:120])
|
||
finding('PASS' if gate_fired else 'FAIL',
|
||
'gate: missing bath/tank on wet WO',
|
||
'blocked' if gate_fired else 'NOT blocked — validation broken')
|
||
wet_wo.sudo().write({
|
||
'x_fc_bath_id': saved_bath,
|
||
'x_fc_tank_id': saved_tank,
|
||
})
|
||
|
||
# ===== Negative tests for the 6 new gates (wrapped in savepoints
|
||
# so an SQL-level constraint failure doesn't abort the txn) =====
|
||
banner('PHASE 4c — Negative tests for the new compliance gates')
|
||
|
||
|
||
def neg_test(label, fn, expect_keywords):
|
||
"""Run fn() inside a savepoint; check the raised error mentions
|
||
one of `expect_keywords`. Always rolls back."""
|
||
sp_name = f'neg_{abs(hash(label))}'
|
||
env.cr.execute(f'SAVEPOINT {sp_name}')
|
||
fired = False
|
||
msg = ''
|
||
try:
|
||
fn()
|
||
except Exception as e:
|
||
msg = str(e)
|
||
low = msg.lower()
|
||
fired = any(k.lower() in low for k in expect_keywords)
|
||
finally:
|
||
env.cr.execute(f'ROLLBACK TO SAVEPOINT {sp_name}')
|
||
if msg:
|
||
show(' blocked with', msg.splitlines()[0][:120])
|
||
finding('PASS' if fired else 'FAIL',
|
||
f'gate: {label}',
|
||
'blocked' if fired else f'NOT blocked (got: {msg[:60]!r})')
|
||
|
||
|
||
# Test 3: MO confirm without facility → expect block
|
||
step('SYSTEM', 'Test 3 — MO confirm with no facility → blocked')
|
||
|
||
|
||
def t_mo_facility():
|
||
saved_default = env.company.x_fc_default_facility_id
|
||
env.company.sudo().x_fc_default_facility_id = False
|
||
fac0 = env['fusion.plating.facility'].search([('active', '=', True)])
|
||
fac0.sudo().write({'active': False})
|
||
try:
|
||
m = env['mrp.production'].sudo().create({
|
||
'product_id': mo.product_id.id,
|
||
'product_qty': 1,
|
||
'company_id': env.company.id,
|
||
})
|
||
m.action_confirm() # should raise — no facility resolvable
|
||
finally:
|
||
fac0.sudo().write({'active': True})
|
||
env.company.sudo().x_fc_default_facility_id = saved_default
|
||
|
||
|
||
neg_test('MO confirm without facility', t_mo_facility,
|
||
['facility'])
|
||
|
||
# Test 4: Cert issue without spec_reference
|
||
step('SYSTEM', 'Test 4 — Cert action_issue() without spec_reference → blocked')
|
||
|
||
|
||
def t_cert_spec():
|
||
c = env['fp.certificate'].sudo().create({
|
||
'partner_id': customer.id,
|
||
'production_id': mo.id,
|
||
'certificate_type': 'coc',
|
||
'spec_reference': False,
|
||
})
|
||
c.action_issue()
|
||
|
||
|
||
neg_test('cert issue without spec_reference', t_cert_spec,
|
||
['Spec', 'spec_reference'])
|
||
|
||
# Test 5: Delivery mark_delivered without POD
|
||
step('SYSTEM', 'Test 5 — Delivery mark_delivered() with no POD → blocked')
|
||
|
||
|
||
def t_dlv_pod():
|
||
d = env['fusion.plating.delivery'].sudo().create({
|
||
'partner_id': customer.id,
|
||
'state': 'en_route',
|
||
'company_id': env.company.id,
|
||
})
|
||
d.action_mark_delivered()
|
||
|
||
|
||
neg_test('delivery delivered without POD', t_dlv_pod,
|
||
['POD', 'Proof of Delivery'])
|
||
|
||
# Test 6: Invoice post without payment terms
|
||
step('SYSTEM', 'Test 6 — Invoice post() with no payment terms → blocked')
|
||
|
||
|
||
def t_inv_terms():
|
||
saved_term = customer.property_payment_term_id
|
||
customer.sudo().property_payment_term_id = False
|
||
try:
|
||
i = env['account.move'].sudo().create({
|
||
'move_type': 'out_invoice',
|
||
'partner_id': customer.id,
|
||
'invoice_date': fields.Date.today(),
|
||
'invoice_line_ids': [(0, 0, {
|
||
'name': 'Test plating service',
|
||
'quantity': 1,
|
||
'price_unit': 100.0,
|
||
})],
|
||
})
|
||
i.invoice_payment_term_id = False
|
||
i.action_post()
|
||
finally:
|
||
customer.sudo().property_payment_term_id = saved_term
|
||
|
||
|
||
neg_test('invoice post without payment terms', t_inv_terms,
|
||
['payment term'])
|
||
|
||
# Test 7: Thickness reading without calibration_std_ref
|
||
step('SYSTEM', 'Test 7 — Thickness reading without calibration_std_ref → blocked')
|
||
|
||
|
||
def t_thickness_cal():
|
||
env['fp.thickness.reading'].sudo().create({
|
||
'production_id': mo.id,
|
||
'reading_number': 99,
|
||
'nip_mils': 0.05,
|
||
'calibration_std_ref': False,
|
||
})
|
||
|
||
|
||
neg_test('thickness reading without cal std', t_thickness_cal,
|
||
['calibration', 'required', 'not-null', 'null value'])
|
||
|
||
# Test 8: NCR close without root cause / containment / disposition
|
||
step('SYSTEM', 'Test 8 — NCR close() with missing fields → blocked')
|
||
|
||
|
||
def t_ncr_close():
|
||
f = env['fusion.plating.facility'].search([], limit=1)
|
||
n = env['fusion.plating.ncr'].sudo().create({
|
||
'facility_id': f.id,
|
||
'description': '',
|
||
'containment': '',
|
||
'root_cause': '',
|
||
'disposition': False,
|
||
})
|
||
n.action_close()
|
||
|
||
|
||
neg_test('NCR close without RC/containment/disposition', t_ncr_close,
|
||
['Root Cause', 'Containment', 'Disposition'])
|
||
|
||
# Test 9: CAPA close without root cause analysis / action plan / verification
|
||
step('SYSTEM', 'Test 9 — CAPA close() with missing fields → blocked')
|
||
|
||
|
||
def t_capa_close():
|
||
c = env['fusion.plating.capa'].sudo().create({
|
||
'description': '',
|
||
'root_cause_analysis': '',
|
||
'action_plan': '',
|
||
})
|
||
c.action_close()
|
||
|
||
|
||
neg_test('CAPA close without analysis/plan/verification', t_capa_close,
|
||
['Root Cause Analysis', 'Action Plan', 'Verification'])
|
||
|
||
# Test 10: Discharge sample close without lab evidence
|
||
step('SYSTEM', 'Test 10 — Discharge sample close() with no lab evidence → blocked')
|
||
|
||
|
||
def t_discharge_close():
|
||
f = env['fusion.plating.facility'].search([], limit=1)
|
||
s = env['fusion.plating.discharge.sample'].sudo().create({
|
||
'facility_id': f.id,
|
||
})
|
||
s.action_close()
|
||
|
||
|
||
neg_test('discharge sample close without lab evidence', t_discharge_close,
|
||
['Lab Report', 'Results Received', 'parameter', 'Lab certificate'])
|
||
|
||
# Test 11: Invoice ref auto-fill from SO at create time
|
||
step('SYSTEM', 'Test 11 — Invoice ref auto-fills from SO.client_order_ref')
|
||
test_inv2 = env['account.move'].sudo().create({
|
||
'move_type': 'out_invoice',
|
||
'partner_id': customer.id,
|
||
'invoice_date': fields.Date.today(),
|
||
'invoice_origin': so.name,
|
||
'invoice_line_ids': [(0, 0, {
|
||
'name': 'Test', 'quantity': 1, 'price_unit': 1.0,
|
||
})],
|
||
})
|
||
finding('PASS' if test_inv2.ref == so.client_order_ref else 'FAIL',
|
||
'invoice ref auto-fills from SO',
|
||
f'ref={test_inv2.ref!r} (expected {so.client_order_ref!r})')
|
||
test_inv2.sudo().unlink()
|
||
|
||
# =====================================================================
|
||
banner('PHASE 5 — Operators run their work orders (REAL-TIME timers)')
|
||
# =====================================================================
|
||
|
||
# Pick a bath for the plating step so chemistry logging has somewhere
|
||
# to land.
|
||
bath = env['fusion.plating.bath'].search([], limit=1)
|
||
if bath:
|
||
show('test bath', f'{bath.name} (id={bath.id})')
|
||
|
||
batch = None # will hold the rack batch if batch model is present
|
||
FpBatch = env.get('fusion.plating.batch')
|
||
if FpBatch is not None and recipe:
|
||
step('HANNAH', 'Creates a rack batch for the plating step')
|
||
batch_vals = {'production_id': mo.id, 'part_count': 40}
|
||
if bath:
|
||
batch_vals['bath_id'] = bath.id
|
||
facility = env['fusion.plating.facility'].search([], limit=1)
|
||
if facility:
|
||
batch_vals['facility_id'] = facility.id
|
||
try:
|
||
batch = FpBatch.with_user(users['hannah']).sudo().create(batch_vals)
|
||
show('batch', f'{batch.name}')
|
||
except Exception as e:
|
||
finding('WARN', 'batch create', str(e))
|
||
batch = None
|
||
|
||
WO_DURATIONS_BEFORE = {wo.id: wo.duration for wo in mo.workorder_ids}
|
||
|
||
for wo, op_user, op_key in assignments:
|
||
actor = PERSONAS[op_key][0].split()[0].upper()
|
||
step(actor, f'Picks up "{wo.name}" on iPad — taps START')
|
||
wo_op = wo.with_user(op_user).sudo()
|
||
started_state = wo_op.state
|
||
try:
|
||
if wo_op.state in ('pending', 'waiting', 'ready'):
|
||
wo_op.button_start()
|
||
except Exception as e:
|
||
finding('WARN', f'WO start ({op_key})', f'{wo.name}: {e}')
|
||
continue
|
||
show(f' state', f'{started_state} → {wo_op.state}')
|
||
|
||
# Real-time work — sleep 2s for non-plating, 4s for plating
|
||
work_seconds = 4 if 'plating' in (wo.name or '').lower() else 2
|
||
show(f' working...', f'{work_seconds}s elapsed')
|
||
time.sleep(work_seconds)
|
||
|
||
# Tom logs chemistry mid-bath
|
||
if 'plating' in (wo.name or '').lower() and bath and op_key == 'tom':
|
||
step(actor, 'Logs bath chemistry while plating')
|
||
params = env['fusion.plating.bath.parameter'].search([], limit=2)
|
||
if params:
|
||
log = env['fusion.plating.bath.log'].with_user(op_user).sudo().create({
|
||
'bath_id': bath.id,
|
||
'shift': 'day',
|
||
'notes': 'Mid-bath check during E2E run',
|
||
'line_ids': [
|
||
(0, 0, {'parameter_id': p.id, 'value': 5.5})
|
||
for p in params
|
||
],
|
||
})
|
||
show(' chemistry log', f'{log.id} ({len(log.line_ids)} readings)')
|
||
else:
|
||
finding('WARN', 'chemistry', 'no fusion.plating.bath.parameter records — log skipped')
|
||
|
||
# Frank logs Fischerscope thickness readings during inspection
|
||
if 'inspect' in (wo.name or '').lower() and op_key == 'frank':
|
||
step(actor, 'Records 5 Fischerscope thickness readings')
|
||
Reading = env.get('fp.thickness.reading')
|
||
if Reading is not None:
|
||
for n, (pos, nip) in enumerate([
|
||
('Top edge', 0.0512),
|
||
('Mid surface', 0.0498),
|
||
('Bottom rim', 0.0521),
|
||
('Inner bore', 0.0489),
|
||
('Outer flange', 0.0507),
|
||
], 1):
|
||
Reading.with_user(op_user).sudo().create({
|
||
'production_id': mo.id,
|
||
'reading_number': n,
|
||
'nip_mils': nip,
|
||
'ni_percent': 90.5,
|
||
'p_percent': 9.5,
|
||
'position_label': pos,
|
||
'operator_id': op_user.id,
|
||
})
|
||
n_readings = Reading.search_count([('production_id', '=', mo.id)])
|
||
show(' thickness readings', f'{n_readings} logged for {mo.name}')
|
||
|
||
# Bake operator records actuals BEFORE pressing finish (new gate)
|
||
if hasattr(wo, '_fp_classify_kind') and wo._fp_classify_kind() == 'bake':
|
||
wo.sudo().write({
|
||
'x_fc_bake_temp': 365.0,
|
||
'x_fc_bake_duration_hours': 4.0,
|
||
})
|
||
show(' bake actuals', '365°F × 4h recorded')
|
||
|
||
step(actor, 'Taps FINISH')
|
||
try:
|
||
if wo_op.state == 'progress':
|
||
wo_op.button_finish()
|
||
except Exception as e:
|
||
finding('WARN', f'WO finish ({op_key})', f'{wo.name}: {e}')
|
||
continue
|
||
show(f' state', wo_op.state)
|
||
show(f' duration', f'{wo.duration:.2f} min')
|
||
|
||
# Tally results per WO
|
||
nonzero = sum(1 for wo in mo.workorder_ids if wo.duration > 0)
|
||
finding('PASS' if nonzero == n_wos else 'WARN',
|
||
'time tracking', f'{nonzero}/{n_wos} WOs have duration > 0')
|
||
|
||
# Check Odoo's underlying productivity records
|
||
prod_recs = env['mrp.workcenter.productivity'].sudo().search([
|
||
('workorder_id', 'in', mo.workorder_ids.ids),
|
||
])
|
||
finding('PASS' if len(prod_recs) > 0 else 'WARN',
|
||
'productivity records', f'{len(prod_recs)} mrp.workcenter.productivity rows logged')
|
||
|
||
# Per-operator productivity
|
||
distinct_operators_logged = len(set(prod_recs.mapped('user_id')))
|
||
finding('PASS' if distinct_operators_logged > 1 else 'WARN',
|
||
'per-operator productivity',
|
||
f'{distinct_operators_logged} distinct operators recorded')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 6 — Hannah closes the MO')
|
||
# =====================================================================
|
||
|
||
step('HANNAH', 'Marks MO done')
|
||
try:
|
||
mo_h.button_mark_done()
|
||
except Exception as e:
|
||
print(f' [info] mark_done: {e} — falling back')
|
||
try:
|
||
mo_h.qty_producing = mo.product_qty
|
||
mo_h._action_done()
|
||
except Exception as e2:
|
||
print(f' [info] _action_done: {e2}')
|
||
finding('PASS' if mo.state == 'done' else 'FAIL', 'MO done', f'state={mo.state}')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 7 — Frank inspects + CoC')
|
||
# =====================================================================
|
||
|
||
certs = env['fp.certificate'].search([('production_id', '=', mo.id)])
|
||
coc = certs.filtered(lambda c: c.certificate_type == 'coc')[:1]
|
||
finding('PASS' if coc else 'FAIL', 'CoC auto-create', coc.name if coc else 'MISSING')
|
||
if coc:
|
||
finding('PASS' if coc.state == 'issued' else 'WARN',
|
||
'CoC issued', f'state={coc.state}')
|
||
finding('PASS' if coc.attachment_id else 'FAIL',
|
||
'CoC PDF attached', coc.attachment_id.name if coc.attachment_id else 'MISSING')
|
||
if coc.attachment_id:
|
||
kb = len(base64.b64decode(coc.attachment_id.datas)) / 1024
|
||
finding('PASS' if kb >= 100 else 'FAIL',
|
||
'CoC PDF rich (>=100KB)', f'{kb:.1f} KB')
|
||
# Thickness readings on cert
|
||
if 'thickness_reading_ids' in coc._fields:
|
||
n_readings = len(coc.thickness_reading_ids)
|
||
finding('PASS' if n_readings > 0 else 'WARN',
|
||
'thickness readings', f'{n_readings} reading rows')
|
||
|
||
step('FRANK', 'Reviews + signs CoC (already auto-issued)')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 8 — Dave drives the delivery')
|
||
# =====================================================================
|
||
|
||
dlv = env['fusion.plating.delivery'].search(
|
||
[('partner_id', '=', customer.id)], order='id desc', limit=1)
|
||
finding('PASS' if dlv else 'FAIL', 'delivery auto-create', dlv.name if dlv else 'MISSING')
|
||
if dlv:
|
||
finding('PASS' if dlv.scheduled_date else 'WARN',
|
||
'delivery scheduled prefill', str(dlv.scheduled_date or 'empty'))
|
||
finding('PASS' if dlv.assigned_driver_id else 'WARN',
|
||
'delivery driver prefill',
|
||
dlv.assigned_driver_id.name if dlv.assigned_driver_id else 'empty')
|
||
finding('PASS' if dlv.coc_attachment_id else 'WARN',
|
||
'CoC linked to delivery',
|
||
dlv.coc_attachment_id.name if dlv.coc_attachment_id else 'missing')
|
||
|
||
step('DAVE', 'Schedules → start route → mark delivered')
|
||
try:
|
||
if dlv.state == 'draft': dlv.with_user(users['dave']).sudo().action_schedule()
|
||
if dlv.state == 'scheduled': dlv.with_user(users['dave']).sudo().action_start_route()
|
||
# POD must be captured BEFORE marking delivered (new gate)
|
||
if dlv.state == 'en_route' and not dlv.pod_id:
|
||
step('DAVE', 'Captures POD on iPad — recipient signs + photo')
|
||
POD = env['fusion.plating.proof.of.delivery']
|
||
pod = POD.with_user(users['dave']).sudo().create({
|
||
'delivery_id': dlv.id,
|
||
'partner_id': dlv.partner_id.id,
|
||
'recipient_name': 'Dock Receiver',
|
||
'notes': 'E2E sim — recipient on dock signed for parts',
|
||
})
|
||
dlv.sudo().pod_id = pod.id
|
||
show(' POD captured', f'{pod.name} (id={pod.id})')
|
||
if dlv.state == 'en_route': dlv.with_user(users['dave']).sudo().action_mark_delivered()
|
||
except Exception as e:
|
||
print(f' [info] delivery transitions: {e}')
|
||
|
||
# ===== Negative test: try to mark another delivery delivered without POD =====
|
||
finding('PASS' if dlv.pod_id else 'FAIL',
|
||
'POD captured before delivery',
|
||
f'pod_id={dlv.pod_id.name if dlv.pod_id else "NONE"}')
|
||
finding('PASS' if dlv.state == 'delivered' else 'FAIL',
|
||
'delivery final state', dlv.state)
|
||
coc_logs = env['fusion.plating.chain.of.custody'].search(
|
||
[('delivery_id', '=', dlv.id)])
|
||
finding('PASS' if len(coc_logs) >= 2 else 'WARN',
|
||
'chain of custody', f'{len(coc_logs)} entries')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 9 — Linda creates + posts invoice')
|
||
# =====================================================================
|
||
|
||
step('LINDA', 'Creates invoice from SO')
|
||
try:
|
||
inv_act = so.with_user(users['linda']).sudo()._create_invoices()
|
||
inv = inv_act if hasattr(inv_act, '_name') else env['account.move'].browse(
|
||
inv_act.get('res_id') if isinstance(inv_act, dict) else inv_act)
|
||
except Exception as e:
|
||
print(f' [info] _create_invoices: {e}')
|
||
inv = env['account.move'].search([('invoice_origin', '=', so.name)], limit=1)
|
||
|
||
if inv:
|
||
inv.invoice_date = fields.Date.today()
|
||
try:
|
||
inv.with_user(users['linda']).sudo().action_post()
|
||
except Exception as e:
|
||
finding('FAIL', 'invoice post', str(e))
|
||
finding('PASS' if inv.state == 'posted' else 'FAIL',
|
||
'invoice posted', f'state={inv.state}, payment_state={inv.payment_state}')
|
||
|
||
# =====================================================================
|
||
banner('PHASE 10 — Compliance + notification audit')
|
||
# =====================================================================
|
||
|
||
# Notification log
|
||
logs = env['fp.notification.log'].search(
|
||
[('sale_order_id', '=', so.id)], order='create_date')
|
||
events = logs.mapped('trigger_event')
|
||
EXPECTED_EVENTS = {'so_confirmed', 'parts_received', 'mo_complete',
|
||
'shipped', 'invoice_posted'}
|
||
seen = set(events)
|
||
missing = EXPECTED_EVENTS - seen
|
||
finding('PASS' if not missing else 'FAIL',
|
||
'notifications fired',
|
||
f'sent={sorted(seen)}; missing={sorted(missing) if missing else "none"}')
|
||
|
||
# Each notification has the right attachment?
|
||
for ev_log in logs:
|
||
needed = {
|
||
'so_confirmed': 'Quotation',
|
||
'shipped': 'CoC',
|
||
'invoice_posted': 'Invoice',
|
||
}
|
||
expected_in_attachments = needed.get(ev_log.trigger_event)
|
||
if expected_in_attachments:
|
||
att_names = ev_log.attachment_names or ''
|
||
ok = expected_in_attachments.lower() in att_names.lower()
|
||
finding('PASS' if ok else 'WARN',
|
||
f'{ev_log.trigger_event} attachment',
|
||
f'expected "{expected_in_attachments}" in: {att_names!r}')
|
||
|
||
# Workflow stage
|
||
finding('PASS' if so.x_fc_workflow_stage in ('complete', 'invoicing', 'paid') else 'WARN',
|
||
'final SO workflow stage', so.x_fc_workflow_stage)
|
||
|
||
# Portal job state
|
||
job_now = env['fusion.plating.portal.job'].browse(job.id) if job else None
|
||
if job_now:
|
||
finding('PASS' if job_now.state in ('shipped', 'complete') else 'WARN',
|
||
'final portal job state', job_now.state)
|
||
|
||
# Bath chemistry logged?
|
||
bath_logs_during = env['fusion.plating.bath.log'].search(
|
||
[('bath_id', '=', bath.id), ('id', '>=', max([0] + prod_recs.ids))],
|
||
limit=10) if bath else env['fusion.plating.bath.log']
|
||
recent_bath_log = env['fusion.plating.bath.log'].search([], order='id desc', limit=1)
|
||
finding('PASS' if recent_bath_log and recent_bath_log.create_date else 'WARN',
|
||
'chemistry log persisted', f'most-recent log id={recent_bath_log.id if recent_bath_log else "none"}')
|
||
|
||
# Bake window auto-created after plating? Bake-window links via lot_ref (portal job name)
|
||
BakeWin = env.get('fusion.plating.bake.window')
|
||
if BakeWin is not None and job:
|
||
bw = BakeWin.search([('lot_ref', '=', job.name)])
|
||
finding('PASS' if bw else 'WARN',
|
||
'bake window auto-created',
|
||
f'{len(bw)} record(s) for {job.name}')
|
||
|
||
# First-piece gate auto-created?
|
||
FPG = env.get('fusion.plating.first.piece.gate')
|
||
if FPG is not None:
|
||
# FPG model may not have production_id either; try common link fields
|
||
fpg = FPG.search([]) # take any recent
|
||
fpg_for_mo = fpg.filtered(
|
||
lambda g: getattr(g, 'production_id', False) and g.production_id.id == mo.id
|
||
) if 'production_id' in FPG._fields else fpg.browse([])
|
||
finding('PASS' if fpg_for_mo else 'WARN',
|
||
'first-piece gate',
|
||
f'{len(fpg_for_mo)} for MO (coating-driven; OK if 0)')
|
||
|
||
# Each operator can see their OWN assigned WOs via the tablet
|
||
# (queue is a TransientModel; tablet calls build_for_user on load)
|
||
# Reset MO to make some WOs ready/progress for queue test BEFORE this is run
|
||
# would be needed — but the queue should still work for any in-progress WOs
|
||
# elsewhere in the system that match the user.
|
||
OpQueue = env.get('fusion.plating.operator.queue')
|
||
if OpQueue is not None:
|
||
# Create a second test MO so there's a WO in 'ready' state to queue
|
||
test_mo = env['mrp.production'].search(
|
||
[('state', 'in', ('confirmed', 'progress'))], limit=1)
|
||
if test_mo and test_mo.workorder_ids:
|
||
# Force-assign a ready WO to John so we have something to surface
|
||
ready_wo = test_mo.workorder_ids.filtered(lambda w: w.state in ('ready', 'progress'))[:1]
|
||
if ready_wo:
|
||
ready_wo.sudo().x_fc_assigned_user_id = users['john'].id
|
||
for op_key, op_user in [('john', users['john']), ('tom', users['tom']),
|
||
('frank', users['frank'])]:
|
||
rows = OpQueue.with_user(op_user).sudo().build_for_user(user_id=op_user.id)
|
||
finding('PASS' if rows else 'WARN',
|
||
f'tablet queue for {op_key}',
|
||
f'{len(rows)} queue rows visible to {op_user.name}')
|
||
# Verify NONE of the rows are someone else's assigned WO
|
||
if rows:
|
||
wo_rows = rows.filtered(lambda r: r.source_model == 'mrp.workorder')
|
||
wrong = []
|
||
for r in wo_rows:
|
||
wo = env['mrp.workorder'].browse(r.source_id)
|
||
if wo.exists() and wo.x_fc_assigned_user_id and wo.x_fc_assigned_user_id != op_user:
|
||
wrong.append(wo.name)
|
||
finding('PASS' if not wrong else 'FAIL',
|
||
f'queue isolation for {op_key}',
|
||
f'leaked rows assigned to others: {wrong}' if wrong else 'no leak')
|
||
|
||
# Worker proficiency advanced for completed roles?
|
||
prof_records = env['fp.operator.proficiency'].search([
|
||
('employee_id', 'in',
|
||
env['hr.employee'].search([('user_id', 'in', list(u.id for u in users.values()))]).ids),
|
||
]) if env.get('fp.operator.proficiency') is not None else None
|
||
if prof_records is not None:
|
||
finding('PASS' if len(prof_records) > 0 else 'WARN',
|
||
'operator proficiency tracked',
|
||
f'{len(prof_records)} (employee,role) proficiency rows')
|
||
|
||
# =====================================================================
|
||
banner('SUMMARY')
|
||
# =====================================================================
|
||
|
||
passed = sum(1 for l, _, _ in FINDINGS if l == 'PASS')
|
||
warns = sum(1 for l, _, _ in FINDINGS if l == 'WARN')
|
||
fails = sum(1 for l, _, _ in FINDINGS if l == 'FAIL')
|
||
|
||
print(f' {passed} PASS / {warns} WARN / {fails} FAIL (out of {len(FINDINGS)} checks)')
|
||
print(f' customer: {customer.name}')
|
||
print(f' SO : {so.name}')
|
||
print(f' MO : {mo.name} → {mo.state}')
|
||
print(f' WOs : {n_wos}, total time = {sum(mo.workorder_ids.mapped("duration")):.2f} min')
|
||
print(f' CoC : {coc.name if coc else "(none)"}')
|
||
print(f' delivery : {dlv.name if dlv else "(none)"} → {dlv.state if dlv else "n/a"}')
|
||
print(f' invoice : {inv.name if inv else "(none)"}')
|
||
print(f' portal : {job.name if job else "(none)"} → final {job_now.state if job_now else "n/a"}')
|
||
|
||
if warns or fails:
|
||
print(f'\n ── GAPS / FAILS ──')
|
||
for level, area, msg in FINDINGS:
|
||
if level in ('WARN', 'FAIL'):
|
||
print(f' {level} [{area}] {msg}')
|
||
|
||
env.cr.commit()
|
||
print('\n → committed.\n')
|