Files
Odoo-Modules/fusion_plating/fusion_plating_jobs/models/fp_job.py
gsinghpal 32d48ea44d feat(jobs): step sequences are 1, 2, 3, ... not 10, 20, 30, ...
User feedback: operators kept asking why their work order said "Step 10"
for the first row. The 10-spacing was originally there to allow midpoint
inserts (insert sequence 15 between 10 and 20 without renumbering).
Tradeoff is operator confusion, and recipe authors rarely insert in the
middle anyway. Switching to 1-based contiguous sequences.

Files changed (every step-sequence allocation in the codebase):

fusion_plating_jobs/models/fp_job.py
  _generate_steps_from_recipe — seq_counter starts at 1, increments by 1.
  This is the path that builds fp.job.step records, so new jobs now show
  Step 1, 2, 3, ... in the work order.

fusion_plating_bridge_mrp/models/mrp_production.py
  Same change for the legacy MRP bridge so customers still on
  mrp.production also get 1-based numbering.

fusion_plating/controllers/recipe_controller.py
  - create_node: max_seq + 1
  - reorder_nodes: idx + 1
  - swap renumber: i (was i * 10)
  - paste-import renumber: i (was i * 10)
  - move_node: max_seq + 1
  - _copy_subtree (recipe duplicate/import): i (was i * 10)

fusion_plating/controllers/simple_recipe_controller.py
  - _sequence_for_position rewritten — always renumbers siblings to
    keep them contiguous. Returns pos + 1 for the inserted node.
    Old code used midpoint-with-fallback-to-renumber (10/20/30 spacing).
  - step_reorder: i (was i * 10)
  - library_input_add + step_add_input: existing_max + 1

What this DOESN'T do
  Existing fp.job.step records keep their old sequences (10, 20, ...).
  Re-confirm the SO to spawn a fresh job if you want the clean 1-based
  numbering on a current test job. No data migration — we're in dev
  and the user explicitly said test data is disposable.

What this DOES do
  Every NEW job created from this commit forward shows Step 1, 2, 3, ...
  Every NEW recipe step inserted via the simple editor / tree editor
  also gets sequence 1, 2, 3, ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 22:58:21 -04:00

1358 lines
59 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# fp.job extension — cross-module fields that couldn't live in core
# because their target models are in dependent modules. Per spec §5.1
# this module is the umbrella that re-bundles the cross-module
# extensions for the native job flow.
#
# qc_check_id is deferred to Task 2.7 (the underlying QC model still
# lives in fusion_plating_bridge_mrp; we'll address its sourcing then).
import logging
from markupsafe import Markup
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class FpJob(models.Model):
_inherit = 'fp.job'
part_catalog_id = fields.Many2one(
'fp.part.catalog',
string='Part',
ondelete='restrict',
)
coating_config_id = fields.Many2one(
'fp.coating.config',
string='Coating Configuration',
ondelete='restrict',
)
customer_spec_id = fields.Many2one(
'fusion.plating.customer.spec',
string='Customer Spec',
ondelete='set null',
)
portal_job_id = fields.Many2one(
'fusion.plating.portal.job',
string='Portal Job',
ondelete='set null',
)
delivery_id = fields.Many2one(
'fusion.plating.delivery',
string='Delivery',
ondelete='set null',
)
override_ids = fields.One2many(
'fp.job.node.override',
'job_id',
string='Recipe Overrides',
)
# Sub 13 — sequential enforcement. Mirrored from the recipe root so
# button_start on each step can read the policy without walking the
# node tree. Stored so a recipe author flipping the recipe's flag
# AFTER job generation does NOT change behaviour mid-run (jobs
# snapshot the policy at creation, not on the fly).
enforce_sequential = fields.Boolean(
related='recipe_id.enforce_sequential',
string='Enforce Sequential Order',
store=True,
readonly=True,
help='Snapshotted from the recipe at job creation. When True, '
'every step waits for its predecessors before it can start '
'(unless the step itself is flagged Parallel Start, or a '
'manager bypasses via context).',
)
# Phase 7 — migration idempotency key. Populated by
# scripts/migrate_to_fp_jobs.py to mark a fp.job as the mirror of a
# specific mrp.production. Used to skip already-migrated MOs on
# subsequent runs. Cleared after the 2-week shadow period.
legacy_mrp_production_id = fields.Integer(
string='Legacy MRP Production ID',
index=True,
help='Database id of the source mrp.production record this job '
'was migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)
# ------------------------------------------------------------------
# Smart-button counts (Feature A — operator workflow)
#
# Compute counts for each downstream model so the form view can
# render an oe_stat_button row similar to sale.order. Cross-module
# models are runtime-detected so this still works when one of the
# bridge modules is uninstalled.
# ------------------------------------------------------------------
sale_order_count = fields.Integer(compute='_compute_smart_counts')
delivery_count = fields.Integer(compute='_compute_smart_counts')
invoice_count = fields.Integer(compute='_compute_smart_counts')
payment_count = fields.Integer(compute='_compute_smart_counts')
quality_hold_count = fields.Integer(compute='_compute_smart_counts')
certificate_count = fields.Integer(compute='_compute_smart_counts')
timelog_count = fields.Integer(compute='_compute_smart_counts')
portal_job_count = fields.Integer(compute='_compute_smart_counts')
@api.depends(
'sale_order_id', 'delivery_id', 'portal_job_id', 'step_ids',
'step_ids.time_log_ids', 'origin', 'partner_id',
)
def _compute_smart_counts(self):
AccountMove = self.env.get('account.move')
AccountPayment = self.env.get('account.payment')
QualityHold = self.env.get('fusion.plating.quality.hold')
Certificate = self.env.get('fp.certificate')
for job in self:
job.sale_order_count = 1 if job.sale_order_id else 0
job.delivery_count = 1 if job.delivery_id else 0
job.portal_job_count = 1 if job.portal_job_id else 0
# Invoices via origin (the SO name)
if AccountMove is not None and job.origin:
job.invoice_count = AccountMove.search_count([
('invoice_origin', '=', job.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
])
else:
job.invoice_count = 0
# Payments — find invoices for this SO, then payments
# reconciled against them.
if (AccountMove is not None and AccountPayment is not None
and job.origin):
inv_ids = AccountMove.search([
('invoice_origin', '=', job.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
]).ids
if inv_ids:
job.payment_count = AccountPayment.search_count([
('reconciled_invoice_ids', 'in', inv_ids),
])
else:
job.payment_count = 0
else:
job.payment_count = 0
if QualityHold is not None:
job.quality_hold_count = QualityHold.search_count([
('x_fc_job_id', '=', job.id),
])
else:
job.quality_hold_count = 0
if Certificate is not None:
job.certificate_count = Certificate.search_count([
('x_fc_job_id', '=', job.id),
])
else:
job.certificate_count = 0
job.timelog_count = sum(
len(s.time_log_ids) for s in job.step_ids
)
# ------------------------------------------------------------------
# Smart-button actions
# ------------------------------------------------------------------
def action_view_sale_order(self):
self.ensure_one()
if not self.sale_order_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'sale.order',
'res_id': self.sale_order_id.id,
'view_mode': 'form',
'name': self.sale_order_id.name,
}
# All time logs across every step on this job — backs the Time Logs
# tab on the form so the manager sees the full labour audit without
# clicking into each step.
time_log_ids = fields.One2many(
'fp.job.step.timelog',
'job_id',
string='All Time Logs',
readonly=True,
)
# 2026-04-28 — link to the auto-created Sub 8 racking inspection so
# the job form can show a smart button + the manager can route into
# the inspection without leaving the job screen.
racking_inspection_ids = fields.One2many(
'fp.racking.inspection',
'x_fc_job_id',
string='Racking Inspections',
)
racking_inspection_id = fields.Many2one(
'fp.racking.inspection',
string='Racking Inspection',
compute='_compute_racking_inspection',
store=False,
help='The single racking inspection scoped to this job (Sub 8 '
'enforces uniqueness). Smart button on the form routes here.',
)
# Computed alongside racking_inspection_id so views can render the
# state badge without needing a related-on-non-stored field (which
# the ORM rejects). Selection mirrors fp.racking.inspection.state.
racking_inspection_state = fields.Selection(
[('draft', 'Draft'),
('inspecting', 'Inspecting'),
('done', 'Done'),
('discrepancy_flagged', 'Discrepancy Flagged')],
string='Racking Inspection Status',
compute='_compute_racking_inspection',
store=False,
)
@api.depends('racking_inspection_ids', 'racking_inspection_ids.state')
def _compute_racking_inspection(self):
for job in self:
ri = job.racking_inspection_ids[:1]
job.racking_inspection_id = ri
job.racking_inspection_state = ri.state if ri else False
def action_view_racking_inspection(self):
"""Open the racking inspection. Auto-create if missing, or seed
lines from the SO if it exists but was created before line auto-
seeding shipped (the helper handles both cases idempotently)."""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
from odoo.exceptions import UserError
raise UserError(_(
'Sub 8 racking inspection module not installed. '
'Install fusion_plating_receiving to enable.'
))
# Always call the helper — it short-circuits for already-populated
# draft inspections and creates fresh ones when missing. This is
# also the entry point that backfills lines on inspections that
# pre-date the line-seeding feature.
self._fp_create_racking_inspection()
self.invalidate_recordset(['racking_inspection_ids'])
ri = self.racking_inspection_id or self.racking_inspection_ids[:1]
if not ri:
from odoo.exceptions import UserError
raise UserError(_('Could not auto-create racking inspection.'))
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.racking.inspection',
'res_id': ri.id,
'view_mode': 'form',
'target': 'current',
'name': _('Racking Inspection — %s') % self.name,
}
def action_view_steps(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step',
'view_mode': 'list,form',
'domain': [('job_id', '=', self.id)],
'name': 'Steps — %s' % self.name,
'context': {'default_job_id': self.id},
}
def action_finish_current_step(self):
"""Steelhead-style header button: finish whatever's currently
in_progress and auto-start the next pending/ready step. If
nothing is running yet, start the lowest-sequence pending step
instead — operator's first click on a fresh job just begins
the line.
Sub 12e v4 — when button_start returns an action (e.g. the
QA-005 redirect for contract_review steps), propagate it so
the operator lands on the right page in ONE click instead of
two.
"""
self.ensure_one()
running = self.step_ids.filtered(lambda s: s.state == 'in_progress')[:1]
if running:
return running.action_finish_and_advance()
# No running step — kick off the first pending/ready one.
first = self.step_ids.filtered(
lambda s: s.state in ('pending', 'ready', 'paused')
).sorted('sequence')[:1]
if not first:
raise UserError(_(
'No runnable step found on this job — either every step '
'is done or the job is still in draft.'
))
result = first.with_context(
fp_skip_predecessor_check=True,
).button_start()
self.message_post(body=_(
'Started first step "%s".'
) % first.name)
# Propagate any action returned by button_start (e.g. the
# QA-005 redirect on a contract_review step). If it's just
# True/False (the normal case), fall back to True.
if isinstance(result, dict):
return result
return True
def action_open_move_wizard(self):
"""Original Move wizard — kept available for cross-station moves
and rework / scrap transfers. The simple "finish current → start
next" flow is now action_finish_current_step (header button).
Opens the wizard pre-filled with the currently in-progress (or
most recently in-progress) step as the from-step.
"""
self.ensure_one()
active_step = self.step_ids.filtered(
lambda s: s.state == 'in_progress'
)[:1]
if not active_step:
active_step = self.step_ids.filtered(
lambda s: s.state in ('paused', 'ready')
).sorted('sequence')[:1]
if not active_step:
raise UserError(_(
'No in-progress, paused, or ready step found on this job. '
'Either every step is done or the job is still in draft.'
))
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.move.wizard',
'view_mode': 'form',
'target': 'new',
'name': _('Move Step — %s') % active_step.name,
'context': {
'default_from_step_id': active_step.id,
'default_job_id': self.id,
},
}
def action_print_traveller(self):
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_traveller'
).report_action(self)
def action_print_wo_detail(self):
"""Print the Steelhead-style Work Order Detail PDF — chronological
chain-of-custody + per-step inputs + Certified By page. Use this
as the AS9100/Nadcap shippable audit document.
"""
self.ensure_one()
return self.env.ref(
'fusion_plating_jobs.action_report_fp_job_wo_detail'
).report_action(self)
def action_view_deliveries(self):
self.ensure_one()
if not self.delivery_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.delivery',
'res_id': self.delivery_id.id,
'view_mode': 'form',
'name': self.delivery_id.name,
}
def action_view_invoices(self):
self.ensure_one()
if not self.origin:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'account.move',
'view_mode': 'list,form',
'domain': [
('invoice_origin', '=', self.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
],
'name': 'Invoices — %s' % self.name,
}
def action_view_payments(self):
self.ensure_one()
if not self.origin:
return {'type': 'ir.actions.act_window_close'}
AccountMove = self.env.get('account.move')
if AccountMove is None:
return {'type': 'ir.actions.act_window_close'}
inv_ids = AccountMove.search([
('invoice_origin', '=', self.origin),
('move_type', 'in', ('out_invoice', 'out_refund')),
]).ids
return {
'type': 'ir.actions.act_window',
'res_model': 'account.payment',
'view_mode': 'list,form',
'domain': (
[('reconciled_invoice_ids', 'in', inv_ids)]
if inv_ids else [('id', '=', 0)]
),
'name': 'Payments — %s' % self.name,
}
def action_view_quality_holds(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.quality.hold',
'view_mode': 'list,form',
'domain': [('x_fc_job_id', '=', self.id)],
'name': 'Quality Holds — %s' % self.name,
'context': {'default_x_fc_job_id': self.id},
}
def action_view_certificates(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.certificate',
'view_mode': 'list,form',
'domain': [('x_fc_job_id', '=', self.id)],
'name': 'Certificates — %s' % self.name,
'context': {'default_x_fc_job_id': self.id},
}
def action_view_timelogs(self):
self.ensure_one()
step_ids = self.step_ids.ids
return {
'type': 'ir.actions.act_window',
'res_model': 'fp.job.step.timelog',
'view_mode': 'list,form',
'domain': (
[('step_id', 'in', step_ids)]
if step_ids else [('id', '=', 0)]
),
'name': 'Time Logs — %s' % self.name,
}
def action_view_portal_job(self):
self.ensure_one()
if not self.portal_job_id:
return {'type': 'ir.actions.act_window_close'}
return {
'type': 'ir.actions.act_window',
'res_model': 'fusion.plating.portal.job',
'res_id': self.portal_job_id.id,
'view_mode': 'form',
'name': self.portal_job_id.name,
}
def write(self, vals):
"""Write hook: when qty_scrapped INCREASES, auto-spawn a
fusion.plating.quality.hold for the scrapped delta. AS9100 /
Nadcap need a disposition record per scrap event — without
this the operator silently bumps qty_scrapped, no paper trail,
auditor can't reconstruct what happened.
Idempotent per write: one hold per increase event. Operator
fills hold_reason + description on the spawned record.
"""
from markupsafe import Markup as _Markup
scrap_deltas = {}
if 'qty_scrapped' in vals:
new = vals['qty_scrapped'] or 0
for job in self:
old = job.qty_scrapped or 0
if new > old:
scrap_deltas[job.id] = (old, new)
result = super().write(vals)
if not scrap_deltas:
return result
Hold = (self.env['fusion.plating.quality.hold']
if 'fusion.plating.quality.hold' in self.env else None)
if Hold is None:
return result
Facility = self.env['fusion.plating.facility']
for job in self:
if job.id not in scrap_deltas:
continue
old, new = scrap_deltas[job.id]
delta = new - old
facility = job.facility_id or Facility.search([
('company_id', '=', job.company_id.id),
], limit=1) or Facility.search([], limit=1)
part_ref = (
job.part_catalog_id.part_number if job.part_catalog_id
else job.product_id.default_code or job.name
)
# When the scrap was bumped from the tablet, the operator
# was prompted for a reason and we passed it via context as
# `fp_scrap_reason` (see /fp/shopfloor/bump_qty_scrapped).
# Prepend that reason to the description so the audit row
# captures what the operator actually typed instead of the
# generic "OPERATOR: replace this text..." placeholder.
scrap_reason = self.env.context.get('fp_scrap_reason')
if scrap_reason:
description = _(
'Operator reason: %s\n\n'
'Auto-spawned from job %s scrap update by %s: '
'qty_scrapped went from %g to %g (delta %g).'
) % (scrap_reason, job.name, self.env.user.name, old, new, delta)
else:
description = _(
'Auto-spawned from job %s scrap update by %s: '
'qty_scrapped went from %g to %g (delta %g). '
'OPERATOR: replace this text with the actual '
'reason (drop / contamination / out-of-spec / etc).'
) % (job.name, self.env.user.name, old, new, delta)
try:
hold = Hold.create({
'job_id': job.id,
'part_ref': (part_ref or job.name)[:64],
'qty_on_hold': int(delta),
'qty_original': int(job.qty or 0),
'mark_for_scrap': True,
'hold_reason': 'other',
'description': description,
'facility_id': facility.id if facility else False,
})
job.message_post(body=_Markup(_(
'⚠️ Scrap auto-Hold spawned: <b>%s</b> for %g part(s). '
'Operator must update description with the cause.'
)) % (hold.name, delta))
except Exception as e:
_logger.warning(
'Job %s: failed to auto-spawn scrap hold: %s',
job.name, e,
)
return result
def action_sync_qty_from_so(self):
"""Pull the SO qty into the job's qty field after a mid-job
SO line edit. Posts chatter so the audit trail captures who
synced + what the previous value was.
Manual action because qty changes mid-job have physical-world
consequences (rack more parts, stop early, scrap excess) — the
supervisor must explicitly acknowledge by clicking the button.
"""
from markupsafe import Markup
for job in self:
if not job.sale_order_id:
continue
so_qty = sum(job.sale_order_id.order_line.mapped('product_uom_qty'))
old = job.qty
if abs(old - so_qty) < 0.0001:
continue
job.qty = so_qty
job.message_post(body=Markup(_(
'Job qty synced from SO by <b>%s</b>: %g%g%+g). '
'Operator: confirm physical scope matches.'
)) % (self.env.user.name, old, so_qty, so_qty - old))
return True
# ------------------------------------------------------------------
# Recipe → fp.job.step generation (Task 2.4)
#
# Native port of fusion_plating_bridge_mrp's
# _generate_workorders_from_recipe. Walks the recipe tree, creates
# one fp.job.step per 'operation' node, formats child 'step' nodes
# as step instructions on chatter, respects opt-in/out overrides
# from fp.job.node.override.
#
# Adaptations from the original:
# - Creates fp.job.step (not mrp.workorder)
# - Maps fusion.plating.work.center → fp.work.centre via code
# fallback (no forward link exists yet)
# - Uses native field names (job_id, work_centre_id, etc.)
# - Drops work_role_id (not on fp.job.step yet — Task 2.6+)
# - Drops _fp_autofill_default_equipment (not yet on step)
# ------------------------------------------------------------------
def _generate_steps_from_recipe(self):
"""Generate fp.job.step records from the assigned recipe.
Walks the recipe tree, creates one step per 'operation' node,
and formats child 'step' nodes as step instructions on the
chatter. Respects opt-in/out overrides from override_ids.
"""
Step = self.env['fp.job.step']
Node = self.env['fusion.plating.process.node']
for job in self:
if not job.recipe_id:
continue # No recipe assigned
if job.step_ids:
continue # Steps already exist — don't duplicate
# Build lookup of overrides keyed by node ID
override_map = {ov.node_id.id: ov.included for ov in job.override_ids}
# Start-at-node: if set, the allowed set is the union of:
# 1. start_node and all its descendants
# 2. each ancestor of start_node
# 3. at each ancestor level, any LATER-sequence sibling and
# all of its descendants
start_node = job.start_at_node_id
allowed_ids = None # None = include everything
if start_node:
descendants = Node.search([('id', 'child_of', start_node.id)])
allowed_ids = set(descendants.ids)
cur = start_node
while cur.parent_id:
parent = cur.parent_id
allowed_ids.add(parent.id)
later_sibs = parent.child_ids.filtered(
lambda n: n.sequence > cur.sequence
)
for sib in later_sibs:
sib_descendants = Node.search([
('id', 'child_of', sib.id),
])
allowed_ids |= set(sib_descendants.ids)
cur = parent
step_vals_list = []
wo_steps = {} # {sequence: instruction text}
# Sequences increment by 1 (operator-friendly: Step 1, 2, 3,
# ...) instead of the legacy 10/20/30 spacing. The 10-spacing
# was originally there to allow midpoint inserts, but
# operators kept asking why their work order said "Step 10"
# for the first row.
seq_counter = [1]
def _is_node_included(node):
"""Determine if a node should be included based on
opt-in/out logic, per-job overrides, and start-at-node
filter.
"""
nid = node.id
if allowed_ids is not None and nid not in allowed_ids:
return False
opt = node.opt_in_out or 'disabled'
if opt == 'disabled':
return True
if nid in override_map:
return override_map[nid]
if opt == 'opt_in':
return False # Default excluded
return True # opt_out → default included
def _resolve_work_centre(legacy_wc):
"""Map fusion.plating.work.center → fp.work.centre.
The legacy work-centre model does not (yet) have a forward
link to the new fp.work.centre. Try a forward link
(x_fc_fp_work_centre_id) if some bridge module added one;
otherwise fall back to a code lookup.
"""
if not legacy_wc:
return self.env['fp.work.centre']
# Forward link, if any
if (
'x_fc_fp_work_centre_id' in legacy_wc._fields
and legacy_wc.x_fc_fp_work_centre_id
):
return legacy_wc.x_fc_fp_work_centre_id
# Code fallback (legacy code is unique-per-facility,
# native code is globally unique — first match wins)
if legacy_wc.code:
found = self.env['fp.work.centre'].search(
[('code', '=', legacy_wc.code)], limit=1,
)
if found:
return found
return self.env['fp.work.centre']
def walk_node(node):
if not _is_node_included(node):
return
if node.node_type == 'operation':
work_centre = _resolve_work_centre(node.work_center_id)
if not work_centre:
_logger.warning(
'Job %s: operation "%s" has no mapped fp.work.centre — '
'creating step without work centre.',
job.name, node.name,
)
# Collect step instructions from child 'step' nodes
instructions = []
step_num = 1
for child in node.child_ids.sorted('sequence'):
if child.node_type == 'step' and _is_node_included(child):
line = '%d. %s' % (step_num, child.name)
if child.estimated_duration:
line += ' (%.0f min)' % child.estimated_duration
instructions.append(line)
step_num += 1
# Map recipe_node.default_kind → step.kind so the
# downstream gates (Sub 8 racking soft-gate, Policy B
# contract-review gate) work even when the step gets
# renamed by the customer (e.g. "Hang on Bar" instead
# of "Racking"). Without this, gate detection falls
# back to fragile name matching.
_NODE_KIND_TO_STEP_KIND = {
'cleaning': 'wet',
'etch': 'wet',
'rinse': 'wet',
'plate': 'wet',
'dry': 'wet',
'wbf_test': 'wet',
'bake': 'bake',
'mask': 'mask',
'demask': 'mask',
'racking': 'rack',
'derack': 'rack',
'inspect': 'inspect',
'final_inspect': 'inspect',
'contract_review': 'other',
'gating': 'other',
'ship': 'other',
}
step_kind = 'other'
node_kind = (
node.default_kind
if 'default_kind' in node._fields else None
)
if node_kind and node_kind in _NODE_KIND_TO_STEP_KIND:
step_kind = _NODE_KIND_TO_STEP_KIND[node_kind]
vals = {
'job_id': job.id,
'name': node.name,
'work_centre_id': work_centre.id if work_centre else False,
'duration_expected': node.estimated_duration or 0.0,
'sequence': seq_counter[0],
'recipe_node_id': node.id,
'kind': step_kind,
}
if node.estimated_duration:
vals['dwell_time_minutes'] = node.estimated_duration
# Pull thickness target from the coating config when
# this is a plating step (matched by node name keyword).
coating = job.coating_config_id
name_l = (node.name or '').lower()
is_plating_node = (
'plat' in name_l or 'nickel' in name_l
or 'chrome' in name_l or 'anodiz' in name_l
)
if coating and is_plating_node:
if (
'thickness_max' in coating._fields
and coating.thickness_max
):
vals['thickness_target'] = coating.thickness_max
if (
'thickness_uom' in coating._fields
and coating.thickness_uom
):
# fp.coating.config uses long-form uom names
# (mils / microns / inches); fp.job.step uses
# short codes (mil / um / inch). Map between
# them. Unknown values fall through to the
# step's default ('um').
_UOM_MAP = {
'mils': 'mil',
'mil': 'mil',
'microns': 'um',
'micron': 'um',
'um': 'um',
'inches': 'inch',
'inch': 'inch',
'in': 'inch',
}
mapped = _UOM_MAP.get(coating.thickness_uom)
if mapped:
vals['thickness_uom'] = mapped
step_vals_list.append(vals)
if instructions:
wo_steps[seq_counter[0]] = '\n'.join(instructions)
seq_counter[0] += 1
elif node.node_type in ('recipe', 'sub_process'):
for child in node.child_ids.sorted('sequence'):
walk_node(child)
# 'step' nodes at top level are handled by their parent operation
# Walk from recipe root
walk_node(job.recipe_id)
# Bulk create
if step_vals_list:
created = Step.create(step_vals_list)
for step in created:
instr_text = wo_steps.get(step.sequence)
if instr_text:
step.message_post(
body=Markup(
'<b>Recipe steps:</b><br/><pre>%s</pre>'
) % instr_text,
subtype_xmlid='mail.mt_note',
)
job.message_post(
body=('%d steps generated from recipe "%s".') % (
len(step_vals_list), job.recipe_id.name,
),
)
return True
# ------------------------------------------------------------------
# UI — Process Tree client action (Phase 6)
# ------------------------------------------------------------------
def action_open_process_tree(self):
"""Open the OWL process-tree visualization for this job.
Launches the fp_process_tree client action (defined in
fusion_plating_shopfloor) with job_id in context. The component
fetches /fp/shopfloor/process_tree and renders the recipe ->
sub_process -> operation hierarchy as cards with per-step state
badges.
Consolidated 2026-04-24: this points at the canonical shopfloor
client action; the parallel fp_job_process_tree was removed.
"""
self.ensure_one()
return {
'type': 'ir.actions.client',
'tag': 'fp_process_tree',
'context': {'job_id': self.id},
'name': 'Process Tree — %s' % (self.name or ''),
'target': 'current',
}
# ------------------------------------------------------------------
# Lifecycle hooks (Tasks 2.6, 2.7, 2.8)
#
# On confirm: create the portal-job mirror record and (when the
# customer requires QC) a fusion.plating.quality.check.
# On done: create a draft fusion.plating.delivery and best-effort
# trigger fp.certificate auto-generation.
#
# The QC and certificate models live in modules this module does NOT
# depend on by design (bridge_mrp). We runtime-detect those models so
# the hooks degrade gracefully when those modules are absent.
# ------------------------------------------------------------------
def action_confirm(self):
result = super().action_confirm()
# During migration, lifecycle side-effects are skipped — the
# migration script directly rebinds existing portal/QC/inspection
# records via x_fc_job_id. See scripts/migrate_to_fp_jobs.py.
if self.env.context.get('fp_jobs_migration'):
return result
for job in self:
# Auto-generate steps from the recipe — was previously only
# called by seed scripts, which meant real-life confirmed
# jobs sat with zero operations. Idempotent: the generator
# short-circuits when steps already exist.
if job.recipe_id and not job.step_ids:
job._generate_steps_from_recipe()
# Promote freshly-generated 'pending' steps to 'ready' so the
# operator has a Start button when they open the job. Without
# this the floor stalls — every step is parked in pending with
# no UI affordance to move it forward.
pending_steps = job.step_ids.filtered(
lambda s: s.state == 'pending'
)
if pending_steps:
pending_steps.write({'state': 'ready'})
# 2026-04-28 — auto-populate facility_id + manager_id so the
# job header surfaces them on the form. Page-1 audit found
# both empty on confirmed jobs.
job._fp_autofill_facility_and_manager()
job._fp_create_portal_job()
job._fp_create_qc_check_if_needed()
job._fp_create_racking_inspection()
job._fp_fire_notification('job_confirmed')
return result
def _fp_autofill_facility_and_manager(self):
"""Populate facility_id + manager_id on confirm if empty.
Resolution order:
facility_id —
1. Already set → leave alone.
2. First step with a work_centre that has a facility → use it.
3. Recipe's process_type → facility (if process_type carries one).
4. Single-facility company → use that one.
manager_id —
1. Already set → leave alone.
2. Confirming user IS in the Plating Manager group → use them.
3. Sale order user_id (the salesperson who confirmed the SO).
4. The customer's account manager (partner.user_id).
5. Leave blank — no sensible default.
"""
self.ensure_one()
# ---- facility_id ----
if not self.facility_id:
facility = False
for s in self.step_ids:
if s.work_centre_id and 'facility_id' in s.work_centre_id._fields:
facility = s.work_centre_id.facility_id
if facility:
break
if not facility and self.recipe_id and 'process_type_id' in self.recipe_id._fields:
pt = self.recipe_id.process_type_id
if pt and 'facility_id' in pt._fields:
facility = pt.facility_id
if not facility:
Facility = self.env.get('fusion.plating.facility')
if Facility is not None:
facilities = Facility.search([
('company_id', '=', self.company_id.id),
])
if len(facilities) == 1:
facility = facilities
if facility:
self.facility_id = facility.id
self.message_post(body=_(
'Facility auto-set on confirm: %s'
) % facility.display_name)
# ---- manager_id ----
if not self.manager_id:
mgr = False
ManagerGroup = self.env.ref(
'fusion_plating.group_fusion_plating_manager',
raise_if_not_found=False,
)
if ManagerGroup and self.env.user in ManagerGroup.user_ids:
mgr = self.env.user
elif self.sale_order_id and self.sale_order_id.user_id:
mgr = self.sale_order_id.user_id
elif self.partner_id and self.partner_id.user_id:
mgr = self.partner_id.user_id
if mgr:
self.manager_id = mgr.id
self.message_post(body=_(
'Plating Manager auto-set on confirm: %s'
) % mgr.name)
def _fp_create_racking_inspection(self):
"""Auto-create a draft racking inspection on job confirm.
Phase 9 — production_id is now optional on fp.racking.inspection,
so we always create one bound by `x_fc_job_id`. When the job is
also linked to an MO (legacy bridge_mrp coexistence), populate
production_id too so legacy reports keep working.
Idempotent — if an inspection already exists for this job, skip.
Either way the inspection's lines are seeded from the SO's
plating order lines so the racker walks into a pre-populated
checklist instead of an empty form.
"""
self.ensure_one()
if 'fp.racking.inspection' not in self.env:
return
Inspection = self.env['fp.racking.inspection'].sudo()
if 'x_fc_job_id' not in Inspection._fields:
# Schema not yet upgraded — skip.
return
existing = Inspection.search([
('x_fc_job_id', '=', self.id),
], limit=1)
if existing:
# Self-heal: pre-existing inspections from before line seeding
# was added show up empty. Top them up now if still empty +
# the inspection isn't already finalised (don't rewrite history).
if not existing.line_ids and existing.state == 'draft':
self._fp_seed_racking_lines(existing)
return
# Phase 6 (Sub 11) — production_id retired; bind by x_fc_job_id only.
vals = {'x_fc_job_id': self.id}
try:
insp = Inspection.create(vals)
self._fp_seed_racking_lines(insp)
except Exception as e:
_logger.warning(
"Job %s: failed to auto-create racking inspection: %s",
self.name, e,
)
def _fp_seed_racking_lines(self, inspection):
"""Populate the inspection with one line per SO plating order line.
Walks sale_order_line_ids (the M2M of SO lines tied to this job),
falling back to the linked SO's order_line. Each line carries the
part_catalog and the quoted qty as the expected count — the
racker confirms or amends on the floor.
"""
self.ensure_one()
if not inspection or inspection.line_ids:
return
Line = self.env['fp.racking.inspection.line'].sudo()
# Source preference: explicit M2M of plating lines bound to this
# job (fast-order multi-part jobs), falling back to the SO header.
so_lines = self.sale_order_line_ids
if not so_lines and self.sale_order_id:
so_lines = self.sale_order_id.order_line
plating_lines = so_lines.filtered(
lambda l: l.x_fc_part_catalog_id and not l.display_type
)
if not plating_lines:
return
seq = 10
for sol in plating_lines:
try:
Line.create({
'inspection_id': inspection.id,
'sequence': seq,
'part_catalog_id': sol.x_fc_part_catalog_id.id,
'qty_expected': int(sol.product_uom_qty or 0),
'condition': 'ok',
})
except Exception as e:
_logger.warning(
"Job %s: failed to seed racking line for SO line %s: %s",
self.name, sol.id, e,
)
seq += 10
def _fp_create_portal_job(self):
"""Create the fusion.plating.portal.job mirror record."""
self.ensure_one()
if self.portal_job_id:
return # already exists — idempotent
Portal = self.env['fusion.plating.portal.job'].sudo()
portal = Portal.create({
'name': self.name,
'partner_id': self.partner_id.id,
'state': 'in_progress',
'x_fc_job_id': self.id,
})
self.portal_job_id = portal.id
def _fp_create_qc_check_if_needed(self):
"""If customer has x_fc_requires_qc=True, spawn a QC check via
the canonical fp.quality.check.create_for_job() entry point.
Sub 11 — model relocated from bridge_mrp to fusion_plating_quality.
create_for_job resolves the template (customer-specific or default),
clones every template line, returns an existing record if one is
already open, and posts a chatter trail.
"""
self.ensure_one()
partner = self.partner_id
wants_qc = (
'x_fc_requires_qc' in partner._fields
and partner.x_fc_requires_qc
)
if not wants_qc:
return
if 'fusion.plating.quality.check' not in self.env:
return
QC = self.env['fusion.plating.quality.check']
try:
QC.create_for_job(self)
except Exception as e:
_logger.warning(
"Job %s: create_for_job failed: %s", self.name, e,
)
# ------------------------------------------------------------------
# button_mark_done — Task 2.8
# ------------------------------------------------------------------
def button_mark_done(self):
"""Transition the job to 'done' and trigger downstream side effects.
- Blocks if any step is not done/skipped (manager bypass via
context key `fp_skip_step_gate=True`). Compliance: AS9100 /
Nadcap require evidence that every recipe step ran. Without
this guard an operator could close a job with zero work.
- Blocks if customer requires QC and the QC check isn't passed
(manager bypass via context key `fp_skip_qc_gate=True`)
- Sets state='done', date_finished=now
- Auto-creates a draft fusion.plating.delivery
- Triggers certificate auto-generation (best-effort)
"""
# During migration, side-effects are skipped — see action_confirm.
skip_side_effects = self.env.context.get('fp_jobs_migration')
skip_qc_gate = self.env.context.get('fp_skip_qc_gate')
skip_step_gate = self.env.context.get('fp_skip_step_gate')
QC = self.env['fusion.plating.quality.check'] \
if 'fusion.plating.quality.check' in self.env else None
for job in self:
if job.state == 'done':
continue
if job.state == 'cancelled':
raise UserError(
"Job %s is cancelled — cannot mark done." % job.name
)
# Step-completion gate: every step must be done (or explicitly
# skipped, once button_skip is implemented). Without this
# guard operators can close a recipe-driven job with zero
# actual work logged. Manager bypass via context.
if not skip_step_gate and job.step_ids:
# `skipped` and `cancelled` count as terminal — operator
# explicitly opted those out (skipped) or killed them
# (cancelled). Only steps still in pending/ready/in_progress/
# paused block job close.
undone = job.step_ids.filtered(
lambda s: s.state not in ('done', 'skipped', 'cancelled')
)
if undone:
raise UserError(_(
"Job %s cannot be marked Done — %d/%d step(s) "
"are not finished:\n %s\n\nWalk each step on "
"the tablet (or skip / cancel opt-in steps)."
) % (
job.name, len(undone), len(job.step_ids),
'\n '.join(
f'#{s.sequence} {s.name} ({s.state})'
for s in undone[:5]
),
))
# Bake-window gate (compliance — AS9100 / Nadcap): if any
# auto-spawned bake.window is still awaiting_bake OR
# bake_in_progress, the bake hasn't been documented and
# parts cannot ship. Without this guard a careless
# operator closes the job, parts ship, three weeks later
# a field failure surfaces and the auditor asks for the
# bake record that doesn't exist. Manager bypass via
# fp_skip_bake_gate=True for documented customer deviation.
skip_bake_gate = self.env.context.get('fp_skip_bake_gate')
BW = (self.env['fusion.plating.bake.window']
if 'fusion.plating.bake.window' in self.env else None)
if not skip_bake_gate and BW is not None:
pending_bw = BW.sudo().search([
('part_ref', '=', job.name),
('state', 'in', ('awaiting_bake', 'bake_in_progress')),
])
if pending_bw:
raise UserError(_(
"Job %s cannot be marked Done — bake window "
"still pending:\n %s\n\nBake hydrogen "
"embrittlement relief on the parts (start + "
"end the bake on the bake.window record), then "
"close the job. Manager override available for "
"documented customer deviation."
) % (
job.name,
'\n '.join(
f'{bw.name} (state={bw.state}, '
f'required_by={bw.bake_required_by})'
for bw in pending_bw[:5]
),
))
# Qty reconciliation gate: qty_done + qty_scrapped must
# equal qty when the job closes. Without this an operator
# can ship "5 of 5" while only 4 are actually plated +
# 1 contaminated, with no record of the missing piece.
# Manager bypass via fp_skip_qty_reconcile=True (e.g. when
# qty tracking truly doesn't apply).
skip_qty_gate = self.env.context.get('fp_skip_qty_reconcile')
if not skip_qty_gate and job.qty:
accounted = (job.qty_done or 0) + (job.qty_scrapped or 0)
if abs(accounted - job.qty) > 0.0001:
raise UserError(_(
"Job %s qty mismatch — ordered %g, but qty_done "
"(%g) + qty_scrapped (%g) = %g. Update Quantity "
"Completed and Quantity Scrapped on the job "
"header so they sum to %g before closing."
) % (
job.name, job.qty, job.qty_done or 0,
job.qty_scrapped or 0, accounted, job.qty,
))
# QC gate: customers flagged x_fc_requires_qc must have a
# passed QC before the job closes. AS9100 / Nadcap compliance.
if QC and not skip_qc_gate \
and 'x_fc_requires_qc' in job.partner_id._fields \
and job.partner_id.x_fc_requires_qc:
blocking_qc = QC.search([
('job_id', '=', job.id),
('state', 'not in', ('passed',)),
], order='create_date desc', limit=1)
if blocking_qc:
raise UserError(_(
"Job %s cannot be marked Done — QC check %s is in "
"state '%s'. Pass the QC checklist first, or have "
"a manager override via the bypass button."
) % (job.name, blocking_qc.name, blocking_qc.state))
# No QC at all? Spawn one now (idempotent) and require
# the operator to walk it before retrying.
no_qc = not QC.search_count([('job_id', '=', job.id)])
if no_qc:
QC.create_for_job(job)
raise UserError(_(
"Job %s requires QC. A new check has been created — "
"complete it before marking the job Done."
) % job.name)
job.state = 'done'
job.date_finished = fields.Datetime.now()
if not skip_side_effects:
job._fp_create_delivery()
job._fp_create_certificates()
job._fp_fire_notification('job_complete')
return True
# ------------------------------------------------------------------
# Notifications dispatch (Phase 4)
#
# Fires fp.notification.template records whose trigger_event matches
# the given event name. Best-effort: silently skips if the
# fusion_plating_notifications module is not installed (model not
# registered) and logs (without raising) on any send failure so the
# job lifecycle is never blocked by an email problem.
# ------------------------------------------------------------------
def _fp_fire_notification(self, event):
"""Best-effort notification dispatch for fp.job lifecycle events.
Looks up fp.notification.template records with the matching
trigger_event and dispatches via the central _dispatch helper
provided by fusion_plating_notifications. Silently no-ops when
that module isn't installed.
"""
self.ensure_one()
if 'fp.notification.template' not in self.env:
return
Template = self.env['fp.notification.template'].sudo()
try:
# The notifications module exposes a model-level _dispatch
# helper that handles template lookup, recipient resolution
# (Sub 6 contact routing), attachment rendering, and audit
# logging in one go. Pass partner explicitly since fp.job's
# partner_id is the customer.
Template._dispatch(event, self, partner=self.partner_id)
except Exception as e:
_logger.warning(
"Job %s: notification %s dispatch failed: %s",
self.name, event, e,
)
def _fp_create_delivery(self):
"""Create a draft fusion.plating.delivery linked to this job.
Sets BOTH x_fc_job_id (Many2one — strong link) AND job_ref
(Char — soft reference). Downstream code is split: smart-button
navigation reads x_fc_job_id, but the box-parity check, RMA
refund auto-link, and the legacy notification dispatch all
look up by job_ref. Setting both ends keeps every consumer
happy.
"""
self.ensure_one()
if self.delivery_id:
return
Delivery = self.env['fusion.plating.delivery'].sudo()
vals = {'partner_id': self.partner_id.id}
if 'x_fc_job_id' in Delivery._fields:
vals['x_fc_job_id'] = self.id
if 'job_ref' in Delivery._fields:
vals['job_ref'] = self.name
if 'x_fc_job_id' not in Delivery._fields \
and 'job_ref' not in Delivery._fields:
_logger.warning(
"Job %s: fusion.plating.delivery has no job link field; "
"delivery created without job back-reference.", self.name,
)
try:
delivery = Delivery.create(vals)
self.delivery_id = delivery.id
except Exception as e:
_logger.warning(
"Job %s: failed to auto-create delivery: %s", self.name, e,
)
def _fp_create_certificates(self):
"""Trigger cert auto-create on job done.
Pre-populates ALL the fields a CoC issuer needs so Tom can hit
Issue without filling 6 fields first:
- partner_id from job
- spec_reference from coating (required by action_issue)
- part_number from part_catalog
- quantity_shipped from job qty (minus scrap)
- po_number from sale_order
- sale_order_id link
- x_fc_job_id link if the field exists
Idempotent — if a cert already exists for this job, skip
(prevents dupes when button_mark_done is re-run after a
manager bypass).
"""
self.ensure_one()
if 'fp.certificate' not in self.env:
return
Cert = self.env['fp.certificate'].sudo()
# Idempotency: don't double-create on retry.
existing_dom = []
if 'x_fc_job_id' in Cert._fields:
existing_dom.append(('x_fc_job_id', '=', self.id))
elif self.sale_order_id and 'sale_order_id' in Cert._fields:
existing_dom.append(('sale_order_id', '=', self.sale_order_id.id))
if existing_dom:
existing = Cert.search(existing_dom, limit=1)
if existing:
_logger.info(
'Job %s: cert %s already exists, skipping auto-create',
self.name, existing.name,
)
return
try:
vals = {'partner_id': self.partner_id.id}
if 'certificate_type' in Cert._fields:
vals['certificate_type'] = 'coc'
if 'state' in Cert._fields:
vals['state'] = 'draft'
# Job + SO links.
if 'x_fc_job_id' in Cert._fields:
vals['x_fc_job_id'] = self.id
elif 'job_id' in Cert._fields:
vals['job_id'] = self.id
if 'sale_order_id' in Cert._fields and self.sale_order_id:
vals['sale_order_id'] = self.sale_order_id.id
# Pre-fill from coating: the spec_reference is what action_issue
# blocks on — without this every cert needs a manual edit.
coating = self.coating_config_id
if coating and 'spec_reference' in Cert._fields \
and getattr(coating, 'spec_reference', False):
vals['spec_reference'] = coating.spec_reference
# Pre-fill part_number from the part catalog if we have one.
if 'part_number' in Cert._fields and self.part_catalog_id:
vals['part_number'] = self.part_catalog_id.part_number or ''
# Quantity shipped = job qty minus scrap. AS9100 wants the
# actual count that left the shop, not the order count.
if 'quantity_shipped' in Cert._fields:
vals['quantity_shipped'] = int(
(self.qty_done or self.qty or 0) - (self.qty_scrapped or 0)
)
# PO number from the source SO.
if 'po_number' in Cert._fields and self.sale_order_id \
and 'x_fc_po_number' in self.sale_order_id._fields:
vals['po_number'] = self.sale_order_id.x_fc_po_number or ''
# Customer job# → cert label (helps customer search).
if 'customer_job_no' in Cert._fields and self.sale_order_id \
and 'x_fc_customer_job_number' in self.sale_order_id._fields:
vals['customer_job_no'] = (
self.sale_order_id.x_fc_customer_job_number or ''
)
# Process description from coating name.
if 'process_description' in Cert._fields and coating:
vals['process_description'] = coating.name or ''
# Job # for shop-side reference.
if 'entech_wo_number' in Cert._fields:
vals['entech_wo_number'] = self.name or ''
cert = Cert.create(vals)
self.message_post(body=Markup(_(
'CoC <b>%s</b> auto-created (draft). Issuer should hit '
'the Issue button on the certificate when ready to ship.'
)) % cert.name)
except Exception as e:
_logger.warning(
"Job %s: failed to auto-create cert: %s", self.name, e,
)
class FpJobStep(models.Model):
"""Phase 7 — adds the migration idempotency key on fp.job.step.
Populated by scripts/migrate_to_fp_jobs.py to mark a step as the
mirror of a specific mrp.workorder. Used to skip already-migrated
WOs on subsequent runs.
"""
_inherit = 'fp.job.step'
legacy_mrp_workorder_id = fields.Integer(
string='Legacy MRP Work Order ID',
index=True,
help='Database id of the source mrp.workorder this step was '
'migrated from. Used by the migration script for '
'idempotency. Cleared post-cutover.',
)