#!/usr/bin/env python3 """ fusion_clock ACR122U Wedge ========================== Turns an ACR122U (or other ACS PC/SC NFC reader) into a USB-HID-style keyboard wedge. Polls the reader for tag presence, reads the UID via the standard PC/SC GET_UID APDU, then types the UID + Enter into the currently focused window. The fusion_clock NFC kiosk page already has a keyboard-wedge listener (since v19.0.3.2.0), so once this daemon is running the kiosk will respond to ACR122U taps exactly like it responds to a USB-C HID reader. Why this exists: ACR122U speaks PC/SC (CCID), not HID. Browsers can't talk to PC/SC devices directly. This daemon is the bridge. Output format: Colon-separated uppercase hex + Enter, e.g. "04:10:5B:CA:FD:22:90\\n" Matches what FusionClockNfcKiosk._normalize_uid() expects. Setup (macOS): brew install swig pip3 install pyscard pyautogui python3 wedge.py --verbose Setup (Windows): pip install pyscard pyautogui python wedge.py --verbose Setup (Linux): sudo apt install pcscd pcsc-tools libpcsclite-dev swig sudo systemctl enable --now pcscd pip3 install pyscard pyautogui python3 wedge.py --verbose macOS note: The first time pyautogui tries to type, macOS will prompt for Accessibility permission. Grant it to Terminal.app (or whatever shell is running this script). Usage: python3 wedge.py # quiet, types on tap python3 wedge.py --verbose # prints each tap python3 wedge.py --no-type # detect-only, no keystrokes # (use for debugging) """ import argparse import sys import threading import time from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer try: from smartcard.System import readers from smartcard.Exceptions import ( NoCardException, CardConnectionException, CardRequestTimeoutException, ) except ImportError: print("ERROR: pyscard is not installed.", file=sys.stderr) print(" Install with: pip3 install pyscard", file=sys.stderr) print(" (macOS may also need: brew install swig)", file=sys.stderr) sys.exit(1) # ──────────────────────────────────────────────────────────────────── # Server-Sent Events (SSE) endpoint — primary IPC to the kiosk page. # # The kiosk JS connects to http://localhost:8765/events via EventSource # and gets pushed each UID as it's detected. Chrome treats http://localhost # as a secure origin, so this works from an HTTPS kiosk page without # mixed-content warnings. No keystroke injection, no Accessibility prompt, # no focus dependency. # ──────────────────────────────────────────────────────────────────── SSE_HOST = "127.0.0.1" SSE_PORT = 8765 _sse_lock = threading.Lock() _sse_latest_uid = None _sse_latest_ts = 0.0 def _sse_publish(uid): """Make ``uid`` available to all connected SSE clients.""" global _sse_latest_uid, _sse_latest_ts with _sse_lock: _sse_latest_uid = uid _sse_latest_ts = time.time() class _TapSSEHandler(BaseHTTPRequestHandler): # Quiet the default per-request access log. def log_message(self, format, *args): return def do_GET(self): if self.path != "/events": self.send_response(404) self.end_headers() return self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") # Allow ANY origin to connect — the kiosk page lives on # https:///fusion_clock/kiosk/nfc and the # daemon runs on the same machine as the kiosk browser. self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() last_seen_ts = 0.0 try: while True: with _sse_lock: uid = _sse_latest_uid ts = _sse_latest_ts if uid and ts > last_seen_ts: self.wfile.write(f"data: {uid}\n\n".encode("utf-8")) self.wfile.flush() last_seen_ts = ts else: # 30s keep-alive comment line so proxies/network stacks # don't kill an idle connection. self.wfile.write(b": keep-alive\n\n") self.wfile.flush() time.sleep(0.1) except (BrokenPipeError, ConnectionResetError, OSError): pass # client disconnected def start_sse_server(verbose=False): """Start the SSE server in a daemon thread. Returns the server.""" server = ThreadingHTTPServer((SSE_HOST, SSE_PORT), _TapSSEHandler) t = threading.Thread(target=server.serve_forever, daemon=True) t.start() if verbose: print(f"[wedge] SSE server listening on http://{SSE_HOST}:{SSE_PORT}/events") return server # Standard PC/SC APDU to get the card UID. Works on ACR122U, ACR1252U, # and any other PC/SC reader following the ACS extension spec. GET_UID_APDU = [0xFF, 0xCA, 0x00, 0x00, 0x00] # Reader name fragments we'll match against, in priority order. # ACR122U appears as "ACS ACR122U PICC Interface" on most systems. READER_NAME_HINTS = ("ACR122", "ACR1252", "ACR1281", "ACS ACR") # Don't re-emit the same UID more than once within this window. DEBOUNCE_SECONDS = 2.0 # Poll interval — fast enough to feel instant, slow enough to be quiet. POLL_INTERVAL_SECONDS = 0.15 # Time to wait between reconnect attempts when no reader is plugged in. RECONNECT_INTERVAL_SECONDS = 1.5 def find_reader(verbose=False): """Return the first ACR122U-class reader, or None.""" try: rs = readers() except Exception as e: if verbose: print(f"[wedge] enumerate readers failed: {e}") return None if verbose and not rs: print("[wedge] no PC/SC readers detected") for r in rs: name = str(r) if any(hint in name for hint in READER_NAME_HINTS): return r if verbose and rs: print(f"[wedge] readers present but none match: {[str(r) for r in rs]}") return None def read_uid(connection): """Send GET_UID. Return colon-separated uppercase hex, or None.""" try: data, sw1, sw2 = connection.transmit(GET_UID_APDU) except Exception: return None if (sw1, sw2) != (0x90, 0x00): return None if not data: return None return ":".join(f"{b:02X}" for b in data) def _emit_macos(uid): """Type via AppleScript / System Events. Permission attaches to the terminal/app that launched this Python process — typically Terminal.app, iTerm, or another known GUI app that the user can easily find in Accessibility settings. More reliable than pyautogui's Quartz path which often fails to surface Python in the Accessibility list on newer macOS. """ import subprocess script = ( 'tell application "System Events"\n' f' keystroke "{uid}"\n' ' key code 36\n' # Return 'end tell' ) result = subprocess.run( ["osascript", "-e", script], capture_output=True, text=True, check=False, timeout=3, ) if result.returncode != 0: raise RuntimeError(f"osascript failed: {result.stderr.strip()}") def _emit_other(uid): """Type via pyautogui (works on Linux, Windows, and as macOS fallback).""" import pyautogui pyautogui.typewrite(uid, interval=0.005) pyautogui.press("enter") def emit_uid(uid, type_keys=True, verbose=False): """Publish UID via SSE (primary) and optionally type it (fallback).""" if verbose: print(f"[wedge] TAP {uid}") # Primary path: push to all SSE clients. _sse_publish(uid) if not type_keys: return # Optional secondary path: keystroke injection. Useful for testing # in TextEdit / other apps, or for browsers/devices that don't # connect to the SSE endpoint (Safari, old Chrome, etc.). # # On macOS this often hits TCC permission walls (Accessibility, # Automation). The SSE path above is the reliable production # mechanism — keystroke injection is best-effort. if sys.platform == "darwin": try: _emit_macos(uid) return except Exception as e: if verbose: print(f"[wedge] osascript path failed ({e}); falling back to pyautogui") try: _emit_other(uid) except ImportError: if verbose: print( "[wedge] pyautogui is not installed; SSE-only mode. " "Run: pip3 install pyautogui (for keystroke fallback)", file=sys.stderr, ) except Exception as e: if verbose: print(f"[wedge] type error: {e}", file=sys.stderr) def run_loop(args): reader = None last_uid = None last_time = 0.0 while True: # Find / refresh reader handle when one isn't connected. if reader is None: reader = find_reader(verbose=args.verbose) if reader is None: time.sleep(RECONNECT_INTERVAL_SECONDS) continue print(f"[wedge] connected: {reader}") # Try to grab a card-present connection. try: connection = reader.createConnection() connection.connect() # raises NoCardException when no card on antenna except NoCardException: time.sleep(POLL_INTERVAL_SECONDS) continue except (CardConnectionException, CardRequestTimeoutException) as e: if args.verbose: print(f"[wedge] connection error: {e}; will rescan readers") reader = None time.sleep(RECONNECT_INTERVAL_SECONDS) continue except Exception as e: if args.verbose: print(f"[wedge] unexpected connection error: {e}") reader = None time.sleep(RECONNECT_INTERVAL_SECONDS) continue # Card present — read the UID and emit if not a duplicate. uid = read_uid(connection) try: connection.disconnect() except Exception: pass if uid: now = time.time() if uid != last_uid or (now - last_time) > DEBOUNCE_SECONDS: emit_uid(uid, type_keys=not args.no_type, verbose=args.verbose) last_uid = uid last_time = now elif args.verbose: print(f"[wedge] debounce: {uid}") time.sleep(POLL_INTERVAL_SECONDS) def main(): parser = argparse.ArgumentParser( description="ACR122U PC/SC -> keyboard wedge for fusion_clock kiosk." ) parser.add_argument( "-v", "--verbose", action="store_true", help="Print each detected tap.", ) parser.add_argument( "--no-type", action="store_true", help="Detect tags but don't type them (useful for testing the reader).", ) args = parser.parse_args() print("[wedge] fusion_clock ACR wedge starting. Ctrl+C to stop.") if args.no_type: print("[wedge] --no-type: keystroke injection disabled (SSE only).") # Always start the SSE server — it's the primary IPC channel. try: start_sse_server(verbose=True) except OSError as e: print(f"[wedge] WARNING: could not start SSE server on port {SSE_PORT}: {e}") print("[wedge] Daemon will continue in keystroke-only mode.") try: run_loop(args) except KeyboardInterrupt: print("\n[wedge] stopped.") sys.exit(0) if __name__ == "__main__": main()