diff --git a/fusion_clock/migrations/19.0.5.0.0/post-migrate.py b/fusion_clock/migrations/19.0.5.0.0/post-migrate.py new file mode 100644 index 00000000..c5005e5a --- /dev/null +++ b/fusion_clock/migrations/19.0.5.0.0/post-migrate.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 Nexa Systems Inc. +# License OPL-1 (Odoo Proprietary License v1.0) +# +# One-time port of Odoo Enterprise Planning data into the native Fusion Clock +# models, so a deployment that previously used the `planning` bridge keeps all +# its roles, employee role assignments and shifts after `planning` / +# `fusion_planning` are removed. +# +# Guarded: a no-op on Community / fresh installs where planning data is absent. +# Idempotent: a marker param stops it re-running. + +import logging + +from odoo import api, SUPERUSER_ID + +_logger = logging.getLogger(__name__) + +_MARKER = 'fusion_clock.planning_migrated' + + +def migrate(cr, version): + """Port Odoo Planning data into the native models, once. The heavy lifting + lives in fusion.clock.schedule._fclk_port_planning_data so it can be unit + tested on an Enterprise clone where planning is installed.""" + env = api.Environment(cr, SUPERUSER_ID, {}) + ICP = env['ir.config_parameter'].sudo() + if ICP.get_param(_MARKER): + _logger.info("Fusion Clock: planning data already migrated; skipping.") + return + counts = env['fusion.clock.schedule'].sudo()._fclk_port_planning_data() + ICP.set_param(_MARKER, '1') + _logger.info("Fusion Clock: planning -> native migration done: %s", counts) diff --git a/fusion_clock/models/clock_schedule.py b/fusion_clock/models/clock_schedule.py index 65d6b67f..5e15e7c0 100644 --- a/fusion_clock/models/clock_schedule.py +++ b/fusion_clock/models/clock_schedule.py @@ -516,6 +516,83 @@ class FusionClockSchedule(models.Model): notified += 1 return posted, notified + @api.model + def _fclk_port_planning_data(self): + """Port Odoo Planning data (roles, employee roles, slots) into the + native models. Safe no-op when planning is not installed. Returns a + dict of counts. Called by the 19.0.5.0.0 migration and by tests.""" + import pytz + + counts = {'roles': 0, 'employees': 0, 'slots': 0, 'skipped': 0} + env = self.env + has_roles = 'planning.role' in env + has_slots = 'planning.slot' in env + if not has_roles and not has_slots: + return counts + + Role = env['fusion.clock.role'].sudo() + role_map = {} + if has_roles: + for prole in env['planning.role'].sudo().with_context(active_test=False).search([]): + target = Role.with_context(active_test=False).search( + [('name', '=ilike', prole.name)], limit=1) or Role.create({ + 'name': prole.name, 'color': prole.color or 1, 'active': prole.active}) + role_map[prole.id] = target.id + counts['roles'] = len(role_map) + + Employee = env['hr.employee'].sudo().with_context(active_test=False) + for emp in Employee.search([]): + vals = {} + if emp._fields.get('default_planning_role_id') and emp.default_planning_role_id: + mapped = role_map.get(emp.default_planning_role_id.id) + if mapped: + vals['x_fclk_default_role_id'] = mapped + if emp._fields.get('planning_role_ids') and emp.planning_role_ids: + mapped_ids = [role_map[r.id] for r in emp.planning_role_ids if r.id in role_map] + if mapped_ids: + vals['x_fclk_role_ids'] = [(6, 0, mapped_ids)] + if vals: + emp.write(vals) + counts['employees'] += 1 + + if has_slots: + Schedule = self.sudo() + for slot in env['planning.slot'].sudo().search([], order='start_datetime'): + if not slot.start_datetime or not slot.end_datetime: + counts['skipped'] += 1 + continue + employee = slot.employee_id if 'employee_id' in slot._fields else False + tz_name = ((employee.tz if employee else False) + or (slot.resource_id.tz if slot.resource_id else False) + or env.company.partner_id.tz or 'UTC') + try: + tz = pytz.timezone(tz_name) + except Exception: + tz = pytz.UTC + local_start = pytz.utc.localize(slot.start_datetime).astimezone(tz) + local_end = pytz.utc.localize(slot.end_datetime).astimezone(tz) + span_hours = (slot.end_datetime - slot.start_datetime).total_seconds() / 3600.0 + allocated = slot.allocated_hours if 'allocated_hours' in slot._fields else span_hours + vals = { + 'employee_id': employee.id if employee else False, + 'is_open': not bool(employee), + 'schedule_date': local_start.date(), + 'start_time': round(local_start.hour + local_start.minute / 60.0, 2), + 'end_time': round(local_end.hour + local_end.minute / 60.0, 2), + 'break_minutes': round(max(0.0, span_hours - (allocated or span_hours)) * 60.0, 0), + 'role_id': role_map.get(slot.role_id.id) if slot.role_id else False, + 'note': slot.name or False, + 'state': 'posted' if slot.state == 'published' else 'draft', + } + with env.cr.savepoint(): + try: + Schedule.create(vals) + counts['slots'] += 1 + except Exception as exc: + counts['skipped'] += 1 + _logger.warning("Fusion Clock: skip planning.slot %s (%s).", slot.id, exc) + return counts + class FusionClockScheduleAudit(models.Model): _name = 'fusion.clock.schedule.audit' diff --git a/fusion_clock/tests/test_planning_migration.py b/fusion_clock/tests/test_planning_migration.py new file mode 100644 index 00000000..e8fba4b4 --- /dev/null +++ b/fusion_clock/tests/test_planning_migration.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +# Copyright 2026 Nexa Systems Inc. +# License OPL-1 (Odoo Proprietary License v1.0) +# +# Integration test for the planning -> native port. Runs only where Odoo +# Planning is installed (Enterprise); a no-op skip on Community / local dev. + +from datetime import datetime + +from odoo.tests import tagged +from odoo.tests.common import TransactionCase + + +@tagged('-at_install', 'post_install', 'fusion_clock') +class TestPlanningMigration(TransactionCase): + + def test_port_planning_data(self): + if 'planning.slot' not in self.env: + self.skipTest('planning not installed (Community / local dev)') + + prole = self.env['planning.role'].create({'name': 'PortLead', 'color': 5}) + emp = self.env['hr.employee'].create({'name': 'Porty McPort'}) + if 'default_planning_role_id' in emp._fields: + emp.default_planning_role_id = prole.id + self.env['planning.slot'].create({ + 'resource_id': emp.resource_id.id, + 'company_id': emp.company_id.id, + 'start_datetime': datetime(2026, 6, 1, 14, 0, 0), + 'end_datetime': datetime(2026, 6, 1, 22, 0, 0), + 'role_id': prole.id, + 'state': 'published', + }) + + counts = self.env['fusion.clock.schedule']._fclk_port_planning_data() + + self.assertGreaterEqual(counts['roles'], 1) + self.assertTrue( + self.env['fusion.clock.role'].search([('name', '=ilike', 'PortLead')]), + "planning.role should be ported to a native fusion.clock.role") + + emp.invalidate_recordset() + if 'default_planning_role_id' in emp._fields: + self.assertTrue(emp.x_fclk_default_role_id, + "employee default planning role should be ported") + + sched = self.env['fusion.clock.schedule'].search([('employee_id', '=', emp.id)]) + self.assertTrue(sched, "published planning.slot should become a native schedule row") + self.assertEqual(sched[0].state, 'posted', + "published slots port as posted schedule entries")