Files
gsinghpal fc3c966484 changes
2026-03-13 12:38:28 -04:00

389 lines
15 KiB
Python

# -*- coding: utf-8 -*-
from odoo import http
from odoo.http import request
import json
import logging
_logger = logging.getLogger(__name__)
class QuotationAPI(http.Controller):
@http.route('/my/quotation/api/search_clients', type='json',
auth='user', methods=['POST'])
def search_clients(self, query='', limit=20, **kw):
"""Search existing clients by name, phone, or health card."""
if not query or len(query) < 2:
return []
Partner = request.env['res.partner'].sudo()
domain = [
'|',
('name', 'ilike', query),
('phone', 'ilike', query),
]
partners = Partner.search(domain, limit=limit, order='name')
return [{
'id': p.id,
'name': p.name,
'phone': p.phone or '',
'email': p.email or '',
'street': p.street or '',
'city': p.city or '',
} for p in partners]
@http.route('/my/quotation/api/search_products', type='json',
auth='user', methods=['POST'])
def search_products(self, query='', section_code=None, limit=20, **kw):
"""Search product templates (parent products), optionally filtered by section.
Returns templates with variant_count so the UI can show a configurator
when multiple variants exist.
"""
if not query or len(query) < 2:
return []
Template = request.env['product.template'].sudo()
domain = [
('sale_ok', '=', True),
'|', '|',
('name', 'ilike', query),
('default_code', 'ilike', query),
('x_fc_adp_device_code', 'ilike', query),
]
# If section specified, filter by category
if section_code:
Section = request.env['fusion.wc.section'].sudo()
section = Section.search([('code', '=', section_code)], limit=1)
if section and section.product_category_id:
domain.append(('categ_id', 'child_of', section.product_category_id.id))
templates = Template.search(domain, limit=limit, order='name')
results = []
for t in templates:
variant_count = len(t.product_variant_ids)
has_configurable = bool(
t.attribute_line_ids and variant_count > 1
)
# For single-variant templates, get the variant ID directly
single_variant_id = (
t.product_variant_ids[0].id
if variant_count == 1 else False
)
results.append({
'id': t.id,
'name': t.name,
'default_code': t.default_code or '',
'adp_device_code': t.x_fc_adp_device_code or '',
'adp_price': t.x_fc_adp_price or 0.0,
'list_price': t.list_price,
'variant_count': variant_count,
'has_configurable_attributes': has_configurable,
'single_variant_id': single_variant_id,
})
return results
@http.route('/my/quotation/api/get_product_attributes', type='json',
auth='user', methods=['POST'])
def get_product_attributes(self, template_id=None, **kw):
"""Given a product template ID, return its configurable attributes
with available values for each.
"""
if not template_id:
return []
Template = request.env['product.template'].sudo()
tmpl = Template.browse(int(template_id))
if not tmpl.exists():
return []
attributes = []
for line in tmpl.attribute_line_ids:
values = []
for val in line.value_ids:
values.append({
'id': val.id,
'name': val.name,
})
attributes.append({
'attribute_id': line.attribute_id.id,
'attribute_name': line.attribute_id.name,
'values': values,
})
return attributes
@http.route('/my/quotation/api/resolve_variant', type='json',
auth='user', methods=['POST'])
def resolve_variant(self, template_id=None, attribute_value_ids=None, **kw):
"""Given a template ID and list of selected attribute value IDs,
find and return the matching product.product variant.
"""
if not template_id or not attribute_value_ids:
return {'error': 'Missing template_id or attribute_value_ids'}
Template = request.env['product.template'].sudo()
tmpl = Template.browse(int(template_id))
if not tmpl.exists():
return {'error': 'Template not found'}
# Convert to set for matching
selected_set = set(int(v) for v in attribute_value_ids)
# Search through variants to find the one matching all selected values
for variant in tmpl.product_variant_ids:
variant_values = set()
for ptav in variant.product_template_attribute_value_ids:
variant_values.add(ptav.product_attribute_value_id.id)
if variant_values == selected_set:
return {
'id': variant.id,
'name': variant.display_name,
'default_code': variant.default_code or '',
'list_price': variant.lst_price,
}
# No exact match found — return first variant as fallback
fallback = tmpl.product_variant_ids[:1]
if fallback:
return {
'id': fallback.id,
'name': fallback.display_name,
'default_code': fallback.default_code or '',
'list_price': fallback.lst_price,
'warning': 'No exact variant match found, using default variant.',
}
return {'error': 'No variants found for this template'}
@http.route('/my/quotation/api/get_section_options', type='json',
auth='user', methods=['POST'])
def get_section_options(self, section_code=None, build_type=None, **kw):
"""Get standard options for a section."""
if not section_code:
return []
Section = request.env['fusion.wc.section'].sudo()
section = Section.search([('code', '=', section_code)], limit=1)
if not section:
return []
domain = [('section_id', '=', section.id), ('is_standard', '=', True)]
if build_type and build_type in ('modular', 'custom_fabricated'):
domain.append(('available_build_types', 'in', [build_type, 'both']))
options = request.env['fusion.wc.section.option'].sudo().search(
domain, order='sequence')
return [{
'id': o.id,
'product_tmpl_id': o.product_tmpl_id.id,
'product_name': o.product_tmpl_id.display_name,
'variant_count': o.variant_count,
'adp_device_code': o.adp_device_code or '',
'adp_price': o.adp_price or 0.0,
'list_price': o.list_price or 0.0,
'requires_clinical_rationale': o.requires_clinical_rationale,
'available_build_types': o.available_build_types,
} for o in options]
@http.route('/my/quotation/api/check_upcharges', type='json',
auth='user', methods=['POST'])
def check_upcharges(self, assessment_data=None, **kw):
"""Check which upcharge rules would trigger for given measurements.
Returns list of upcharges WITHOUT creating any records.
"""
if not assessment_data:
return []
rules = request.env['fusion.wc.upcharge.rule'].sudo().search([
('active', '=', True),
])
equipment_type = assessment_data.get('equipment_type', 'manual_wheelchair')
triggered = []
exclusive_triggered = {}
# Normalize measurements
def get_value(field, unit_field, default_unit='inches'):
val = float(assessment_data.get(field, 0) or 0)
unit = assessment_data.get(unit_field, default_unit)
if unit == 'cm' and val:
val = val / 2.54
return val
seat_width = get_value('seat_width', 'seat_width_unit')
seat_depth = get_value('seat_depth', 'seat_depth_unit')
back_height = get_value('finished_back_height', 'back_height_unit')
client_weight = float(assessment_data.get('client_weight', 0) or 0)
weight_unit = assessment_data.get('client_weight_unit', 'lbs')
if weight_unit == 'kg' and client_weight:
client_weight = client_weight * 2.20462
# Get backrest width from lines or default to seat_width
back_width = float(assessment_data.get('back_width', 0) or 0)
if not back_width:
back_width = seat_width
measurement_map = {
'seat_width': seat_width,
'seat_depth': seat_depth,
'back_width': back_width,
'back_height': back_height,
}
for rule in rules.sorted('sequence'):
if rule.equipment_type not in (equipment_type, 'both'):
continue
if rule.mutually_exclusive_group:
if rule.mutually_exclusive_group in exclusive_triggered:
continue
matched = False
if rule.trigger_type == 'measurement':
val = measurement_map.get(rule.measurement_field, 0)
if val:
matched = self._compare(val, rule.comparison, rule.threshold_value)
elif rule.trigger_type == 'weight':
if client_weight > rule.weight_min:
if not rule.weight_max or client_weight <= rule.weight_max:
matched = True
elif rule.trigger_type == 'dimension_mismatch':
val1 = measurement_map.get(rule.compare_field_1, 0)
val2 = measurement_map.get(rule.compare_field_2, 0)
if val1 and val2 and abs(val1 - val2) > 0.01:
matched = True
if matched:
# Look up ADP price
adp_device = request.env['fusion.adp.device.code'].sudo().search(
[('device_code', '=', rule.adp_device_code), ('active', '=', True)],
limit=1
)
triggered.append({
'rule_id': rule.id,
'name': rule.name,
'adp_device_code': rule.adp_device_code,
'adp_price': adp_device.adp_price if adp_device else 0,
'description': rule.description or '',
'measurement_field': rule.measurement_field or '',
})
if rule.mutually_exclusive_group:
exclusive_triggered[rule.mutually_exclusive_group] = True
return triggered
@http.route('/my/quotation/api/save_step', type='json',
auth='user', methods=['POST'])
def save_step(self, assessment_id=None, step=None, data=None, **kw):
"""Auto-save current step data as JSON for resume."""
if not assessment_id:
return {'success': False, 'error': 'No assessment ID'}
Assessment = request.env['fusion.wc.assessment'].sudo()
assessment = Assessment.browse(int(assessment_id))
if not assessment.exists() or assessment.sales_rep_id.id != request.env.user.id:
return {'success': False, 'error': 'Access denied'}
vals = {'current_step': int(step) if step else 1}
if data:
vals['form_data_json'] = json.dumps(data)
assessment.write(vals)
return {'success': True}
@staticmethod
def _compare(value, comparison, threshold):
if comparison == 'gt':
return value > threshold
elif comparison == 'gte':
return value >= threshold
elif comparison == 'lt':
return value < threshold
elif comparison == 'eq':
return abs(value - threshold) < 0.01
elif comparison == 'neq':
return abs(value - threshold) >= 0.01
return False
class QuotationPublicAPI(http.Controller):
"""Public (token-based) JSON API — mirrors QuotationAPI for unauthenticated access."""
def _validate_token(self, token):
assessment = request.env['fusion.wc.assessment'].sudo().search([
('access_token', '=', token),
], limit=1)
return assessment if assessment else None
# --- delegates ---------------------------------------------------------
# Each route validates the token then forwards to the existing API class.
_api = QuotationAPI()
@http.route('/quotation/api/<string:token>/search_clients',
type='json', auth='public', methods=['POST'])
def public_search_clients(self, token, **kw):
if not self._validate_token(token):
return {'error': 'invalid_token'}
return self._api.search_clients(**kw)
@http.route('/quotation/api/<string:token>/search_products',
type='json', auth='public', methods=['POST'])
def public_search_products(self, token, **kw):
if not self._validate_token(token):
return {'error': 'invalid_token'}
return self._api.search_products(**kw)
@http.route('/quotation/api/<string:token>/get_product_attributes',
type='json', auth='public', methods=['POST'])
def public_get_product_attributes(self, token, **kw):
if not self._validate_token(token):
return {'error': 'invalid_token'}
return self._api.get_product_attributes(**kw)
@http.route('/quotation/api/<string:token>/resolve_variant',
type='json', auth='public', methods=['POST'])
def public_resolve_variant(self, token, **kw):
if not self._validate_token(token):
return {'error': 'invalid_token'}
return self._api.resolve_variant(**kw)
@http.route('/quotation/api/<string:token>/get_section_options',
type='json', auth='public', methods=['POST'])
def public_get_section_options(self, token, **kw):
if not self._validate_token(token):
return {'error': 'invalid_token'}
return self._api.get_section_options(**kw)
@http.route('/quotation/api/<string:token>/check_upcharges',
type='json', auth='public', methods=['POST'])
def public_check_upcharges(self, token, **kw):
if not self._validate_token(token):
return {'error': 'invalid_token'}
return self._api.check_upcharges(**kw)
@http.route('/quotation/api/<string:token>/save_step',
type='json', auth='public', methods=['POST'])
def public_save_step(self, token, **kw):
assessment = self._validate_token(token)
if not assessment:
return {'error': 'invalid_token'}
# Override assessment_id from the token-linked assessment
kw['assessment_id'] = assessment.id
# Bypass user ownership check by handling directly
vals = {'current_step': int(kw.get('step', 1))}
if kw.get('data'):
vals['form_data_json'] = json.dumps(kw['data'])
assessment.sudo().write(vals)
return {'success': True}