Files
gsinghpal f9fab699d4 feat(jobs): Phase 7 — migration script + legacy id fields
Adds legacy_mrp_production_id (Integer index) on fp.job and
legacy_mrp_workorder_id on fp.job.step. Used as the idempotency
key during cutover migration.

Three scripts under fusion_plating_jobs/scripts/:
- audit_pre_migration.py   — counts and data-quality concerns BEFORE
- migrate_to_fp_jobs.py    — copies MO->fp.job, WO->fp.job.step, time
                             logs, rebinds cross-refs (batches,
                             holds, certs, readings, portals,
                             inspections, deliveries). Idempotent.
- audit_post_migration.py  — counts and verifies AFTER

Migration is run manually from \`odoo shell\` at cutover (not as
auto post-migration hook, for safety). README explains usage.

Tests verify the legacy id fields exist and the migration script
files are well-formed Python.

Manifest 19.0.1.9.0 -> 19.0.2.0.0.

Part of: native job model migration (spec 2026-04-25)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 00:15:23 -04:00

171 lines
5.5 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# Post-migration audit. Verifies migration counts match expectations.
# Read-only — does NOT modify data. Run from `odoo shell`.
import logging
_logger = logging.getLogger('fp_jobs_migration')
def run(env):
"""Compare row counts between source MRP tables and target fp.job
/ fp.job.step tables, plus dependent-model x_fc_*_id linkage.
"""
cr = env.cr
print('=== Post-migration audit ===')
cr.execute("SELECT COUNT(*) FROM fp_job")
job_total = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_job WHERE legacy_mrp_production_id IS NOT NULL"
)
job_migrated = cr.fetchone()[0]
cr.execute("SELECT COUNT(*) FROM mrp_production")
mo_total = cr.fetchone()[0]
print(
'mrp.production: %d, fp.job: %d (migrated: %d)'
% (mo_total, job_total, job_migrated)
)
if job_migrated < mo_total:
print('WARNING: %d MOs not migrated' % (mo_total - job_migrated))
cr.execute("SELECT COUNT(*) FROM fp_job_step")
step_total = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_job_step WHERE legacy_mrp_workorder_id IS NOT NULL"
)
step_migrated = cr.fetchone()[0]
cr.execute("SELECT COUNT(*) FROM mrp_workorder")
wo_total = cr.fetchone()[0]
print(
'mrp.workorder: %d, fp.job.step: %d (migrated: %d)'
% (wo_total, step_total, step_migrated)
)
if step_migrated < wo_total:
print('WARNING: %d WOs not migrated' % (wo_total - step_migrated))
# Cross-references — for each dependent model, show counts of records
# with the LEGACY production_id set vs the NEW x_fc_job_id set. After
# migration, the second column should match the first (we don't clear
# production_id during shadow period).
if 'fp.quality.hold' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_quality_hold WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_quality_hold WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fp.quality.hold: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fusion.plating.quality.hold' in env:
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_quality_hold "
"WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_quality_hold "
"WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fusion.plating.quality.hold: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fp.certificate' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_certificate WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_certificate WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fp.certificate: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fp.thickness.reading' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_thickness_reading "
"WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_thickness_reading "
"WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fp.thickness.reading: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fusion.plating.batch' in env:
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_batch "
"WHERE workorder_id IS NOT NULL"
)
with_wo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_batch "
"WHERE x_fc_step_id IS NOT NULL"
)
with_step = cr.fetchone()[0]
print(
'fusion.plating.batch: with workorder_id=%d, with x_fc_step_id=%d'
% (with_wo, with_step)
)
if 'fp.racking.inspection' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_racking_inspection "
"WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_racking_inspection "
"WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fp.racking.inspection: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fusion.plating.delivery' in env:
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_delivery "
"WHERE job_ref IS NOT NULL"
)
with_ref = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_delivery "
"WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fusion.plating.delivery: with job_ref=%d, with x_fc_job_id=%d'
% (with_ref, with_job)
)
print('=== End post-migration audit ===')
try:
run(env) # noqa: F821 — `env` is provided by odoo shell
except NameError:
print(
'This script expects to run inside `odoo shell` where `env` is defined.'
)