feat(fusion_plating_shopfloor): workspace_controller — 4 endpoints + tests

Plan tasks P1.8 through P1.11 batched into one commit (local tests not
run between them; entech is the verification env).

  POST /fp/workspace/load               — full payload for one fp.job
  POST /fp/workspace/hold               — quality.hold create with photo
  POST /fp/workspace/sign_off           — signature + finish step atomic
  POST /fp/workspace/advance_milestone  — fire next_milestone_action

Each endpoint logs INFO on success, EXCEPTION on failure, returns a
consistent {'ok': bool, 'error': str?} envelope. Hold endpoint isolates
photo-attach failures so they don't roll back the hold record.

Tests cover: payload shape, bad job_id, hold create with/without photo,
empty qty rejection, empty-signature rejection, sign-off finish, and
the no-milestone-action error path.

Verify on entech: -u fusion_plating_shopfloor --test-tags fp_shopfloor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gsinghpal
2026-05-22 21:50:09 -04:00
parent a61bd05a5c
commit eae6a471e8
4 changed files with 504 additions and 0 deletions

View File

@@ -6,3 +6,4 @@ from . import shopfloor_controller
from . import manager_controller
from . import tank_status
from . import move_controller
from . import workspace_controller

View File

@@ -0,0 +1,338 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
# Part of the Fusion Plating product family.
"""JSON-RPC endpoints for the Job Workspace client action.
Surfaces a single fp.job + step list + workflow milestones + side-panel
data (spec PDF, attachments, chatter) + action endpoints (hold, sign-off,
milestone advance).
Endpoints:
POST /fp/workspace/load — full payload for one fp.job
POST /fp/workspace/hold — create quality.hold with photo
POST /fp/workspace/sign_off — capture signature + finish step
POST /fp/workspace/advance_milestone — fire next_milestone_action
Companion plan: docs/superpowers/plans/2026-05-22-shopfloor-tablet-redesign-plan.md
"""
import logging
from odoo import fields, http
from odoo.addons.fusion_plating.models.fp_tz import fp_format
from odoo.http import request
_logger = logging.getLogger(__name__)
class FpWorkspaceController(http.Controller):
"""JSON-RPC endpoints for the JobWorkspace OWL client action."""
# ======================================================================
# /fp/workspace/load — full workspace payload
# ======================================================================
@http.route('/fp/workspace/load', type='jsonrpc', auth='user')
def load(self, job_id):
env = request.env
job = env['fp.job'].browse(int(job_id))
if not job.exists():
_logger.warning("workspace/load: job %s not found", job_id)
return {'ok': False, 'error': f'Job {job_id} not found'}
# ---- Workflow milestones ----------------------------------------
all_states = env['fp.job.workflow.state'].search([], order='sequence, id')
current = job.workflow_state_id
passed_ids = set()
for ws in all_states:
passed_ids.add(ws.id)
if ws.id == current.id:
break
workflow_states = [{
'id': ws.id,
'name': ws.name,
'color': ws.color or 'grey',
'sequence': ws.sequence or 0,
'passed': ws.id in passed_ids,
'is_current': ws.id == current.id,
} for ws in all_states]
# ---- Steps ------------------------------------------------------
steps = []
for step in job.step_ids.sorted('sequence'):
override = job.override_ids.filtered(
lambda o, n=step.recipe_node_id: o.node_id.id == n.id
) if 'override_ids' in job._fields else env['fp.job.node.override']
steps.append({
'id': step.id,
'sequence': step.sequence,
'sequence_display': (step.sequence or 0) // 10,
'name': step.name or '',
'kind': step.kind or 'other',
'kind_label': dict(step._fields['kind'].selection).get(step.kind, ''),
'state': step.state,
'assigned_user_id': step.assigned_user_id.id or False,
'assigned_user_name': step.assigned_user_id.name or '',
'work_centre_name': step.work_centre_id.name or '',
'duration_actual': step.duration_actual or 0,
'duration_expected': step.duration_expected or 0,
'date_started_iso': fp_format(
env, step.date_started, fmt='%Y-%m-%d %H:%M:%S',
) if step.date_started else '',
'instructions': step.instructions or '',
'thickness_target': step.thickness_target or 0,
'thickness_uom': step.thickness_uom or '',
'dwell_time_minutes': step.dwell_time_minutes or 0,
'bake_setpoint_temp': step.bake_setpoint_temp or 0,
'requires_signoff': bool(getattr(step, 'requires_signoff', False)),
'can_start': bool(step.can_start) if 'can_start' in step._fields else (
step.state in ('ready', 'paused') and step.blocker_kind == 'none'
),
'blocker_kind': step.blocker_kind,
'blocker_reason': step.blocker_reason or '',
'blocker_jump_target_model': step.blocker_jump_target_model or '',
'blocker_jump_target_id': step.blocker_jump_target_id or 0,
'override_excluded': bool(override and not override.included),
'quick_look_prompt_count': len(
getattr(step, 'quick_look_prompt_ids', step.browse())
),
})
# ---- Spec + attachments + chatter -------------------------------
spec = job.customer_spec_id if 'customer_spec_id' in job._fields else False
attachments = env['ir.attachment'].search([
('res_model', '=', 'fp.job'),
('res_id', '=', job.id),
], limit=20)
chatter = job.message_ids.filtered(
lambda m: m.message_type in ('comment', 'notification')
).sorted('date', reverse=True)[:10]
# ---- Required cert state ----------------------------------------
try:
needs = list(job._resolve_required_cert_types())
except Exception:
needs = []
try:
has_draft = bool(job._fp_has_draft_required_certs())
except Exception:
has_draft = False
required_certs = {'needs': needs, 'has_draft': has_draft}
# ---- Active step (the one in_progress) --------------------------
active = (
job.active_step_id
if 'active_step_id' in job._fields and job.active_step_id
else job.step_ids.filtered(lambda s: s.state == 'in_progress')[:1]
)
return {
'ok': True,
'job': {
'id': job.id,
'name': job.name,
'display_wo_name': job.display_wo_name,
'partner_name': job.partner_id.name or '',
'product_name': job.product_id.display_name or '',
'part_number': (
job.part_catalog_id.part_number
if 'part_catalog_id' in job._fields and job.part_catalog_id
else ''
),
'qty': int(job.qty or 0),
'qty_done': int(job.qty_done or 0),
'qty_scrapped': int(job.qty_scrapped or 0),
'date_deadline': fp_format(
env, job.date_deadline, fmt='%Y-%m-%d',
) if job.date_deadline else '',
'state': job.state,
'workflow_state': {
'id': current.id,
'name': current.name,
'color': current.color or 'grey',
} if current else None,
'next_milestone_action': job.next_milestone_action or '',
'next_milestone_label': job.next_milestone_label or '',
'quality_hold_count': job.quality_hold_count or 0,
'priority': job.priority or 'normal',
},
'workflow_states': workflow_states,
'steps': steps,
'active_step_id': active.id if active else False,
'spec': {
'id': spec.id,
'name': spec.name,
} if spec else None,
'attachments': [
{
'id': a.id,
'name': a.name,
'mimetype': a.mimetype or '',
'url': f'/web/content/{a.id}',
}
for a in attachments
],
'chatter': [
{
'id': m.id,
'author': m.author_id.name or 'System',
'body': m.body or '',
'date': fp_format(env, m.date, fmt='%Y-%m-%d %H:%M') if m.date else '',
}
for m in chatter
],
'required_certs': required_certs,
}
# ======================================================================
# /fp/workspace/hold — create a quality.hold from HoldComposer
# ======================================================================
@http.route('/fp/workspace/hold', type='jsonrpc', auth='user')
def hold(self, job_id, reason='other', qty_on_hold=1, description='',
part_ref='', step_id=None, mark_for_scrap=False,
photo_data=None, photo_filename=None):
env = request.env
job = env['fp.job'].browse(int(job_id))
if not job.exists():
return {'ok': False, 'error': f'Job {job_id} not found'}
if not qty_on_hold or int(qty_on_hold) < 1:
return {'ok': False, 'error': 'qty_on_hold must be at least 1'}
Hold = env['fusion.plating.quality.hold']
hold_vals = {
'part_ref': part_ref or '',
'qty_on_hold': int(qty_on_hold),
'qty_original': int(job.qty or 0),
'hold_reason': reason or 'other',
'description': description or '',
'mark_for_scrap': bool(mark_for_scrap),
}
if 'x_fc_job_id' in Hold._fields:
hold_vals['x_fc_job_id'] = job.id
if step_id and 'x_fc_step_id' in Hold._fields:
hold_vals['x_fc_step_id'] = int(step_id)
try:
hold = Hold.create(hold_vals)
except Exception as exc:
_logger.exception("workspace/hold: create failed")
return {'ok': False, 'error': str(exc)}
# Attach photo if provided (base64 string from the camera input).
# Photo attach failure does NOT roll back the hold — log + continue.
attachment_id = False
if photo_data:
try:
att = env['ir.attachment'].create({
'name': photo_filename or f'hold_{hold.id}.png',
'datas': photo_data,
'res_model': 'fusion.plating.quality.hold',
'res_id': hold.id,
'mimetype': 'image/png',
})
attachment_id = att.id
except Exception:
_logger.exception(
"workspace/hold: photo attach failed for hold %s", hold.id,
)
_logger.info(
"Hold %s created on job %s by uid %s, reason %s, qty %s",
hold.name, job.name, env.uid, reason, qty_on_hold,
)
return {
'ok': True,
'hold_id': hold.id,
'hold_name': hold.name,
'state': hold.state,
'attachment_id': attachment_id,
}
# ======================================================================
# /fp/workspace/sign_off — capture signature + finish step atomically
# ======================================================================
@http.route('/fp/workspace/sign_off', type='jsonrpc', auth='user')
def sign_off(self, step_id, signature_data_uri):
env = request.env
sig = (signature_data_uri or '').strip()
if not sig:
_logger.warning("workspace/sign_off: empty signature for step %s", step_id)
return {
'ok': False,
'error': 'A signature is required to finish this step.',
}
step = env['fp.job.step'].browse(int(step_id))
if not step.exists():
return {'ok': False, 'error': f'Step {step_id} not found'}
# Strip "data:...;base64," prefix if present (canvas.toDataURL adds it)
if ',' in sig and sig.startswith('data:'):
sig = sig.split(',', 1)[1]
try:
env['ir.attachment'].create({
'name': f'signature_{step.id}.png',
'datas': sig,
'res_model': 'fp.job.step',
'res_id': step.id,
'mimetype': 'image/png',
})
except Exception:
_logger.exception(
"workspace/sign_off: attachment failed for step %s", step.id,
)
return {'ok': False, 'error': 'Failed to save signature.'}
try:
step.button_finish()
except Exception as exc:
_logger.exception("workspace/sign_off: button_finish failed")
return {'ok': False, 'error': str(exc)}
_logger.info("Step %s signed off by uid %s", step.id, env.uid)
return {
'ok': True,
'step_id': step.id,
'state': step.state,
}
# ======================================================================
# /fp/workspace/advance_milestone — fire next_milestone_action
# ======================================================================
@http.route('/fp/workspace/advance_milestone', type='jsonrpc', auth='user')
def advance_milestone(self, job_id):
env = request.env
job = env['fp.job'].browse(int(job_id))
if not job.exists():
return {'ok': False, 'error': f'Job {job_id} not found'}
if not job.next_milestone_action:
return {
'ok': False,
'error': 'No milestone advance available — finish all steps first.',
}
try:
job.action_advance_next_milestone()
except Exception as exc:
_logger.exception("workspace/advance_milestone failed")
return {'ok': False, 'error': str(exc)}
_logger.info(
"Job %s milestone advanced by uid %s", job.name, env.uid,
)
job.invalidate_recordset([
'workflow_state_id',
'next_milestone_action',
'next_milestone_label',
])
return {
'ok': True,
'workflow_state': {
'id': job.workflow_state_id.id,
'name': job.workflow_state_id.name,
'color': job.workflow_state_id.color or 'grey',
} if job.workflow_state_id else None,
'next_milestone_action': job.next_milestone_action or '',
'next_milestone_label': job.next_milestone_label or '',
}

View File

@@ -1 +1,2 @@
# -*- coding: utf-8 -*-
from . import test_workspace_controller

View File

@@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc. — License OPL-1
"""HTTP tests for /fp/workspace/* endpoints."""
import base64
import json
from odoo.tests.common import HttpCase, tagged
# Minimal 1x1 PNG so photo + signature attachment tests can run without
# packing a real binary in the source tree.
_TINY_PNG_B64 = base64.b64encode(
b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01'
b'\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\rIDATx\x9cc\x00\x01'
b'\x00\x00\x05\x00\x01\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82'
).decode()
def _rpc(case, url, **params):
res = case.url_open(
url,
data=json.dumps({'jsonrpc': '2.0', 'params': params}),
headers={'Content-Type': 'application/json'},
)
return res.json()['result']
@tagged('-at_install', 'post_install', 'fp_shopfloor')
class TestWorkspaceLoad(HttpCase):
def setUp(self):
super().setUp()
self.authenticate("admin", "admin")
self.partner = self.env['res.partner'].create({'name': 'WS Cust'})
self.product = self.env['product.product'].create({'name': 'WS Prod'})
self.job = self.env['fp.job'].create({
'name': 'WH/JOB/WS001',
'partner_id': self.partner.id,
'product_id': self.product.id,
'qty': 5,
})
def test_load_returns_full_payload(self):
res = _rpc(self, '/fp/workspace/load', job_id=self.job.id)
self.assertTrue(res['ok'])
self.assertEqual(res['job']['display_wo_name'], 'WO # WS001')
self.assertEqual(res['job']['id'], self.job.id)
for key in ('steps', 'workflow_states', 'chatter',
'attachments', 'required_certs'):
self.assertIn(key, res)
def test_load_bad_job_id_returns_error(self):
res = _rpc(self, '/fp/workspace/load', job_id=999999)
self.assertFalse(res['ok'])
self.assertIn('not found', res['error'].lower())
@tagged('-at_install', 'post_install', 'fp_shopfloor')
class TestWorkspaceHold(HttpCase):
def setUp(self):
super().setUp()
self.authenticate("admin", "admin")
self.partner = self.env['res.partner'].create({'name': 'Hold Cust'})
self.product = self.env['product.product'].create({'name': 'Hold Prod'})
self.job = self.env['fp.job'].create({
'name': 'WH/JOB/H001',
'partner_id': self.partner.id,
'product_id': self.product.id,
'qty': 10,
})
def test_hold_creates_quality_hold(self):
res = _rpc(
self, '/fp/workspace/hold',
job_id=self.job.id, reason='dimensional', qty_on_hold=3,
description='Bracket bent on de-rack',
part_ref='Bracket Rev A',
)
self.assertTrue(res['ok'])
hold = self.env['fusion.plating.quality.hold'].browse(res['hold_id'])
self.assertEqual(hold.qty_on_hold, 3)
self.assertEqual(hold.hold_reason, 'dimensional')
def test_hold_with_photo_creates_attachment(self):
res = _rpc(
self, '/fp/workspace/hold',
job_id=self.job.id, reason='thickness', qty_on_hold=1,
photo_data=_TINY_PNG_B64, photo_filename='evidence.png',
)
self.assertTrue(res['ok'])
self.assertTrue(res['attachment_id'])
attachments = self.env['ir.attachment'].search([
('res_model', '=', 'fusion.plating.quality.hold'),
('res_id', '=', res['hold_id']),
])
self.assertGreaterEqual(len(attachments), 1)
def test_hold_qty_zero_rejected(self):
res = _rpc(
self, '/fp/workspace/hold',
job_id=self.job.id, qty_on_hold=0,
)
self.assertFalse(res['ok'])
@tagged('-at_install', 'post_install', 'fp_shopfloor')
class TestWorkspaceSignOff(HttpCase):
def setUp(self):
super().setUp()
self.authenticate("admin", "admin")
self.partner = self.env['res.partner'].create({'name': 'Sig Cust'})
self.product = self.env['product.product'].create({'name': 'Sig Prod'})
self.job = self.env['fp.job'].create({
'name': 'WH/JOB/S001',
'partner_id': self.partner.id,
'product_id': self.product.id,
'qty': 1,
})
self.step = self.env['fp.job.step'].create({
'job_id': self.job.id,
'name': 'ENP Plate',
'sequence': 50,
'state': 'in_progress',
})
def test_sign_off_rejects_empty_signature(self):
res = _rpc(
self, '/fp/workspace/sign_off',
step_id=self.step.id, signature_data_uri='',
)
self.assertFalse(res['ok'])
self.assertIn('signature', res['error'].lower())
def test_sign_off_finishes_step(self):
res = _rpc(
self, '/fp/workspace/sign_off',
step_id=self.step.id, signature_data_uri=_TINY_PNG_B64,
)
self.assertTrue(res['ok'])
self.step.invalidate_recordset(['state'])
self.assertEqual(self.step.state, 'done')
@tagged('-at_install', 'post_install', 'fp_shopfloor')
class TestWorkspaceAdvanceMilestone(HttpCase):
def setUp(self):
super().setUp()
self.authenticate("admin", "admin")
self.partner = self.env['res.partner'].create({'name': 'M Cust'})
self.product = self.env['product.product'].create({'name': 'M Prod'})
self.job = self.env['fp.job'].create({
'name': 'WH/JOB/M001',
'partner_id': self.partner.id,
'product_id': self.product.id,
'qty': 1,
})
def test_advance_no_action_returns_error(self):
# Job with no steps → no next_milestone_action → friendly reject
res = _rpc(self, '/fp/workspace/advance_milestone', job_id=self.job.id)
self.assertFalse(res['ok'])