feat(fusion_clock): planning -> native data migration [A8]

post-migrate(19.0.5.0.0) -> fusion.clock.schedule._fclk_port_planning_data:
planning.role -> fusion.clock.role, employee default/allowed roles, and
planning.slot -> fusion.clock.schedule (local date+float, role map, posted
if published, open if unassigned). Guarded (no-op on Community), idempotent
(marker), per-row savepoints. Integration test runs on Enterprise clones.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
gsinghpal
2026-06-04 20:58:13 -04:00
parent 3376a32143
commit d35d5f4b34
3 changed files with 159 additions and 0 deletions

View File

@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# One-time port of Odoo Enterprise Planning data into the native Fusion Clock
# models, so a deployment that previously used the `planning` bridge keeps all
# its roles, employee role assignments and shifts after `planning` /
# `fusion_planning` are removed.
#
# Guarded: a no-op on Community / fresh installs where planning data is absent.
# Idempotent: a marker param stops it re-running.
import logging
from odoo import api, SUPERUSER_ID
_logger = logging.getLogger(__name__)
_MARKER = 'fusion_clock.planning_migrated'
def migrate(cr, version):
"""Port Odoo Planning data into the native models, once. The heavy lifting
lives in fusion.clock.schedule._fclk_port_planning_data so it can be unit
tested on an Enterprise clone where planning is installed."""
env = api.Environment(cr, SUPERUSER_ID, {})
ICP = env['ir.config_parameter'].sudo()
if ICP.get_param(_MARKER):
_logger.info("Fusion Clock: planning data already migrated; skipping.")
return
counts = env['fusion.clock.schedule'].sudo()._fclk_port_planning_data()
ICP.set_param(_MARKER, '1')
_logger.info("Fusion Clock: planning -> native migration done: %s", counts)

View File

@@ -516,6 +516,83 @@ class FusionClockSchedule(models.Model):
notified += 1
return posted, notified
@api.model
def _fclk_port_planning_data(self):
"""Port Odoo Planning data (roles, employee roles, slots) into the
native models. Safe no-op when planning is not installed. Returns a
dict of counts. Called by the 19.0.5.0.0 migration and by tests."""
import pytz
counts = {'roles': 0, 'employees': 0, 'slots': 0, 'skipped': 0}
env = self.env
has_roles = 'planning.role' in env
has_slots = 'planning.slot' in env
if not has_roles and not has_slots:
return counts
Role = env['fusion.clock.role'].sudo()
role_map = {}
if has_roles:
for prole in env['planning.role'].sudo().with_context(active_test=False).search([]):
target = Role.with_context(active_test=False).search(
[('name', '=ilike', prole.name)], limit=1) or Role.create({
'name': prole.name, 'color': prole.color or 1, 'active': prole.active})
role_map[prole.id] = target.id
counts['roles'] = len(role_map)
Employee = env['hr.employee'].sudo().with_context(active_test=False)
for emp in Employee.search([]):
vals = {}
if emp._fields.get('default_planning_role_id') and emp.default_planning_role_id:
mapped = role_map.get(emp.default_planning_role_id.id)
if mapped:
vals['x_fclk_default_role_id'] = mapped
if emp._fields.get('planning_role_ids') and emp.planning_role_ids:
mapped_ids = [role_map[r.id] for r in emp.planning_role_ids if r.id in role_map]
if mapped_ids:
vals['x_fclk_role_ids'] = [(6, 0, mapped_ids)]
if vals:
emp.write(vals)
counts['employees'] += 1
if has_slots:
Schedule = self.sudo()
for slot in env['planning.slot'].sudo().search([], order='start_datetime'):
if not slot.start_datetime or not slot.end_datetime:
counts['skipped'] += 1
continue
employee = slot.employee_id if 'employee_id' in slot._fields else False
tz_name = ((employee.tz if employee else False)
or (slot.resource_id.tz if slot.resource_id else False)
or env.company.partner_id.tz or 'UTC')
try:
tz = pytz.timezone(tz_name)
except Exception:
tz = pytz.UTC
local_start = pytz.utc.localize(slot.start_datetime).astimezone(tz)
local_end = pytz.utc.localize(slot.end_datetime).astimezone(tz)
span_hours = (slot.end_datetime - slot.start_datetime).total_seconds() / 3600.0
allocated = slot.allocated_hours if 'allocated_hours' in slot._fields else span_hours
vals = {
'employee_id': employee.id if employee else False,
'is_open': not bool(employee),
'schedule_date': local_start.date(),
'start_time': round(local_start.hour + local_start.minute / 60.0, 2),
'end_time': round(local_end.hour + local_end.minute / 60.0, 2),
'break_minutes': round(max(0.0, span_hours - (allocated or span_hours)) * 60.0, 0),
'role_id': role_map.get(slot.role_id.id) if slot.role_id else False,
'note': slot.name or False,
'state': 'posted' if slot.state == 'published' else 'draft',
}
with env.cr.savepoint():
try:
Schedule.create(vals)
counts['slots'] += 1
except Exception as exc:
counts['skipped'] += 1
_logger.warning("Fusion Clock: skip planning.slot %s (%s).", slot.id, exc)
return counts
class FusionClockScheduleAudit(models.Model):
_name = 'fusion.clock.schedule.audit'

View File

@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# Integration test for the planning -> native port. Runs only where Odoo
# Planning is installed (Enterprise); a no-op skip on Community / local dev.
from datetime import datetime
from odoo.tests import tagged
from odoo.tests.common import TransactionCase
@tagged('-at_install', 'post_install', 'fusion_clock')
class TestPlanningMigration(TransactionCase):
def test_port_planning_data(self):
if 'planning.slot' not in self.env:
self.skipTest('planning not installed (Community / local dev)')
prole = self.env['planning.role'].create({'name': 'PortLead', 'color': 5})
emp = self.env['hr.employee'].create({'name': 'Porty McPort'})
if 'default_planning_role_id' in emp._fields:
emp.default_planning_role_id = prole.id
self.env['planning.slot'].create({
'resource_id': emp.resource_id.id,
'company_id': emp.company_id.id,
'start_datetime': datetime(2026, 6, 1, 14, 0, 0),
'end_datetime': datetime(2026, 6, 1, 22, 0, 0),
'role_id': prole.id,
'state': 'published',
})
counts = self.env['fusion.clock.schedule']._fclk_port_planning_data()
self.assertGreaterEqual(counts['roles'], 1)
self.assertTrue(
self.env['fusion.clock.role'].search([('name', '=ilike', 'PortLead')]),
"planning.role should be ported to a native fusion.clock.role")
emp.invalidate_recordset()
if 'default_planning_role_id' in emp._fields:
self.assertTrue(emp.x_fclk_default_role_id,
"employee default planning role should be ported")
sched = self.env['fusion.clock.schedule'].search([('employee_id', '=', emp.id)])
self.assertTrue(sched, "published planning.slot should become a native schedule row")
self.assertEqual(sched[0].state, 'posted',
"published slots port as posted schedule entries")