Files
Odoo-Modules/fusion_projects/fusion_project_portal/controllers/portal.py
gsinghpal a2fe1fcbcc changes
2026-04-29 03:35:33 -04:00

481 lines
19 KiB
Python

from collections import defaultdict
from odoo import http, _
from odoo.exceptions import AccessError, MissingError
from odoo.http import request
from odoo.addons.project.controllers.portal import ProjectCustomerPortal
class FusionProjectCustomerPortal(ProjectCustomerPortal):
def _task_get_searchbar_sortings(self, milestones_allowed, project=False):
values = super()._task_get_searchbar_sortings(milestones_allowed, project)
values['sequence, id'] = {
'label': _('Manual'), 'order': 'sequence, id', 'sequence': 5,
}
return values
def _fp_project_card_data(self, projects):
"""Return {project_id: dict} with the rich stats the redesigned card needs.
Aggregations are done in two grouped reads (tasks + timesheets) so the
page stays cheap regardless of how many projects/tasks the user owns.
"""
if not projects:
return {}
Task = request.env['project.task'].sudo()
AAL = request.env['account.analytic.line'].sudo()
done_states = ('1_done', '03_approved')
has_alloc = 'allocated_hours' in Task._fields
has_effective = 'effective_hours' in Task._fields
task_data = defaultdict(lambda: {
'open': 0, 'done': 0, 'total': 0,
'alloc': 0.0, 'last_activity': False,
'assignees': set(),
'task_partners': set(),
})
task_fields = ['project_id', 'state', 'write_date', 'user_ids', 'partner_id']
if has_alloc:
task_fields.append('allocated_hours')
tasks = Task.search_read(
[('project_id', 'in', projects.ids), ('is_template', '=', False)],
task_fields,
)
for t in tasks:
pid = t['project_id'][0] if t.get('project_id') else False
if not pid:
continue
d = task_data[pid]
d['total'] += 1
if t['state'] in done_states:
d['done'] += 1
else:
d['open'] += 1
if has_alloc and t.get('allocated_hours'):
d['alloc'] += t['allocated_hours']
wd = t.get('write_date')
if wd and (not d['last_activity'] or wd > d['last_activity']):
d['last_activity'] = wd
for uid in t.get('user_ids') or []:
d['assignees'].add(uid)
if t.get('partner_id'):
d['task_partners'].add((t['partner_id'][0], t['partner_id'][1]))
# Timesheet hours per project (effective hours come from timesheets).
spent_by_project = defaultdict(float)
if has_effective:
ts_groups = AAL.read_group(
[('project_id', 'in', projects.ids)],
['project_id', 'unit_amount:sum'],
['project_id'],
)
for g in ts_groups:
if g.get('project_id'):
spent_by_project[g['project_id'][0]] = g.get('unit_amount', 0.0) or 0.0
# Resolve assignee names + avatars in one batch.
all_user_ids = set()
for d in task_data.values():
all_user_ids |= d['assignees']
users_by_id = {}
if all_user_ids:
for u in request.env['res.users'].sudo().browse(list(all_user_ids)):
users_by_id[u.id] = {
'id': u.id,
'name': u.name,
'initials': ''.join([p[:1].upper() for p in (u.name or '?').split()[:2]]) or '?',
}
result = {}
for project in projects:
d = task_data.get(project.id, {
'open': 0, 'done': 0, 'total': 0,
'alloc': 0.0, 'last_activity': False,
'assignees': set(), 'task_partners': set(),
})
spent = spent_by_project.get(project.id, 0.0)
pct = round(100.0 * d['done'] / d['total'], 0) if d['total'] else 0
if not project.active:
bucket = 'done'
elif d['open'] > 0:
bucket = 'active'
else:
bucket = 'idle'
assignees = [users_by_id[uid] for uid in d['assignees'] if uid in users_by_id]
assignees.sort(key=lambda u: u['name'])
# Customer label: prefer the project's customer; fall back to task
# customers when the project has none. If multiple distinct task
# customers, show a count rather than picking one arbitrarily.
if project.partner_id:
partner_name = project.partner_id.display_name
elif len(d['task_partners']) == 1:
partner_name = next(iter(d['task_partners']))[1]
elif len(d['task_partners']) > 1:
partner_name = _('%s customers') % len(d['task_partners'])
else:
partner_name = ''
result[project.id] = {
'open_count': d['open'],
'done_count': d['done'],
'total_count': d['total'],
'pct': pct,
'alloc_hours': d['alloc'],
'spent_hours': spent,
'last_activity': d['last_activity'] or project.write_date,
'assignees': assignees,
'bucket': bucket,
'partner_name': partner_name,
}
return result
def _prepare_searchbar_sortings(self):
sortings = super()._prepare_searchbar_sortings()
# Add an "activity" SQL sort. We don't add task_count / pct here because
# those are non-stored compute fields and would break the SQL ORDER BY.
# The redesigned page sorts those client-side.
sortings['activity'] = {'label': _('Last Activity'), 'order': 'write_date desc'}
return sortings
@http.route(['/my/projects', '/my/projects/page/<int:page>'],
type='http', auth='user', website=True)
def portal_my_projects(self, page=1, date_begin=None, date_end=None, sortby=None, **kw):
from odoo.addons.portal.controllers.portal import pager as portal_pager
Project = request.env['project.project']
values = self._prepare_portal_layout_values()
domain = self._prepare_project_domain()
searchbar_sortings = self._prepare_searchbar_sortings()
if not sortby or sortby not in searchbar_sortings:
sortby = 'name'
order = searchbar_sortings[sortby]['order']
if date_begin and date_end:
domain += [('create_date', '>', date_begin), ('create_date', '<=', date_end)]
project_count = Project.search_count(domain)
# Bigger page size than core's default — the redesign is meant to scan
# everything you have at a glance, not paginate two-at-a-time.
per_page = max(self._items_per_page, 40)
pager = portal_pager(
url='/my/projects',
url_args={'date_begin': date_begin, 'date_end': date_end, 'sortby': sortby},
total=project_count, page=page, step=per_page,
)
projects = Project.search(domain, order=order, limit=per_page, offset=pager['offset'])
request.session['my_projects_history'] = projects.ids[:100]
values.update({
'date': date_begin,
'date_end': date_end,
'projects': projects,
'page_name': 'project',
'default_url': '/my/projects',
'pager': pager,
'searchbar_sortings': searchbar_sortings,
'sortby': sortby,
'fp_project_cards': self._fp_project_card_data(projects),
'fp_groupby': (kw.get('groupby') or 'status').lower(),
})
return request.render('fusion_project_portal.portal_my_projects', values)
def _project_get_page_view_values(self, project, access_token, page=1, date_begin=None,
date_end=None, sortby=None, search=None,
search_in='content', groupby=None, **kwargs):
values = super()._project_get_page_view_values(
project, access_token, page, date_begin, date_end,
sortby, search, search_in, groupby, **kwargs,
)
values['can_create_task'] = self._fp_can_create_task(project)
groups = values.get('grouped_tasks')
depth_map = {}
if isinstance(groups, list):
new_groups = []
for tasks in groups:
if not tasks:
new_groups.append(tasks)
continue
ordered_ids = self._fp_hierarchical_order(tasks, depth_map)
new_groups.append(tasks.browse(ordered_ids))
values['grouped_tasks'] = new_groups
values['fp_task_depth'] = depth_map
return values
def _fp_hierarchical_order(self, tasks, depth_map):
task_ids = set(tasks.ids)
children = {tid: [] for tid in task_ids}
roots = []
for t in tasks:
if t.parent_id and t.parent_id.id in task_ids:
children[t.parent_id.id].append(t.id)
else:
roots.append((t.sequence, t.id))
roots.sort()
all_tasks_by_id = {t.id: t for t in tasks}
for tid in children:
children[tid].sort(key=lambda cid: (all_tasks_by_id[cid].sequence, cid))
order = []
def walk(tid, depth):
order.append(tid)
depth_map[tid] = depth
for child in children.get(tid, []):
walk(child, depth + 1)
for _seq, r in roots:
walk(r, 0)
return order
def _task_get_page_view_values(self, task, access_token, **kwargs):
values = super()._task_get_page_view_values(task, access_token, **kwargs)
descendants = []
depth_map = {}
Task = request.env['project.task'].sudo()
def walk(parent_id, depth):
children = Task.search(
[('parent_id', '=', parent_id)],
order='sequence, id',
)
for c in children:
descendants.append(c)
depth_map[c.id] = depth
walk(c.id, depth + 1)
walk(task.id, 0)
done_states = ('1_done', '03_approved')
total = len(descendants)
done = sum(1 for d in descendants if d.state in done_states)
allocated_field = 'allocated_hours' if 'allocated_hours' in Task._fields else (
'planned_hours' if 'planned_hours' in Task._fields else None
)
has_effective = 'effective_hours' in Task._fields
alloc_by_id = {}
spent_by_id = {}
for d in descendants:
alloc_by_id[d.id] = float(getattr(d, allocated_field, 0.0) or 0.0) if allocated_field else 0.0
spent_by_id[d.id] = float(d.effective_hours or 0.0) if has_effective else 0.0
total_alloc = sum(alloc_by_id.values())
total_spent = sum(spent_by_id.values())
values['fp_task_descendants'] = descendants
values['fp_task_depth_inside'] = depth_map
values['fp_can_create_task'] = self._fp_can_create_task(task.project_id)
values['fp_can_change_state'] = self._fp_can_change_state(task)
values['fp_priority_options'] = request.env['project.task']._fields['priority']._description_selection(request.env)
values['fp_done_states'] = done_states
values['fp_descendant_total'] = total
values['fp_descendant_done'] = done
values['fp_descendant_pct'] = round(100.0 * done / total, 1) if total else 0.0
values['fp_descendant_alloc_hours'] = total_alloc
values['fp_descendant_spent_hours'] = total_spent
values['fp_alloc_by_id'] = alloc_by_id
values['fp_spent_by_id'] = spent_by_id
timesheets = request.env['account.analytic.line'].sudo().search(
[('task_id', '=', task.id)],
order='date desc, id desc',
)
values['fp_timesheets'] = timesheets
values['fp_timesheets_total_hours'] = sum(timesheets.mapped('unit_amount'))
values['fp_state_options'] = task._fields['state']._description_selection(request.env)
values['fp_state_label'] = dict(values['fp_state_options']).get(task.state, task.state)
return values
def _fp_can_create_task(self, project):
user = request.env.user
if not project or not project.exists():
return False
if not user or user.id == request.env.ref('base.public_user').id:
return False
if user._is_internal():
return True
partner = user.partner_id
if not partner:
return False
if project.partner_id and (
partner == project.partner_id
or partner.parent_id == project.partner_id
or partner.commercial_partner_id == project.partner_id.commercial_partner_id
):
return True
follower = request.env['mail.followers'].sudo().search_count([
('res_model', '=', 'project.project'),
('res_id', '=', project.id),
('partner_id', '=', partner.id),
], limit=1)
return bool(follower)
def _fp_can_change_state(self, task):
"""Permissive gate for the customer status actions on a task page.
A portal user who can VIEW the task should be able to flip its state
(Request Changes / Approve / Mark Done). _fp_can_create_task is too
strict — it only passes the project's customer or a project follower.
Customers are often added to a single task as the task partner or as a
task follower, never on the project.
"""
user = request.env.user
if not task or not task.exists():
return False
if not user or user.id == request.env.ref('base.public_user').id:
return False
if user._is_internal():
return True
# Project-level checks (covers most customers)
if self._fp_can_create_task(task.project_id):
return True
partner = user.partner_id
if not partner:
return False
# Task-level partner / commercial-partner family
if task.partner_id and (
partner == task.partner_id
or partner.parent_id == task.partner_id
or partner.commercial_partner_id == task.partner_id.commercial_partner_id
):
return True
# Task followers
follower = request.env['mail.followers'].sudo().search_count([
('res_model', '=', 'project.task'),
('res_id', '=', task.id),
('partner_id', '=', partner.id),
], limit=1)
return bool(follower)
@http.route(
['/my/projects/<int:project_id>/task/<int:task_id>/state'],
type='http', auth='user', website=True, methods=['POST'],
csrf=True,
)
def fp_portal_task_set_state(self, project_id, task_id, **post):
try:
project_sudo = self._document_check_access('project.project', project_id)
except (AccessError, MissingError):
return request.redirect('/my')
task = request.env['project.task'].sudo().search([
('id', '=', task_id),
('project_id', '=', project_id),
], limit=1)
if not task:
return request.redirect(f'/my/projects/{project_id}')
if not self._fp_can_change_state(task):
return request.redirect(f'/my/projects/{project_id}/task/{task_id}')
new_state = post.get('state')
allowed = ['01_in_progress', '02_changes_requested', '03_approved', '1_done']
if new_state not in allowed:
return request.redirect(f'/my/projects/{project_id}/task/{task_id}')
comment = (post.get('comment') or '').strip()
partner = request.env.user.partner_id
labels = dict(task._fields['state']._description_selection(request.env))
label = labels.get(new_state, new_state)
body = f'Status set to <b>{label}</b> by {partner.name} from the customer portal.'
if comment:
body += f'<br/><br/><b>Comment:</b><br/>{comment}'
task.write({'state': new_state})
task.message_post(
body=body,
author_id=partner.id,
message_type='comment',
subtype_xmlid='mail.mt_comment',
)
return request.redirect(f'/my/projects/{project_id}/task/{task_id}')
@http.route(
[
'/my/projects/<int:project_id>/task/new',
'/my/projects/<int:project_id>/task/<int:parent_task_id>/subtask/new',
],
type='http', auth='user', website=True, methods=['GET', 'POST'],
csrf=True,
)
def fp_portal_task_new(self, project_id, parent_task_id=None, **post):
try:
project_sudo = self._document_check_access('project.project', project_id)
except (AccessError, MissingError):
return request.redirect('/my')
if not self._fp_can_create_task(project_sudo):
return request.redirect(f'/my/projects/{project_id}')
parent_task = None
if parent_task_id:
parent_task = request.env['project.task'].sudo().search([
('id', '=', parent_task_id),
('project_id', '=', project_id),
], limit=1)
if not parent_task:
return request.redirect(f'/my/projects/{project_id}')
priority_selection = request.env['project.task']._fields['priority']._description_selection(request.env)
render_values = {
'project': project_sudo,
'parent_task': parent_task,
'priority_options': priority_selection,
'errors': {},
'values': {},
'page_name': 'fp_new_task',
}
if request.httprequest.method == 'POST':
name = (post.get('name') or '').strip()
description = (post.get('description') or '').strip()
priority = post.get('priority') or '0'
if priority not in dict(priority_selection):
priority = '0'
errors = {}
if not name:
errors['name'] = 'Please enter a title.'
if errors:
if parent_task:
return request.redirect(f'/my/projects/{project_id}/task/{parent_task.id}')
render_values.update({'errors': errors, 'values': post})
return request.render('fusion_project_portal.portal_new_task_form', render_values)
partner = request.env.user.partner_id
vals = {
'name': name,
'description': description,
'priority': priority,
'project_id': project_sudo.id,
'partner_id': project_sudo.partner_id.id or partner.id,
'message_partner_ids': [(4, partner.id)],
}
if parent_task:
vals['parent_id'] = parent_task.id
vals['display_in_project'] = False
task = request.env['project.task'].sudo().create(vals)
task.sudo().message_post(
body=f'Submitted from customer portal by {partner.name}.',
author_id=partner.id,
message_type='comment',
subtype_xmlid='mail.mt_comment',
)
if parent_task:
return request.redirect(f'/my/projects/{project_id}/task/{parent_task.id}')
return request.redirect(f'/my/projects/{project_id}/task/{task.id}')
return request.render('fusion_project_portal.portal_new_task_form', render_values)