feat(jobs): Phase 7 — migration script + legacy id fields

Adds legacy_mrp_production_id (Integer index) on fp.job and
legacy_mrp_workorder_id on fp.job.step. Used as the idempotency
key during cutover migration.

Three scripts under fusion_plating_jobs/scripts/:
- audit_pre_migration.py   — counts and data-quality concerns BEFORE
- migrate_to_fp_jobs.py    — copies MO->fp.job, WO->fp.job.step, time
                             logs, rebinds cross-refs (batches,
                             holds, certs, readings, portals,
                             inspections, deliveries). Idempotent.
- audit_post_migration.py  — counts and verifies AFTER

Migration is run manually from \`odoo shell\` at cutover (not as
auto post-migration hook, for safety). README explains usage.

Tests verify the legacy id fields exist and the migration script
files are well-formed Python.

Manifest 19.0.1.9.0 -> 19.0.2.0.0.

Part of: native job model migration (spec 2026-04-25)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gsinghpal
2026-04-25 00:15:23 -04:00
parent 71376228cb
commit f9fab699d4
8 changed files with 901 additions and 1 deletions

View File

@@ -3,7 +3,7 @@
# License OPL-1 (Odoo Proprietary License v1.0)
{
'name': 'Fusion Plating — Native Jobs',
'version': '19.0.1.9.0',
'version': '19.0.2.0.0',
'category': 'Manufacturing/Plating',
'summary': 'Native plating job model — replaces mrp.production / mrp.workorder bridge.',
'description': """

View File

@@ -48,6 +48,17 @@ class FpJob(models.Model):
'job_id',
string='Recipe Overrides',
)
# Phase 7 — migration idempotency key. Populated by
# scripts/migrate_to_fp_jobs.py to mark a fp.job as the mirror of a
# specific mrp.production. Used to skip already-migrated MOs on
# subsequent runs. Cleared after the 2-week shadow period.
legacy_mrp_production_id = fields.Integer(
string='Legacy MRP Production ID',
index=True,
help='Database id of the source mrp.production record this job '
'was migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)
# ------------------------------------------------------------------
# Recipe → fp.job.step generation (Task 2.4)
@@ -485,3 +496,21 @@ class FpJob(models.Model):
_logger.warning(
"Job %s: failed to auto-create cert: %s", self.name, e,
)
class FpJobStep(models.Model):
"""Phase 7 — adds the migration idempotency key on fp.job.step.
Populated by scripts/migrate_to_fp_jobs.py to mark a step as the
mirror of a specific mrp.workorder. Used to skip already-migrated
WOs on subsequent runs.
"""
_inherit = 'fp.job.step'
legacy_mrp_workorder_id = fields.Integer(
string='Legacy MRP Work Order ID',
index=True,
help='Database id of the source mrp.workorder this step was '
'migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)

View File

@@ -0,0 +1,51 @@
# Native job migration scripts
## migrate_to_fp_jobs.py
Copies live `mrp.production` / `mrp.workorder` records into the native
`fp.job` / `fp.job.step` model. Idempotent — safe to run multiple times.
### Usage
Run from the host (e.g. entech) using `odoo shell`:
```bash
ssh pve-worker5 "pct exec 111 -- bash -c 'su - odoo -s /bin/bash -c \"/usr/bin/odoo shell -c /etc/odoo/odoo.conf -d admin\" < /mnt/extra-addons/custom/fusion_plating_jobs/scripts/migrate_to_fp_jobs.py'"
```
Or interactively from `odoo shell` (Python `exec` builtin, not a shell call):
```python
exec(open('/mnt/extra-addons/custom/fusion_plating_jobs/scripts/migrate_to_fp_jobs.py').read())
```
### What it does
1. For every `mrp.production` record, creates a parallel `fp.job` with the same name and fields. Skips MOs that already have a fp.job mirror (`fp.job.legacy_mrp_production_id == mo.id`).
2. For every `mrp.workorder` record, creates a parallel `fp.job.step`. Skips already-migrated WOs.
3. Migrates `mrp.workorder.time_ids` to `fp.job.step.timelog`.
4. Rebinds cross-references on dependent models (batches, holds, certs, deliveries, portal jobs, racking inspections).
5. Audit log written to `/tmp/fp_jobs_migration.log` and to a chatter post on each migrated job.
### Safety
- Idempotent. Re-running skips already-migrated records.
- Read-only on legacy MO/WO records. Original data untouched.
- Cross-reference rebinds add new x_fc_job_id / x_fc_step_id values without removing legacy production_id / workorder_id values. Both stay populated for the 2-week shadow period.
- Wrap in a transaction (default for `odoo shell`); if anything fails, rollback.
### Pre-migration audit
Run `audit_pre_migration.py` first to see what's about to happen. The
script uses Python's `exec` builtin to load the file inside the running
shell session — no shell exec involved.
Reports counts of MO/WO/dependent records and any data-quality concerns
(MOs with no recipe, WOs with no work centre, etc).
### Post-migration audit
Run `audit_post_migration.py` after to verify counts match.
Reports row counts on fp.job, fp.job.step, and confirms all dependent
records have new x_fc_*_id values.

View File

@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# This package holds standalone migration / audit scripts for the native
# job model rollout. Scripts under this directory are NOT imported at
# module load time — they are invoked manually from `odoo shell` by the
# cutover engineer. See README.md in this directory for usage.

View File

@@ -0,0 +1,170 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# Post-migration audit. Verifies migration counts match expectations.
# Read-only — does NOT modify data. Run from `odoo shell`.
import logging
_logger = logging.getLogger('fp_jobs_migration')
def run(env):
"""Compare row counts between source MRP tables and target fp.job
/ fp.job.step tables, plus dependent-model x_fc_*_id linkage.
"""
cr = env.cr
print('=== Post-migration audit ===')
cr.execute("SELECT COUNT(*) FROM fp_job")
job_total = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_job WHERE legacy_mrp_production_id IS NOT NULL"
)
job_migrated = cr.fetchone()[0]
cr.execute("SELECT COUNT(*) FROM mrp_production")
mo_total = cr.fetchone()[0]
print(
'mrp.production: %d, fp.job: %d (migrated: %d)'
% (mo_total, job_total, job_migrated)
)
if job_migrated < mo_total:
print('WARNING: %d MOs not migrated' % (mo_total - job_migrated))
cr.execute("SELECT COUNT(*) FROM fp_job_step")
step_total = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_job_step WHERE legacy_mrp_workorder_id IS NOT NULL"
)
step_migrated = cr.fetchone()[0]
cr.execute("SELECT COUNT(*) FROM mrp_workorder")
wo_total = cr.fetchone()[0]
print(
'mrp.workorder: %d, fp.job.step: %d (migrated: %d)'
% (wo_total, step_total, step_migrated)
)
if step_migrated < wo_total:
print('WARNING: %d WOs not migrated' % (wo_total - step_migrated))
# Cross-references — for each dependent model, show counts of records
# with the LEGACY production_id set vs the NEW x_fc_job_id set. After
# migration, the second column should match the first (we don't clear
# production_id during shadow period).
if 'fp.quality.hold' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_quality_hold WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_quality_hold WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fp.quality.hold: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fusion.plating.quality.hold' in env:
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_quality_hold "
"WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_quality_hold "
"WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fusion.plating.quality.hold: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fp.certificate' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_certificate WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_certificate WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fp.certificate: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fp.thickness.reading' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_thickness_reading "
"WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_thickness_reading "
"WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fp.thickness.reading: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fusion.plating.batch' in env:
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_batch "
"WHERE workorder_id IS NOT NULL"
)
with_wo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_batch "
"WHERE x_fc_step_id IS NOT NULL"
)
with_step = cr.fetchone()[0]
print(
'fusion.plating.batch: with workorder_id=%d, with x_fc_step_id=%d'
% (with_wo, with_step)
)
if 'fp.racking.inspection' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_racking_inspection "
"WHERE production_id IS NOT NULL"
)
with_mo = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fp_racking_inspection "
"WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fp.racking.inspection: with production_id=%d, with x_fc_job_id=%d'
% (with_mo, with_job)
)
if 'fusion.plating.delivery' in env:
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_delivery "
"WHERE job_ref IS NOT NULL"
)
with_ref = cr.fetchone()[0]
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_delivery "
"WHERE x_fc_job_id IS NOT NULL"
)
with_job = cr.fetchone()[0]
print(
'fusion.plating.delivery: with job_ref=%d, with x_fc_job_id=%d'
% (with_ref, with_job)
)
print('=== End post-migration audit ===')
try:
run(env) # noqa: F821 — `env` is provided by odoo shell
except NameError:
print(
'This script expects to run inside `odoo shell` where `env` is defined.'
)

View File

@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# Pre-migration audit. Reports row counts and data-quality concerns
# before running migrate_to_fp_jobs.py. Read-only — does NOT modify data.
#
# Run from `odoo shell` where `env` is in scope. See ./README.md.
import logging
_logger = logging.getLogger('fp_jobs_migration')
def run(env):
"""Print a snapshot of what migrate_to_fp_jobs.py would touch.
All queries are SELECT-only. Safe to run on production at any time.
"""
cr = env.cr
print('=== Pre-migration audit ===')
# Core MRP counts
cr.execute("SELECT COUNT(*) FROM mrp_production")
mo_total = cr.fetchone()[0]
print('mrp.production total:', mo_total)
cr.execute("SELECT state, COUNT(*) FROM mrp_production GROUP BY state ORDER BY 1")
print('mrp.production by state:', cr.fetchall())
cr.execute("SELECT COUNT(*) FROM mrp_workorder")
wo_total = cr.fetchone()[0]
print('mrp.workorder total:', wo_total)
cr.execute("SELECT state, COUNT(*) FROM mrp_workorder GROUP BY state ORDER BY 1")
print('mrp.workorder by state:', cr.fetchall())
# Already migrated?
cr.execute("SELECT COUNT(*) FROM fp_job")
job_total = cr.fetchone()[0]
print('fp.job already exists:', job_total)
cr.execute("SELECT COUNT(*) FROM fp_job_step")
step_total = cr.fetchone()[0]
print('fp.job.step already exists:', step_total)
# Data quality
if 'x_fc_recipe_id' in env['mrp.production']._fields:
cr.execute(
"SELECT COUNT(*) FROM mrp_production WHERE x_fc_recipe_id IS NULL"
)
no_recipe = cr.fetchone()[0]
print('MOs without x_fc_recipe_id:', no_recipe)
cr.execute(
"SELECT COUNT(*) FROM mrp_workorder WHERE workcenter_id IS NULL"
)
no_wc = cr.fetchone()[0]
print('WOs without workcenter_id:', no_wc)
# Dependent records — check by model registry (truthful even when
# the schema names differ from defaults).
if 'fp.quality.hold' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_quality_hold WHERE production_id IS NOT NULL"
)
print('fp.quality.hold rows with production_id:', cr.fetchone()[0])
if 'fp.certificate' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_certificate WHERE production_id IS NOT NULL"
)
print('fp.certificate rows with production_id:', cr.fetchone()[0])
if 'fp.thickness.reading' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_thickness_reading WHERE production_id IS NOT NULL"
)
print(
'fp.thickness.reading rows with production_id:',
cr.fetchone()[0],
)
if 'fusion.plating.batch' in env:
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_batch WHERE workorder_id IS NOT NULL"
)
print(
'fusion.plating.batch rows with workorder_id:',
cr.fetchone()[0],
)
if 'fusion.plating.portal.job' in env:
cr.execute("SELECT COUNT(*) FROM fusion_plating_portal_job")
print('fusion.plating.portal.job total:', cr.fetchone()[0])
if 'fp.racking.inspection' in env:
cr.execute(
"SELECT COUNT(*) FROM fp_racking_inspection WHERE production_id IS NOT NULL"
)
print(
'fp.racking.inspection rows with production_id:',
cr.fetchone()[0],
)
if 'fusion.plating.delivery' in env:
cr.execute(
"SELECT COUNT(*) FROM fusion_plating_delivery WHERE job_ref IS NOT NULL"
)
print(
'fusion.plating.delivery rows with job_ref:',
cr.fetchone()[0],
)
print('=== End pre-migration audit ===')
# Run when the script is exec'd from odoo shell (env is in scope).
try:
run(env) # noqa: F821 — `env` is provided by odoo shell
except NameError:
print('This script expects to run inside `odoo shell` where `env` is defined.')

View File

@@ -0,0 +1,466 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# Native job migration: copies mrp.production / mrp.workorder records
# into fp.job / fp.job.step. Idempotent. Run from `odoo shell`.
#
# Strategy:
# 1. Verify the legacy_mrp_production_id / legacy_mrp_workorder_id
# idempotency-key fields exist on fp.job / fp.job.step. If missing,
# bail (the user must upgrade fusion_plating_jobs first).
# 2. For each MO: skip if already mirrored; else create fp.job with
# same name, partner, qty, dates, state, etc.
# 3. For each WO under MO: skip if already mirrored; else create
# fp.job.step with same name, work centre (mapped via legacy
# code), sequence, durations, state.
# 4. Time logs: copy mrp.workorder.time_ids if available.
# 5. Rebind cross-references on dependent models (defensive — only
# writes a value when the field exists on both sides AND the
# target field is currently empty).
# 6. Write audit log to /tmp/fp_jobs_migration.log.
#
# This is NOT an Odoo upgrade hook — it is an explicit cutover step.
# Run from `odoo shell -d <db>` so the surrounding transaction can be
# rolled back manually if the operator spots a problem (`env.cr.rollback()`).
# At the end of run() we env.cr.commit() — the operator can comment that
# out if they want to inspect changes before persisting.
#
# See ./README.md for usage.
import logging
from datetime import datetime
_logger = logging.getLogger('fp_jobs_migration')
# Map of mrp.production.state -> fp.job.state.
# fp.job.state values are defined in fusion_plating core (Phase 1 spec).
JOB_STATE_MAP = {
'draft': 'draft',
'confirmed': 'confirmed',
'progress': 'in_progress',
'to_close': 'in_progress',
'done': 'done',
'cancel': 'cancelled',
}
# Map of mrp.workorder.state -> fp.job.step.state
STEP_STATE_MAP = {
'pending': 'pending',
'waiting': 'pending',
'ready': 'ready',
'progress': 'in_progress',
'done': 'done',
'cancel': 'cancelled',
}
def map_work_centre(env, mrp_wc):
"""Find the fp.work.centre that corresponds to a mrp.workcenter.
Strategy: match by code. If no match, return False (the step will
have no work centre — operator can fix manually post-cutover).
"""
if not mrp_wc:
return False
if not mrp_wc.code:
return False
fp_wc = env['fp.work.centre'].search(
[('code', '=', mrp_wc.code)], limit=1,
)
return fp_wc.id if fp_wc else False
def _resolve_partner(env, mo):
"""Best-effort partner lookup for the MO.
Order of preference:
1. mo.x_fc_customer_id (some custom modules add this)
2. partner from sale.order matching mo.origin
3. False (caller decides whether to error)
"""
if 'x_fc_customer_id' in mo._fields and mo.x_fc_customer_id:
return mo.x_fc_customer_id.id
if mo.origin:
so = env['sale.order'].search([('name', '=', mo.origin)], limit=1)
if so:
return so.partner_id.id
return False
def migrate_mo(env, mo, audit):
"""Migrate one mrp.production -> fp.job. Idempotent."""
Job = env['fp.job']
existing = Job.search(
[('legacy_mrp_production_id', '=', mo.id)], limit=1,
)
if existing:
audit['mo_skipped'] += 1
return existing
vals = {
'name': mo.name, # preserve WH/MO/00033 format
'partner_id': _resolve_partner(env, mo),
'product_id': mo.product_id.id if mo.product_id else False,
'qty': mo.product_qty,
'date_deadline': mo.date_deadline,
'date_planned_start': mo.date_start,
'date_finished': mo.date_finished,
'origin': mo.origin,
'state': JOB_STATE_MAP.get(mo.state, 'draft'),
'legacy_mrp_production_id': mo.id,
}
# Optional fields — only set when the source has them
if 'x_fc_facility_id' in mo._fields and mo.x_fc_facility_id:
if 'facility_id' in Job._fields:
vals['facility_id'] = mo.x_fc_facility_id.id
if 'x_fc_manager_id' in mo._fields and mo.x_fc_manager_id:
if 'manager_id' in Job._fields:
vals['manager_id'] = mo.x_fc_manager_id.id
if 'x_fc_recipe_id' in mo._fields and mo.x_fc_recipe_id:
if 'recipe_id' in Job._fields:
vals['recipe_id'] = mo.x_fc_recipe_id.id
if 'x_fc_portal_job_id' in mo._fields and mo.x_fc_portal_job_id:
if 'portal_job_id' in Job._fields:
vals['portal_job_id'] = mo.x_fc_portal_job_id.id
if 'x_fc_part_catalog_id' in mo._fields and mo.x_fc_part_catalog_id:
if 'part_catalog_id' in Job._fields:
vals['part_catalog_id'] = mo.x_fc_part_catalog_id.id
if 'x_fc_coating_config_id' in mo._fields and mo.x_fc_coating_config_id:
if 'coating_config_id' in Job._fields:
vals['coating_config_id'] = mo.x_fc_coating_config_id.id
# Bypass any auto-create lifecycle hooks while migrating — the source
# MO already had its hooks run when it was originally created. We
# don't want a second portal job / racking inspection / etc.
job = Job.with_context(
fp_jobs_migration=True,
tracking_disable=True,
mail_create_nosubscribe=True,
mail_create_nolog=True,
).create(vals)
audit['mo_migrated'] += 1
audit['jobs_created'].append(job.id)
return job
def migrate_wo(env, wo, job, audit):
"""Migrate one mrp.workorder -> fp.job.step. Idempotent."""
Step = env['fp.job.step']
existing = Step.search(
[('legacy_mrp_workorder_id', '=', wo.id)], limit=1,
)
if existing:
audit['wo_skipped'] += 1
return existing
wc_id = map_work_centre(env, wo.workcenter_id)
vals = {
'job_id': job.id,
'name': wo.name,
'sequence': wo.sequence or 10,
'state': STEP_STATE_MAP.get(wo.state, 'pending'),
'work_centre_id': wc_id,
'duration_expected': wo.duration_expected or 0.0,
'duration_actual': wo.duration or 0.0,
'date_started': wo.date_start,
'date_finished': wo.date_finished,
'legacy_mrp_workorder_id': wo.id,
}
if 'x_fc_recipe_node_id' in wo._fields and wo.x_fc_recipe_node_id:
if 'recipe_node_id' in Step._fields:
vals['recipe_node_id'] = wo.x_fc_recipe_node_id.id
if 'x_fc_assigned_user_id' in wo._fields and wo.x_fc_assigned_user_id:
if 'assigned_user_id' in Step._fields:
vals['assigned_user_id'] = wo.x_fc_assigned_user_id.id
if 'x_fc_thickness_target' in wo._fields and wo.x_fc_thickness_target:
if 'thickness_target' in Step._fields:
vals['thickness_target'] = wo.x_fc_thickness_target
if 'x_fc_dwell_time_minutes' in wo._fields and wo.x_fc_dwell_time_minutes:
if 'dwell_time_minutes' in Step._fields:
vals['dwell_time_minutes'] = wo.x_fc_dwell_time_minutes
step = Step.with_context(
fp_jobs_migration=True,
tracking_disable=True,
).create(vals)
audit['wo_migrated'] += 1
# Migrate time logs — only if both sides have a time-log model
if 'time_ids' in wo._fields and wo.time_ids \
and 'fp.job.step.timelog' in env:
TimeLog = env['fp.job.step.timelog']
for tl in wo.time_ids:
try:
TimeLog.create({
'step_id': step.id,
'user_id': tl.user_id.id if tl.user_id else env.user.id,
'date_started': tl.date_start,
'date_finished': tl.date_end,
})
except Exception as e:
_logger.warning(
'Failed to migrate time log %s on WO %s: %s',
tl.id, wo.name, e,
)
return step
def _safe_set(record, fname, value):
"""Set a field only when (a) the field exists and (b) is currently empty.
Returns True if a write happened, False otherwise. Catches exceptions
individually so one bad record doesn't sink the whole batch.
"""
if fname not in record._fields:
return False
current = record[fname]
# Many2one .id is 0 / False when empty; Char/Text empty string also OK
if current:
return False
try:
record[fname] = value
return True
except Exception as e:
_logger.warning(
'Failed to set %s.%s on id=%s: %s',
record._name, fname, record.id, e,
)
return False
def rebind_dependents(env, mo, job, audit):
"""Update cross-references on dependent models.
Only writes when:
- the target model is registered in env
- the target field exists on the model
- the target field is currently empty (idempotent)
Legacy production_id / workorder_id values are LEFT INTACT so the
shadow period can read both old and new linkages.
"""
# Build a step lookup by legacy WO id (used for batches and any other
# WO-scoped dependents).
step_by_wo = {}
if mo.workorder_ids:
Step = env['fp.job.step']
steps = Step.search([
('legacy_mrp_workorder_id', 'in', mo.workorder_ids.ids),
])
for s in steps:
step_by_wo[s.legacy_mrp_workorder_id] = s
# ---- fusion.plating.batch (workorder_id → x_fc_step_id) ----
if 'fusion.plating.batch' in env:
Batch = env['fusion.plating.batch']
for wo in mo.workorder_ids:
step = step_by_wo.get(wo.id)
if not step:
continue
batches = Batch.search([('workorder_id', '=', wo.id)])
for batch in batches:
if _safe_set(batch, 'x_fc_step_id', step.id):
audit['batches_rebound'] += 1
# batch may also have x_fc_job_id (the job-level link)
_safe_set(batch, 'x_fc_job_id', job.id)
# ---- fp.quality.hold (production_id → x_fc_job_id) ----
if 'fp.quality.hold' in env:
Hold = env['fp.quality.hold']
if 'production_id' in Hold._fields:
holds = Hold.search([('production_id', '=', mo.id)])
for h in holds:
if _safe_set(h, 'x_fc_job_id', job.id):
audit['holds_rebound'] += 1
# If the hold also has workorder_id, rebind to step
if 'workorder_id' in Hold._fields and h.workorder_id:
step = step_by_wo.get(h.workorder_id.id)
if step:
_safe_set(h, 'x_fc_step_id', step.id)
# ---- fusion.plating.quality.hold (legacy fallback name) ----
if 'fusion.plating.quality.hold' in env:
Hold2 = env['fusion.plating.quality.hold']
if 'production_id' in Hold2._fields:
holds = Hold2.search([('production_id', '=', mo.id)])
for h in holds:
if _safe_set(h, 'x_fc_job_id', job.id):
audit['holds_rebound'] += 1
if 'workorder_id' in Hold2._fields and h.workorder_id:
step = step_by_wo.get(h.workorder_id.id)
if step:
_safe_set(h, 'x_fc_step_id', step.id)
# ---- fp.certificate (production_id → x_fc_job_id) ----
if 'fp.certificate' in env:
Cert = env['fp.certificate']
if 'production_id' in Cert._fields:
certs = Cert.search([('production_id', '=', mo.id)])
for c in certs:
if _safe_set(c, 'x_fc_job_id', job.id):
audit['certs_rebound'] += 1
# ---- fp.thickness.reading (production_id → x_fc_job_id, optional step) ----
if 'fp.thickness.reading' in env:
TR = env['fp.thickness.reading']
if 'production_id' in TR._fields:
readings = TR.search([('production_id', '=', mo.id)])
for r in readings:
if _safe_set(r, 'x_fc_job_id', job.id):
audit['readings_rebound'] += 1
if 'workorder_id' in TR._fields and r.workorder_id:
step = step_by_wo.get(r.workorder_id.id)
if step:
_safe_set(r, 'x_fc_step_id', step.id)
# ---- fusion.plating.portal.job (mo.x_fc_portal_job_id → x_fc_job_id) ----
if 'fusion.plating.portal.job' in env \
and 'x_fc_portal_job_id' in mo._fields \
and mo.x_fc_portal_job_id:
portal = mo.x_fc_portal_job_id
if _safe_set(portal, 'x_fc_job_id', job.id):
audit['portals_rebound'] += 1
# ---- fp.racking.inspection (production_id → x_fc_job_id) ----
if 'fp.racking.inspection' in env:
Insp = env['fp.racking.inspection']
if 'production_id' in Insp._fields:
insps = Insp.search([('production_id', '=', mo.id)])
for i in insps:
if _safe_set(i, 'x_fc_job_id', job.id):
audit['inspections_rebound'] += 1
# ---- fusion.plating.delivery (job_ref Char → x_fc_job_id Many2one) ----
if 'fusion.plating.delivery' in env:
Delivery = env['fusion.plating.delivery']
if 'job_ref' in Delivery._fields:
deliveries = Delivery.search([('job_ref', '=', mo.name)])
for d in deliveries:
if _safe_set(d, 'x_fc_job_id', job.id):
audit['deliveries_rebound'] += 1
def run(env):
"""Main entry point. Call as `run(env)` from `odoo shell`.
Returns the audit dict (also written to /tmp/fp_jobs_migration.log).
Commits the transaction at the end. To dry-run, comment out
`env.cr.commit()` below or pass `--no-http` and `env.cr.rollback()`
after inspecting the result.
"""
audit = {
'started_at': datetime.now().isoformat(),
'mo_migrated': 0,
'mo_skipped': 0,
'wo_migrated': 0,
'wo_skipped': 0,
'batches_rebound': 0,
'holds_rebound': 0,
'certs_rebound': 0,
'readings_rebound': 0,
'portals_rebound': 0,
'inspections_rebound': 0,
'deliveries_rebound': 0,
'errors': [],
'jobs_created': [],
}
# Verify the idempotency-key fields exist before doing anything.
# If they're missing, the operator forgot to upgrade
# fusion_plating_jobs to v19.0.2.0.0+ and we'd create duplicates on
# every run.
if 'legacy_mrp_production_id' not in env['fp.job']._fields:
msg = (
'fp.job.legacy_mrp_production_id field missing — upgrade '
'fusion_plating_jobs to v19.0.2.0.0+ before running this '
'script.'
)
print(msg)
_logger.error(msg)
return None
if 'legacy_mrp_workorder_id' not in env['fp.job.step']._fields:
msg = (
'fp.job.step.legacy_mrp_workorder_id field missing — upgrade '
'fusion_plating_jobs to v19.0.2.0.0+ before running this '
'script.'
)
print(msg)
_logger.error(msg)
return None
print('=== Migration starting ===')
MO = env['mrp.production']
all_mos = MO.search([])
print('Migrating %d MOs and their WOs...' % len(all_mos))
for mo in all_mos:
try:
job = migrate_mo(env, mo, audit)
for wo in mo.workorder_ids:
try:
migrate_wo(env, wo, job, audit)
except Exception as e:
audit['errors'].append({
'wo': wo.id,
'wo_name': wo.name,
'mo': mo.id,
'error': str(e),
})
_logger.error(
'Migration failed for WO %s (MO %s): %s',
wo.name, mo.name, e,
)
rebind_dependents(env, mo, job, audit)
except Exception as e:
audit['errors'].append({
'mo': mo.id,
'name': mo.name,
'error': str(e),
})
_logger.error('Migration failed for MO %s: %s', mo.name, e)
audit['finished_at'] = datetime.now().isoformat()
print('=== Migration finished ===')
print('MOs migrated:', audit['mo_migrated'],
'(skipped:', audit['mo_skipped'], ')')
print('WOs migrated:', audit['wo_migrated'],
'(skipped:', audit['wo_skipped'], ')')
print('Batches rebound:', audit['batches_rebound'])
print('Holds rebound:', audit['holds_rebound'])
print('Certs rebound:', audit['certs_rebound'])
print('Readings rebound:', audit['readings_rebound'])
print('Portals rebound:', audit['portals_rebound'])
print('Inspections rebound:', audit['inspections_rebound'])
print('Deliveries rebound:', audit['deliveries_rebound'])
print('Errors:', len(audit['errors']))
# Write audit log
try:
with open('/tmp/fp_jobs_migration.log', 'a') as f:
f.write('\n=== Migration run at %s ===\n' % audit['started_at'])
for k, v in audit.items():
if k == 'jobs_created':
f.write('%s: %d records\n' % (k, len(v)))
elif k == 'errors':
f.write('errors: %d\n' % len(v))
for err in v:
f.write(' %s\n' % err)
else:
f.write('%s: %s\n' % (k, v))
except Exception as e:
print('Could not write audit log:', e)
# Commit. Comment this out to dry-run.
env.cr.commit()
return audit
# Run when exec'd from odoo shell
try:
result = run(env) # noqa: F821 — `env` is provided by odoo shell
except NameError:
print(
'This script expects to run inside `odoo shell` where `env` is defined.'
)

View File

@@ -574,3 +574,63 @@ class TestPhase6Controllers(TransactionCase):
step_by_node = {s.recipe_node_id.id: s for s in self.job.step_ids if s.recipe_node_id}
self.assertIn(op.id, step_by_node)
self.assertEqual(step_by_node[op.id].name, 'Op1')
class TestPhase7Migration(TransactionCase):
"""Phase 7 — verify the migration script idempotency-key fields are
in place and the script files are present + parse as valid Python.
We cannot run the migration end-to-end in a unit test (it would need
a populated MO/WO snapshot). Instead we assert the scaffolding is
solid: fields exist, files are well-formed.
"""
def test_legacy_id_field_on_fp_job(self):
self.assertIn(
'legacy_mrp_production_id',
self.env['fp.job']._fields,
)
# Should be Integer (we store the raw db id, not a Many2one — the
# source MO may be archived later without breaking the link).
self.assertEqual(
self.env['fp.job']._fields['legacy_mrp_production_id'].type,
'integer',
)
def test_legacy_id_field_on_fp_job_step(self):
self.assertIn(
'legacy_mrp_workorder_id',
self.env['fp.job.step']._fields,
)
self.assertEqual(
self.env['fp.job.step']._fields['legacy_mrp_workorder_id'].type,
'integer',
)
def test_migration_script_files_exist_and_parse(self):
# Sanity check that the script files we ship are valid Python.
# Catches syntax errors that would otherwise only surface on the
# cutover engineer's screen at the worst possible moment.
import ast
from pathlib import Path
scripts_dir = (
Path(__file__).parent.parent / 'scripts'
)
for script in (
'audit_pre_migration.py',
'migrate_to_fp_jobs.py',
'audit_post_migration.py',
):
path = scripts_dir / script
self.assertTrue(path.exists(), '%s missing' % script)
with open(path) as f:
ast.parse(f.read()) # Will raise SyntaxError if invalid
def test_scripts_dir_is_a_python_package(self):
# __init__.py exists so Odoo's autodiscovery doesn't trip and the
# dir is importable for hypothetical future post-migration hooks.
from pathlib import Path
init = (
Path(__file__).parent.parent / 'scripts' / '__init__.py'
)
self.assertTrue(init.exists())