# -*- coding: utf-8 -*- import base64 import csv import io import logging from datetime import date, datetime from markupsafe import escape as html_escape from odoo import api, fields, models, _ from odoo.exceptions import UserError, ValidationError _logger = logging.getLogger(__name__) # Fuzzy matching aliases for auto-detecting CSV columns EMPLOYEE_FIELD_ALIASES = { 'name': ['employee name', 'full name', 'name', 'employee'], 'first_name': ['first name', 'first', 'given name', 'prenom'], 'last_name': ['last name', 'last', 'surname', 'family name', 'nom'], 'sin_number': ['sin', 's.i.n.', 'social insurance number', 'social insurance', 'social ins #', 'sin number'], 'home_street': ['street', 'address', 'street address', 'address line 1', 'address 1', 'home street'], 'home_street2': ['address line 2', 'address 2', 'apt', 'suite', 'unit'], 'home_city': ['city', 'town', 'home city'], 'home_province': ['province', 'prov', 'state', 'province/territory', 'prov/terr', 'home province'], 'home_postal_code': ['postal code', 'postal', 'zip', 'zip code', 'postcode', 'home postal code'], 'hire_date': ['hire date', 'start date', 'date of hire', 'employment start', 'hired'], 'pay_type': ['pay type', 'type', 'compensation type', 'hourly/salary', 'pay method'], 'hourly_rate': ['hourly rate', 'rate', 'rate per hour', 'hourly wage', 'hourly pay', 'pay rate'], 'salary_amount': ['salary', 'annual salary', 'salary amount', 'yearly salary', 'annual pay'], 'pay_schedule': ['pay frequency', 'pay schedule', 'frequency', 'pay period', 'payroll frequency'], 'federal_td1_amount': ['federal td1', 'td1 federal', 'federal claim', 'federal basic personal', 'fed td1', 'federal personal amount'], 'provincial_claim_amount': ['provincial td1', 'td1 provincial', 'provincial claim', 'provincial basic personal', 'prov td1'], 'federal_additional_tax': ['additional tax', 'additional federal tax', 'extra tax', 'td1 additional'], 'vacation_rate': ['vacation rate', 'vacation %', 'vacation percent', 'vac rate', 'vac %'], 'payment_method': ['payment method', 'pay method', 'direct deposit', 'payment type'], 'employee_number': ['employee id', 'employee number', 'emp id', 'emp #', 'emp number', 'employee #'], 't4_dental_code': ['dental code', 'dental benefits', 't4 dental', 'dental benefits code'], 'email': ['email', 'email address', 'e-mail'], 'phone': ['phone', 'telephone', 'phone number', 'mobile'], } PAYSLIP_FIELD_ALIASES = { 'employee_name': ['employee', 'employee name', 'name', 'full name'], 'employee_id_ref': ['employee id', 'emp id', 'emp #', 'employee number', 'emp number'], 'pay_date': ['pay date', 'cheque date', 'payment date', 'date', 'check date'], 'period_start': ['period start', 'pay period start', 'period from', 'start date', 'from'], 'period_end': ['period end', 'pay period end', 'period to', 'end date', 'to'], 'regular_hours': ['regular hours', 'hours', 'hrs', 'hours worked', 'total hours'], 'BASIC': ['regular pay', 'base pay', 'salary', 'basic pay', 'regular earnings', 'base salary', 'wages'], 'OT_PAY': ['overtime', 'overtime pay', 'ot pay', 'ot', 'overtime earnings'], 'STAT_PAY': ['stat holiday', 'stat pay', 'statutory holiday', 'stat holiday pay', 'public holiday pay'], 'VAC_PAY': ['vacation pay', 'vacation', 'vac pay', 'holiday pay'], 'BONUS_PAY': ['bonus', 'bonus pay', 'bonuses'], 'COMMISSION': ['commission', 'commissions', 'commission pay', 'sales commission'], 'RETRO_PAY': ['retro pay', 'retroactive', 'retroactive pay', 'retro'], 'SHIFT_PREMIUM': ['shift premium', 'shift differential', 'shift diff'], 'GROSS': ['gross pay', 'gross', 'total gross', 'gross earnings', 'total earnings'], 'RRSP': ['rrsp', 'rrsp deduction', 'registered retirement', 'rrsp contribution'], 'UNION_DUES': ['union dues', 'union', 'dues', 'union deduction'], 'CPP_EE': ['cpp', 'canada pension', 'canada pension plan', 'c.p.p.', 'cpp employee', 'cpp deduction'], 'CPP2_EE': ['cpp2', 'second cpp', 'cpp2 employee', 'second canada pension'], 'EI_EE': ['ei', 'employment insurance', 'e.i.', 'ei employee', 'ei premium', 'ei deduction'], 'FED_TAX': ['federal tax', 'fed tax', 'federal income tax', 'federal', 'income tax federal'], 'PROV_TAX': ['provincial tax', 'prov tax', 'provincial income tax', 'provincial', 'ontario tax', 'income tax provincial'], 'OHP': ['ohp', 'ontario health premium', 'health premium', 'ont health'], 'CPP_ER': ['cpp employer', 'employer cpp', 'cpp er'], 'CPP2_ER': ['cpp2 employer', 'employer cpp2', 'cpp2 er'], 'EI_ER': ['ei employer', 'employer ei', 'ei er'], 'NET': ['net pay', 'net', 'take home', 'net earnings', 'total net'], 'cheque_number': ['cheque #', 'cheque number', 'check #', 'check number', 'chq #'], } T4_FIELD_ALIASES = { 'employee_name': ['employee', 'employee name', 'name'], 'sin_number': ['sin', 's.i.n.', 'social insurance number'], 'tax_year': ['year', 'tax year', 'taxation year'], 'box_14': ['box 14', 'employment income', 'box14', 'total income'], 'box_16': ['box 16', 'cpp', 'cpp contributions', 'box16'], 'box_16a': ['box 16a', 'cpp2', 'cpp2 contributions', 'box16a'], 'box_18': ['box 18', 'ei premiums', 'ei', 'box18'], 'box_20': ['box 20', 'rpp', 'rrsp', 'pension', 'box20'], 'box_22': ['box 22', 'income tax', 'tax deducted', 'box22'], 'box_24': ['box 24', 'ei insurable', 'insurable earnings', 'box24'], 'box_26': ['box 26', 'cpp pensionable', 'pensionable earnings', 'box26'], 'box_44': ['box 44', 'union dues', 'union', 'box44'], } def _fuzzy_match_column(header, alias_dict, threshold=0.8): """Match a CSV header to a field using alias dictionary.""" header_lower = header.strip().lower() best_match = None best_score = 0 for field_name, aliases in alias_dict.items(): for alias in aliases: alias_lower = alias.lower() if header_lower == alias_lower: return field_name # Substring containment scoring if alias_lower in header_lower or header_lower in alias_lower: max_len = max(len(header_lower), len(alias_lower)) if max_len == 0: continue length_ratio = min(len(header_lower), len(alias_lower)) / max_len if length_ratio > best_score and length_ratio >= threshold: best_score = length_ratio best_match = field_name return best_match class FusionPayrollMigration(models.Model): _name = 'fusion.payroll.migration' _description = 'Payroll Migration Session' _order = 'create_date desc' _inherit = ['mail.thread'] name = fields.Char( string='Reference', default=lambda self: _('New Migration'), required=True, ) company_id = fields.Many2one( 'res.company', string='Company', required=True, default=lambda self: self.env.company, ) state = fields.Selection([ ('setup', 'Company Setup'), ('employees', 'Employee Import'), ('employee_map', 'Map Employee Columns'), ('employee_preview', 'Preview Employees'), ('payslips', 'Payslip Import'), ('payslip_map', 'Map Payslip Columns'), ('payslip_preview', 'Preview Payslips'), ('ytd', 'YTD Verification'), ('t4', 'T4 History'), ('t4_map', 'Map T4 Columns'), ('reconcile', 'Reconciliation'), ('done', 'Complete'), ], string='Step', default='setup', tracking=True) migration_type = fields.Selection([ ('full', 'Full History (import individual payslips)'), ('ytd_only', 'YTD Only (summary balances, no payslip detail)'), ], string='Migration Type', default='full') cutoff_date = fields.Date( string='Migration Cutoff Date', help='Last payslip date processed in QuickBooks', ) pay_schedule = fields.Selection([ ('weekly', 'Weekly'), ('biweekly', 'Bi-Weekly'), ('semi_monthly', 'Semi-Monthly'), ('monthly', 'Monthly'), ], string='Pay Schedule', default='biweekly') # File uploads employee_csv = fields.Binary(string='Employee CSV File', attachment=False) employee_csv_filename = fields.Char() payslip_csv = fields.Binary(string='Payslip History CSV File', attachment=False) payslip_csv_filename = fields.Char() t4_csv = fields.Binary(string='T4 History CSV File', attachment=False) t4_csv_filename = fields.Char() ytd_csv = fields.Binary(string='YTD Balances CSV File', attachment=False) ytd_csv_filename = fields.Char() # Mapping lines employee_mapping_ids = fields.One2many( 'fusion.payroll.migration.mapping.line', 'migration_id', string='Employee Column Mapping', domain=[('mapping_type', '=', 'employee')], ) payslip_mapping_ids = fields.One2many( 'fusion.payroll.migration.mapping.line', 'migration_id', string='Payslip Column Mapping', domain=[('mapping_type', '=', 'payslip')], ) t4_mapping_ids = fields.One2many( 'fusion.payroll.migration.mapping.line', 'migration_id', string='T4 Column Mapping', domain=[('mapping_type', '=', 't4')], ) # Import results employee_count = fields.Integer(string='Employees Imported', default=0) payslip_count = fields.Integer(string='Payslips Imported', default=0) t4_count = fields.Integer(string='T4 Slips Imported', default=0) warning_count = fields.Integer(string='Warnings', default=0) error_count = fields.Integer(string='Errors', default=0) log_ids = fields.One2many( 'fusion.payroll.migration.log', 'migration_id', string='Import Log', ) # Preview data (stored as text for display) preview_html = fields.Html(string='Preview', sanitize=False) reconciliation_html = fields.Html(string='Reconciliation Report', sanitize=False) # --- CSV PARSING HELPERS --- def _parse_csv(self, csv_binary): """Parse a binary CSV field into a list of dicts.""" if not csv_binary: raise UserError(_('No CSV file uploaded.')) try: csv_text = base64.b64decode(csv_binary).decode('utf-8-sig') except UnicodeDecodeError: csv_text = base64.b64decode(csv_binary).decode('latin-1') reader = csv.DictReader(io.StringIO(csv_text)) rows = list(reader) if not rows: raise UserError(_('The CSV file is empty or has no data rows.')) return rows, list(reader.fieldnames) if reader.fieldnames else [] def _auto_map_columns(self, headers, alias_dict, mapping_type): """Auto-map CSV headers to known fields and create mapping lines.""" self.ensure_one() MappingLine = self.env['fusion.payroll.migration.mapping.line'] # Clear old mapping lines of this type self.env['fusion.payroll.migration.mapping.line'].search([ ('migration_id', '=', self.id), ('mapping_type', '=', mapping_type), ]).unlink() for header in headers: matched_field = _fuzzy_match_column(header, alias_dict) MappingLine.create({ 'migration_id': self.id, 'mapping_type': mapping_type, 'csv_column': header, 'fusion_field': matched_field or '', 'is_mapped': bool(matched_field), 'is_skipped': not bool(matched_field), }) def _get_mapping_dict(self, mapping_type): """Get {csv_column: fusion_field} dict from mapping lines.""" self.ensure_one() lines = self.env['fusion.payroll.migration.mapping.line'].search([ ('migration_id', '=', self.id), ('mapping_type', '=', mapping_type), ('is_skipped', '=', False), ('fusion_field', '!=', ''), ]) return {l.csv_column: l.fusion_field for l in lines} def _log(self, level, message, row_num=0): """Add a log entry.""" self.env['fusion.payroll.migration.log'].create({ 'migration_id': self.id, 'level': level, 'message': message, 'row_number': row_num, }) if level == 'error': self.error_count += 1 elif level == 'warning': self.warning_count += 1 # --- STATE TRANSITIONS --- def action_next_step(self): """Advance to the next step based on current state.""" self.ensure_one() state_flow = [ 'setup', 'employees', 'employee_map', 'employee_preview', 'payslips', 'payslip_map', 'payslip_preview', 'ytd', 't4', 't4_map', 'reconcile', 'done', ] idx = state_flow.index(self.state) if idx < len(state_flow) - 1: self.state = state_flow[idx + 1] return self._reopen() def action_prev_step(self): """Go back one step, respecting skipped states.""" self.ensure_one() skip_map = { 'reconcile': 't4', 'ytd': 'employee_preview' if self.migration_type == 'ytd_only' else 'payslip_preview', 't4': 'ytd', } if self.state in skip_map: self.state = skip_map[self.state] else: state_flow = [ 'setup', 'employees', 'employee_map', 'employee_preview', 'payslips', 'payslip_map', 'payslip_preview', 'ytd', 't4', 't4_map', 'reconcile', 'done', ] idx = state_flow.index(self.state) if idx > 0: self.state = state_flow[idx - 1] return self._reopen() def _reopen(self): """Reopen the form view for this migration.""" return { 'type': 'ir.actions.act_window', 'res_model': self._name, 'res_id': self.id, 'view_mode': 'form', 'target': 'current', } def action_skip_to_ytd(self): """Skip payslip import (YTD-only mode).""" self.ensure_one() self.state = 'ytd' return self._reopen() def action_skip_t4(self): """Skip T4 import step.""" self.ensure_one() self.state = 'reconcile' self._compute_reconciliation() return self._reopen() def action_complete(self): """Mark migration as done.""" self.ensure_one() self.state = 'done' self.message_post(body=_('Migration completed: %d employees, %d payslips, %d T4 slips imported.') % ( self.employee_count, self.payslip_count, self.t4_count, )) action = self.env.ref('fusion_payroll.action_fusion_dashboard_employees', raise_if_not_found=False) if action: return action.read()[0] return self._reopen() # --- STEP 2: EMPLOYEE IMPORT --- def action_upload_employee_csv(self): """Parse uploaded employee CSV and auto-map columns.""" self.ensure_one() rows, headers = self._parse_csv(self.employee_csv) self._auto_map_columns(headers, EMPLOYEE_FIELD_ALIASES, 'employee') self.state = 'employee_map' self._log('info', _('Employee CSV uploaded: %d rows, %d columns detected.') % (len(rows), len(headers))) return self._reopen() def action_preview_employees(self): """Show preview of first 5 employee rows with mapped fields.""" self.ensure_one() rows, headers = self._parse_csv(self.employee_csv) mapping = self._get_mapping_dict('employee') preview_rows = rows[:5] html = '' for fusion_field in mapping.values(): html += '' % fusion_field html += '' for row in preview_rows: html += '' for csv_col, fusion_field in mapping.items(): val = row.get(csv_col, '') html += '' % html_escape(val) html += '' html += '
%s
%s
' html += '

Showing first %d of %d rows.

' % (len(preview_rows), len(rows)) self.preview_html = html self.state = 'employee_preview' return self._reopen() def action_import_employees(self): """Create hr.employee + hr.contract records from CSV.""" self.ensure_one() self.error_count = 0 self.warning_count = 0 rows, _ = self._parse_csv(self.employee_csv) mapping = self._get_mapping_dict('employee') inv_map = {v: k for k, v in mapping.items()} structure = self.env.ref('fusion_payroll.hr_payroll_structure_canada', raise_if_not_found=False) struct_type = self.env.ref('fusion_payroll.structure_type_canada', raise_if_not_found=False) Employee = self.env['hr.employee'] Contract = self.env['hr.contract'] imported = 0 PROVINCE_MAP = { 'alberta': 'AB', 'british columbia': 'BC', 'manitoba': 'MB', 'new brunswick': 'NB', 'newfoundland': 'NL', 'newfoundland and labrador': 'NL', 'nova scotia': 'NS', 'northwest territories': 'NT', 'nunavut': 'NU', 'ontario': 'ON', 'prince edward island': 'PE', 'quebec': 'QC', 'saskatchewan': 'SK', 'yukon': 'YT', } for i, row in enumerate(rows, start=2): try: def g(field): col = inv_map.get(field) return (row.get(col, '') or '').strip() if col else '' # Build employee name name = g('name') if not name: first = g('first_name') last = g('last_name') name = ('%s %s' % (first, last)).strip() if not name: self._log('error', _('Row %d: No employee name found, skipping.') % i, i) continue # SIN sin = g('sin_number').replace('-', '').replace(' ', '') # Check for existing employee by SIN existing = False if sin and len(sin) == 9: existing = Employee.search([('sin_number', '=', sin), ('company_id', '=', self.company_id.id)], limit=1) # Province mapping VALID_PROVS = {'AB', 'BC', 'MB', 'NB', 'NL', 'NS', 'NT', 'NU', 'ON', 'PE', 'QC', 'SK', 'YT'} prov = g('home_province').strip() if len(prov) > 2: prov = PROVINCE_MAP.get(prov.lower(), '') if prov: prov = prov.upper() if prov not in VALID_PROVS: if prov: self._log('warning', _('Row %d: Unknown province "%s", defaulting to ON.') % (i, prov), i) prov = 'ON' # Pay type pay_type_raw = g('pay_type').lower() pay_type = 'salary' if 'sal' in pay_type_raw else 'hourly' # Pay schedule sched_raw = g('pay_schedule').lower() sched_map = { 'weekly': 'weekly', 'week': 'weekly', 'bi-weekly': 'biweekly', 'biweekly': 'biweekly', 'bi weekly': 'biweekly', 'semi-monthly': 'semi_monthly', 'semi monthly': 'semi_monthly', 'semimonthly': 'semi_monthly', 'monthly': 'monthly', 'month': 'monthly', } pay_schedule = 'biweekly' for key, val in sched_map.items(): if key in sched_raw: pay_schedule = val break # Parse monetary values def parse_money(s): if not s: return 0.0 return float(s.replace('$', '').replace(',', '').replace('(', '-').replace(')', '').strip() or '0') def parse_date(s): if not s: return False for fmt in ('%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%Y/%m/%d', '%m-%d-%Y'): try: return datetime.strptime(s.strip(), fmt).date() except ValueError: continue return False vals = { 'name': name, 'company_id': self.company_id.id, 'sin_number': sin or False, 'home_street': g('home_street') or False, 'home_street2': g('home_street2') or False, 'home_city': g('home_city') or False, 'home_province': prov or 'ON', 'home_postal_code': g('home_postal_code') or False, 'hire_date': parse_date(g('hire_date')), 'pay_type': pay_type, 'pay_schedule': pay_schedule, 'hourly_rate': parse_money(g('hourly_rate')) if pay_type == 'hourly' else 0, 'salary_amount': parse_money(g('salary_amount')) if pay_type == 'salary' else 0, 'federal_td1_amount': parse_money(g('federal_td1_amount')) or 16452, 'provincial_claim_amount': parse_money(g('provincial_claim_amount')) or 0, 'federal_additional_tax': parse_money(g('federal_additional_tax')), 'vacation_rate': float(g('vacation_rate') or '4'), 'employee_number': g('employee_number') or False, } payment = g('payment_method').lower() if 'direct' in payment or 'deposit' in payment: vals['payment_method'] = 'direct_deposit' elif 'cheque' in payment or 'check' in payment: vals['payment_method'] = 'cheque' dental = g('t4_dental_code') if dental and dental[0] in ('1', '2', '3', '4', '5'): vals['t4_dental_code'] = dental[0] email_val = g('email') if email_val: vals['work_email'] = email_val phone_val = g('phone') if phone_val: vals['work_phone'] = phone_val if existing: existing.write(vals) emp = existing self._log('info', _('Row %d: Updated existing employee %s (SIN match).') % (i, name), i) else: emp = Employee.create(vals) self._log('info', _('Row %d: Created employee %s.') % (i, name), i) # Create contract if none exists if not emp.contract_id: contract_vals = { 'name': _('Contract - %s') % name, 'employee_id': emp.id, 'company_id': self.company_id.id, 'state': 'open', 'date_start': vals.get('hire_date') or date.today(), } if pay_type == 'hourly': contract_vals['wage'] = vals['hourly_rate'] * 2080 / 12 if 'wage_type' in Contract._fields: contract_vals['wage_type'] = 'hourly' if 'hourly_wage' in Contract._fields: contract_vals['hourly_wage'] = vals['hourly_rate'] else: contract_vals['wage'] = vals['salary_amount'] / 12 if vals['salary_amount'] else 0 if struct_type: contract_vals['structure_type_id'] = struct_type.id Contract.create(contract_vals) imported += 1 except Exception as e: self._log('error', _('Row %d: Error importing employee: %s') % (i, str(e)), i) self.employee_count = imported self.state = 'payslips' if self.migration_type == 'full' else 'ytd' self.preview_html = False self._log('info', _('Employee import complete: %d employees imported.') % imported) return self._reopen() # --- STEP 3: PAYSLIP HISTORY IMPORT --- def action_upload_payslip_csv(self): """Parse uploaded payslip CSV and auto-map columns.""" self.ensure_one() rows, headers = self._parse_csv(self.payslip_csv) self._auto_map_columns(headers, PAYSLIP_FIELD_ALIASES, 'payslip') self.state = 'payslip_map' self._log('info', _('Payslip CSV uploaded: %d rows, %d columns.') % (len(rows), len(headers))) return self._reopen() def action_preview_payslips(self): """Show preview of payslip import with per-employee summaries.""" self.ensure_one() rows, _ = self._parse_csv(self.payslip_csv) mapping = self._get_mapping_dict('payslip') inv_map = {v: k for k, v in mapping.items()} employee_totals = {} for row in rows: def g(field): col = inv_map.get(field) return (row.get(col, '') or '').strip() if col else '' emp_name = g('employee_name') or g('employee_id_ref') or 'Unknown' if emp_name not in employee_totals: employee_totals[emp_name] = {'rows': 0, 'gross': 0, 'net': 0, 'cpp': 0, 'ei': 0, 'tax': 0} def pm(f): v = g(f) if not v: return 0.0 return abs(float(v.replace('$', '').replace(',', '').replace('(', '-').replace(')', '').strip() or '0')) employee_totals[emp_name]['rows'] += 1 employee_totals[emp_name]['gross'] += pm('GROSS') employee_totals[emp_name]['net'] += pm('NET') employee_totals[emp_name]['cpp'] += pm('CPP_EE') employee_totals[emp_name]['ei'] += pm('EI_EE') employee_totals[emp_name]['tax'] += pm('FED_TAX') + pm('PROV_TAX') html = '' html += '' html += '' for emp, t in sorted(employee_totals.items()): html += '' % ( emp, t['rows'], f"{t['gross']:,.2f}", f"{t['cpp']:,.2f}", f"{t['ei']:,.2f}", f"{t['tax']:,.2f}", f"{t['net']:,.2f}") html += '
EmployeePayslipsGrossCPPEITaxNet
%s%d$%s$%s$%s$%s$%s
' html += '

%d total payslips for %d employees.

' % (len(rows), len(employee_totals)) self.preview_html = html self.state = 'payslip_preview' return self._reopen() def action_import_payslips(self): """Create hr.payslip + hr.payslip.line records from CSV.""" self.ensure_one() self.error_count = 0 self.warning_count = 0 rows, _ = self._parse_csv(self.payslip_csv) mapping = self._get_mapping_dict('payslip') inv_map = {v: k for k, v in mapping.items()} structure = self.env.ref('fusion_payroll.hr_payroll_structure_canada', raise_if_not_found=False) Employee = self.env['hr.employee'] Payslip = self.env['hr.payslip'] PayslipLine = self.env['hr.payslip.line'] EARNINGS_CODES = {'BASIC', 'OT_PAY', 'STAT_PAY', 'VAC_PAY', 'BONUS_PAY', 'COMMISSION', 'RETRO_PAY', 'SHIFT_PREMIUM'} DEDUCTION_CODES = {'RRSP', 'UNION_DUES', 'CPP_EE', 'CPP2_EE', 'EI_EE', 'FED_TAX', 'PROV_TAX', 'OHP'} EMPLOYER_CODES = {'CPP_ER', 'CPP2_ER', 'EI_ER'} ALL_LINE_CODES = EARNINGS_CODES | DEDUCTION_CODES | EMPLOYER_CODES | {'GROSS', 'NET'} # Build employee lookup by name and employee number employees = Employee.search([('company_id', '=', self.company_id.id)]) emp_by_name = {e.name.lower().strip(): e for e in employees} emp_by_number = {} for e in employees: if e.employee_number: emp_by_number[e.employee_number.strip()] = e # Get salary rules for line creation rules = {} if structure: for rule in self.env['hr.salary.rule'].search([('struct_id', '=', structure.id)]): rules[rule.code] = rule imported = 0 for i, row in enumerate(rows, start=2): try: def g(field): col = inv_map.get(field) return (row.get(col, '') or '').strip() if col else '' def parse_money(s): if not s: return 0.0 s = s.replace('$', '').replace(',', '').replace('(', '-').replace(')', '').strip() return float(s or '0') def parse_date(s): if not s: return False for fmt in ('%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y', '%Y/%m/%d', '%m-%d-%Y'): try: return datetime.strptime(s.strip(), fmt).date() except ValueError: continue return False # Find employee emp_name = g('employee_name') emp_ref = g('employee_id_ref') emp = None if emp_ref and emp_ref in emp_by_number: emp = emp_by_number[emp_ref] elif emp_name: emp = emp_by_name.get(emp_name.lower().strip()) if not emp: self._log('warning', _('Row %d: Employee "%s" not found, skipping.') % (i, emp_name or emp_ref), i) continue pay_date = parse_date(g('pay_date')) period_start = parse_date(g('period_start')) period_end = parse_date(g('period_end')) if not pay_date: self._log('error', _('Row %d: No pay date found, skipping.') % i, i) continue if not period_start: period_start = pay_date.replace(day=1) if not period_end: period_end = pay_date # Create payslip slip_vals = { 'name': _('QB Import - %s - %s') % (emp.name, pay_date), 'employee_id': emp.id, 'company_id': self.company_id.id, 'date_from': period_start, 'date_to': period_end, 'state': 'done', 'contract_id': emp.contract_id.id if emp.contract_id else False, } if structure: slip_vals['struct_id'] = structure.id slip = Payslip.create(slip_vals) # Create payslip lines for each mapped salary rule code for code in ALL_LINE_CODES: amount = parse_money(g(code)) if amount == 0: continue # Deductions should be negative if code in DEDUCTION_CODES and amount > 0: amount = -amount rule = rules.get(code) if not rule: rule = self.env['hr.salary.rule'].search([('code', '=', code)], limit=1) if not rule: self._log('warning', _('Row %d: No salary rule for code %s, skipping line.') % (i, code), i) continue line_vals = { 'slip_id': slip.id, 'name': rule.name, 'code': code, 'amount': abs(amount), 'quantity': 1, 'rate': 100 if amount >= 0 else -100, 'total': amount, 'salary_rule_id': rule.id, 'category_id': rule.category_id.id, } PayslipLine.create(line_vals) # Cheque number chq = g('cheque_number') if chq: slip.write({'memo': _('QB Cheque #%s') % chq}) imported += 1 except Exception as e: self._log('error', _('Row %d: Error importing payslip: %s') % (i, str(e)), i) self.payslip_count = imported self.state = 'ytd' self.preview_html = False self._log('info', _('Payslip import complete: %d payslips imported.') % imported) return self._reopen() # --- STEP 4: YTD VERIFICATION --- def action_compute_ytd_preview(self): """Compute YTD totals from imported payslips and display.""" self.ensure_one() cutoff = self.cutoff_date or date.today() year_start = cutoff.replace(month=1, day=1) employees = self.env['hr.employee'].search([('company_id', '=', self.company_id.id)]) html = '' html += '' html += '' for emp in employees: payslips = self.env['hr.payslip'].search([ ('employee_id', '=', emp.id), ('state', 'in', ['done', 'paid']), ('date_from', '>=', year_start), ('date_to', '<=', cutoff), ]) totals = {'GROSS': 0, 'CPP_EE': 0, 'EI_EE': 0, 'FED_TAX': 0, 'PROV_TAX': 0, 'NET': 0} for slip in payslips: for line in slip.line_ids: if line.code in totals: totals[line.code] += abs(line.total or 0) html += '' % ( emp.name, f"{totals['GROSS']:,.2f}", f"{totals['CPP_EE']:,.2f}", f"{totals['EI_EE']:,.2f}", f"{totals['FED_TAX']:,.2f}", f"{totals['PROV_TAX']:,.2f}", f"{totals['NET']:,.2f}") html += '
EmployeeYTD GrossYTD CPPYTD EIYTD Fed TaxYTD Prov TaxYTD Net
%s$%s$%s$%s$%s$%s$%s
' self.preview_html = html return self._reopen() def action_import_ytd_balances(self): """Import YTD opening balance payslips from CSV (for YTD-only mode).""" self.ensure_one() if not self.ytd_csv: self.state = 't4' return self._reopen() rows, headers = self._parse_csv(self.ytd_csv) structure = self.env.ref('fusion_payroll.hr_payroll_structure_canada', raise_if_not_found=False) Employee = self.env['hr.employee'] employees = Employee.search([('company_id', '=', self.company_id.id)]) emp_by_name = {e.name.lower().strip(): e for e in employees} cutoff = self.cutoff_date or date.today() year_start = cutoff.replace(month=1, day=1) imported = 0 for i, row in enumerate(rows, start=2): try: emp_name = (row.get('Employee', '') or row.get('employee', '') or row.get('Name', '') or row.get('name', '')).strip() emp = emp_by_name.get(emp_name.lower()) if emp_name else None if not emp: self._log('warning', _('YTD Row %d: Employee "%s" not found.') % (i, emp_name), i) continue def pm(key): for k in (key, key.lower(), key.upper(), key.replace('_', ' ')): val = row.get(k, '') if val: return abs(float(str(val).replace('$', '').replace(',', '').strip() or '0')) return 0.0 slip = self.env['hr.payslip'].create({ 'name': _('YTD Opening Balance - %s') % emp.name, 'employee_id': emp.id, 'company_id': self.company_id.id, 'date_from': year_start, 'date_to': cutoff, 'state': 'done', 'struct_id': structure.id if structure else False, 'contract_id': emp.contract_id.id if emp.contract_id else False, }) ytd_lines = { 'GROSS': pm('Gross') or pm('YTD Gross') or pm('GROSS'), 'CPP_EE': -(pm('CPP') or pm('YTD CPP') or pm('CPP_EE')), 'CPP2_EE': -(pm('CPP2') or pm('YTD CPP2') or pm('CPP2_EE')), 'EI_EE': -(pm('EI') or pm('YTD EI') or pm('EI_EE')), 'FED_TAX': -(pm('Federal Tax') or pm('YTD Fed Tax') or pm('FED_TAX')), 'PROV_TAX': -(pm('Provincial Tax') or pm('YTD Prov Tax') or pm('PROV_TAX')), 'NET': pm('Net') or pm('YTD Net') or pm('NET'), } rules = {} if structure: for rule in self.env['hr.salary.rule'].search([('struct_id', '=', structure.id)]): rules[rule.code] = rule for code, amount in ytd_lines.items(): if amount == 0: continue rule = rules.get(code) if not rule: rule = self.env['hr.salary.rule'].search([('code', '=', code)], limit=1) if not rule: self._log('warning', _('YTD: No salary rule for code %s, skipping.') % code) continue self.env['hr.payslip.line'].create({ 'slip_id': slip.id, 'name': rule.name, 'code': code, 'amount': abs(amount), 'quantity': 1, 'rate': 100 if amount >= 0 else -100, 'total': amount, 'salary_rule_id': rule.id, 'category_id': rule.category_id.id, }) imported += 1 except Exception as e: self._log('error', _('YTD Row %d: %s') % (i, str(e)), i) self._log('info', _('YTD opening balances created for %d employees.') % imported) self.state = 't4' return self._reopen() def action_confirm_ytd(self): """Confirm YTD totals and move to T4 step.""" self.ensure_one() self.state = 't4' self.preview_html = False return self._reopen() # --- STEP 5: T4 IMPORT --- def action_upload_t4_csv(self): """Parse uploaded T4 CSV and auto-map columns.""" self.ensure_one() rows, headers = self._parse_csv(self.t4_csv) self._auto_map_columns(headers, T4_FIELD_ALIASES, 't4') self.state = 't4_map' self._log('info', _('T4 CSV uploaded: %d rows, %d columns.') % (len(rows), len(headers))) return self._reopen() def action_import_t4(self): """Create T4 summary + slip records from CSV.""" self.ensure_one() rows, _ = self._parse_csv(self.t4_csv) mapping = self._get_mapping_dict('t4') inv_map = {v: k for k, v in mapping.items()} Employee = self.env['hr.employee'] employees = Employee.search([('company_id', '=', self.company_id.id)]) emp_by_name = {e.name.lower().strip(): e for e in employees} T4Summary = self.env['hr.t4.summary'] T4Slip = self.env['hr.t4.slip'] summaries = {} imported = 0 for i, row in enumerate(rows, start=2): try: def g(f): col = inv_map.get(f) return (row.get(col, '') or '').strip() if col else '' def pm(f): v = g(f) if not v: return 0.0 return float(v.replace('$', '').replace(',', '').strip() or '0') emp_name = g('employee_name') emp = emp_by_name.get(emp_name.lower().strip()) if emp_name else None if not emp: self._log('warning', _('T4 Row %d: Employee "%s" not found.') % (i, emp_name), i) continue year = int(g('tax_year') or '0') if not year: self._log('error', _('T4 Row %d: No tax year.') % i, i) continue if year not in summaries: existing = T4Summary.search([ ('company_id', '=', self.company_id.id), ('tax_year', '=', year), ], limit=1) if existing: summaries[year] = existing else: summaries[year] = T4Summary.create({ 'company_id': self.company_id.id, 'tax_year': year, 'state': 'generated', }) T4Slip.create({ 'summary_id': summaries[year].id, 'employee_id': emp.id, 'employment_income': pm('box_14'), 'cpp_employee': pm('box_16'), 'cpp2_employee': pm('box_16a'), 'ei_employee': pm('box_18'), 'box_20_rpp': pm('box_20'), 'income_tax': pm('box_22'), 'ei_insurable_earnings': pm('box_24'), 'cpp_pensionable_earnings': pm('box_26'), 'box_44_union_dues': pm('box_44'), }) imported += 1 except Exception as e: self._log('error', _('T4 Row %d: %s') % (i, str(e)), i) self.t4_count = imported self._log('info', _('T4 import complete: %d slips imported.') % imported) self.state = 'reconcile' self._compute_reconciliation() return self._reopen() # --- STEP 6: RECONCILIATION --- def _compute_reconciliation(self): """Build reconciliation HTML report.""" self.ensure_one() cutoff = self.cutoff_date or date.today() employees = self.env['hr.employee'].search([('company_id', '=', self.company_id.id)]) payslips = self.env['hr.payslip'].search([ ('company_id', '=', self.company_id.id), ('state', 'in', ['done', 'paid']), ('name', 'like', 'QB Import%'), ]) grand = {'gross': 0, 'cpp': 0, 'ei': 0, 'tax': 0, 'net': 0} for slip in payslips: for line in slip.line_ids: code = line.code or '' amt = abs(line.total or 0) if code == 'GROSS': grand['gross'] += amt elif code == 'CPP_EE': grand['cpp'] += amt elif code == 'EI_EE': grand['ei'] += amt elif code in ('FED_TAX', 'PROV_TAX', 'OHP'): grand['tax'] += amt elif code == 'NET': grand['net'] += amt html = '
' html += '

Migration Summary

' html += '' html += '' % self.employee_count html += '' % self.payslip_count html += '' % self.t4_count html += '' % self.warning_count html += '' % self.error_count html += '
Employees Imported%d
Payslips Imported%d
T4 Slips Imported%d
Warnings%d
Errors%d
' html += '

Grand Totals (All Payslips)

' html += '' html += '' % f"{grand['gross']:,.2f}" html += '' % f"{grand['cpp']:,.2f}" html += '' % f"{grand['ei']:,.2f}" html += '' % f"{grand['tax']:,.2f}" html += '' % f"{grand['net']:,.2f}" html += '
Total Gross$%s
Total CPP$%s
Total EI$%s
Total Tax$%s
Total Net$%s
' self.reconciliation_html = html class FusionPayrollMigrationLog(models.Model): _name = 'fusion.payroll.migration.log' _description = 'Migration Log Entry' _order = 'id desc' migration_id = fields.Many2one('fusion.payroll.migration', required=True, ondelete='cascade') level = fields.Selection([ ('info', 'Info'), ('warning', 'Warning'), ('error', 'Error'), ], string='Level', default='info') message = fields.Text(string='Message') row_number = fields.Integer(string='CSV Row') create_date = fields.Datetime(string='Time') class FusionPayrollMigrationMappingLine(models.Model): _name = 'fusion.payroll.migration.mapping.line' _description = 'Migration Column Mapping Line' _order = 'sequence' migration_id = fields.Many2one('fusion.payroll.migration', required=True, ondelete='cascade') mapping_type = fields.Selection([ ('employee', 'Employee'), ('payslip', 'Payslip'), ('t4', 'T4'), ], string='Type', required=True) sequence = fields.Integer(default=10) csv_column = fields.Char(string='CSV Column', required=True) fusion_field = fields.Char(string='Maps To') is_mapped = fields.Boolean(string='Mapped', default=False) is_skipped = fields.Boolean(string='Skip', default=False)