Skip to main content

OpenSynaptic API Reference

API and service contracts currently implemented in the repository.

Deep-dive references:


OpenSynapticsrc/opensynaptic/core/pycore/core.py (exported via opensynaptic.core)

The top-level orchestrator. Instantiating it wires all subsystems from Config.json.

Constructor

OpenSynaptic(config_path: str | None = None)
ParameterTypeDescription
config_pathstr | NoneOptional path to Config.json (absolute or relative). If None, OSContext auto-detects the project root and loads the default config.

Key attributes after init:

AttributeTypeDescription
configdictLoaded Config.json contents
config_pathstrActive config path used by the instance
base_dirstrProject root directory
assigned_idint | NoneCurrent device ID (4294967295 = unassigned)
device_idstrHuman-readable device identifier
standardizerOpenSynapticStandardizerNormalisation subsystem
engineOpenSynapticEngineBase62 compression subsystem
fusionOSVisualFusionEngineBinary packet encode/decode subsystem
protocolOSHandshakeManagerCMD dispatch and session manager
id_allocatorIDAllocatorServer-side ID pool manager
transporter_managerTransporterManagerPluggable transporter registry
active_transportersdict[str, driver]Currently enabled transporter drivers
service_managerServiceManagerPlugin mount / lifecycle manager
db_managerDatabaseManager | NoneSQL engine (if configured)

Methods

ensure_id(server_ip, server_port, device_meta=None, timeout=5.0) → bool

Requests a uint32 device ID from the server via UDP (CMD.ID_REQUEST).
Persists the assigned ID back into Config.json.
Returns True if an ID was successfully obtained or already present.

ParameterTypeDefaultDescription
server_ipstrrequiredServer hostname or IP
server_portintrequiredUDP server port
device_metadict | NoneNoneOptional metadata sent with the request
timeoutfloat5.0Socket timeout in seconds

ensure_time(server_ip=None, server_port=None, timeout=3.0) → int | None

Synchronises the local clock with the server's UNIX timestamp.
Returns the server timestamp (int) or None on failure.

transmit(sensors, device_id=None, device_status='ONLINE', **kwargs) → tuple[bytes, int, str]

Full pipeline: standardise → compress → fuse into binary packet.
Raises RuntimeError if assigned_id is unset.

ParameterTypeDescription
sensorslist[list][[sensor_id, status, value, unit], ...]
device_idstr | NoneOverride device identifier (defaults to self.device_id)
device_statusstrDevice status string, e.g. "ONLINE"

Returns (binary_packet, assigned_id, strategy_label) where strategy_label is "FULL_PACKET" or "DIFF_PACKET".

dispatch(packet, medium=None) → bool

Send a binary packet via the named transporter.

ParameterTypeDescription
packetbytes | strEncoded packet; strings are UTF-8 encoded
mediumstr | NoneTransporter key, e.g. "UDP", "UART". Falls back to Config.OpenSynaptic_Setting.default_medium.

Returns True on success.

receive(raw_bytes) → dict

Decode a raw binary packet back to a fact dict (calls fusion.decompress).

receive_via_protocol(raw_bytes, addr=None) → Any

Full protocol-layer dispatch: parse CMD byte, route to handshake manager.

dispatch_with_reply(packet, server_ip=None, server_port=None, timeout=3.0) -> bytes | None

Send a packet via UDP and wait for a single reply.

transmit_fast(sensors=None, device_id=None, device_status='ONLINE', **kwargs) -> tuple[bytes, int, str]

Fast-path alias of transmit() in pycore (same semantics; rscore may override with fused FFI behavior).

transmit_batch(batch_items, **kwargs) -> list[tuple[bytes, int, str]]

Batch wrapper over transmit_fast().

relay(packet) -> bytes

Pass-through relay helper that re-encodes via fusion logic.

Internal Helpers (not part of stable public API)

_save_config()

Persist the current in-memory self.config dict back to Config.json.

_is_id_missing() -> bool

Returns True if assigned_id is absent, zero, or equal to MAX_UINT32 (4294967295).


OpenSynapticStandardizercore/pycore/standardization.py

Normalises raw sensor readings into UCUM-tagged fact dicts.

Constructor

OpenSynapticStandardizer(config_path: str = 'Config.json')

standardize(device_id, device_status, sensors, **kwargs) → dict

ParameterTypeDescription
device_idstrDevice identifier
device_statusstrStatus string, e.g. "ONLINE"
sensorslist[list][[sensor_id, status, value, unit], ...]
tint (kwarg)UNIX timestamp (auto-populated if omitted)

Returns a fact dict ready for OpenSynapticEngine.compress().


OpenSynapticEnginecore/pycore/solidity.py

Base62 compress and decompress pipeline facts.

Constructor

OpenSynapticEngine(config_path: str | None = None)

compress(fact: dict) → str

Converts a fact dict into a compact Base62-encoded string.

decompress(b62_str: str) → dict

Reverses compress() back to a fact dict.


OSVisualFusionEnginecore/pycore/unified_parser.py

Binary packet encoder/decoder with template learning for DIFF compression.

Constructor

OSVisualFusionEngine(config_path: str | None = None)

run_engine(raw_input_str, strategy='DIFF') → bytes

Encode a "<aid>;<b62_payload>" string into a binary packet.

ParameterTypeDescription
raw_input_strstr"{assigned_id};{compressed}"
strategystr"FULL" or "DIFF"

Binary frame layout (single hop, N=1):

[CMD:1][route_count:1][AID:4][TID:1][TS:6][BODY:L][CRC8:1][CRC16:2]
total = 15+L bytes
  • CMD — data command byte (see CMD class in handshake.py)
  • AID — source device ID, big-endian uint32
  • TID — template ID uint8 (zero-padded 2-digit decimal string cast to int)
  • TS — 6 LSBs of millisecond timestamp (big-endian uint64 bytes [2..8])
  • BODY — CMD-dependent (full UTF-8 payload / diff bitmask+deltas / empty)
  • CRC8 — CRC-8 (poly=0x07, init=0x00) over plaintext body
  • CRC16 — CRC-16/CCITT (poly=0x1021, init=0xFFFF) over frame minus last 2 bytes

decompress(raw_bytes: bytes) → dict

Decode a binary packet back to a fact dict.

Returns the decoded SensorFact dict with an extra __packet_meta__ key:

{
"__packet_meta__": {
"cmd": int,
"base_cmd": int,
"secure": bool,
"source_aid": int,
"crc16_ok": bool,
"crc8_ok": bool,
"timestamp_raw": int, # milliseconds
"tid": str,
"template_learned": bool,
}
}

Other Methods

MethodDescription
_set_local_id(id)Internal helper to update local device ID (used by core sync flow)
relay(packet)Pass-through re-encode (relay node use case)

OSHandshakeManagercore/pycore/handshake.py

CMD byte dispatch, device session management, ID and time negotiation.

Constructor

OSHandshakeManager(target_sync_count=3, registry_dir=None, expire_seconds=86400, secure_store_path=None)
ParameterTypeDefaultDescription
target_sync_countint3Successful sends required before switching to DIFF strategy
registry_dirstr | NoneNoneDevice registry root directory
expire_secondsint86400Session and sync-status entry TTL in seconds
secure_store_pathstr | NoneNoneOptional secure session persistence path

CMD Byte Constants (CMD class)

ConstantValueDescription
DATA_FULL0x3FFull payload (template + data)
DATA_FULL_SEC0x40Full payload, XOR-encrypted
DATA_DIFF0xAADifferential update (changed slots only)
DATA_DIFF_SEC0xABDifferential, XOR-encrypted
DATA_HEART0x7FHeartbeat (no value change)
DATA_HEART_SEC0x80Heartbeat, XOR-encrypted
ID_REQUEST0x01Node requests a device ID
ID_ASSIGN0x02Server assigns a device ID
ID_POOL_REQ0x03Node requests a batch of IDs
ID_POOL_RES0x04Server batch ID assignment
HANDSHAKE_ACK0x05Positive acknowledgement
HANDSHAKE_NACK0x06Negative acknowledgement
PING0x09Liveness probe
PONG0x0ALiveness reply
TIME_REQUEST0x0BNode requests server timestamp
TIME_RESPONSE0x0CServer returns UNIX timestamp
SECURE_DICT_READY0x0DServer confirms key exchange
SECURE_CHANNEL_ACK0x0ENode acknowledges encrypted channel

Strategy Methods

MethodReturnsDescription
get_strategy(src_aid, has_template)strReturns "FULL_PACKET" or "DIFF_PACKET"
commit_success(src_aid)NoneIncrement successful send counter

Session Security Methods

MethodReturnsDescription
note_local_plaintext_sent(aid, timestamp_raw, addr=None)dictDerive pending session key from (aid, timestamp_raw)
establish_remote_plaintext(aid, timestamp_raw, addr=None)dictServer-side key derivation + set dict_ready=True
confirm_secure_dict(aid, timestamp_raw=None, addr=None)boolPromote pending key to active key
mark_secure_channel(aid, addr=None)dictSet decrypt_confirmed=True, state -> "SECURE"
has_secure_dict(aid)boolTrue if dict_ready flag is set
should_encrypt_outbound(aid)boolTrue if session has both dict_ready and key
get_session_key(aid)`bytesNone`
note_server_time(ts)NoneRecord server timestamp for clock correction

Control Packet Builders

MethodReturnsDescription
build_id_request(device_meta=None)bytes[0x01][seq:2][json_meta]
build_id_pool_request(count=10, meta=None)bytes[0x03][seq:2][count:2][json_meta]
build_ping()bytes[0x09][seq:2]
build_time_request()bytes[0x0B][seq:2]

Transport-Level Handshake Loops

MethodReturnsDescription
request_id_via_transport(transport_send_fn, transport_recv_fn, device_meta=None, timeout=5.0)`intNone`
request_id_pool_via_transport(transport_send_fn, transport_recv_fn, count=10, meta=None, timeout=5.0)list[int]Client-side bulk ID request
request_time_via_transport(transport_send_fn, transport_recv_fn, timeout=3.0)`intNone`
classify_and_dispatch(raw_bytes, addr=None)dictParse CMD byte and route to handler

IDAllocatorplugins/id_allocator.py

Thread-safe uint32 ID pool with JSON persistence.

Constructor

IDAllocator(base_dir=None, start_id=1, end_id=4294967295,
persist_file="data/id_allocation.json", max_released=10000,
lease_policy=None, metrics_sink=None)

Methods

MethodReturnsDescription
allocate_id(meta=None)intAllocate the next available ID
allocate_pool(count, meta=None)list[int]Allocate count IDs at once
release_id(id_val, immediate=False)boolMark ID offline and reclaim immediately or after lease
release_pool(ids)intBulk release; returns count released
is_allocated(id_val)boolCheck if ID is in use
get_meta(id_val)dictRetrieve metadata stored at allocation time
stats()dictIncludes allocator totals plus lease_policy and lease_metrics

TransporterManagercore/pycore/transporter_manager.py

Auto-discovers transporter driver modules from services/transporters/ and the physical/transport layers.

Constructor

TransporterManager(master: OpenSynaptic)

Key Methods

MethodReturnsDescription
auto_load()dictDiscover and activate all enabled transporters
get_driver(medium)driver | NoneReturn the driver for a medium key
refresh_protocol(medium)driver | NoneInvalidate cache and reload one driver
runtime_tick()boolPeriodic heartbeat; reloads stale drivers

CoreManagercore/coremanager.py

Discovers and lazy-loads core plugins (for example pycore and rscore) and resolves the public opensynaptic.core symbols.

Constructor

CoreManager(base_package: str = 'opensynaptic.core', default_core: str = 'pycore')

Key Methods

MethodReturnsDescription
discover_cores()list[str]Discover available core plugin packages under opensynaptic.core.*
available_cores()list[str]Return registered core plugin names
set_config(config)dictSet in-memory config for backend selection
set_config_path(config_path)dictLoad Config and cache it for selection
set_active_core(name)strForce the active core plugin
get_active_core_name()strReturn selected core plugin name
load_core(name=None)dictLoad a core plugin manifest lazily
list_symbols(name=None)list[str]Return exported symbols for the selected core
get_symbol(symbol_name, name=None)AnyResolve one exported symbol from the selected core
create_node(config_path=None, name=None)OpenSynapticInstantiate the selected core's OpenSynaptic class

Top-level helper:

from opensynaptic.core import get_core_manager

ServiceManagerservices/service_manager.py

Generic plugin mount / load / lifecycle hub.

Constructor

ServiceManager(config: dict | None = None, mode: str = 'runtime')

Methods

MethodReturnsDescription
mount(name, service, config=None, mode=None)serviceRegister a service under the given name
get(name, default=None)service | defaultRetrieve a mounted service
load(name)service | NoneCall service.auto_load() once, mark as loaded
start_all()dictLoad every mounted service
stop_all()NoneCall close() / shutdown() on all services
snapshot()dictReturn {mode, mount_index, runtime_index, config_index}
collect_cli_commands()dict[str, dict]Ask each service for its CLI sub-commands
dispatch_plugin_cli(plugin_name, argv)intRoute argv to the named plugin's handler; returns exit code

plugin_registryservices/plugin_registry.py

Utility module for decoupled plugin mounting and config default synchronization.

FunctionReturnsDescription
normalize_plugin_name(name)strNormalize aliases (web-user -> web_user, deps -> dependency_manager)
list_builtin_plugins()list[str]Built-in plugin keys
ensure_plugin_defaults(config, plugin_name)boolInject missing defaults into RESOURCES.service_plugins.<plugin>
sync_all_plugin_defaults(config)boolEnsure defaults for all built-ins
ensure_and_mount_plugin(node, plugin_name, load=False, mode='runtime')serviceEnsure config defaults, mount service, optionally load

TUIServiceservices/tui/main.py

Section-aware terminal UI snapshot renderer with interactive live mode.

Constructor

TUIService(node: OpenSynaptic)

Methods

MethodReturnsDescription
render_section(name)dictSingle section snapshot (identity/config/transport/pipeline/plugins/db)
render_sections(sections=None)dictMultiple sections; None = all
build_snapshot()dictFull snapshot dict (all sections + timestamp)
render_text(sections=None)strJSON-formatted string of render_sections()
run_once(sections=None)strAlias for render_text()
run_bios(interval=2.0, section=None)NoneInteractive BIOS-style view with hotkeys
run_interactive(interval=2.0, sections=None)NoneBackward-compatible wrapper to BIOS mode
get_cli_commands()dictReturns {"render": fn, "interactive": fn, "bios": fn}

TestPluginservices/test_plugin/main.py

ServiceManager-mountable test runner exposing component, stress, integration, and comparison suites.

Constructor

TestPlugin(node: OpenSynaptic | None = None)

Methods

MethodReturnsDescription
run_component(verbosity=1)(ok, fail, result)Run all unittest.TestCase component tests
run_stress(total=200, workers=8, sources=6, progress=True, core_backend=None, require_rust=False, header_probe_rate=0.0, batch_size=1, processes=1, threads_per_process=None, chain_mode='core')(summary_dict, fail_count)Run concurrent pipeline stress test
run_all(stress_total=200, stress_workers=8, stress_sources=6, verbosity=1, progress=True, core_backend=None, require_rust=False, header_probe_rate=0.0, batch_size=1, processes=1, threads_per_process=None, chain_mode='core')dictRun component + stress and return combined report
run_integration()(ok, fail, result)Run integration test suite
get_cli_commands()dictReturns handlers including component, stress, all, compare, full_load, integration, audit

WebUserServiceservices/web_user/main.py

HTTP management-entry plugin with a browser dashboard and control APIs.

MethodReturnsDescription
start(host=None, port=None)boolStart web service thread (defaults from RESOURCES.service_plugins.web_user)
stop()boolStop web service
status()dictRuntime state (running, users, data_file, uptime_s)
build_dashboard()dictAggregated node snapshot for Web dashboard (identity, plugins, transport, pipeline, users)
get_cli_commands()dictReturns start/stop/status/dashboard/list/add/update/delete handlers

HTTP routes:

  • GET / - browser UI
  • GET /health, GET /api/health
  • GET /api/dashboard
  • GET /api/dashboard?sections=identity,transport
  • GET /api/ui/config
  • PUT /api/ui/config
  • GET /api/options/schema?only_writable=1
  • PUT /api/options
  • GET /api/config?key=a.b.c
  • PUT /api/config
  • GET /api/plugins, POST /api/plugins
  • GET /api/transport, POST /api/transport
  • GET /users
  • POST /users
  • PUT /users/{username}
  • DELETE /users/{username}

Write endpoints are controlled by web_user.read_only and web_user.writable_config_prefixes. When web_user.auth_enabled=true, management calls require X-Admin-Token.


DependencyManagerPluginservices/dependency_manager/main.py

Dependency inspection and repair plugin.

CommandDescription
checkCompare pyproject.toml dependencies with installed packages
doctorRun pip check
syncInstall all declared dependencies
repairInstall only missing dependencies
installInstall one named dependency

Logging

Use the os_log singleton:

from opensynaptic.utils import os_log

os_log.info('MODULE', 'EVENT', 'message', {'context': 'dict'})
os_log.err('MODULE', 'EVENT', exception, {'context': 'dict'})
os_log.log_with_const('info', LogMsg.READY, root='/path/to/root')

All message templates live in utils/constants.py:MESSAGES.
Add a LogMsg enum member before using it with log_with_const.