fix(fp.job): post-shop state machine entech smoke fixes (Task 23)

Three bugs caught + fixed during entech battle test:

1. _fp_check_finish_gates calling button_mark_done triggered the
   step-completion gate prematurely (step still in_progress at
   pre-super time). Pass fp_skip_step_gate=True alongside
   fp_check_gates_only — we know the operator is about to finish
   the last open step.

2. _fp_schedule_cert_activity used env.get('fp.notification.template')
   for presence check. env.get returns an EMPTY recordset (falsy),
   not None — 'if not Template: return' silently exited and no
   activity was ever scheduled. Switch to 'in self.env' check
   pattern + explicit indexing. CLAUDE.md Rule 24.

3. _fp_check_advance_after_cert_issue + _fp_check_regress_after_cert_void
   used 'state != issued' as outstanding-cert count. This made
   voided certs count as outstanding forever, so void+re-issue
   cycles never re-advanced. Switch to per-type coverage check:
   each required cert TYPE needs at least one issued cert.
   Regress mirrors: only fire if a type loses all issued certs.

CLAUDE.md gains Rule 24 (env.get falsy empty recordset trap).
Rule 25 (mail.template parse-time validation) renumbered.

Battle test ALL PASS on entech admin DB:
  10/10 steps green — auto-advance, kanban placement, activity
  schedule + auto-resolve, ACL guard, cert issue advance, void
  regress, re-issue advance, manual ship.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gsinghpal
2026-05-25 10:45:35 -04:00
parent 8b14466da2
commit a2e254b934
2 changed files with 57 additions and 22 deletions

View File

@@ -2286,15 +2286,19 @@ class FpJob(models.Model):
The trick: re-uses button_mark_done's gate logic but short-
circuits BEFORE the state flip via the fp_check_gates_only
context flag (honoured in button_mark_done below).
context flag.
IMPORTANT: pass fp_skip_step_gate=True. At pre-super time the
current step is STILL in_progress (we're about to finish it
but super().button_finish hasn't fired yet), so button_mark_done's
step-completion gate would always fail with "1/1 step is not
finished". The step gate is structurally wrong for this caller;
the bake/qty/QC gates are not. Bit us on entech smoke test.
"""
self.ensure_one()
# Pass through with context flag; button_mark_done will run all
# its gates and then exit before flipping state. The actual
# state transition (in_progress → awaiting_cert/ship) is owned
# by _fp_check_advance_post_shop running AFTER super().button_finish.
self.with_context(
fp_check_gates_only=True,
fp_skip_step_gate=True,
).button_mark_done()
def _fp_check_advance_post_shop(self):
@@ -2335,8 +2339,17 @@ class FpJob(models.Model):
def _fp_check_advance_after_cert_issue(self):
"""Called from fp.certificate.action_issue. If every required
cert for this job is now `issued`, advance awaiting_cert →
awaiting_ship. Idempotent — safe to call repeatedly.
cert TYPE for this job has at least one `issued` cert, advance
awaiting_cert → awaiting_ship. Idempotent — safe to call
repeatedly.
Semantics chosen: per-TYPE coverage, not per-CERT exhaustion.
A previously-voided cert (state='voided') of the same type is
irrelevant — the operator's intent on void was "this attempt
is invalid"; a fresh `issued` cert of that type satisfies the
requirement. Counting voided certs as outstanding would block
the advance after a void+re-issue cycle (caught on entech
smoke test 2026-05-25).
"""
for job in self:
if job.state != 'awaiting_cert':
@@ -2353,12 +2366,14 @@ class FpJob(models.Model):
job._fp_resolve_cert_activities()
continue
Cert = self.env['fp.certificate'].sudo()
outstanding = Cert.search_count([
# Per-type coverage: every required cert type must have at
# least ONE cert in state=issued. Voided certs are ignored.
covered_types = set(Cert.search([
('x_fc_job_id', '=', job.id),
('certificate_type', 'in', list(required)),
('state', '!=', 'issued'),
])
if outstanding == 0:
('state', '=', 'issued'),
]).mapped('certificate_type'))
if required.issubset(covered_types):
job.state = 'awaiting_ship'
job._fp_create_delivery()
if hasattr(job, '_fp_resolve_cert_activities'):
@@ -2366,9 +2381,14 @@ class FpJob(models.Model):
def _fp_check_regress_after_cert_void(self):
"""Called from fp.certificate.write when state=voided. If a
previously-issued cert is no longer issued, slide the job back
to awaiting_cert so it reappears in Final Inspection and the
QM is re-notified.
required cert TYPE has lost coverage (no remaining issued
cert), slide the job back to awaiting_cert so it reappears in
Final Inspection and the QM is re-notified.
Per-type coverage (mirror of _fp_check_advance_after_cert_issue):
voiding ONE cert only regresses if it was the only issued cert
of its type. If a sibling issued cert still covers the type,
coverage holds and no regress fires.
"""
for job in self:
if job.state != 'awaiting_ship':
@@ -2379,12 +2399,12 @@ class FpJob(models.Model):
if not required:
continue
Cert = self.env['fp.certificate'].sudo()
outstanding = Cert.search_count([
covered_types = set(Cert.search([
('x_fc_job_id', '=', job.id),
('certificate_type', 'in', list(required)),
('state', '!=', 'issued'),
])
if outstanding > 0:
('state', '=', 'issued'),
]).mapped('certificate_type'))
if not required.issubset(covered_types):
job.state = 'awaiting_cert'
job._fp_fire_notification('cert_voided_re_notify')
if hasattr(job, '_fp_schedule_cert_activity'):
@@ -2411,9 +2431,15 @@ class FpJob(models.Model):
)
if existing:
return
Template = self.env.get('fp.notification.template')
if not Template or not hasattr(
Template, '_fp_resolve_cert_authority_users'):
# env.get('model.name') returns an EMPTY recordset when the
# model exists but has no records — empty recordsets are
# falsy in Python, so `if not Template: return` exits early
# even when the model IS registered. Use the membership check
# instead. Bit us on entech smoke test 2026-05-25.
if 'fp.notification.template' not in self.env:
return
Template = self.env['fp.notification.template']
if not hasattr(Template, '_fp_resolve_cert_authority_users'):
return
qms = Template.sudo()._fp_resolve_cert_authority_users(self)
if not qms: