# -*- coding: utf-8 -*- """Comprehensive E2E simulator — workforce edition. Role-plays each employee touching a job from quote → invoice. For each work order: • The assigned operator clocks in (button_start) • Real time elapses (time.sleep) • Chemistry / quality data is logged where relevant • The operator clocks out (button_finish) Then audits: • Per-WO duration captured (mrp.workorder.duration) • mrp.workcenter.productivity records exist with operator user • Chemistry log entries on bath • Certificate state, attachment, thickness readings • Chain-of-custody entries on delivery • Notification log with attachment names • Portal job final state + SO workflow_stage Findings printed at the end as PASS/FAIL/WARN — each FAIL/WARN is a gap that needs fixing before this can ship to a real shop floor. """ from datetime import datetime import time import base64 env = env # noqa injected by odoo shell from odoo import fields # noqa def banner(label): print(f'\n{"="*76}\n {label}\n{"="*76}') def step(actor, action): print(f' → [{actor:<14}] {action}') def show(label, value): print(f' {label:<32} {value}') FINDINGS = [] def finding(level, area, msg): """level: PASS | WARN | FAIL""" FINDINGS.append((level, area, msg)) sym = {'PASS': '✓', 'WARN': '⚠', 'FAIL': '✗'}[level] print(f' {sym} {level:<5} [{area}] {msg}') stamp = datetime.now().strftime('%y%m%d-%H%M%S') # ===================================================================== banner(f'PHASE 0 — Set up cast of employees ({stamp})') # ===================================================================== # Reuse existing users when present so we don't bloat the DB on reruns. # Each persona gets a real res.users so with_user() exercises permission # checks the way an operator would experience them on the iPad. PERSONAS = { 'sandra': ('Sandra Kim', 'Sales rep / estimator'), 'carlos': ('Carlos Reyes', 'Receiving clerk'), 'hannah': ('Hannah Patel', 'Production planner / manager'), 'john': ('John Murphy', 'Masking operator'), 'maria': ('Maria Lopez', 'Rack / handler'), 'tom': ('Tom Wright', 'Plater'), 'ana': ('Ana Silva', 'De-mask / clean'), 'frank': ('Frank Bauer', 'QC / inspector'), 'dave': ('Dave Chen', 'Driver'), 'linda': ('Linda Brown', 'Accounting'), } users = {} mgr_group = env.ref('fusion_plating.group_fusion_plating_manager', raise_if_not_found=False) op_group = env.ref('fusion_plating.group_fusion_plating_operator', raise_if_not_found=False) internal_group = env.ref('base.group_user') for key, (name, desc) in PERSONAS.items(): login = f'fp_{key}' u = env['res.users'].search([('login', '=', login)], limit=1) if not u: u = env['res.users'].sudo().create({ 'name': name, 'login': login, 'email': f'{login}@enplating.example', 'group_ids': [(6, 0, [internal_group.id])], }) # Put managers in the manager group, operators in the operator group extra = mgr_group if key in ('hannah',) else op_group if extra and extra not in u.group_ids: u.sudo().write({'group_ids': [(4, extra.id)]}) users[key] = u # Make sure each has an hr.employee record (proficiency tracking # writes to employee records). emp = env['hr.employee'].search([('user_id', '=', u.id)], limit=1) if not emp: emp = env['hr.employee'].sudo().create({ 'name': name, 'user_id': u.id, }) show(f'{key:<8}', f'{u.name} ({desc}) — uid={u.id}, emp={emp.id}') # ===================================================================== banner('PHASE 1 — Sandra builds a quote (estimator)') # ===================================================================== customer = env['res.partner'].sudo().create({ 'name': f'Beacon Aerospace {stamp}', 'company_type': 'company', 'email': f'orders-{stamp}@beacon.example', 'phone': '+1-416-555-0199', 'street': '500 University Ave', 'city': 'Toronto', 'zip': 'M5G 1V7', 'country_id': env.ref('base.ca').id, }) # Net-30 default so invoices created later inherit the right schedule. net30 = env.ref('account.account_payment_term_30days', raise_if_not_found=False) if net30: customer.sudo().property_payment_term_id = net30.id # Make sure the company has a default facility so MO confirm succeeds. co = env.company if not co.x_fc_default_facility_id: f = env['fusion.plating.facility'].search([('active', '=', True)], limit=1) if f: co.sudo().x_fc_default_facility_id = f.id show('company default facility set', f.name) step('SANDRA', f'Receives RFQ from {customer.name}') rfq = env['fusion.plating.quote.request'].with_user(users['sandra']).sudo().create({ 'partner_id': customer.id, 'contact_name': 'Procurement', 'contact_email': customer.email, 'company_name': customer.name, 'part_description': '
40 housings, AMS 2404, 50µin ENP, rush.
', 'quantity': 40, 'state': 'new', }) show('RFQ', f'{rfq.name}') step('SANDRA', 'Builds configurator quote with PO# and override price') coating = env['fp.coating.config'].search([], limit=1) part_cat = env['fp.part.catalog'].search([], limit=1) po_number = f'PO-BCN-{stamp}' quote = env['fp.quote.configurator'].with_user(users['sandra']).sudo().create({ 'partner_id': customer.id, 'part_catalog_id': part_cat.id, 'coating_config_id': coating.id, 'quantity': 40, 'po_number_preliminary': po_number, 'estimator_override_price': 3200.00, 'rush_order': True, }) result = quote.with_user(users['sandra']).sudo().action_create_quotation() so = env['sale.order'].browse(result.get('res_id')) show('SO', f'{so.name} ({so.amount_total:,.2f})') finding('PASS' if so.client_order_ref == po_number else 'FAIL', 'quote→SO PO#', f'client_order_ref="{so.client_order_ref}"') # ===================================================================== banner('PHASE 2 — Customer accepts → SO confirm → auto-MO + portal job') # ===================================================================== step('CUSTOMER', 'Accepts quote — Sandra confirms SO') so.with_user(users['sandra']).sudo().action_confirm() finding('PASS' if so.state == 'sale' else 'FAIL', 'SO confirm', f'state={so.state}') mo = env['mrp.production'].search([('origin', '=', so.name)], limit=1) finding('PASS' if mo else 'FAIL', 'auto-MO', mo.name if mo else 'MISSING') if mo and mo.state == 'draft': mo.with_user(users['hannah']).sudo().action_confirm() finding('PASS' if mo and mo.state == 'confirmed' else 'WARN', 'MO confirm', f'state={mo.state if mo else "n/a"}') job = mo.x_fc_portal_job_id if mo else False finding('PASS' if job else 'FAIL', 'portal job', job.name if job else 'MISSING') # ===================================================================== banner('PHASE 3 — Carlos receives parts') # ===================================================================== step('CARLOS', 'Logs receiving — 40 housings in 2 boxes from FedEx') recv = env['fp.receiving'].with_user(users['carlos']).sudo().create({ 'partner_id': customer.id, 'sale_order_id': so.id, 'received_date': fields.Datetime.now(), 'expected_qty': 40, 'carrier_name': 'FedEx', 'carrier_tracking': f'FX{stamp}', 'line_ids': [(0, 0, { 'description': '40 stainless aero housings', 'expected_qty': 40, 'received_qty': 40, })], }) finding('PASS' if recv.received_qty == 40 else 'FAIL', 'receiving prefill', f'expected={recv.expected_qty} received={recv.received_qty}') step('CARLOS', 'Inspects → accepts') recv.with_user(users['carlos']).sudo().action_start_inspection() recv.with_user(users['carlos']).sudo().action_accept() finding('PASS' if recv.state == 'accepted' else 'FAIL', 'receiving accept', f'state={recv.state}') # ===================================================================== banner('PHASE 4 — Hannah plans the job') # ===================================================================== step('HANNAH', 'Assigns recipe + generates work orders') recipe = env['fusion.plating.process.node'].search( [('node_type', '=', 'recipe')], limit=1) mo_h = mo.with_user(users['hannah']).sudo() if not mo_h.x_fc_recipe_id: mo_h.x_fc_recipe_id = recipe.id mo_h._generate_workorders_from_recipe() n_wos = len(mo.workorder_ids) finding('PASS' if n_wos > 0 else 'FAIL', 'WOs generated', f'{n_wos} work orders from {recipe.name}') # Map operations to operators by station/role hints WO_OPERATORS = { 'masking': 'john', 'racking': 'maria', 'ready': 'maria', 'plating': 'tom', 'enickel': 'tom', 'nickel': 'tom', 'demask': 'ana', 'de-mask': 'ana', 'clean': 'ana', 'rinse': 'ana', 'inspect': 'frank', 'qc': 'frank', } step('HANNAH', 'Assigns each WO to a specific operator') # Pick a bath + a tank for any WO that needs wet-process traceability test_bath = env['fusion.plating.bath'].search([], limit=1) test_tank = env['fusion.plating.tank'].search([], limit=1) test_oven = env['fusion.plating.bake.oven'].search([], limit=1) if not test_oven: f0 = env['fusion.plating.facility'].search([], limit=1) test_oven = env['fusion.plating.bake.oven'].sudo().create({ 'name': 'Bake Oven 1', 'code': 'OVEN-1', 'facility_id': f0.id if f0 else False, 'target_temp_min': 350.0, 'target_temp_max': 380.0, 'chart_recorder_ref': 'CR-OVEN1-2026', }) # Make sure the oven has a chart_recorder_ref (new gate requirement) if test_oven and not test_oven.chart_recorder_ref: test_oven.sudo().chart_recorder_ref = f'CR-{test_oven.code}-2026' # Issue operator certifications for the bath's process type so the cert # gate doesn't block legitimate operators (in real life the manager # tracks training + issues certs; for a clean E2E we pre-issue). Cert = env.get('fp.operator.certification') if Cert is not None and test_bath and test_bath.process_type_id: pt = test_bath.process_type_id for op_key in ('john', 'maria', 'tom', 'ana', 'frank'): emp = env['hr.employee'].search( [('user_id', '=', users[op_key].id)], limit=1) if not emp: continue existing = Cert.sudo().search([ ('employee_id', '=', emp.id), ('process_type_id', '=', pt.id), ('revoked', '=', False), ], limit=1) if not existing: Cert.sudo().create({ 'employee_id': emp.id, 'process_type_id': pt.id, 'issued_by_id': users['hannah'].id, 'notes': 'Auto-issued for E2E workforce simulation', }) show(' certifications', f'pre-issued for {pt.name} → 5 operators') show(' test bath', f'{test_bath.name}' if test_bath else '(none — wet-WO assignment will fail)') show(' test tank', f'{test_tank.name}' if test_tank else '(none — wet-WO assignment will fail)') assignments = [] wet_assignments = [] for wo in mo.workorder_ids: name_l = (wo.name or '').lower() operator_key = None for kw, k in WO_OPERATORS.items(): if kw in name_l: operator_key = k break operator_key = operator_key or 'john' op_user = users[operator_key] wo.sudo().x_fc_assigned_user_id = op_user.id # Pin per-kind equipment using the new classifier (post inspect/mask/ # rack/bake priority fix), so Post-plate Inspection no longer gets # bath assigned just because its name contains "plat". kind = wo._fp_classify_kind() if hasattr(wo, '_fp_classify_kind') else 'other' extras = f' [{kind}]' if kind == 'wet' and test_bath and test_tank: wo.sudo().write({ 'x_fc_bath_id': test_bath.id, 'x_fc_tank_id': test_tank.id, }) wet_assignments.append(wo) extras = f' [WET — bath={test_bath.name}, tank={test_tank.name}]' elif kind == 'bake' and test_oven: wo.sudo().x_fc_oven_id = test_oven.id extras = f' [BAKE — oven={test_oven.name}]' elif kind == 'rack': rack = env['fusion.plating.rack'].search([], limit=1) if rack: wo.sudo().x_fc_rack_id = rack.id extras = f' [RACK — fixture={rack.name}]' elif kind == 'mask': wo.sudo().x_fc_masking_material = 'tape' extras = ' [MASK — material=tape]' assignments.append((wo, op_user, operator_key)) show(f' WO {wo.id}', f'"{wo.name}" → {op_user.name}{extras}') assigned_count = sum(1 for w, _, _ in assignments if w.x_fc_assigned_user_id) finding('PASS' if assigned_count == n_wos else 'FAIL', 'WO assignment', f'{assigned_count}/{n_wos} have x_fc_assigned_user_id') wet_with_bath = sum(1 for w in wet_assignments if w.x_fc_bath_id and w.x_fc_tank_id) finding('PASS' if (not wet_assignments) or (wet_with_bath == len(wet_assignments)) else 'FAIL', 'wet-WO bath+tank set', f'{wet_with_bath}/{len(wet_assignments)} wet WOs have both bath + tank') # ===== Negative tests: validation MUST block bad starts ===== banner('PHASE 4b — Negative tests: validation gates fire correctly') # Test 1: try to start a WO with operator stripped → expect UserError step('SYSTEM', 'Test 1 — un-assigning operator and trying to start') test_wo = mo.workorder_ids[0] saved_op = test_wo.x_fc_assigned_user_id.id test_wo.sudo().x_fc_assigned_user_id = False gate_fired = False try: test_wo.sudo().button_start() except Exception as e: gate_fired = 'Assigned Operator' in str(e) or 'required' in str(e).lower() show(' blocked with', str(e).splitlines()[0][:120]) finding('PASS' if gate_fired else 'FAIL', 'gate: missing operator', 'blocked' if gate_fired else 'NOT blocked — validation broken') test_wo.sudo().x_fc_assigned_user_id = saved_op # Test 2: try to start a WET WO without bath/tank → expect UserError if wet_assignments: step('SYSTEM', 'Test 2 — wet WO with bath/tank stripped') wet_wo = wet_assignments[0] saved_bath = wet_wo.x_fc_bath_id.id saved_tank = wet_wo.x_fc_tank_id.id wet_wo.sudo().write({'x_fc_bath_id': False, 'x_fc_tank_id': False}) gate_fired = False try: wet_wo.sudo().button_start() except Exception as e: msg = str(e) gate_fired = ('Bath' in msg and 'Tank' in msg) or 'required' in msg.lower() show(' blocked with', msg.splitlines()[0][:120]) finding('PASS' if gate_fired else 'FAIL', 'gate: missing bath/tank on wet WO', 'blocked' if gate_fired else 'NOT blocked — validation broken') wet_wo.sudo().write({ 'x_fc_bath_id': saved_bath, 'x_fc_tank_id': saved_tank, }) # ===== Negative tests for the 6 new gates (wrapped in savepoints # so an SQL-level constraint failure doesn't abort the txn) ===== banner('PHASE 4c — Negative tests for the new compliance gates') def neg_test(label, fn, expect_keywords): """Run fn() inside a savepoint; check the raised error mentions one of `expect_keywords`. Always rolls back.""" sp_name = f'neg_{abs(hash(label))}' env.cr.execute(f'SAVEPOINT {sp_name}') fired = False msg = '' try: fn() except Exception as e: msg = str(e) low = msg.lower() fired = any(k.lower() in low for k in expect_keywords) finally: env.cr.execute(f'ROLLBACK TO SAVEPOINT {sp_name}') if msg: show(' blocked with', msg.splitlines()[0][:120]) finding('PASS' if fired else 'FAIL', f'gate: {label}', 'blocked' if fired else f'NOT blocked (got: {msg[:60]!r})') # Test 3: MO confirm without facility → expect block step('SYSTEM', 'Test 3 — MO confirm with no facility → blocked') def t_mo_facility(): saved_default = env.company.x_fc_default_facility_id env.company.sudo().x_fc_default_facility_id = False fac0 = env['fusion.plating.facility'].search([('active', '=', True)]) fac0.sudo().write({'active': False}) try: m = env['mrp.production'].sudo().create({ 'product_id': mo.product_id.id, 'product_qty': 1, 'company_id': env.company.id, }) m.action_confirm() # should raise — no facility resolvable finally: fac0.sudo().write({'active': True}) env.company.sudo().x_fc_default_facility_id = saved_default neg_test('MO confirm without facility', t_mo_facility, ['facility']) # Test 4: Cert issue without spec_reference step('SYSTEM', 'Test 4 — Cert action_issue() without spec_reference → blocked') def t_cert_spec(): c = env['fp.certificate'].sudo().create({ 'partner_id': customer.id, 'production_id': mo.id, 'certificate_type': 'coc', 'spec_reference': False, }) c.action_issue() neg_test('cert issue without spec_reference', t_cert_spec, ['Spec', 'spec_reference']) # Test 5: Delivery mark_delivered without POD step('SYSTEM', 'Test 5 — Delivery mark_delivered() with no POD → blocked') def t_dlv_pod(): d = env['fusion.plating.delivery'].sudo().create({ 'partner_id': customer.id, 'state': 'en_route', 'company_id': env.company.id, }) d.action_mark_delivered() neg_test('delivery delivered without POD', t_dlv_pod, ['POD', 'Proof of Delivery']) # Test 6: Invoice post without payment terms step('SYSTEM', 'Test 6 — Invoice post() with no payment terms → blocked') def t_inv_terms(): saved_term = customer.property_payment_term_id customer.sudo().property_payment_term_id = False try: i = env['account.move'].sudo().create({ 'move_type': 'out_invoice', 'partner_id': customer.id, 'invoice_date': fields.Date.today(), 'invoice_line_ids': [(0, 0, { 'name': 'Test plating service', 'quantity': 1, 'price_unit': 100.0, })], }) i.invoice_payment_term_id = False i.action_post() finally: customer.sudo().property_payment_term_id = saved_term neg_test('invoice post without payment terms', t_inv_terms, ['payment term']) # Test 7: Thickness reading without calibration_std_ref step('SYSTEM', 'Test 7 — Thickness reading without calibration_std_ref → blocked') def t_thickness_cal(): env['fp.thickness.reading'].sudo().create({ 'production_id': mo.id, 'reading_number': 99, 'nip_mils': 0.05, 'calibration_std_ref': False, }) neg_test('thickness reading without cal std', t_thickness_cal, ['calibration', 'required', 'not-null', 'null value']) # Test 8: NCR close without root cause / containment / disposition step('SYSTEM', 'Test 8 — NCR close() with missing fields → blocked') def t_ncr_close(): f = env['fusion.plating.facility'].search([], limit=1) n = env['fusion.plating.ncr'].sudo().create({ 'facility_id': f.id, 'description': '', 'containment': '', 'root_cause': '', 'disposition': False, }) n.action_close() neg_test('NCR close without RC/containment/disposition', t_ncr_close, ['Root Cause', 'Containment', 'Disposition']) # Test 9: CAPA close without root cause analysis / action plan / verification step('SYSTEM', 'Test 9 — CAPA close() with missing fields → blocked') def t_capa_close(): c = env['fusion.plating.capa'].sudo().create({ 'description': '', 'root_cause_analysis': '', 'action_plan': '', }) c.action_close() neg_test('CAPA close without analysis/plan/verification', t_capa_close, ['Root Cause Analysis', 'Action Plan', 'Verification']) # Test 10: Discharge sample close without lab evidence step('SYSTEM', 'Test 10 — Discharge sample close() with no lab evidence → blocked') def t_discharge_close(): f = env['fusion.plating.facility'].search([], limit=1) s = env['fusion.plating.discharge.sample'].sudo().create({ 'facility_id': f.id, }) s.action_close() neg_test('discharge sample close without lab evidence', t_discharge_close, ['Lab Report', 'Results Received', 'parameter', 'Lab certificate']) # Test 11: Invoice ref auto-fill from SO at create time step('SYSTEM', 'Test 11 — Invoice ref auto-fills from SO.client_order_ref') test_inv2 = env['account.move'].sudo().create({ 'move_type': 'out_invoice', 'partner_id': customer.id, 'invoice_date': fields.Date.today(), 'invoice_origin': so.name, 'invoice_line_ids': [(0, 0, { 'name': 'Test', 'quantity': 1, 'price_unit': 1.0, })], }) finding('PASS' if test_inv2.ref == so.client_order_ref else 'FAIL', 'invoice ref auto-fills from SO', f'ref={test_inv2.ref!r} (expected {so.client_order_ref!r})') test_inv2.sudo().unlink() # ===================================================================== banner('PHASE 5 — Operators run their work orders (REAL-TIME timers)') # ===================================================================== # Pick a bath for the plating step so chemistry logging has somewhere # to land. bath = env['fusion.plating.bath'].search([], limit=1) if bath: show('test bath', f'{bath.name} (id={bath.id})') batch = None # will hold the rack batch if batch model is present FpBatch = env.get('fusion.plating.batch') if FpBatch is not None and recipe: step('HANNAH', 'Creates a rack batch for the plating step') batch_vals = {'production_id': mo.id, 'part_count': 40} if bath: batch_vals['bath_id'] = bath.id facility = env['fusion.plating.facility'].search([], limit=1) if facility: batch_vals['facility_id'] = facility.id try: batch = FpBatch.with_user(users['hannah']).sudo().create(batch_vals) show('batch', f'{batch.name}') except Exception as e: finding('WARN', 'batch create', str(e)) batch = None WO_DURATIONS_BEFORE = {wo.id: wo.duration for wo in mo.workorder_ids} for wo, op_user, op_key in assignments: actor = PERSONAS[op_key][0].split()[0].upper() step(actor, f'Picks up "{wo.name}" on iPad — taps START') wo_op = wo.with_user(op_user).sudo() started_state = wo_op.state try: if wo_op.state in ('pending', 'waiting', 'ready'): wo_op.button_start() except Exception as e: finding('WARN', f'WO start ({op_key})', f'{wo.name}: {e}') continue show(f' state', f'{started_state} → {wo_op.state}') # Real-time work — sleep 2s for non-plating, 4s for plating work_seconds = 4 if 'plating' in (wo.name or '').lower() else 2 show(f' working...', f'{work_seconds}s elapsed') time.sleep(work_seconds) # Tom logs chemistry mid-bath if 'plating' in (wo.name or '').lower() and bath and op_key == 'tom': step(actor, 'Logs bath chemistry while plating') params = env['fusion.plating.bath.parameter'].search([], limit=2) if params: log = env['fusion.plating.bath.log'].with_user(op_user).sudo().create({ 'bath_id': bath.id, 'shift': 'day', 'notes': 'Mid-bath check during E2E run', 'line_ids': [ (0, 0, {'parameter_id': p.id, 'value': 5.5}) for p in params ], }) show(' chemistry log', f'{log.id} ({len(log.line_ids)} readings)') else: finding('WARN', 'chemistry', 'no fusion.plating.bath.parameter records — log skipped') # Frank logs Fischerscope thickness readings during inspection if 'inspect' in (wo.name or '').lower() and op_key == 'frank': step(actor, 'Records 5 Fischerscope thickness readings') Reading = env.get('fp.thickness.reading') if Reading is not None: for n, (pos, nip) in enumerate([ ('Top edge', 0.0512), ('Mid surface', 0.0498), ('Bottom rim', 0.0521), ('Inner bore', 0.0489), ('Outer flange', 0.0507), ], 1): Reading.with_user(op_user).sudo().create({ 'production_id': mo.id, 'reading_number': n, 'nip_mils': nip, 'ni_percent': 90.5, 'p_percent': 9.5, 'position_label': pos, 'operator_id': op_user.id, }) n_readings = Reading.search_count([('production_id', '=', mo.id)]) show(' thickness readings', f'{n_readings} logged for {mo.name}') # Bake operator records actuals BEFORE pressing finish (new gate) if hasattr(wo, '_fp_classify_kind') and wo._fp_classify_kind() == 'bake': wo.sudo().write({ 'x_fc_bake_temp': 365.0, 'x_fc_bake_duration_hours': 4.0, }) show(' bake actuals', '365°F × 4h recorded') step(actor, 'Taps FINISH') try: if wo_op.state == 'progress': wo_op.button_finish() except Exception as e: finding('WARN', f'WO finish ({op_key})', f'{wo.name}: {e}') continue show(f' state', wo_op.state) show(f' duration', f'{wo.duration:.2f} min') # Tally results per WO nonzero = sum(1 for wo in mo.workorder_ids if wo.duration > 0) finding('PASS' if nonzero == n_wos else 'WARN', 'time tracking', f'{nonzero}/{n_wos} WOs have duration > 0') # Check Odoo's underlying productivity records prod_recs = env['mrp.workcenter.productivity'].sudo().search([ ('workorder_id', 'in', mo.workorder_ids.ids), ]) finding('PASS' if len(prod_recs) > 0 else 'WARN', 'productivity records', f'{len(prod_recs)} mrp.workcenter.productivity rows logged') # Per-operator productivity distinct_operators_logged = len(set(prod_recs.mapped('user_id'))) finding('PASS' if distinct_operators_logged > 1 else 'WARN', 'per-operator productivity', f'{distinct_operators_logged} distinct operators recorded') # ===================================================================== banner('PHASE 6 — Hannah closes the MO') # ===================================================================== step('HANNAH', 'Marks MO done') try: mo_h.button_mark_done() except Exception as e: print(f' [info] mark_done: {e} — falling back') try: mo_h.qty_producing = mo.product_qty mo_h._action_done() except Exception as e2: print(f' [info] _action_done: {e2}') finding('PASS' if mo.state == 'done' else 'FAIL', 'MO done', f'state={mo.state}') # ===================================================================== banner('PHASE 7 — Frank inspects + CoC') # ===================================================================== certs = env['fp.certificate'].search([('production_id', '=', mo.id)]) coc = certs.filtered(lambda c: c.certificate_type == 'coc')[:1] finding('PASS' if coc else 'FAIL', 'CoC auto-create', coc.name if coc else 'MISSING') if coc: finding('PASS' if coc.state == 'issued' else 'WARN', 'CoC issued', f'state={coc.state}') finding('PASS' if coc.attachment_id else 'FAIL', 'CoC PDF attached', coc.attachment_id.name if coc.attachment_id else 'MISSING') if coc.attachment_id: kb = len(base64.b64decode(coc.attachment_id.datas)) / 1024 finding('PASS' if kb >= 100 else 'FAIL', 'CoC PDF rich (>=100KB)', f'{kb:.1f} KB') # Thickness readings on cert if 'thickness_reading_ids' in coc._fields: n_readings = len(coc.thickness_reading_ids) finding('PASS' if n_readings > 0 else 'WARN', 'thickness readings', f'{n_readings} reading rows') step('FRANK', 'Reviews + signs CoC (already auto-issued)') # ===================================================================== banner('PHASE 8 — Dave drives the delivery') # ===================================================================== dlv = env['fusion.plating.delivery'].search( [('partner_id', '=', customer.id)], order='id desc', limit=1) finding('PASS' if dlv else 'FAIL', 'delivery auto-create', dlv.name if dlv else 'MISSING') if dlv: finding('PASS' if dlv.scheduled_date else 'WARN', 'delivery scheduled prefill', str(dlv.scheduled_date or 'empty')) finding('PASS' if dlv.assigned_driver_id else 'WARN', 'delivery driver prefill', dlv.assigned_driver_id.name if dlv.assigned_driver_id else 'empty') finding('PASS' if dlv.coc_attachment_id else 'WARN', 'CoC linked to delivery', dlv.coc_attachment_id.name if dlv.coc_attachment_id else 'missing') step('DAVE', 'Schedules → start route → mark delivered') try: if dlv.state == 'draft': dlv.with_user(users['dave']).sudo().action_schedule() if dlv.state == 'scheduled': dlv.with_user(users['dave']).sudo().action_start_route() # POD must be captured BEFORE marking delivered (new gate) if dlv.state == 'en_route' and not dlv.pod_id: step('DAVE', 'Captures POD on iPad — recipient signs + photo') POD = env['fusion.plating.proof.of.delivery'] pod = POD.with_user(users['dave']).sudo().create({ 'delivery_id': dlv.id, 'partner_id': dlv.partner_id.id, 'recipient_name': 'Dock Receiver', 'notes': 'E2E sim — recipient on dock signed for parts', }) dlv.sudo().pod_id = pod.id show(' POD captured', f'{pod.name} (id={pod.id})') if dlv.state == 'en_route': dlv.with_user(users['dave']).sudo().action_mark_delivered() except Exception as e: print(f' [info] delivery transitions: {e}') # ===== Negative test: try to mark another delivery delivered without POD ===== finding('PASS' if dlv.pod_id else 'FAIL', 'POD captured before delivery', f'pod_id={dlv.pod_id.name if dlv.pod_id else "NONE"}') finding('PASS' if dlv.state == 'delivered' else 'FAIL', 'delivery final state', dlv.state) coc_logs = env['fusion.plating.chain.of.custody'].search( [('delivery_id', '=', dlv.id)]) finding('PASS' if len(coc_logs) >= 2 else 'WARN', 'chain of custody', f'{len(coc_logs)} entries') # ===================================================================== banner('PHASE 9 — Linda creates + posts invoice') # ===================================================================== step('LINDA', 'Creates invoice from SO') try: inv_act = so.with_user(users['linda']).sudo()._create_invoices() inv = inv_act if hasattr(inv_act, '_name') else env['account.move'].browse( inv_act.get('res_id') if isinstance(inv_act, dict) else inv_act) except Exception as e: print(f' [info] _create_invoices: {e}') inv = env['account.move'].search([('invoice_origin', '=', so.name)], limit=1) if inv: inv.invoice_date = fields.Date.today() try: inv.with_user(users['linda']).sudo().action_post() except Exception as e: finding('FAIL', 'invoice post', str(e)) finding('PASS' if inv.state == 'posted' else 'FAIL', 'invoice posted', f'state={inv.state}, payment_state={inv.payment_state}') # ===================================================================== banner('PHASE 10 — Compliance + notification audit') # ===================================================================== # Notification log logs = env['fp.notification.log'].search( [('sale_order_id', '=', so.id)], order='create_date') events = logs.mapped('trigger_event') EXPECTED_EVENTS = {'so_confirmed', 'parts_received', 'mo_complete', 'shipped', 'invoice_posted'} seen = set(events) missing = EXPECTED_EVENTS - seen finding('PASS' if not missing else 'FAIL', 'notifications fired', f'sent={sorted(seen)}; missing={sorted(missing) if missing else "none"}') # Each notification has the right attachment? for ev_log in logs: needed = { 'so_confirmed': 'Quotation', 'shipped': 'CoC', 'invoice_posted': 'Invoice', } expected_in_attachments = needed.get(ev_log.trigger_event) if expected_in_attachments: att_names = ev_log.attachment_names or '' ok = expected_in_attachments.lower() in att_names.lower() finding('PASS' if ok else 'WARN', f'{ev_log.trigger_event} attachment', f'expected "{expected_in_attachments}" in: {att_names!r}') # Workflow stage finding('PASS' if so.x_fc_workflow_stage in ('complete', 'invoicing', 'paid') else 'WARN', 'final SO workflow stage', so.x_fc_workflow_stage) # Portal job state job_now = env['fusion.plating.portal.job'].browse(job.id) if job else None if job_now: finding('PASS' if job_now.state in ('shipped', 'complete') else 'WARN', 'final portal job state', job_now.state) # Bath chemistry logged? bath_logs_during = env['fusion.plating.bath.log'].search( [('bath_id', '=', bath.id), ('id', '>=', max([0] + prod_recs.ids))], limit=10) if bath else env['fusion.plating.bath.log'] recent_bath_log = env['fusion.plating.bath.log'].search([], order='id desc', limit=1) finding('PASS' if recent_bath_log and recent_bath_log.create_date else 'WARN', 'chemistry log persisted', f'most-recent log id={recent_bath_log.id if recent_bath_log else "none"}') # Bake window auto-created after plating? Bake-window links via lot_ref (portal job name) BakeWin = env.get('fusion.plating.bake.window') if BakeWin is not None and job: bw = BakeWin.search([('lot_ref', '=', job.name)]) finding('PASS' if bw else 'WARN', 'bake window auto-created', f'{len(bw)} record(s) for {job.name}') # First-piece gate auto-created? FPG = env.get('fusion.plating.first.piece.gate') if FPG is not None: # FPG model may not have production_id either; try common link fields fpg = FPG.search([]) # take any recent fpg_for_mo = fpg.filtered( lambda g: getattr(g, 'production_id', False) and g.production_id.id == mo.id ) if 'production_id' in FPG._fields else fpg.browse([]) finding('PASS' if fpg_for_mo else 'WARN', 'first-piece gate', f'{len(fpg_for_mo)} for MO (coating-driven; OK if 0)') # Each operator can see their OWN assigned WOs via the tablet # (queue is a TransientModel; tablet calls build_for_user on load) # Reset MO to make some WOs ready/progress for queue test BEFORE this is run # would be needed — but the queue should still work for any in-progress WOs # elsewhere in the system that match the user. OpQueue = env.get('fusion.plating.operator.queue') if OpQueue is not None: # Create a second test MO so there's a WO in 'ready' state to queue test_mo = env['mrp.production'].search( [('state', 'in', ('confirmed', 'progress'))], limit=1) if test_mo and test_mo.workorder_ids: # Force-assign a ready WO to John so we have something to surface ready_wo = test_mo.workorder_ids.filtered(lambda w: w.state in ('ready', 'progress'))[:1] if ready_wo: ready_wo.sudo().x_fc_assigned_user_id = users['john'].id for op_key, op_user in [('john', users['john']), ('tom', users['tom']), ('frank', users['frank'])]: rows = OpQueue.with_user(op_user).sudo().build_for_user(user_id=op_user.id) finding('PASS' if rows else 'WARN', f'tablet queue for {op_key}', f'{len(rows)} queue rows visible to {op_user.name}') # Verify NONE of the rows are someone else's assigned WO if rows: wo_rows = rows.filtered(lambda r: r.source_model == 'mrp.workorder') wrong = [] for r in wo_rows: wo = env['mrp.workorder'].browse(r.source_id) if wo.exists() and wo.x_fc_assigned_user_id and wo.x_fc_assigned_user_id != op_user: wrong.append(wo.name) finding('PASS' if not wrong else 'FAIL', f'queue isolation for {op_key}', f'leaked rows assigned to others: {wrong}' if wrong else 'no leak') # Worker proficiency advanced for completed roles? prof_records = env['fp.operator.proficiency'].search([ ('employee_id', 'in', env['hr.employee'].search([('user_id', 'in', list(u.id for u in users.values()))]).ids), ]) if env.get('fp.operator.proficiency') is not None else None if prof_records is not None: finding('PASS' if len(prof_records) > 0 else 'WARN', 'operator proficiency tracked', f'{len(prof_records)} (employee,role) proficiency rows') # ===================================================================== banner('SUMMARY') # ===================================================================== passed = sum(1 for l, _, _ in FINDINGS if l == 'PASS') warns = sum(1 for l, _, _ in FINDINGS if l == 'WARN') fails = sum(1 for l, _, _ in FINDINGS if l == 'FAIL') print(f' {passed} PASS / {warns} WARN / {fails} FAIL (out of {len(FINDINGS)} checks)') print(f' customer: {customer.name}') print(f' SO : {so.name}') print(f' MO : {mo.name} → {mo.state}') print(f' WOs : {n_wos}, total time = {sum(mo.workorder_ids.mapped("duration")):.2f} min') print(f' CoC : {coc.name if coc else "(none)"}') print(f' delivery : {dlv.name if dlv else "(none)"} → {dlv.state if dlv else "n/a"}') print(f' invoice : {inv.name if inv else "(none)"}') print(f' portal : {job.name if job else "(none)"} → final {job_now.state if job_now else "n/a"}') if warns or fails: print(f'\n ── GAPS / FAILS ──') for level, area, msg in FINDINGS: if level in ('WARN', 'FAIL'): print(f' {level} [{area}] {msg}') env.cr.commit() print('\n → committed.\n')