Files
gsinghpal fc3c966484 changes
2026-03-13 12:38:28 -04:00

540 lines
23 KiB
Python

# -*- coding: utf-8 -*-
import json
from odoo import api, fields, models, _
from odoo.exceptions import UserError
class WheelchairConfigFlow(models.Model):
_name = 'fusion.wc.config.flow'
_description = 'Wheelchair Configuration Flow'
_order = 'sequence, id'
name = fields.Char(string='Name', required=True,
help='e.g. "Standard Manual Wheelchair Config"')
active = fields.Boolean(default=True)
sequence = fields.Integer(default=10)
equipment_type = fields.Selection(
selection='_get_equipment_type_selection',
string='Equipment Type', required=True)
@api.model
def _get_equipment_type_selection(self):
types = self.env['fusion.equipment.type'].sudo().search([], order='sequence')
if types:
return [(t.code, t.name) for t in types]
return [
('manual_wheelchair', 'Manual Wheelchair'),
('power_wheelchair', 'Power Wheelchair'),
('walker', 'Walker / Ambulation Aid'),
]
description = fields.Text(string='Description')
# Canvas state — JSON blob for viewport (zoom, pan) preserved across sessions
canvas_data = fields.Text(string='Canvas Data', default='{}',
help='JSON: viewport state for the visual designer')
state = fields.Selection([
('draft', 'Draft'),
('active', 'Active'),
('archived', 'Archived'),
], string='Status', default='draft', tracking=True)
# Relationships
node_ids = fields.One2many('fusion.wc.config.flow.node', 'flow_id',
string='Nodes')
connection_ids = fields.One2many('fusion.wc.config.flow.connection', 'flow_id',
string='Connections')
step_ids = fields.One2many('fusion.wc.config.flow.step', 'flow_id',
string='Form Steps')
# Computed counts
node_count = fields.Integer(string='Nodes', compute='_compute_counts')
connection_count = fields.Integer(string='Connections', compute='_compute_counts')
step_count = fields.Integer(string='Steps', compute='_compute_counts')
@api.depends('node_ids', 'connection_ids', 'step_ids')
def _compute_counts(self):
for rec in self:
rec.node_count = len(rec.node_ids)
rec.connection_count = len(rec.connection_ids)
rec.step_count = len(rec.step_ids)
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
def action_open_designer(self):
"""Open the visual flow designer (OWL client action)."""
self.ensure_one()
return {
'type': 'ir.actions.client',
'tag': 'fusion_flow_designer',
'name': _('Flow Designer: %s') % self.name,
'context': {'active_id': self.id},
}
def action_activate(self):
"""Set flow to active. Deactivate other flows for same equipment type.
Auto-creates default steps if flow has no steps defined.
"""
self.ensure_one()
# Auto-create default steps if none exist
if not self.step_ids:
self.action_create_default_steps()
# Deactivate other active flows for the same equipment type
siblings = self.search([
('equipment_type', '=', self.equipment_type),
('state', '=', 'active'),
('id', '!=', self.id),
])
siblings.write({'state': 'draft'})
self.write({'state': 'active'})
def action_archive(self):
self.ensure_one()
self.write({'state': 'archived'})
def action_reset_draft(self):
self.ensure_one()
self.write({'state': 'draft'})
def action_create_default_steps(self):
"""Generate default form steps based on equipment type.
For wheelchair types, creates the standard 6 steps.
For other types, creates a generic 4-step flow.
"""
self.ensure_one()
Step = self.env['fusion.wc.config.flow.step']
# Remove existing steps
self.step_ids.unlink()
wheelchair_types = ('manual_wheelchair', 'power_wheelchair')
if self.equipment_type in wheelchair_types:
# ── Client step config: ADP program fields ──
is_manual = self.equipment_type == 'manual_wheelchair'
client_config = json.dumps({
"show_health_card": True,
"show_dob": True,
"show_adp_fields": True, # client_type + reason_for_application
"show_wheelchair_category": is_manual,
"show_powerchair_category": not is_manual,
})
# ── Measurement fields ──
mw_measurements_json = json.dumps([
{"name": "seat_width", "label": "Seat Width", "type": "float",
"unit_field": "seat_width_unit", "units": ["inches", "cm"], "required": True},
{"name": "seat_depth", "label": "Seat Depth", "type": "float",
"unit_field": "seat_depth_unit", "units": ["inches", "cm"], "required": True},
{"name": "finished_seat_to_floor_height", "label": "Finished Seat to Floor Height",
"type": "float", "unit_field": "seat_to_floor_unit", "units": ["inches", "cm"]},
{"name": "back_cane_height", "label": "Back Cane Height", "type": "float",
"unit_field": "cane_height_unit", "units": ["inches", "cm"]},
{"name": "finished_back_height", "label": "Finished Back Height", "type": "float",
"unit_field": "back_height_unit", "units": ["inches", "cm"]},
{"name": "finished_leg_rest_length", "label": "Finished Leg Rest Length",
"type": "float", "unit_field": "leg_rest_unit", "units": ["inches", "cm"]},
{"name": "client_weight", "label": "Client Weight", "type": "float",
"unit_field": "client_weight_unit", "units": ["lbs", "kg"], "required": True},
])
prefix = 'mw' if is_manual else 'pw'
steps = [
{'sequence': 10, 'name': 'Client', 'step_type': 'client_info',
'icon': 'fa-user', 'fields_json': client_config},
{'sequence': 20, 'name': 'Measurements', 'step_type': 'measurements',
'icon': 'fa-ruler', 'fields_json': mw_measurements_json},
{'sequence': 30, 'name': 'Frame', 'step_type': 'product_select',
'icon': 'fa-wheelchair', 'section_code': f'{prefix}_frame'},
{'sequence': 40, 'name': 'Seating', 'step_type': 'options',
'icon': 'fa-chair', 'section_code': 'seating'},
{'sequence': 50, 'name': 'Options', 'step_type': 'options',
'icon': 'fa-list', 'section_code': f'{prefix}_adp_options,{prefix}_accessories'},
{'sequence': 60, 'name': 'Review', 'step_type': 'review',
'icon': 'fa-check'},
]
elif self.equipment_type == 'walker':
# ── Client step config: ADP program fields (no wheelchair categories) ──
client_config = json.dumps({
"show_health_card": True,
"show_dob": True,
"show_adp_fields": True,
})
walker_measurements_json = json.dumps([
{"name": "walker_seat_height", "label": "Seat Height", "type": "float",
"unit_field": "walker_seat_height_unit", "units": ["inches", "cm", "na"]},
{"name": "push_handle_height", "label": "Push Handle Height", "type": "float",
"unit_field": "push_handle_height_unit", "units": ["inches", "cm"]},
{"name": "width_between_push_handles", "label": "Width Between Push Handles",
"type": "float", "unit_field": "push_handle_width_unit", "units": ["inches", "cm"]},
{"name": "client_weight", "label": "Client Weight", "type": "float",
"unit_field": "client_weight_unit", "units": ["lbs", "kg"], "required": True},
])
steps = [
{'sequence': 10, 'name': 'Client', 'step_type': 'client_info',
'icon': 'fa-user', 'fields_json': client_config},
{'sequence': 20, 'name': 'Measurements', 'step_type': 'measurements',
'icon': 'fa-ruler', 'fields_json': walker_measurements_json},
{'sequence': 30, 'name': 'Equipment', 'step_type': 'product_select',
'icon': 'fa-male', 'section_code': 'walker_frame'},
{'sequence': 40, 'name': 'Options', 'step_type': 'options',
'icon': 'fa-list', 'section_code': 'walker_adp_options,walker_accessories'},
{'sequence': 50, 'name': 'Review', 'step_type': 'review',
'icon': 'fa-check'},
]
else:
# ── Generic flow for new equipment types (stair lift, porch lift, etc.) ──
# No ADP fields, no health card, no DOB — pure quotation tool
steps = [
{'sequence': 10, 'name': 'Client', 'step_type': 'client_info',
'icon': 'fa-user'}, # no fields_json → no optional groups
{'sequence': 20, 'name': 'Equipment', 'step_type': 'product_select',
'icon': 'fa-cog'},
{'sequence': 30, 'name': 'Options', 'step_type': 'options',
'icon': 'fa-list'},
{'sequence': 40, 'name': 'Review', 'step_type': 'review',
'icon': 'fa-check'},
]
for step_vals in steps:
step_vals['flow_id'] = self.id
Step.create(step_vals)
def action_new_assessment_form(self):
"""Open a new portal assessment form pre-selected to this flow's equipment type."""
self.ensure_one()
base = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
return {
'type': 'ir.actions.act_url',
'url': f'{base}/my/quotation/builder/new?equipment_type={self.equipment_type}',
'target': 'new',
}
# ------------------------------------------------------------------
# Load / Save graph for designer
# ------------------------------------------------------------------
def load_flow_graph(self):
"""Return the complete flow graph for the visual designer."""
self.ensure_one()
nodes = []
for n in self.node_ids:
node_data = {
'id': n.id,
'name': n.name,
'node_type': n.node_type,
'pos_x': n.pos_x,
'pos_y': n.pos_y,
'color': n.color,
'icon': n.icon,
'section_id': n.section_id.id if n.section_id else False,
'section_name': n.section_id.name if n.section_id else '',
'decision_field': n.decision_field or '',
'decision_operator': n.decision_operator or '',
'decision_value': n.decision_value or '',
'measurement_field': n.measurement_field or '',
'comparison': n.comparison or '',
'threshold_value': n.threshold_value,
'action_type': n.action_type or '',
'target_option_ids': n.target_option_ids.ids,
'target_step': n.target_step,
'config_json': n.config_json or '{}',
'node_options': [{
'id': opt.id,
'name': opt.name,
'sequence': opt.sequence,
'section_option_id': opt.section_option_id.id if opt.section_option_id else False,
'enables_option_ids': opt.enables_option_ids.ids,
'disables_option_ids': opt.disables_option_ids.ids,
'requires_option_ids': opt.requires_option_ids.ids,
'port_key': opt.port_key,
} for opt in n.node_option_ids],
}
nodes.append(node_data)
connections = []
for c in self.connection_ids:
connections.append({
'id': c.id,
'source_node_id': c.source_node_id.id,
'target_node_id': c.target_node_id.id,
'source_port': c.source_port or 'out',
'label': c.label or '',
'condition_json': c.condition_json or '{}',
'sequence': c.sequence,
})
return {
'id': self.id,
'name': self.name,
'equipment_type': self.equipment_type,
'canvas': json.loads(self.canvas_data or '{}'),
'nodes': nodes,
'connections': connections,
}
def save_flow_graph(self, graph_data):
"""Sync the full graph from the visual designer to ORM records."""
self.ensure_one()
Node = self.env['fusion.wc.config.flow.node']
Connection = self.env['fusion.wc.config.flow.connection']
# Save viewport state
canvas = graph_data.get('canvas', {})
self.write({'canvas_data': json.dumps(canvas)})
# --- Sync Nodes ---
incoming_nodes = graph_data.get('nodes', [])
incoming_node_ids = set()
node_id_map = {} # temp_id -> real_id
for ndata in incoming_nodes:
vals = {
'flow_id': self.id,
'name': ndata.get('name', 'Untitled'),
'node_type': ndata.get('node_type', 'action'),
'pos_x': ndata.get('pos_x', 0),
'pos_y': ndata.get('pos_y', 0),
'color': ndata.get('color', '#3b82f6'),
'icon': ndata.get('icon', 'fa-circle'),
'section_id': ndata.get('section_id') or False,
'decision_field': ndata.get('decision_field') or False,
'decision_operator': ndata.get('decision_operator') or False,
'decision_value': ndata.get('decision_value') or '',
'measurement_field': ndata.get('measurement_field') or False,
'comparison': ndata.get('comparison') or False,
'threshold_value': ndata.get('threshold_value', 0),
'action_type': ndata.get('action_type') or False,
'target_step': ndata.get('target_step', 0),
'config_json': ndata.get('config_json', '{}'),
}
target_ids = ndata.get('target_option_ids', [])
node_id = ndata.get('id')
if isinstance(node_id, int) and node_id > 0:
# Update existing
node = Node.browse(node_id)
if node.exists():
node.write(vals)
if target_ids is not None:
node.write({'target_option_ids': [(6, 0, target_ids)]})
node_id_map[node_id] = node_id
incoming_node_ids.add(node_id)
continue
# Create new
new_node = Node.create(vals)
if target_ids:
new_node.write({'target_option_ids': [(6, 0, target_ids)]})
node_id_map[ndata.get('id', 'new')] = new_node.id
incoming_node_ids.add(new_node.id)
# Delete nodes no longer in the graph
existing_nodes = Node.search([('flow_id', '=', self.id)])
to_delete = existing_nodes.filtered(lambda n: n.id not in incoming_node_ids)
to_delete.unlink()
# --- Sync Connections ---
incoming_conns = graph_data.get('connections', [])
incoming_conn_ids = set()
for cdata in incoming_conns:
src_id = cdata.get('source_node_id')
tgt_id = cdata.get('target_node_id')
# Resolve temp IDs
src_id = node_id_map.get(src_id, src_id)
tgt_id = node_id_map.get(tgt_id, tgt_id)
vals = {
'flow_id': self.id,
'source_node_id': src_id,
'target_node_id': tgt_id,
'source_port': cdata.get('source_port', 'out'),
'label': cdata.get('label', ''),
'condition_json': cdata.get('condition_json', '{}'),
'sequence': cdata.get('sequence', 10),
}
conn_id = cdata.get('id')
if isinstance(conn_id, int) and conn_id > 0:
conn = Connection.browse(conn_id)
if conn.exists():
conn.write(vals)
incoming_conn_ids.add(conn_id)
continue
new_conn = Connection.create(vals)
incoming_conn_ids.add(new_conn.id)
# Delete connections no longer in the graph
existing_conns = Connection.search([('flow_id', '=', self.id)])
to_delete = existing_conns.filtered(lambda c: c.id not in incoming_conn_ids)
to_delete.unlink()
return True
# ------------------------------------------------------------------
# Flow Evaluation Engine
# ------------------------------------------------------------------
def evaluate(self, assessment_data):
"""Walk the flow graph and return option directives.
Args:
assessment_data: dict with keys like equipment_type, seat_width,
selected_option_ids, build_type, etc.
Returns:
dict with enabled/disabled/required option IDs, skip steps, messages.
"""
self.ensure_one()
result = {
'enabled_option_ids': set(),
'disabled_option_ids': set(),
'required_option_ids': set(),
'skip_steps': set(),
'active_nodes': [],
'messages': [],
}
# Find start node
start_nodes = self.node_ids.filtered(lambda n: n.node_type == 'start')
if not start_nodes:
return self._finalize_result(result)
# BFS walk with cycle protection
visited = set()
queue = list(start_nodes)
max_iterations = 200
iteration = 0
while queue and iteration < max_iterations:
iteration += 1
node = queue.pop(0)
if node.id in visited:
continue
visited.add(node.id)
result['active_nodes'].append(node.id)
next_nodes = self._evaluate_node(node, assessment_data, result)
queue.extend(next_nodes)
return self._finalize_result(result)
def _finalize_result(self, result):
"""Convert sets to sorted lists for JSON serialization."""
for key in ('enabled_option_ids', 'disabled_option_ids',
'required_option_ids', 'skip_steps'):
result[key] = sorted(result[key])
return result
def _evaluate_node(self, node, data, result):
"""Evaluate a single node. Returns list of next nodes to visit."""
if node.node_type == 'start':
return self._get_next_nodes(node, 'out')
elif node.node_type == 'end':
return []
elif node.node_type == 'decision':
passed = self._evaluate_decision(node, data)
return self._get_next_nodes(node, 'true' if passed else 'false')
elif node.node_type == 'measurement_check':
passed = self._evaluate_measurement(node, data)
return self._get_next_nodes(node, 'pass' if passed else 'fail')
elif node.node_type == 'option_group':
selected = set(data.get('selected_option_ids', []))
for opt in node.node_option_ids:
if opt.section_option_id and opt.section_option_id.id in selected:
result['enabled_option_ids'].update(opt.enables_option_ids.ids)
result['disabled_option_ids'].update(opt.disables_option_ids.ids)
result['required_option_ids'].update(opt.requires_option_ids.ids)
return self._get_next_nodes(node, 'out')
elif node.node_type == 'action':
if node.action_type == 'enable':
result['enabled_option_ids'].update(node.target_option_ids.ids)
elif node.action_type == 'disable':
result['disabled_option_ids'].update(node.target_option_ids.ids)
elif node.action_type == 'require':
result['required_option_ids'].update(node.target_option_ids.ids)
elif node.action_type == 'skip_step' and node.target_step:
result['skip_steps'].add(node.target_step)
elif node.action_type == 'set_value':
msg = json.loads(node.config_json or '{}').get('message', '')
if msg:
result['messages'].append(msg)
return self._get_next_nodes(node, 'out')
elif node.node_type == 'product_select':
return self._get_next_nodes(node, 'out')
return []
def _evaluate_decision(self, node, data):
"""Evaluate a decision node condition against assessment data."""
field = node.decision_field
op = node.decision_operator
expected = node.decision_value or ''
actual = data.get(field, '')
if isinstance(actual, (int, float)) and expected:
try:
expected_num = float(expected)
actual_num = float(actual)
if op == 'eq':
return actual_num == expected_num
elif op == 'neq':
return actual_num != expected_num
elif op == 'gt':
return actual_num > expected_num
elif op == 'gte':
return actual_num >= expected_num
elif op == 'lt':
return actual_num < expected_num
elif op == 'lte':
return actual_num <= expected_num
except (ValueError, TypeError):
pass
# String comparison
actual_str = str(actual)
if op == 'eq':
return actual_str == expected
elif op == 'neq':
return actual_str != expected
elif op == 'in':
return actual_str in [v.strip() for v in expected.split(',')]
return False
def _evaluate_measurement(self, node, data):
"""Evaluate a measurement check node."""
field = node.measurement_field
if not field:
return False
actual = data.get(field, 0)
try:
actual = float(actual)
except (ValueError, TypeError):
return False
threshold = node.threshold_value
comp = node.comparison
if comp == 'gt':
return actual > threshold
elif comp == 'gte':
return actual >= threshold
elif comp == 'lt':
return actual < threshold
elif comp == 'eq':
return abs(actual - threshold) < 0.001
elif comp == 'neq':
return abs(actual - threshold) >= 0.001
return False
def _get_next_nodes(self, node, port):
"""Get target nodes for outgoing connections from a specific port."""
connections = node.outgoing_connection_ids.filtered(
lambda c: c.source_port == port
)
return connections.mapped('target_node_id')