feat(fusion_accounting_ai): add DataAdapter base + registry
Made-with: Cursor
This commit is contained in:
25
fusion_accounting_ai/services/data_adapters/_registry.py
Normal file
25
fusion_accounting_ai/services/data_adapters/_registry.py
Normal file
@@ -0,0 +1,25 @@
|
||||
"""Registry: lazy-loads data adapter instances per env."""
|
||||
|
||||
from .base import DataAdapter
|
||||
|
||||
|
||||
def get_adapter(env, name: str) -> DataAdapter:
|
||||
"""Return a data adapter by short name. Cached per request via env.context."""
|
||||
cache = env.context.get('_fusion_data_adapter_cache')
|
||||
if cache is None:
|
||||
cache = {}
|
||||
if name not in cache:
|
||||
cls = _ADAPTERS.get(name)
|
||||
if cls is None:
|
||||
raise KeyError(f"Unknown data adapter: {name!r}. Known: {list(_ADAPTERS)}")
|
||||
cache[name] = cls(env)
|
||||
return cache[name]
|
||||
|
||||
|
||||
# Populated as adapter classes are added (Tasks 9, 10, 11).
|
||||
_ADAPTERS: dict[str, type[DataAdapter]] = {}
|
||||
|
||||
|
||||
def register_adapter(name: str, cls: type[DataAdapter]) -> None:
|
||||
"""Register an adapter class. Call from each adapter module at import time."""
|
||||
_ADAPTERS[name] = cls
|
||||
Reference in New Issue
Block a user