feat(jobs): seed orders via fp.direct.order.wizard (estimator path)

Adds 8-12 orders that originate from the direct order entry wizard
(used by estimators for bulk entry without quotation flow) instead
of plain sale.order create. Exercises the wizard's
action_create_order() method which builds the SO with all the
x_fc_* header fields, then we confirm to fire _fp_auto_create_job
in one step.

Each wizard creates 1-3 lines with realistic part/coating combos,
treatments, surface area, deadlines, and the wo_group_tag flag
(30% chance) to test multi-line job collapsing. Mixes po_pending
(30%) and PO-doc orders, plus a spread of invoice strategies
(deposit / progress / net_terms / cod_prepay).

Orders distribute across confirmed / in_progress_mid / delivered
/ invoiced / paid states, reusing the state-advancement pattern
from seed_workflow_states.py.

Verified on entech: 10/11 orders created (one invoice failure on a
SO with no invoiceable lines, handled gracefully via savepoint).
22 fp.job records generated across confirmed / in_progress / done.

Part of: native job model migration (spec 2026-04-25)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gsinghpal
2026-04-25 10:02:52 -04:00
parent 128d51755d
commit 009a0b5e10

View File

@@ -0,0 +1,777 @@
# -*- coding: utf-8 -*-
# Copyright 2026 Nexa Systems Inc.
# License OPL-1 (Odoo Proprietary License v1.0)
#
# seed_direct_orders.py
# =====================
# Creates 8-12 sale orders that originate from the estimator's
# direct-order-entry path -- i.e. via fp.direct.order.wizard.action_create_order
# -- instead of plain sale.order.create. This exercises the wizard
# code path which currently has zero seeded data.
#
# The wizard:
# - Validates PO# / PO doc OR po_pending flag (we use po_pending for some)
# - Creates the SO in DRAFT state with one SO line per wizard line
# - Returns an action with res_id pointing at the new SO
# - Does NOT auto-confirm (Sub 1 deliberately removed auto-confirm)
#
# So this script:
# 1. Builds a wizard with realistic header fields + 1-3 lines
# 2. Calls action_create_order() to materialise the draft SO
# 3. Calls so.action_confirm() to fire job creation (the ON-confirm
# _fp_auto_create_job hook builds the fp.job + steps)
# 4. Optionally advances the resulting job/SO across workflow states,
# reusing the helpers from seed_workflow_states.py
#
# Distribution of 8-12 orders across states (matches client request):
# - 3 stay at "Confirmed / Job just generated steps"
# - 3 advance to "Job In Progress (mid)"
# - 2 advance to "Job Done / Delivery Scheduled"
# - 2 advance all the way to "Delivered + Invoice Posted"
# - 1-2 advance to "Paid"
# Total = 11-12 orders.
#
# Each order is wrapped in its own savepoint -- failure on one doesn't
# nuke the whole run. Savepoint names are alphanumeric only because
# Postgres rejects parens/dots in identifiers.
#
# Usage: see scripts/README.md.
from datetime import datetime, timedelta
import base64
import random
import logging
random.seed(2027)
_logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------- #
# Combo / context helpers (mirror seed_workflow_states.py) #
# ---------------------------------------------------------------------- #
def _build_combos(env):
"""List of (partner, part, coating) tuples that have a recipe."""
combos = []
parts = env["fp.part.catalog"].search([
("x_fc_default_coating_config_id", "!=", False),
("x_fc_default_coating_config_id.recipe_id", "!=", False),
("partner_id", "!=", False),
])
for p in parts:
combos.append((p.partner_id, p, p.x_fc_default_coating_config_id))
random.shuffle(combos)
return combos
def _operators(env):
g = env.ref("fusion_plating.group_fusion_plating_operator",
raise_if_not_found=False)
if not g:
return env["res.users"]
return env["res.users"].search([("all_group_ids", "in", g.id)])
def _managers(env):
g = env.ref("fusion_plating.group_fusion_plating_manager",
raise_if_not_found=False)
if not g:
return env["res.users"]
return env["res.users"].search([("all_group_ids", "in", g.id)])
def _employees(env):
return env["hr.employee"].search([])
def _resolve_payment_term(env):
pt = env["account.payment.term"].search(
[("name", "=", "30 Days")], limit=1)
if not pt:
pt = env["account.payment.term"].search([], limit=1)
return pt
def _resolve_journals(env):
sales = env["account.journal"].search([("type", "=", "sale")], limit=1)
bank = env["account.journal"].search([("type", "=", "bank")], limit=1)
return sales, bank
def _resolve_facility(env):
return env["fusion.plating.facility"].search([], limit=1)
def _selection_values(model, fname):
"""Return the list of valid keys for a Selection field, or []."""
fld = model._fields.get(fname)
if not fld or fld.type != "selection":
return []
sel = fld.selection
if callable(sel):
try:
sel = sel(model)
except Exception:
return []
return [k for (k, _label) in sel] if sel else []
# ---------------------------------------------------------------------- #
# Wizard build #
# ---------------------------------------------------------------------- #
def _pick_treatments(env, n):
"""Return a recordset of n treatments (or empty if treatments missing)."""
Treatment = env.get("fp.treatment")
if Treatment is None:
return env["fp.treatment"].browse([]) if "fp.treatment" in env else None
pool = env["fp.treatment"].search([], limit=20)
if not pool or n <= 0:
return env["fp.treatment"].browse([])
n = min(n, len(pool))
picked = random.sample(list(pool), n)
return env["fp.treatment"].browse([t.id for t in picked])
def _build_wizard(env, partner, lines_data, ctx, idx):
"""Build a fp.direct.order.wizard with realistic header + lines.
`lines_data` is a list of dicts, one per line, each with keys:
part, coating, quantity, unit_price, ...
"""
Wizard = env["fp.direct.order.wizard"].sudo()
WLine = env["fp.direct.order.line"].sudo()
addrs = partner.address_get(["invoice", "delivery"])
cust_dl = (datetime.now() + timedelta(days=random.randint(7, 30))).date()
int_dl = cust_dl - timedelta(days=3)
plan_start = (datetime.now() + timedelta(days=random.randint(1, 5))).date()
po_exp = cust_dl - timedelta(days=random.randint(2, 7))
delivery_methods = _selection_values(Wizard, "delivery_method")
invoice_strategies = _selection_values(Wizard, "invoice_strategy")
notes_pool = [
"Direct entry by estimator -- repeat customer, standard ENP per "
"AMS-2404. Rush capacity if available.",
"Customer phoned in PO -- bulk re-order of last month's run. Use "
"same recipe, same masking. Confirm thickness on first piece.",
"Standing order -- expedite if over 10% of capacity is free. "
"Mask threads as before. CoC + thickness report required.",
"Estimator entry, customer requires same-day acknowledgment. "
"Watch for hex / barrel mix on the racks.",
"Direct re-order, shipper to call ahead before pickup. Pack in "
"original boxes. No partial shipments.",
]
po_pending = (random.random() < 0.30)
has_po_doc = (random.random() < 0.40) if not po_pending else False
wiz_vals = {
"partner_id": partner.id,
"partner_invoice_id": addrs.get("invoice") or partner.id,
"partner_shipping_id": addrs.get("delivery") or partner.id,
"customer_job_number": "CJN-D%04d" % idx,
"planned_start_date": plan_start,
"internal_deadline": int_dl,
"customer_deadline": cust_dl,
"is_blanket_order": (random.random() < 0.20),
"block_partial_shipments": (random.random() < 0.30),
"po_pending": po_pending,
"po_expected_date": po_exp if po_pending else False,
"po_number": False if po_pending else "PO-D%04d" % idx,
"notes": random.choice(notes_pool),
}
if delivery_methods:
wiz_vals["delivery_method"] = random.choice(delivery_methods)
if invoice_strategies:
strat = random.choice(invoice_strategies)
wiz_vals["invoice_strategy"] = strat
if strat == "deposit":
wiz_vals["deposit_percent"] = random.choice([15.0, 25.0, 33.0])
elif strat == "progress":
wiz_vals["progress_initial_percent"] = random.choice(
[40.0, 50.0, 60.0])
# Attach a fake PO doc if we need one
if has_po_doc and not po_pending:
fake_pdf = b"%PDF-1.4 fake po placeholder for seed data\n%%EOF\n"
wiz_vals["po_attachment_file"] = base64.b64encode(fake_pdf).decode()
wiz_vals["po_attachment_filename"] = "po_seed_%04d.pdf" % idx
elif not po_pending and not has_po_doc:
# Wizard requires either a PO doc OR po_pending -- force a doc
# if we got here with neither. Better to attach than to fail.
fake_pdf = b"%PDF-1.4 fake po placeholder for seed data\n%%EOF\n"
wiz_vals["po_attachment_file"] = base64.b64encode(fake_pdf).decode()
wiz_vals["po_attachment_filename"] = "po_seed_%04d.pdf" % idx
wizard = Wizard.create(wiz_vals)
# Build a shared wo_group_tag for ~30% of orders so multiple lines
# roll up into one job (tests the multi-line-collapse path)
use_group_tag = (len(lines_data) > 1) and (random.random() < 0.30)
group_tag = "G%d" % random.randint(1, 9) if use_group_tag else False
surface_area_uoms = _selection_values(WLine, "surface_area_uom")
line_descs = [
"Mask threads, ENP per AMS-2404 Class 4. Pack in vendor boxes.",
"Standard ENP, 0.0005-0.001 inch thickness. Bake 4hr @ 400F.",
"Re-work job: strip + replate. Verify base before activation.",
"Heavy duty ENP, mid-phos. Mask all threaded holes per drawing.",
"Light ENP barrier, mil-spec. Customer requires CoC + thickness.",
]
int_descs = [
"Mask 1/4-20 threads. ENP per AMS-2404 Class 4 mid-phos. "
"Watch for racking marks.",
"Standard alkaline EN bath. Target 0.0005 in. Spot-check 5 pcs "
"with Fischerscope before bake.",
"Strip in nitric, neutralise, activate. Replate to drawing spec. "
"First-piece check required.",
"Heavy ENP -- expect 6+ hr in tank. Mask blind holes per "
"engineering note. Bake 4hr @ 400F.",
"Light barrier coat for corrosion. CoC + thickness report on "
"delivery. No exceptions on cleanliness.",
]
wo_descs = [
"ENP plating, mask threads, pack in vendor boxes.",
"Standard ENP run, mid-phos, bake 4hr.",
"Strip + replate, verify base material first.",
"Heavy ENP, 6+hr tank, mask all blind holes.",
"Light ENP barrier, full QC pack-out.",
]
for ld in lines_data:
part = ld["part"]
coating = ld["coating"]
qty = ld["quantity"]
price = ld["unit_price"]
treatments = _pick_treatments(env, random.randint(0, 2))
line_vals = {
"wizard_id": wizard.id,
"part_catalog_id": part.id,
"coating_config_id": coating.id,
"quantity": qty,
"unit_price": price,
"line_description": random.choice(line_descs),
"internal_description": random.choice(int_descs),
"part_wo_description": random.choice(wo_descs),
"rush_order": (random.random() < 0.15),
"is_one_off": False,
"push_to_defaults": False,
}
if treatments:
line_vals["treatment_ids"] = [(6, 0, treatments.ids)]
if group_tag:
line_vals["wo_group_tag"] = group_tag
# Per-line deadline within the order window
line_vals["part_deadline"] = (
cust_dl - timedelta(days=random.randint(0, 3))
)
WLine.create(line_vals)
return wizard
def _create_so_via_wizard(env, partner, combos_for_partner, n_lines, idx):
"""Build wizard, run action_create_order, return the SO record."""
if not combos_for_partner:
return None
# Allow up to n_lines distinct combos for this partner; if the partner
# only has one, just repeat it (different qty / price).
chosen = []
pool = list(combos_for_partner)
random.shuffle(pool)
while len(chosen) < n_lines and pool:
chosen.append(pool.pop())
while len(chosen) < n_lines:
chosen.append(random.choice(combos_for_partner))
lines_data = []
for (_partner, part, coating) in chosen:
lines_data.append({
"part": part,
"coating": coating,
"quantity": random.randint(5, 100),
"unit_price": round(random.uniform(50.0, 300.0), 2),
})
wizard = _build_wizard(env, partner, lines_data, None, idx)
action = wizard.action_create_order()
if not action or not action.get("res_id"):
return None
return env["sale.order"].browse(action["res_id"])
# ---------------------------------------------------------------------- #
# State-advancement helpers (adapted from seed_workflow_states.py) #
# ---------------------------------------------------------------------- #
def _ensure_steps(env, job):
if not job or not job.recipe_id or job.step_ids:
return
try:
job._generate_steps_from_recipe()
except Exception as e:
_logger.warning("Step gen failed for %s: %s", job.name, e)
def _populate_job(env, job, ctx):
if not job:
return
vals = {}
if ctx["facility"] and not job.facility_id:
vals["facility_id"] = ctx["facility"].id
if ctx["managers"] and not job.manager_id:
vals["manager_id"] = random.choice(ctx["managers"]).id
if not job.priority or job.priority == "normal":
vals["priority"] = random.choices(
["low", "normal", "high", "rush"],
weights=[10, 70, 15, 5],
)[0]
if vals:
job.write(vals)
def _assign_step_users(env, job, ctx, n_done=0, current_idx=None):
operators = ctx["operators"]
steps = job.step_ids.sorted("sequence")
if not steps:
return
for s in steps:
if operators and not s.assigned_user_id:
s.assigned_user_id = operators[
random.randrange(len(operators))]
base = datetime.now() - timedelta(hours=len(steps) * 2)
for i, s in enumerate(steps[:n_done]):
start = base + timedelta(hours=i * 2)
finish = start + timedelta(minutes=random.randint(20, 90))
uid = (s.assigned_user_id.id
if s.assigned_user_id else env.user.id)
s.write({
"state": "done",
"date_started": start,
"date_finished": finish,
"duration_actual": (finish - start).total_seconds() / 60.0,
"started_by_user_id": uid,
"finished_by_user_id": uid,
})
env["fp.job.step.timelog"].sudo().create({
"step_id": s.id,
"user_id": uid,
"date_started": start,
"date_finished": finish,
"duration_minutes": (finish - start).total_seconds() / 60.0,
})
if current_idx is not None and current_idx < len(steps):
cur = steps[current_idx]
if cur.state != "done":
start = datetime.now() - timedelta(
minutes=random.randint(5, 90))
uid = (cur.assigned_user_id.id
if cur.assigned_user_id else env.user.id)
cur.write({
"state": "in_progress",
"date_started": start,
"started_by_user_id": uid,
})
env["fp.job.step.timelog"].sudo().create({
"step_id": cur.id,
"user_id": uid,
"date_started": start,
})
if current_idx is not None and current_idx + 1 < len(steps):
nxt = steps[current_idx + 1]
if nxt.state == "pending":
nxt.write({"state": "ready"})
def _make_delivery_full(env, delivery, partner, ctx, state,
scheduled_offset_days=1):
if not delivery:
return
employees = ctx["employees"]
facility = ctx["facility"]
vals = {
"delivery_address_id": partner.id,
"contact_name": partner.name,
"contact_phone": partner.phone or (
"555-%04d" % random.randint(1000, 9999)),
"scheduled_date": datetime.now() + timedelta(
days=scheduled_offset_days),
}
if "x_fc_box_count_out" in delivery._fields:
vals["x_fc_box_count_out"] = random.randint(1, 5)
if employees and "assigned_driver_id" in delivery._fields:
vals["assigned_driver_id"] = employees[
random.randrange(len(employees))].id
if facility and "source_facility_id" in delivery._fields:
vals["source_facility_id"] = facility.id
if "notes" in delivery._fields:
vals["notes"] = (
"<p>Direct-order delivery -- pack in original boxes per "
"customer SOP.</p>")
delivery.write(vals)
delivery.write({"state": state})
if state == "delivered":
delivery.write({"delivered_at": datetime.now() - timedelta(
hours=random.randint(1, 48))})
def _issue_certificate(env, job, so, part, ctx):
cert = env["fp.certificate"].search(
[("x_fc_job_id", "=", job.id)], limit=1)
if not cert:
cert = env["fp.certificate"].sudo().create({
"partner_id": job.partner_id.id,
"certificate_type": "coc",
"state": "draft",
"x_fc_job_id": job.id,
"sale_order_id": so.id if so else False,
})
vals = {
"state": "issued",
"issue_date": datetime.now().date(),
"issued_by_id": env.user.id,
"entech_wo_number": job.name,
"customer_job_no": (so.x_fc_customer_job_number
if so and "x_fc_customer_job_number" in so._fields
else (so.client_order_ref if so else "")),
"po_number": (so.x_fc_po_number
if so and "x_fc_po_number" in so._fields else ""),
"quantity_shipped": int(job.qty or 1),
"part_number": part.part_number or part.name,
"process_description": "Electroless Nickel Plating, MIL-C-26074",
}
if "spec_min_mils" in cert._fields and part.x_fc_default_coating_config_id:
c = part.x_fc_default_coating_config_id
if c.thickness_min:
vals["spec_min_mils"] = c.thickness_min
if c.thickness_max:
vals["spec_max_mils"] = c.thickness_max
vals["spec_reference"] = c.spec_reference or "AMS-2404"
cert.write(vals)
for i in range(5):
env["fp.thickness.reading"].sudo().create({
"certificate_id": cert.id,
"reading_number": i + 1,
"nip_mils": round(random.uniform(0.95, 1.15), 3),
"ni_percent": round(random.uniform(88.0, 92.0), 2),
"p_percent": round(random.uniform(8.0, 12.0), 2),
"position_label": "Pos %d" % (i + 1),
"reading_datetime": datetime.now() - timedelta(
minutes=30 - i * 5),
"operator_id": env.user.id,
"x_fc_job_id": job.id,
"equipment_model": "Fischerscope X-Ray XDV-SD",
"calibration_std_ref": "CAL-2026-04-01",
})
return cert
def _create_invoice(env, so, ctx, post=False):
inv_recordset = so._create_invoices()
if not inv_recordset:
return env["account.move"]
inv = (inv_recordset[0] if hasattr(inv_recordset, "ids")
else env["account.move"].browse(inv_recordset))
inv_vals = {
"invoice_date": (datetime.now() - timedelta(
days=random.randint(0, 5))).date(),
"invoice_date_due": (datetime.now() + timedelta(
days=random.randint(15, 30))).date(),
}
if not inv.invoice_payment_term_id:
inv_vals["invoice_payment_term_id"] = ctx["payment_term"].id
inv.write(inv_vals)
if post:
inv.action_post()
return inv
def _register_payment(env, inv, ctx, validate=True):
bank = ctx["bank_journal"]
pml = bank.inbound_payment_method_line_ids[:1]
wizard = env["account.payment.register"].with_context(
active_model="account.move",
active_ids=inv.ids,
).create({
"amount": inv.amount_total,
"journal_id": bank.id,
"payment_method_line_id": pml.id if pml else False,
"payment_date": (datetime.now() - timedelta(
days=random.randint(0, 7))).date(),
})
wizard.action_create_payments()
pmt = env["account.payment"].search(
[("partner_id", "=", inv.partner_id.id),
("amount", "=", inv.amount_total)],
order="id desc", limit=1)
if pmt and validate:
try:
pmt.action_validate()
except Exception as e:
_logger.warning("Payment validate failed: %s", e)
try:
pmt.write({"state": "paid"})
except Exception as e2:
_logger.warning("Payment direct write paid failed: %s", e2)
return pmt
# ---------------------------------------------------------------------- #
# Per-state advancement #
# ---------------------------------------------------------------------- #
def _advance_to_confirmed(env, so, ctx):
"""Confirm SO; populate job + steps but leave it at draft job state."""
if so.state != "sale":
try:
# Wizard stayed in draft. Sub 1 design: SO is left in draft
# and reviewed before confirmation. We confirm here to
# exercise the downstream auto-create-job hook.
so.action_confirm()
except Exception as e:
_logger.warning("SO confirm failed for %s: %s", so.name, e)
return False
jobs = env["fp.job"].search([("sale_order_id", "=", so.id)])
for job in jobs:
if job.state == "draft":
try:
job.action_confirm()
except Exception as e:
_logger.warning("Job confirm failed: %s", e)
continue
_ensure_steps(env, job)
_populate_job(env, job, ctx)
_assign_step_users(env, job, ctx, n_done=0, current_idx=None)
return True
def _advance_to_in_progress_mid(env, so, ctx):
if not _advance_to_confirmed(env, so, ctx):
return False
jobs = env["fp.job"].search([("sale_order_id", "=", so.id)])
for job in jobs:
total = len(job.step_ids)
n_done = max(1, total // 2) if total else 0
_assign_step_users(env, job, ctx, n_done=n_done,
current_idx=n_done)
job.write({
"state": "in_progress",
"date_started": datetime.now() - timedelta(
days=random.randint(2, 7)),
})
return True
def _advance_to_delivered(env, so, ctx, deliver_state="scheduled"):
"""Drive job to done + delivery to scheduled or delivered."""
if not _advance_to_confirmed(env, so, ctx):
return False
jobs = env["fp.job"].search([("sale_order_id", "=", so.id)])
if not jobs:
return False
for job in jobs:
_assign_step_users(env, job, ctx,
n_done=len(job.step_ids), current_idx=None)
job.write({
"state": "in_progress",
"date_started": datetime.now() - timedelta(
days=random.randint(3, 10)),
})
try:
job.button_mark_done()
except Exception as e:
_logger.warning("Job mark_done failed: %s", e)
continue
if job.delivery_id:
offset = (-2 if deliver_state == "delivered"
else random.randint(1, 5))
_make_delivery_full(env, job.delivery_id, so.partner_id, ctx,
state=deliver_state,
scheduled_offset_days=offset)
if deliver_state == "delivered":
# Issue cert for first part on the SO
first_line = so.order_line[:1]
part = (first_line.x_fc_part_catalog_id
if first_line and "x_fc_part_catalog_id"
in first_line._fields else False)
if part:
_issue_certificate(env, job, so, part, ctx)
return True
def _advance_to_invoice_posted(env, so, ctx):
if not _advance_to_delivered(env, so, ctx, deliver_state="delivered"):
return False
inv = _create_invoice(env, so, ctx, post=True)
return bool(inv and inv.state == "posted")
def _advance_to_paid(env, so, ctx):
if not _advance_to_delivered(env, so, ctx, deliver_state="delivered"):
return False
inv = _create_invoice(env, so, ctx, post=True)
if not (inv and inv.state == "posted"):
return False
pmt = _register_payment(env, inv, ctx, validate=True)
return bool(pmt)
# ---------------------------------------------------------------------- #
# Per-order entry point #
# ---------------------------------------------------------------------- #
def _create_direct_order(env, partner, combos_for_partner, ctx,
n_lines, advance_to, idx):
"""Create one wizard-originated order, advance to target state.
Returns the SO record (or None on failure).
"""
so = _create_so_via_wizard(env, partner, combos_for_partner, n_lines, idx)
if not so:
return None
if advance_to == "confirmed":
if not _advance_to_confirmed(env, so, ctx):
return None
elif advance_to == "in_progress_mid":
if not _advance_to_in_progress_mid(env, so, ctx):
return None
elif advance_to == "delivered":
# Job done + delivery scheduled (not yet delivered)
if not _advance_to_delivered(env, so, ctx,
deliver_state="scheduled"):
return None
elif advance_to == "invoiced":
if not _advance_to_invoice_posted(env, so, ctx):
return None
elif advance_to == "paid":
if not _advance_to_paid(env, so, ctx):
return None
return so
# ---------------------------------------------------------------------- #
# Main runner #
# ---------------------------------------------------------------------- #
# Plan: 11 orders distributed across 5 states.
ORDER_PLAN = [
("confirmed", 3),
("in_progress_mid", 3),
("delivered", 2),
("invoiced", 2),
("paid", 1),
]
def run(env):
print("=" * 70)
print("seed_direct_orders.py - estimator wizard path seeding")
print("=" * 70)
combos = _build_combos(env)
if not combos:
print("ERROR: no parts with coating + recipe + partner. Cannot seed.")
return
print("Customer/part combos: %d" % len(combos))
# Group combos by partner so we can build multi-line orders for the
# same customer (more realistic than one part per partner).
by_partner = {}
for (partner, part, coating) in combos:
by_partner.setdefault(partner.id, []).append((partner, part, coating))
partner_ids = list(by_partner.keys())
random.shuffle(partner_ids)
operators = _operators(env)
managers = _managers(env)
employees = _employees(env)
facility = _resolve_facility(env)
payment_term = _resolve_payment_term(env)
sales_journal, bank_journal = _resolve_journals(env)
print("Operators: %d, Managers: %d, Employees: %d" % (
len(operators), len(managers), len(employees)))
print("Facility: %s, PaymentTerm: %s" % (
facility.name if facility else "NONE",
payment_term.name if payment_term else "NONE"))
if not (payment_term and sales_journal and bank_journal):
print("ERROR: missing required masters; cannot proceed.")
return
ctx = {
"payment_term": payment_term,
"sales_journal": sales_journal,
"bank_journal": bank_journal,
"operators": operators,
"managers": managers,
"employees": employees,
"facility": facility,
}
results = {state: 0 for (state, _n) in ORDER_PLAN}
failures = []
seq = 0
partner_cursor = 0
for (state, count) in ORDER_PLAN:
print()
print("-- target state: %s (count %d) --" % (state, count))
for _i in range(count):
seq += 1
# Round-robin partners to maximise variety
partner_id = partner_ids[partner_cursor % len(partner_ids)]
partner_cursor += 1
partner = env["res.partner"].browse(partner_id)
combos_for_partner = by_partner[partner_id]
n_lines = random.randint(1, 3)
sp = "direct_order_%d" % seq
env.cr.execute("SAVEPOINT %s" % sp)
try:
so = _create_direct_order(env, partner, combos_for_partner,
ctx, n_lines, state, seq)
if so:
env.cr.execute("RELEASE SAVEPOINT %s" % sp)
results[state] += 1
print(" [%d] %s -> %s (state=%s, partner=%s)"
% (seq, so.name, state, so.state, partner.name))
else:
env.cr.execute("ROLLBACK TO SAVEPOINT %s" % sp)
failures.append((seq, partner.name,
"wizard returned no SO"))
print(" [%d] FAILED for %s (no SO)"
% (seq, partner.name))
except Exception as e:
try:
env.cr.execute("ROLLBACK TO SAVEPOINT %s" % sp)
except Exception:
pass
failures.append((seq, partner.name, str(e)[:120]))
print(" [%d] EXCEPTION for %s: %s"
% (seq, partner.name, str(e)[:120]))
env.cr.commit()
print()
print("=" * 70)
print("DIRECT-ORDER SEED RESULTS")
print("=" * 70)
total = 0
for state, n in results.items():
print(" %-20s %d" % (state, n))
total += n
print(" %-20s %d" % ("TOTAL CREATED", total))
if failures:
print()
print("FAILURES (%d):" % len(failures))
for (seq, partner, reason) in failures:
print(" #%d %-30s %s" % (seq, partner, reason))
print()
try:
run(env)
except NameError:
print("Run inside odoo shell.")