Fallback Routing for Offline Sync
Pharmacy inventory systems operate in a zero-tolerance environment for unlogged dispensing events. Network degradation, ISP outages, or POS hardware failures cannot interrupt perpetual inventory track
Pharmacy inventory systems operate in a zero-tolerance environment for unlogged dispensing events. Network degradation, ISP outages, or POS hardware failures cannot interrupt perpetual inventory tracking or fracture the chain of custody for DEA-controlled substances. The fallback routing workflow detailed here establishes a deterministic, locally-buffered synchronization path that maintains DEA 21 CFR 1304 recordkeeping requirements, FDA 21 CFR 211 batch traceability, and HIPAA Security Rule encryption standards during connectivity loss. This operational pattern integrates directly into the broader Core Architecture & DEA Compliance Frameworks and ensures that offline transaction routing never compromises audit integrity or diversion detection thresholds.
1. Network State Detection & Threshold Trigger
The pharmacy management system (PMS) must continuously monitor upstream connectivity without introducing latency to dispensing workflows. A lightweight, asynchronous health probe targets the primary EDI ingestion endpoint using TCP keep-alives and HTTPS HEAD requests. When consecutive probe failures exceed a configurable threshold (default: 3 failures within 15 seconds), the routing state machine transitions to OFFLINE_ROUTING. All outbound inventory events are immediately intercepted by a local middleware layer before reaching the network stack.
import asyncio
import aiohttp
from enum import Enum
class SyncState(Enum):
ONLINE = "ONLINE"
OFFLINE_ROUTING = "OFFLINE_ROUTING"
RECONCILING = "RECONCILING"
class NetworkMonitor:
def __init__(self, endpoint: str, threshold: int = 3, window_sec: int = 15):
self.endpoint = endpoint
self.threshold = threshold
self.window_sec = window_sec
self.state = SyncState.ONLINE
self._failure_count = 0
self._last_failure_ts = 0.0
async def probe(self) -> None:
try:
async with aiohttp.ClientSession() as session:
async with session.head(self.endpoint, timeout=3.0) as resp:
if resp.status == 200:
self._reset_failures()
return
except (aiohttp.ClientError, asyncio.TimeoutError):
self._record_failure()
def _record_failure(self) -> None:
import time
now = time.time()
if now - self._last_failure_ts > self.window_sec:
self._failure_count = 0
self._failure_count += 1
self._last_failure_ts = now
if self._failure_count >= self.threshold:
self.state = SyncState.OFFLINE_ROUTING
def _reset_failures(self) -> None:
self._failure_count = 0
self.state = SyncState.ONLINE
2. Local Buffer Initialization & Cryptographic Enforcement
When the state transitions to OFFLINE_ROUTING, a secure, encrypted local buffer is instantiated on the pharmacy workstation or edge server. HIPAA §164.312(a)(2)(iv) mandates encryption of ePHI at rest. To satisfy this requirement while maintaining high-throughput write performance, the architecture employs SQLite with Write-Ahead Logging (WAL) and application-level AES-256-GCM payload encryption. The buffer schema enforces strict column typing, transactional isolation, and append-only write semantics to prevent retroactive modification or unauthorized deletion.
import sqlite3
import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
class SecureBuffer:
def __init__(self, db_path: str, master_key: bytes):
self.db_path = db_path
self.aesgcm = AESGCM(master_key)
self._init_db()
def _init_db(self) -> None:
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("""
CREATE TABLE IF NOT EXISTS offline_txns (
seq_id INTEGER PRIMARY KEY AUTOINCREMENT,
encrypted_payload BLOB NOT NULL,
nonce BLOB NOT NULL,
hash_chain_prev TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) STRICT;
""")
def insert_encrypted(self, payload: bytes, prev_hash: str) -> int:
nonce = os.urandom(12)
ciphertext = self.aesgcm.encrypt(nonce, payload, None)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"INSERT INTO offline_txns (encrypted_payload, nonce, hash_chain_prev) VALUES (?, ?, ?)",
(ciphertext, nonce, prev_hash)
)
return cursor.lastrowid
3. Transaction Interception & Schema Validation
Every dispensing, receiving, or inventory adjustment event is serialized and validated against the pharmacy’s master drug file before entering the local buffer. NDC formatting must be normalized to the standard 11-digit representation to prevent reconciliation mismatches during sync. Adherence to NDC-11 vs NDC-10 Parsing Standards ensures consistent hyphenation and zero-padding logic across legacy and modern POS systems. DEA schedule classification is resolved concurrently, and lot/expiration metadata is verified against the manufacturer’s certificate of analysis. Invalid payloads are quarantined to a REJECTED_QUEUE with explicit error codes; valid payloads proceed to the transaction log.
import json
import re
from pydantic import BaseModel, field_validator
from typing import Optional
class InventoryEvent(BaseModel):
ndc: str
schedule: Optional[str] = None
lot_number: str
expiration_date: str
quantity_dispensed: int
timestamp: str
@field_validator("ndc")
@classmethod
def normalize_ndc(cls, v: str) -> str:
digits = re.sub(r"\D", "", v)
if len(digits) == 10:
return f"0{digits[:5]}-{digits[5:7]}-{digits[7:]}"
elif len(digits) == 11:
return f"{digits[:5]}-{digits[5:7]}-{digits[7:]}"
raise ValueError("NDC must be 10 or 11 digits")
def to_payload(self) -> bytes:
return json.dumps(self.model_dump(mode="json")).encode("utf-8")
4. Fallback Routing Execution & Priority Weighting
Validated transactions are written to the local buffer with a monotonically increasing sequence ID and a cryptographic hash chain pointer. The routing daemon applies deterministic priority weighting: Schedule II events are flagged for immediate reconciliation upon restoration, while Schedule III-V and OTC events follow standard FIFO ordering. This tiered approach aligns with Fallback sync architecture for disconnected POS systems and ensures high-risk substances never experience sync latency beyond operational thresholds. Schedule classification logic references the authoritative DEA Schedule II-V Classification Mapping to guarantee accurate diversion risk scoring.
import hashlib
from dataclasses import dataclass
from typing import List
# Defined elsewhere on this page (see the surrounding blocks):
# - InventoryEvent
# - SecureBuffer
@dataclass
class PriorityTransaction:
seq_id: int
payload_hash: str
schedule: str
raw_payload: bytes
class FallbackRouter:
def __init__(self, buffer: SecureBuffer):
self.buffer = buffer
self.prev_hash = "0" * 64 # Genesis hash
self.priority_queue: List[PriorityTransaction] = []
def route_transaction(self, event: InventoryEvent) -> None:
payload = event.to_payload()
current_hash = hashlib.sha256(payload).hexdigest()
chain_hash = hashlib.sha256(f"{self.prev_hash}{current_hash}".encode()).hexdigest()
seq_id = self.buffer.insert_encrypted(payload, self.prev_hash)
self.prev_hash = chain_hash
self.priority_queue.append(PriorityTransaction(
seq_id=seq_id,
payload_hash=current_hash,
schedule=event.schedule or "OTC",
raw_payload=payload
))
self._sort_by_priority()
def _sort_by_priority(self) -> None:
# Schedule II first, then III-V, then OTC
schedule_order = {"II": 0, "III": 1, "IV": 2, "V": 3, "OTC": 4}
self.priority_queue.sort(key=lambda x: schedule_order.get(x.schedule, 5))
5. Connectivity Restoration & Idempotent Reconciliation
Upon successful health check restoration, the routing daemon initiates a batched, idempotent upload sequence. Each transaction is transmitted with its original timestamp, cryptographic nonce, and hash chain pointer to guarantee exactly-once delivery semantics. The reconciliation engine validates server-side acknowledgments against the local sequence IDs. If a transmission fails mid-batch, the daemon resumes from the last confirmed seq_id without duplicating records. This resilience pattern is critical for environments with intermittent backhaul, as detailed in Designing offline sync fallbacks for rural pharmacies.
import httpx
from typing import Dict, Any
# Defined elsewhere on this page (see the surrounding blocks):
# - FallbackRouter
class ReconciliationDaemon:
def __init__(self, router: FallbackRouter, sync_endpoint: str):
self.router = router
self.sync_endpoint = sync_endpoint
self.last_confirmed_seq = 0
async def reconcile(self) -> Dict[str, Any]:
if not self.router.priority_queue:
return {"status": "idle"}
batch = self.router.priority_queue[self.last_confirmed_seq:]
results = {"uploaded": 0, "failed": []}
async with httpx.AsyncClient(timeout=10.0) as client:
for txn in batch:
headers = {
"X-Idempotency-Key": txn.payload_hash,
"X-Sequence-ID": str(txn.seq_id),
"Content-Type": "application/json"
}
try:
resp = await client.post(
self.sync_endpoint,
content=txn.raw_payload,
headers=headers
)
if resp.status_code in (200, 201, 202):
results["uploaded"] += 1
self.last_confirmed_seq = txn.seq_id
elif resp.status_code == 409:
# Already processed by server
self.last_confirmed_seq = txn.seq_id
results["uploaded"] += 1
else:
results["failed"].append(txn.seq_id)
except httpx.RequestError:
results["failed"].append(txn.seq_id)
break # Halt on network failure to preserve order
return results
Compliance Mapping & Audit Readiness
This fallback routing architecture is engineered to satisfy explicit regulatory controls without requiring manual intervention during connectivity loss:
| Regulation | Requirement | Implementation Control |
|---|---|---|
| DEA 21 CFR 1304.04 | Complete, accurate, and contemporaneous records of controlled substances | Append-only SQLite buffer with cryptographic hash chaining prevents retroactive alteration. Sequence IDs guarantee chronological ordering. |
| HIPAA §164.312(a)(2)(iv) | Encryption of ePHI at rest | AES-256-GCM payload encryption with unique 12-byte nonces per transaction. Key material derived via HKDF and stored in OS-protected keystores. |
| FDA 21 CFR 211.188 | Batch production and control records | Lot/expiration validation at interception. Immutable payload serialization ensures traceability from manufacturer receipt to patient dispensing. |
| NIST SP 800-111 | Storage security for sensitive data | Hardware-backed TPM/SE integration recommended for master key storage. Buffer auto-purges after successful reconciliation per retention policies. |
Audit readiness is maintained through automated log rotation, cryptographic integrity verification, and deterministic reconciliation reporting. The routing layer generates tamper-evident manifests that can be exported for Automated PDF & HTML Report Generation workflows. All offline events retain their original timestamps, ensuring that diversion detection algorithms operate on accurate temporal data regardless of sync latency.
For cryptographic implementation standards, reference the official Python Cryptography Documentation and the NIST SP 800-111 Guide to Storage Security. Regulatory text for controlled substance recordkeeping is codified in 21 CFR Part 1304.