#!/usr/bin/env python3 """Fusion Plating IoT — DS18B20 poller. Polls every DS18B20 probe the kernel exposes under /sys/bus/w1/devices/28-* and POSTs each reading to the configured Odoo instance. Runs forever under systemd; reads a config file at /etc/fp-iot/poller.conf. Config file format: ODOO_URL=http://10.200.1.26:8069 INGEST_TOKEN=fp-iot-XXXXXXXXXXXXX INTERVAL_SECONDS=30 """ import glob import json import logging import os import sys import time import urllib.error import urllib.request from datetime import datetime, timezone CONFIG_PATH = '/etc/fp-iot/poller.conf' LOG = logging.getLogger('fp-iot-poller') logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) def load_config(path: str) -> dict: cfg = {} try: with open(path) as f: for line in f: line = line.strip() if not line or line.startswith('#') or '=' not in line: continue key, _, val = line.partition('=') cfg[key.strip()] = val.strip() except FileNotFoundError: LOG.error('Config file not found: %s', path) sys.exit(1) for required in ('ODOO_URL', 'INGEST_TOKEN'): if not cfg.get(required): LOG.error('Missing required config key: %s', required) sys.exit(1) cfg.setdefault('INTERVAL_SECONDS', '30') return cfg def read_probe(path: str) -> float | None: """Read one DS18B20 sysfs file. Returns Celsius or None on failure.""" try: with open(os.path.join(path, 'w1_slave')) as f: data = f.read() # First line ends in "YES" for CRC-valid reads lines = data.strip().split('\n') if len(lines) < 2 or not lines[0].rstrip().endswith('YES'): LOG.warning('Probe %s bad CRC: %r', path, data) return None # Second line has "t=" _, _, raw = lines[1].partition('t=') return float(raw) / 1000.0 except Exception as e: LOG.warning('Probe read failed (%s): %s', path, e) return None def post_readings(odoo_url: str, token: str, readings: list[dict]) -> bool: """POST batch to /fp/iot/ingest. Returns True on 2xx.""" payload = json.dumps({'readings': readings}).encode() req = urllib.request.Request( odoo_url.rstrip('/') + '/fp/iot/ingest', data=payload, method='POST', headers={ 'Content-Type': 'application/json', 'X-FP-IOT-Token': token, }, ) try: with urllib.request.urlopen(req, timeout=10) as resp: body = resp.read().decode() if 200 <= resp.status < 300: LOG.info('Ingest OK (%d): %s', resp.status, body.strip()) return True LOG.warning('Ingest %d: %s', resp.status, body.strip()) return False except urllib.error.HTTPError as e: LOG.warning('Ingest HTTP %d: %s', e.code, e.read().decode()[:200]) return False except Exception as e: LOG.warning('Ingest failed: %s', e) return False def main(): cfg = load_config(CONFIG_PATH) odoo_url = cfg['ODOO_URL'] token = cfg['INGEST_TOKEN'] interval = int(cfg['INTERVAL_SECONDS']) LOG.info('Starting — polling every %ds, posting to %s', interval, odoo_url) while True: probes = glob.glob('/sys/bus/w1/devices/28-*') if not probes: LOG.warning('No DS18B20 probes detected under /sys/bus/w1/devices/28-*') readings = [] now = datetime.now(timezone.utc).isoformat(timespec='seconds') for probe_path in probes: serial = os.path.basename(probe_path) temp_c = read_probe(probe_path) if temp_c is None: continue readings.append({ 'device_serial': serial, 'value': round(temp_c, 3), 'read_at': now, }) LOG.info('Probe %s = %.3f°C', serial, temp_c) if readings: post_readings(odoo_url, token, readings) time.sleep(interval) if __name__ == '__main__': main()