Server Module
Overview
Neuro Websocket API Server Implementation.
The server module provides a complete WebSocket server infrastructure for hosting Neuro AI game integration services. It manages client connections, handles the WebSocket protocol, and provides both abstract interfaces and concrete implementations for AI-powered game interaction capabilities.
Key Features
Multi-Client Management: Handle multiple simultaneous game client connections
Structured Concurrency: Built on Trio’s async framework for reliable task management
WebSocket Server: Complete WebSocket server implementation with SSL support
Action Registry: Dynamic action management with registration and unregistration
Development Tools: Interactive console server for testing and debugging
Architecture Overview
The server module follows a layered architecture:
- Base Layer (AbstractNeuroServerClient)
Abstract interface defining the core client communication protocol
- Handler Layer (AbstractHandlerNeuroServerClient)
Adds ID generation, action tracking, and basic command handling
- Recording Layer (AbstractRecordingNeuroServerClient)
Implements action registry management and storage
- Implementation Layer (BaseTrioNeuroServerClient, TrioNeuroServerClient)
Concrete WebSocket communication using trio-websocket
- Server Layer (AbstractTrioNeuroServer)
Server coordination and multi-client management
Classes
Data Types
- class neuro_api.server.ContextData
Context command data schema.
- silent
If True, the message will be added to Neuro’s context without prompting her to respond to it. If False, Neuro _might_ respond to the message directly, unless she is busy talking to someone else or to chat.
- Type:
- class neuro_api.server.ActionSchema
Action schema definition.
- schema: NotRequired[SchemaObject]
- class neuro_api.server.RegisterActionsData
Register actions command data schema.
- actions
List of actions to register.
- Type:
- actions: list[ActionSchema]
- class neuro_api.server.UnregisterActionsData
Unregister actions command data schema.
- class neuro_api.server.ForceActionsData
Force actions command data schema.
- priority
Determines how urgently Neuro should respond to the action force when she is speaking.
- Type:
ForcePriority
- state: NotRequired[str]
- ephemeral_context: NotRequired[bool]
- priority: NotRequired[Literal['low', 'medium', 'high', 'critical']]
- class neuro_api.server.ActionResultData
Action Result command data schema.
- message: NotRequired[str]
Utility Functions
- neuro_api.server.deserialize_actions(data: dict[str, list[object]]) list[Action]
Deserialize a dictionary of actions into a list of Action objects.
- neuro_api.server.check_action_names_type(action_names: list[str]) None
Validate that all items in the action names list are strings.
- Parameters:
action_names (list[str]) – A list of action names to be validated.
- Raises:
ValueError – If any item in the list is not a string.
Examples
>>> check_action_names_type(['action1', 'action2']) # Passes >>> check_action_names_type(['action1', 123]) # Raises ValueError
Abstract Base Classes
- class neuro_api.server.AbstractNeuroServerClient
Bases:
AbstractNeuroAPIClientAbstract base class for Neuro Server Client communication.
This class defines the core interface for bidirectional communication between a server and client in the Neuro system, providing abstract methods and utilities for command exchange.
- Key Responsibilities:
Define methods for sending commands from server to client
Define methods for receiving and processing commands from client to server
Provide a standardized communication protocol for game automation
- Core Communication Patterns:
Action registration and unregistration
Context management
Command execution and result handling
Graceful shutdown mechanisms
Note
This is an abstract base class that MUST be subclassed
Subclasses must implement all abstract methods
Designed to support flexible, game-agnostic automation workflows
Abstract base class for Neuro Server Client communication. Defines the core interface for bidirectional communication between server and client.
- abstractmethod get_next_id() str
Generate and return the next unique command identifier.
- Returns:
A unique identifier for a command.
- Return type:
- async send_action_command(name: str, data: str | None = None) str
Submit an action request and return the associated command result ID.
This method attempts to execute a registered action. The server LLM should not proceed until it receives the associated action result.
- Parameters:
- Returns:
Command ID that the associated action result should have.
- Return type:
- async send_reregister_all_command() None
Send a command to the client to unregister and reregister all actions.
This method signals the client to completely reset and re-establish its action registration.
Warning
This command is part of a proposed API and is not yet officially supported. Some clients may not implement this functionality.
- async send_graceful_shutdown_command(wants_shutdown: bool) None
Send a graceful shutdown command to the client.
This method signals the game to save its state and return to the main menu, preparing for a potential shutdown.
- Parameters:
wants_shutdown (bool) – Control flag for shutdown request. - True: Request shutdown at the next graceful point - False: Cancel a previous shutdown request
Warning
This is part of the game automation API and will only be used for games that can be launched automatically by Neuro. Most games will not support this command.
Example
>>> await client.send_graceful_shutdown_command(True) # Request shutdown >>> await client.send_graceful_shutdown_command(False) # Cancel shutdown
- async send_immediate_shutdown_command() None
Send immediate shutdown command to client.
This method signals the game to shut down immediately, with the expectation that it will save its state and send back a shutdown-ready message as quickly as possible.
Warning
This is part of the game automation API and will only be used for games that can be launched automatically by Neuro. Most games will not support this command.
- async handle_startup(game_title: str) None
Process the startup command for the client.
This method is responsible for initializing the client’s state when a game starts. It MUST clear all previously registered actions for this specific client.
- Parameters:
game_title (str) – The title of the game being started. This value should remain consistent for the entire websocket client session.
Note
This method is critical for resetting the client’s action state before beginning a new game session.
Implementations should ensure a clean slate for new game interactions.
- async handle_context(game_title: str, message: str, silent: bool) None
Process the context command received from the game client.
This method handles incoming contextual information about the game state, which is directly passed to Neuro.
- Parameters:
game_title (str) – The title of the game. This should remain consistent throughout the websocket client session.
message (str) – A plaintext description of the current game state or event. This information is directly received by Neuro to understand the game context.
silent (bool) –
Controls Neuro’s response behavior: - True: Message is added to context silently
without prompting a response
False: Neuro may respond to the message, subject to her current availability and conversation state
Note
The silent flag provides fine-grained control over Neuro’s interaction with the context.
Even with silent=False, Neuro might not respond if she is engaged in another conversation or task.
- abstractmethod async handle_actions_register(game_title: str, actions: list[Action]) None
Register a list of actions for the game client.
This abstract method must be implemented by subclasses to handle action registration for a specific game client.
- Parameters:
Note
Implementations should handle duplicate action registrations by ignoring already registered actions.
This method is critical for preparing available actions for the game client.
- abstractmethod async handle_actions_unregister(game_title: str, action_names: list[str]) None
Unregister specified actions from the game client.
This abstract method must be implemented by subclasses to handle action unregistration for a specific game client.
- Parameters:
Note
If an action name to be unregistered is not currently registered, it should be silently ignored.
Implementations should gracefully handle attempts to unregister non-existent actions.
- abstractmethod async handle_actions_force(game_title: str, state: str | None, query: str, ephemeral_context: bool, action_names: list[str], priority: ForcePriority) None
Force Neuro to choose and execute actions from a specified list.
This abstract method provides a mechanism to constrain and direct Neuro’s action selection for a specific game scenario.
- Parameters:
game_title (str) – The title of the game. This value should remain consistent throughout the websocket client session.
state (str | None) – A detailed description of the current game state. Can be in various formats (plaintext, JSON, Markdown) and is directly passed to Neuro.
query (str) – A plaintext instruction specifying what Neuro should do in the current context. Directly received by Neuro.
ephemeral_context (bool) –
Controls context persistence: - False: Context (state and query) is remembered after
actions force completion
True: Context is only remembered during the actions force operation
action_names (list[str]) – Names of actions Neuro MUST choose from when executing the force command.
priority (ForcePriority) – Determines how urgently Neuro should respond to the action force when she is speaking. If Neuro is not speaking, this setting has no effect. The default is ForcePriority.LOW, which will cause Neuro to wait until she finishes speaking before responding. ForcePriority.MEDIUM causes her to finish her current utterance sooner. ForcePriority.HIGH prompts her to process the action force immediately, shortening her utterance and then responding. ForcePriority.CRITICAL will interrupt her speech and make her respond at once. Use ForcePriority.CRITICAL with caution, as it may lead to abrupt and potentially jarring interruptions.
- abstractmethod async handle_action_result(game_title: str, id_: str, success: bool, message: str | None) None
Process the result of an executed action.
This abstract method handles the outcome of a previously requested action, providing feedback and potential retry mechanisms.
- Parameters:
game_title (str) – The title of the game. This value should remain consistent throughout the websocket client session.
id (str) – Unique identifier of the action that was executed.
success (bool) –
Indicates the action’s execution status: - True: Action completed successfully - False: Action failed. If part of an actions force,
the entire actions force MUST be immediately retried
message (str | None) –
Detailed information about the action’s execution: - For successful actions: Optional small context about
the action’s outcome
For failed actions: An error message explaining why the action could not be completed
Note
The message provides direct feedback to Neuro about the action’s result
Failed actions in a force actions context trigger an automatic retry mechanism
Implementations should carefully handle both successful and unsuccessful action scenarios
- async handle_shutdown_ready(game_title: str) None
Process the shutdown ready command from the game client.
This method is called when the client indicates it is prepared to be safely closed or disconnected.
- Parameters:
game_title (str) – The title of the game. This value should remain consistent throughout the websocket client session.
Note
This method signals that the game client has completed any necessary cleanup or save operations
Implementations should perform any final cleanup or resource management required before closing the connection
The method is typically part of a graceful shutdown sequence for the game client
- static deserialize_actions(data: dict[str, list[object]]) list[Action]
Deserialize a dictionary of actions into a list of Action objects.
- static check_action_names_type(action_names: list[str]) None
Validate that all items in the action names list are strings.
- Parameters:
action_names (list[str]) – A list of action names to be validated.
- Raises:
ValueError – If any item in the list is not a string.
Examples
>>> check_action_names_type(['action1', 'action2']) # Passes >>> check_action_names_type(['action1', 123]) # Raises ValueError
- async read_message() None
Read and process a message from the client websocket.
This method is designed to be called in a loop while the websocket remains connected. It reads a raw client message and dispatches it to the appropriate handler based on the command type.
Supported Command Types and Their Handlers: - “startup”: Calls handle_startup - “context”: Calls handle_context - “actions/register”: Calls handle_actions_register - “actions/unregister”: Calls handle_actions_unregister - “actions/force”: Calls handle_actions_force - “action/result”: Calls handle_action_result - “shutdown/ready”: Calls handle_shutdown_ready - Unknown commands: Calls handle_unknown_command
- Raises:
ValueError – If command data is missing or invalid: - Missing data attribute for commands requiring it - Extra or missing keys in action command data
TypeError – If action command key types do not match expected types
Note
Does not catch exceptions raised by read_raw_client_message
Validates and processes each command type differently
Ensures type safety and data integrity before calling handlers
Example
>>> while websocket.is_connected(): ... await client.read_message()
- class neuro_api.server.AbstractHandlerNeuroServerClient
Bases:
AbstractNeuroServerClientAbstract base class for Neuro Server Client with enhanced command handling.
Extends AbstractNeuroServerClient with additional functionality for: - Unique ID generation - Pending action tracking - Game title management - Basic logging and error handling
Note
Uses __slots__ for memory-efficient attribute management
Provides default implementations for common server client methods
Requires subclasses to implement specific action handling logic
Extends AbstractNeuroServerClient with ID generation, action tracking, and basic command handling infrastructure.
- __init__() None
Initialize the Neuro Server Client with default state.
Sets up: - Unset game title - Zero-initialized ID counter - Empty pending actions dictionary
- async handle_unknown_command(command: str, data: dict[str, object] | None) None
Handle commands that are not recognized by the client.
Provides a default error handling mechanism for unexpected commands.
- Parameters:
Note
Logs a warning with the command details
Subclasses can override for more sophisticated handling
Base implementation just uses
log_warningto log a warning.
- get_next_id() str
Generate and return the next unique command identifier.
- Returns:
A unique identifier for a command.
- Return type:
- abstractmethod clear_registered_actions() None
Abstract method to clear all registered actions.
Subclasses must implement a method to reset the action state.
- check_game_title(game_title: str) None
Validate the game title against the current client state.
Performs two key checks: - Warns if no game title has been set before an action - Prevents changing the game title once it’s been set
- Parameters:
game_title (str) – The game title to validate
Note
Logs a warning if game title is not set
- async handle_startup(game_title: str) None
Process the startup command for the client.
This method is responsible for initializing the client’s state when a game starts. It MUST clear all previously registered actions for this specific client.
- Parameters:
game_title (str) – The title of the game being started. This value should remain consistent for the entire websocket client session.
Note
This method is critical for resetting the client’s action state before beginning a new game session.
Implementations should ensure a clean slate for new game interactions.
- abstractmethod add_context(message: str, reply_if_not_busy: bool) None
Add message to context.
- Parameters:
message (str) – A plaintext message that describes what is happening in the game. This information will be directly received by Neuro.
reply_if_not_busy (bool) – If False, the message will be added to Neuro’s context without prompting her to respond to it. If True, Neuro _might_ respond to the message directly, unless she is busy talking to someone else or to chat.
- async handle_context(game_title: str, message: str, silent: bool) None
Process the context command received from the game client.
This method handles incoming contextual information about the game state, which is directly passed to Neuro.
- Parameters:
game_title (str) – The title of the game. This should remain consistent throughout the websocket client session.
message (str) – A plaintext description of the current game state or event. This information is directly received by Neuro to understand the game context.
silent (bool) –
Controls Neuro’s response behavior: - True: Message is added to context silently
without prompting a response
False: Neuro may respond to the message, subject to her current availability and conversation state
Note
The silent flag provides fine-grained control over Neuro’s interaction with the context.
Even with silent=False, Neuro might not respond if she is engaged in another conversation or task.
- abstractmethod register_action(action: Action) None
Register a single action for the current game session.
This abstract method must be implemented by subclasses to define how individual actions are added to the client’s available action set.
- Parameters:
action (Action) – The action to be registered.
- Behavior:
If the action is already registered, it should be ignored
Implementations should handle action deduplication
Should update the client’s internal action tracking
Note
This is a core method for dynamically managing available actions during a game session
Subclasses must provide a concrete implementation that fits their specific action management strategy
- async handle_actions_register(game_title: str, actions: list[Action]) None
Register a list of actions for the game client.
This abstract method must be implemented by subclasses to handle action registration for a specific game client.
- Parameters:
Note
Implementations should handle duplicate action registrations by ignoring already registered actions.
This method is critical for preparing available actions for the game client.
- abstractmethod unregister_action(action_name: str) None
Remove a specific action from the current game session’s action set.
This abstract method must be implemented by subclasses to define how individual actions are removed from the client’s available actions.
- Parameters:
action_name (str) – The unique identifier of the action to be unregistered.
- Behavior:
If the action is not currently registered, it should be ignored
Implementations should handle action removal gracefully
Should update the client’s internal action tracking
Note
Critical for dynamically managing available actions during a game session
Allows for fine-grained control of action availability
Subclasses must provide a concrete implementation that fits their specific action management strategy
- async handle_actions_unregister(game_title: str, action_names: list[str]) None
Unregister specified actions from the game client.
This abstract method must be implemented by subclasses to handle action unregistration for a specific game client.
- Parameters:
Note
If an action name to be unregistered is not currently registered, it should be silently ignored.
Implementations should gracefully handle attempts to unregister non-existent actions.
- async submit_action(name: str, data: str | None = None) tuple[bool, str | None]
Submit an action request and wait for its completion result.
This method sends an action command to the client and blocks until the corresponding action result is received. It handles the complete action lifecycle from submission to result processing.
- Parameters:
name (str) – The name of the action to execute. Must be a previously registered action name.
data (str | None, optional) – JSON-stringified data for the action. Should conform to the JSON schema provided when the action was registered. If no schema was provided during registration, this should be None. Defaults to None.
- Returns:
- A tuple containing:
- bool: Success status of the action execution
True: Action completed successfully
False: Action failed to execute
- str | None: Result message from the action execution
For successful actions: Optional context about the outcome
For failed actions: Error message explaining the failure
None: No additional message provided
- Return type:
- Raises:
trio.BrokenResourceError – If the underlying communication channel is closed while waiting for the result.
trio.EndOfChannel – If the result channel is unexpectedly closed.
Note
This method blocks until the action completes
Each action gets a unique ID for result correlation
Failed actions in force action contexts will trigger automatic retries
The result message, if present, is automatically added to Neuro’s context
- async handle_action_result(game_title: str, id_: str, success: bool, message: str | None) None
Process the result of a previously submitted action.
This method handles action result messages from the client, correlating them with pending action submissions and delivering results through internal communication channels.
- Parameters:
game_title (str) – The title of the game. Must match the current client’s game title to prevent cross-client contamination.
id (str) – Unique identifier of the completed action. This ID was generated when the action was originally submitted.
success (bool) –
Execution status of the action: - True: Action completed successfully - False: Action failed. If this was part of a force actions
sequence, the entire sequence will be automatically retried.
message (str | None) – Detailed result information: - For successful actions: Optional context about the outcome - For failed actions: Error message explaining the failure - None: No additional information provided
- Behavior:
Validates the game title matches the current session
Looks up the pending action by ID
If found, delivers the result to the waiting
submit_actioncallIf message is provided, adds it to Neuro’s context (silently)
Logs a warning if the action ID is not found in pending actions
Note
This method is called automatically when action results arrive
Unknown action IDs are logged as warnings but don’t raise errors
Result messages are automatically added to context for Neuro’s awareness
The method ensures proper cleanup of internal tracking state
Warning
If an action ID is not found in pending actions, this indicates either a client implementation error or a race condition in the communication protocol.
- abstractmethod async choose_force_action(state: str | None, query: str, ephemeral_context: bool, action_names: frozenset[str], priority: ForcePriority = ForcePriority.LOW) tuple[str, str | None]
Select an action and generate its data for a forced action scenario.
This abstract method must be implemented by subclasses to provide the core decision-making logic for forced actions. It presents Neuro with the current game state and constraints, then returns her chosen action and associated data.
- Parameters:
state (str | None) – Detailed description of the current game state. Can be in any format (plaintext, JSON, Markdown, etc.) and is passed directly to Neuro for context understanding. None indicates no specific state information is available.
query (str) – Plaintext instruction telling Neuro what she should accomplish in this scenario. This directive is passed directly to Neuro and guides her action selection process.
ephemeral_context (bool) –
Controls context persistence after completion: - False: State and query information is retained in Neuro’s
context memory for future reference
True: State and query are only used during this force action and are not retained afterward
action_names (frozenset[str]) – Constrained set of action names that Neuro MUST choose from. Her selection is limited to only these registered actions.
- Returns:
- A tuple containing:
str: Selected action name, must be one of the names from the
action_namesparameterstr | None: JSON-stringified action data that should conform to the action’s registered JSON schema. Returns None if the action has no schema or requires no data.
- Return type:
Note
This method encapsulates the AI decision-making process
Implementations should handle Neuro’s reasoning and choice logic
The returned action name must exactly match one from action_names
Action data should be properly formatted JSON or None
- Example Implementation Pattern:
>>> async def choose_force_action(self, state, query, ephemeral_context, action_names): ... # Present options to Neuro with state and query context ... chosen_action = await self.present_choices_to_neuro( ... state, query, list(action_names), ephemeral_context ... ) ... action_data = await self.generate_action_data(chosen_action) ... return chosen_action, action_data
- async perform_actions_force(state: str | None, query: str, ephemeral_context: bool, action_names: list[str], priority: ForcePriority) None
Execute a forced action sequence with automatic retry on failure.
This method orchestrates the complete forced action workflow by repeatedly attempting action execution until success is achieved. It handles the retry logic for failed actions as required by the Neuro API specification.
- Parameters:
state (str | None) – Current game state description passed to Neuro. Format-agnostic (plaintext, JSON, Markdown, etc.).
query (str) – Plaintext instruction for what Neuro should accomplish.
ephemeral_context (bool) – Context retention flag: - False: Context is permanently added to Neuro’s memory - True: Context is only used for this forced action sequence
action_names (list[str]) – Available actions that Neuro can choose from. Converted to frozenset for the choice method.
- Behavior:
Calls
choose_force_actionto get Neuro’s selected action and dataSubmits the chosen action via
submit_actionIf the action fails, immediately retries from step 1
Continues until an action succeeds
Completes when any action in the sequence succeeds
Note
This method implements the mandatory retry logic for force actions
Failed actions trigger immediate re-selection and retry
The loop only terminates on successful action completion
Each retry allows Neuro to potentially choose a different action
Warning
This method can potentially run indefinitely if all available actions consistently fail.
- Example Flow:
>>> # Neuro chooses "attack_enemy" but it fails >>> # Method automatically retries >>> # Neuro chooses "flee_combat" and it succeeds >>> # Method completes successfully
- abstractmethod async submit_call_async_soon(function: Callable[[], Awaitable[Any]]) None
Schedule a coroutine function to be executed asynchronously.
This abstract method provides a mechanism for deferring the execution of async operations, typically used to prevent blocking during command processing or to manage execution ordering.
- Parameters:
function (Callable[[], Awaitable[Any]]) – A callable that returns a coroutine or awaitable object. The function should take no arguments and will be called and awaited when scheduled for execution.
Note
The exact scheduling behavior depends on the concrete implementation
Common patterns include using task queues, event loops, or nurseries
Used primarily for managing async execution flow in command handlers
The function will be called without arguments when executed
- Implementation Guidelines:
Should handle proper exception propagation from the scheduled function
May use frameworks like Trio nurseries, asyncio tasks, or custom queues
Should consider execution ordering and concurrency requirements
- Example Implementation Patterns:
>>> # Using Trio nursery >>> async def submit_call_async_soon(self, function): ... self.nursery.start_soon(function)
>>> # Using asyncio >>> async def submit_call_async_soon(self, function): ... asyncio.create_task(function())
- async handle_actions_force(game_title: str, state: str | None, query: str, ephemeral_context: bool, action_names: list[str], priority: ForcePriority) None
Process a force actions command from the game client.
This method handles incoming requests to constrain Neuro’s action choices to a specific set and execute actions until one succeeds. It delegates the actual execution to avoid blocking the message processing loop.
- Parameters:
game_title (str) – The title of the game sending the force actions command. Must match the current client’s game title.
state (str | None) – Current game state description that will be passed directly to Neuro for context understanding.
query (str) – Plaintext instruction telling Neuro what she should accomplish in the current scenario.
ephemeral_context (bool) – Context persistence control: - False: State and query are retained in Neuro’s memory - True: State and query are only used during this operation
action_names (list[str]) – List of action names that Neuro is restricted to choose from during this force sequence.
priority (ForcePriority) – Determines how urgently Neuro should respond to the action force when she is speaking. If Neuro is not speaking, this setting has no effect. The default is ForcePriority.LOW, which will cause Neuro to wait until she finishes speaking before responding. ForcePriority.MEDIUM causes her to finish her current utterance sooner. ForcePriority.HIGH prompts her to process the action force immediately, shortening her utterance and then responding. ForcePriority.CRITICAL will interrupt her speech and make her respond at once. Use ForcePriority.CRITICAL with caution, as it may lead to abrupt and potentially jarring interruptions.
- Behavior:
Validates the game title matches the current session
Creates a partial function with the force action parameters
Schedules the force action execution asynchronously
Returns immediately without waiting for completion
Note
This method returns immediately and does not block
The actual force action execution happens asynchronously
Uses
submit_call_async_soonto delegate executionEnsures the message processing loop remains responsive
Warning
The asynchronous execution means this method completes before the force actions are finished. Callers should not assume the actions have completed when this method returns.
- class neuro_api.server.AbstractRecordingNeuroServerClient
Bases:
AbstractHandlerNeuroServerClientAbstract Neuro Server Client with action recording and management capabilities.
This class extends AbstractHandlerNeuroServerClient by adding concrete implementations for action registration, unregistration, and tracking. It maintains an in-memory registry of all registered actions for the current game session.
- Key Features:
Action Registry: Maintains a dictionary mapping action names to Action objects
Dynamic Management: Supports runtime addition and removal of actions
Session Isolation: Actions are cleared between game sessions
Duplicate Handling: Automatically handles action name conflicts by replacement
- actions
Registry mapping action names to their corresponding Action objects. Updated dynamically as actions are registered and unregistered during the game session.
- Storage Behavior:
Actions are stored by name as the dictionary key
Registering an action with an existing name replaces the previous action
Unregistering non-existent actions is silently ignored
All actions are cleared when a new game session starts
Note
This class still requires subclasses to implement abstract methods from parent classes (e.g.,
add_context,choose_force_action)The action registry persists throughout a single game session
Memory usage scales with the number of registered actions
- Example Usage:
>>> class MyNeuroClient(AbstractRecordingNeuroServerClient): ... async def add_context(self, message, reply_if_not_busy): ... # Implementation specific to your use case ... pass ... ... async def choose_force_action(self, state, query, ephemeral_context, action_names): ... # Implementation specific to your use case ... pass ... >>> client = MyNeuroClient() >>> print(len(client.actions)) # 0 - no actions initially >>> # Actions get registered through the websocket protocol >>> # client.actions will contain registered actions
Adds concrete action registry management with dynamic registration, unregistration, and storage capabilities.
- __init__() None
Initialize the recording client with an empty action registry.
Sets up the parent class state and creates an empty dictionary to track registered actions for the current session.
- clear_registered_actions() None
Remove all registered actions from the current session.
This method is called automatically during game startup to ensure a clean slate for each new game session. It removes all previously registered actions from the internal registry.
Note
Called automatically by
handle_startupmethodEnsures no action leakage between different game sessions
Does not affect the game client’s action registrations
- register_action(action: Action) None
Add a single action to the current session’s action registry.
Stores the provided action in the internal registry, making it available for future action selection and execution. If an action with the same name already exists, it will be replaced.
- Parameters:
action (Action) – The action object to register. The action’s name will be used as the registry key.
- Behavior:
Action is stored using
action.nameas the dictionary keyExisting actions with the same name are silently replaced
The action becomes immediately available for selection
Registry size increases unless replacing an existing action
Note
No validation is performed on the action object
Duplicate names result in replacement, not addition
The action persists until explicitly unregistered or session ends
Example
>>> action = Action("move_player", "Move the player character", {...}) >>> client.register_action(action) >>> assert "move_player" in client.actions >>> assert client.actions["move_player"] == action
- unregister_action(action_name: str) None
Remove a specific action from the current session’s action registry.
Removes the action with the given name from the internal registry. If the action name is not found, the operation is silently ignored.
- Parameters:
action_name (str) – The name of the action to remove from the registry. Must exactly match the name used when the action was registered.
- Behavior:
Action is removed from the internal dictionary if it exists
Non-existent action names are silently ignored (no error)
Registry size decreases if the action was found
The action becomes unavailable for future selection
Note
Case-sensitive string matching is used for action names
No validation is performed on whether the action is currently in use
Graceful handling of attempts to unregister non-existent actions
Example
>>> client.unregister_action("move_player") >>> assert "move_player" not in client.actions >>> # Unregistering non-existent actions is safe >>> client.unregister_action("non_existent_action") # No error
- get_action(action_name: str) Action | None
Retrieve a registered action by its name.
Performs a lookup in the action registry to find the Action object associated with the given name.
- Parameters:
action_name (str) – The exact name of the action to retrieve. Must match the name used during registration (case-sensitive).
- Returns:
- The Action object if found in the registry,
None if no action with that name is registered.
- Return type:
Action | None
Note
This is a read-only operation that doesn’t modify the registry
Useful for validation before action execution
Returns None for non-existent actions (no exception raised)
The returned Action object is the same instance stored during registration
Example
>>> action = Action("jump", "Make player jump", None) >>> client.register_action(action) >>> retrieved = client.get_action("jump") >>> assert retrieved == action >>> assert client.get_action("unknown") is None
- has_action(action_name: str) bool
Check if an action is currently registered.
Performs a fast membership test to determine if an action with the given name exists in the current registry.
- Parameters:
action_name (str) – The name of the action to check for existence. Must be an exact match (case-sensitive).
- Returns:
- True if an action with the given name is registered,
False otherwise.
- Return type:
Note
This is a lightweight operation using dictionary key lookup
More efficient than
get_actionwhen only existence mattersDoes not provide access to the Action object itself
Useful for validation and conditional logic
Example
>>> client.register_action(Action("attack", "Attack enemy", None)) >>> assert client.has_action("attack") is True >>> assert client.has_action("defend") is False
- get_action_names() frozenset[str]
Get all currently registered action names.
Returns an immutable set containing the names of all actions currently registered in the session registry.
- Returns:
- Immutable set of all registered action names.
Empty frozenset if no actions are registered.
- Return type:
- Behavior:
Creates a snapshot of current action names at call time
Returned frozenset is immutable and independent of registry changes
Order is not guaranteed (set semantics)
Efficient for membership testing and iteration
Note
The returned frozenset reflects the state at the time of the call
Subsequent registry changes don’t affect previously returned frozensets
Useful for iteration, validation, and constraint checking
Compatible with methods expecting immutable action name collections
Example
>>> client.register_action(Action("move", "Move player", None)) >>> client.register_action(Action("jump", "Jump action", None)) >>> names = client.get_action_names() >>> assert names == frozenset({"move", "jump"}) >>> # Returned frozenset is independent of later changes >>> client.unregister_action("move") >>> assert names == frozenset({"move", "jump"}) # Still contains "move"
Concrete Implementations
- class neuro_api.server.BaseTrioNeuroServerClient(websocket: WebSocketConnection)
Bases:
AbstractRecordingNeuroServerClientTrio-based WebSocket implementation of Neuro Server Client.
This class provides a concrete WebSocket communication layer using the Trio async library and trio-websocket for real-time bidirectional communication with Neuro game clients.
- Key Features:
Trio Integration: Built on Trio’s structured concurrency model
WebSocket Communication: Real-time message exchange with game clients
Action Registry: Inherits action management from parent class
Connection Management: Handles WebSocket lifecycle and error conditions
- Architecture:
Extends AbstractRecordingNeuroServerClient with concrete WebSocket I/O
Provides low-level send/receive operations for the communication protocol
Serves as base class for more specialized Neuro client implementations
Maintains WebSocket connection state throughout the session
- websocket
The trio-websocket connection instance used for all client communication. Manages the underlying TCP connection and WebSocket protocol framing.
- Type:
WebSocketConnection
- Connection Lifecycle:
WebSocket connection established externally and passed to constructor
Client uses connection for bidirectional message exchange
Connection errors propagate to calling code for handling
No automatic reconnection - connection management is external
Note
This class still requires subclasses to implement abstract methods like
add_context,choose_force_action, andsubmit_call_async_soonWebSocket connection must be established before creating instances
Thread-safe within Trio’s structured concurrency model
Connection errors should be handled by the calling application
- Example Usage:
>>> import trio >>> from trio_websocket import serve_websocket >>> >>> class MyNeuroClient(BaseTrioNeuroServerClient): ... async def add_context(self, message, reply_if_not_busy): ... # Custom implementation ... pass ... # ... other required methods ... >>> async def handle_client(request): ... websocket = await request.accept() ... client = MyNeuroClient(websocket) ... # Use client for communication ... await client.read_message()
Trio-based WebSocket implementation providing concrete communication layer.
- __init__(websocket: WebSocketConnection) None
Initialize the Trio-based Neuro Server Client.
Sets up the WebSocket communication layer and inherits action management capabilities from the parent class.
- Parameters:
websocket (WebSocketConnection) – An established trio-websocket connection instance. Must be in a connected state and ready for message exchange. The connection lifecycle is managed externally to this class.
Note
The WebSocket connection must already be established and accepted
This class does not handle connection establishment or cleanup
Parent class initialization creates an empty action registry
The websocket instance is stored for the lifetime of the client
Example
>>> # Within a websocket handler >>> websocket = await request.accept() >>> client = BaseTrioNeuroServerClient(websocket) >>> # Client is ready for message processing
- websocket
- async write_to_websocket(data: str) None
Send a message to the connected game client via WebSocket.
Transmits a string message over the WebSocket connection to the game client. This method handles the low-level WebSocket framing and transmission details.
- Parameters:
data (str) – The message content to send to the client. Should be properly formatted according to the Neuro API protocol (typically JSON-formatted command data).
- Raises:
trio_websocket.ConnectionClosed – If the WebSocket connection has been closed by either the client or server, or if the connection is in the process of closing.
trio.BrokenResourceError – If the underlying network connection experiences an unexpected failure.
OSError – For low-level network errors (connection reset, timeout, etc.).
- Behavior:
Message is sent immediately (no buffering)
Blocks until message transmission is complete or fails
WebSocket protocol handles message framing automatically
String data is encoded as UTF-8 text frames
Note
This is a low-level communication primitive
Higher-level methods should use
send_command_datainsteadConnection errors indicate the client has disconnected
No automatic retry or reconnection is attempted
- async read_from_websocket() bytes | bytearray | memoryview | str
Receive a message from the connected game client via WebSocket.
Reads the next available message from the WebSocket connection, blocking until a message arrives or the connection fails.
- Returns:
- The received message data.
str: Text messages (recommended and expected for JSON protocol data)
bytes/bytearray/memoryview: Binary messages (supported by this code but not properly handled by the original Neuro implementation)
The specific type depends on the message frame type sent by the client.
- Return type:
bytes | bytearray | memoryview | str
- Raises:
trio_websocket.ConnectionClosed – If the WebSocket connection has been closed by the client or server, or encounters a protocol error.
trio.BrokenResourceError – If the internal message channel is broken, typically due to resource cleanup or cancellation. This is rare in normal operation.
AssertionError – If the received message type is unexpected or invalid according to the WebSocket protocol specification.
- Behavior:
Blocks until a message is received or connection fails
Returns immediately when a message is available
Handles WebSocket protocol framing automatically
Preserves message boundaries as sent by the client
Warning
While this method can receive binary messages (bytes, bytearray, or memoryview), the original Neuro implementation does not properly handle binary message processing. Clients should send only text messages containing JSON-formatted command data to ensure compatibility with the real Neuro.
Note
This is a low-level communication primitive
Higher-level methods should use
read_raw_client_messageinsteadText messages are typically JSON-formatted command data
Binary messages should be avoided in practice despite code support
Connection errors indicate the client has disconnected
Example
>>> message = await client.read_from_websocket() >>> if isinstance(message, str): ... # Process JSON command data (recommended path) ... command_data = json.loads(message) >>> else: ... # Handle binary data (avoid in practice) ... print("Warning: Received binary message, may not be processed correctly") ... binary_data = bytes(message)
- class neuro_api.server.TrioNeuroServerClient(websocket: WebSocketConnection, server: AbstractTrioNeuroServer)
Bases:
BaseTrioNeuroServerClientConcrete Trio-based Neuro Server Client with server integration.
This class provides a complete implementation of the Neuro Server Client by integrating with an AbstractTrioNeuroServer instance. It delegates high-level operations like context management and action selection to the server while handling client-specific WebSocket communication.
- Key Features:
Server Integration: Delegates AI operations to a parent server instance
Weak Reference Management: Uses weak references to prevent circular references
Enhanced Logging: Includes client identification in log messages
Action Coordination: Bridges client actions with server-side AI processing
Structured Concurrency: Leverages Trio nurseries for async task management
- Architecture:
Acts as a bridge between WebSocket clients and the Neuro server
Maintains a weak reference to prevent memory leaks
Forwards context and action requests to the server for AI processing
Handles client-specific state while server manages global AI state
- _server_ref
Weak reference to the parent server instance. Prevents circular references that could cause memory leaks while allowing access to server functionality.
- Lifecycle Management:
Server reference is established during initialization
Weak reference allows server to be garbage collected independently
Dead reference detection prevents operations on destroyed servers
Client can outlive server in certain shutdown scenarios
Note
This is a complete, ready-to-use implementation
No abstract methods remain to be implemented by subclasses
Server must remain alive for the client to function properly
Designed for use within Trio-based server applications
- Example Usage:
>>> async def handle_websocket_client(request): ... websocket = await request.accept() ... client = TrioNeuroServerClient(websocket, neuro_server) ... try: ... while True: ... await client.read_message() ... except trio_websocket.ConnectionClosed: ... # Client disconnected ... pass
Complete client implementation with server integration for production use.
- __init__(websocket: WebSocketConnection, server: AbstractTrioNeuroServer) None
Initialize the Trio Neuro Server Client with server integration.
Sets up the WebSocket communication layer and establishes a weak reference to the parent server for AI operation delegation.
- Parameters:
websocket (WebSocketConnection) – An established trio-websocket connection instance ready for message exchange with the game client.
server (AbstractTrioNeuroServer) – The parent Neuro server instance that will handle AI operations, context management, and action selection for this client.
Note
Creates a weak reference to the server to prevent circular references
Server must remain alive for the client to function properly
WebSocket connection must already be established and accepted
Parent class initialization sets up action registry and base state
- property server: AbstractTrioNeuroServer
Access the parent server instance.
Dereferences the weak reference to obtain the server instance, with validation to ensure the server is still alive.
- Returns:
- The parent server instance that handles
AI operations and coordinates multiple clients.
- Return type:
- Raises:
ValueError – If the server instance has been garbage collected and the weak reference is dead. This indicates the server has been destroyed while the client is still active.
Note
Uses weak reference to prevent circular reference memory leaks
Server lifetime is managed independently of client lifetime
Dead reference indicates abnormal shutdown or cleanup order
Should be checked before any server operations
Example
>>> try: ... server = client.server ... server.add_context(game_title, message, True) ... except ValueError: ... print("Server has been destroyed")
- log_warning(message: str) None
Log a warning message with client identification context.
Enhances the base logging functionality by including the game title and client connection information for better debugging and monitoring.
- Parameters:
message (str) – The warning message to log. Will be prefixed with client identification information including game title and remote connection details.
- Behavior:
Extracts remote connection information (IP:port or string identifier)
Formats message with game title and connection details
Delegates actual logging to the parent server’s logging system
Provides context for debugging multi-client scenarios
Note
Remote connection format depends on the underlying transport
Game title may be None if startup hasn’t completed
Logging behavior is determined by the server implementation
Useful for identifying which client generated warnings
Example
>>> client.log_warning("Action validation failed") >>> # Logs: "[MyGame (192.168.1.100:12345)] Action validation failed"
- add_context(message: str, reply_if_not_busy: bool) None
Add contextual information to Neuro’s understanding.
Forwards context messages from the game client to the parent server for integration into Neuro’s contextual awareness.
- Parameters:
message (str) – Plaintext description of the current game state or event. This information is passed directly to Neuro for contextual understanding.
reply_if_not_busy (bool) – Controls Neuro’s response behavior: - True: Neuro may respond to the message if she’s not busy - False: Message is added silently without prompting a response
Note
Context is associated with this client’s game title
Server coordinates context across multiple clients if needed
Message content is passed through unchanged to Neuro
Response behavior depends on Neuro’s current conversation state
Example
>>> client.add_context("Player entered the forest", True) >>> # Neuro receives context and may respond about the forest
- async choose_force_action(state: str | None, query: str, ephemeral_context: bool, action_names: frozenset[str], priority: ForcePriority = ForcePriority.LOW) tuple[str, str | None]
Delegate action selection to the parent server.
Forwards force action requests to the server’s AI system for processing, converting action names to full Action objects.
- Parameters:
state (str | None) – Current game state description for Neuro’s context.
query (str) – Instruction telling Neuro what she should accomplish.
ephemeral_context (bool) – Whether context should persist after completion.
action_names (frozenset[str]) – Set of action names Neuro must choose from.
priority (ForcePriority) – Action force priority level.
- Returns:
- Tuple containing:
str: Selected action name from the provided set
str | None: JSON-stringified action data or None if no data needed
- Return type:
- Behavior:
Looks up full Action objects from the client’s action registry
Passes complete Action objects to the server for AI processing
Server handles the actual AI decision-making and data generation
Returns the server’s choice formatted for client consumption
Note
All action names must exist in the client’s action registry
Server receives full Action objects with schemas for validation
AI processing happens on the server side, not in the client
Game title is automatically included for server context
Example
>>> action_name, data = await client.choose_force_action( ... "Player at crossroads", "Choose direction", False, ... frozenset(["go_north", "go_south"]) ... ) >>> # Returns something like ("go_north", '{"speed": "fast"}')
- async submit_call_async_soon(function: Callable[[], Any]) None
Schedule a coroutine function to be executed asynchronously.
This abstract method provides a mechanism for deferring the execution of async operations, typically used to prevent blocking during command processing or to manage execution ordering.
- Parameters:
function (Callable[[], Awaitable[Any]]) – A callable that returns a coroutine or awaitable object. The function should take no arguments and will be called and awaited when scheduled for execution.
Note
The exact scheduling behavior depends on the concrete implementation
Common patterns include using task queues, event loops, or nurseries
Used primarily for managing async execution flow in command handlers
The function will be called without arguments when executed
- Implementation Guidelines:
Should handle proper exception propagation from the scheduled function
May use frameworks like Trio nurseries, asyncio tasks, or custom queues
Should consider execution ordering and concurrency requirements
- Example Implementation Patterns:
>>> # Using Trio nursery >>> async def submit_call_async_soon(self, function): ... self.nursery.start_soon(function)
>>> # Using asyncio >>> async def submit_call_async_soon(self, function): ... asyncio.create_task(function())
Server Classes
- class neuro_api.server.AbstractTrioNeuroServer
Bases:
objectAbstract base class for Trio-based Neuro AI servers.
This class provides the core WebSocket server infrastructure for hosting Neuro AI game integration services. It manages client connections, handles the WebSocket protocol, and defines the interface for AI-powered game interaction capabilities.
- Key Features:
Multi-Client Management: Handles multiple simultaneous game client connections
Structured Concurrency: Built on Trio’s async framework for reliable task management
WebSocket Server: Complete WebSocket server implementation with SSL support
AI Integration Interface: Abstract methods for connecting to Neuro AI systems
Connection Lifecycle: Automatic client registration, cleanup, and error handling
- Architecture:
Serves as the central coordination point for multiple game clients
Delegates AI operations to concrete implementations via abstract methods
Manages WebSocket connections and protocol handling automatically
Provides logging infrastructure for monitoring and debugging
- clients
Registry of active client connections, keyed by remote address (IP:port format). Updated automatically as clients connect and disconnect.
- Type:
- handler_nursery
Trio nursery for managing background tasks and client operations. Set during server startup and used for structured concurrency management.
- Type:
- Connection Management:
Clients are registered by remote address when they connect
Connection lifecycle is managed automatically with proper cleanup
Failed connections are logged and handled gracefully
Multiple clients can be active simultaneously with independent state
- Abstract Methods:
Subclasses must implement: -
add_context: Handle contextual information from game clients -choose_force_action: Implement AI-driven action selection logic
Note
This is an abstract base class requiring concrete implementation
SSL/TLS support is available but optional
Server runs until manually stopped or encounters critical errors
Designed for integration with external Neuro AI systems
- Example Usage:
>>> class MyNeuroServer(AbstractTrioNeuroServer): ... def add_context(self, game_title, message, reply_if_not_busy): ... # Forward to Neuro AI system ... pass ... ... async def choose_force_action(self, game_title, state, query, ephemeral_context, actions): ... # Implement AI action selection ... return selected_action_name, action_data ... >>> server = MyNeuroServer() >>> await server.run("localhost", 8080)
Abstract base class for Trio-based Neuro AI servers providing WebSocket server infrastructure and multi-client management.
- __init__() None
Initialize the abstract Neuro server with default state.
Sets up the basic server infrastructure including client registry and prepares for nursery assignment during server startup.
Note
Creates empty client registry for connection tracking
Handler nursery is assigned during
run()method executionServer is not ready to accept connections until
run()is called
- clients: dict[str, TrioNeuroServerClient]
- log_info(message: str) None
Log an informational message to the console.
- Parameters:
message (str) – The informational message to log.
Note
Uses simple console output with INFO prefix
Subclasses can override for more sophisticated logging
Useful for tracking server operations and client activity
- log_warning(message: str) None
Log a warning message to the console.
- Parameters:
message (str) – The warning message to log.
Note
Uses simple console output with WARNING prefix
Indicates non-critical issues that should be monitored
Subclasses can override for more sophisticated logging
- log_critical(message: str) None
Log a critical error message to the console.
- Parameters:
message (str) – The critical error message to log.
Note
Uses simple console output with CRITICAL prefix
Indicates serious issues that may affect server operation
Subclasses can override for more sophisticated logging
- abstractmethod add_context(game_title: str | None, message: str, reply_if_not_busy: bool) None
Add contextual information to the Neuro AI system.
This abstract method must be implemented by subclasses to handle contextual information from game clients and forward it to the Neuro AI system for processing.
- Parameters:
game_title (str | None) – The title of the game providing context. None may indicate context from an unidentified or system source.
message (str) – Plaintext description of the current game state or event. This information is intended to be passed directly to Neuro for contextual understanding.
reply_if_not_busy (bool) –
Controls Neuro’s response behavior: - False: Message is added to context silently without prompting
a response from Neuro
True: Neuro may respond to the message directly, subject to her current availability and conversation state
- Implementation Requirements:
Must forward message content to the Neuro AI system
Should handle game title association for multi-client scenarios
Must respect the reply_if_not_busy flag for response control
Should handle None game titles gracefully
Note
Message content is passed directly to Neuro without modification
Response behavior depends on Neuro’s current conversation state
Multiple clients may provide context simultaneously
Implementation should be thread-safe for concurrent access
- Example Implementation:
>>> def add_context(self, game_title, message, reply_if_not_busy): ... context_data = { ... "game": game_title, ... "message": message, ... "can_reply": reply_if_not_busy ... } ... self.neuro_api.send_context(context_data)
- abstractmethod async choose_force_action(game_title: str | None, state: str | None, query: str, ephemeral_context: bool, actions: tuple[Action, ...], priority: ForcePriority = ForcePriority.LOW) tuple[str, str | None]
Select an action for Neuro to perform from a constrained set.
This abstract method must be implemented by subclasses to provide AI-driven action selection logic when games request forced actions.
- Parameters:
game_title (str | None) – The title of the requesting game. May be None if the request comes from an unidentified source.
state (str | None) – Detailed description of the current game state. Can be in any format (plaintext, JSON, Markdown, etc.) and is intended for direct consumption by the Neuro AI system.
query (str) – Plaintext instruction describing what Neuro should accomplish in the current scenario. Passed directly to Neuro for decision-making guidance.
ephemeral_context (bool) –
Context persistence control: - False: State and query information is retained in Neuro’s
memory after the action selection completes
True: State and query are only used during this selection process and are not retained afterward
actions (tuple[Action, ...]) – Immutable sequence of Action objects that Neuro must choose from. Each Action contains name, description, and optional JSON schema for data validation.
priority (ForcePriority) – Force action priority.
- Returns:
- A tuple containing:
str: Selected action name, must exactly match the name of one of the provided Action objects
str | None: JSON-stringified action data conforming to the action’s registered schema, or None if no data is required
- Return type:
- Implementation Requirements:
Must return an action name that exists in the provided actions tuple
Action data must conform to the selected action’s JSON schema
Should handle ephemeral context appropriately in the AI system
Must process state and query information for AI decision-making
Note
This method encapsulates the core AI decision-making process
Selection should be based on the provided state and query context
Action data generation must respect the action’s schema constraints
Multiple concurrent force action requests may be processed
- Example Implementation:
>>> async def choose_force_action(self, game_title, state, query, ephemeral_context, actions): ... ai_request = { ... "game": game_title, ... "state": state, ... "instruction": query, ... "ephemeral": ephemeral_context, ... "available_actions": [action.to_dict() for action in actions] ... } ... response = await self.neuro_ai.request_action_selection(ai_request) ... return response["action_name"], response["action_data"]
- async run(address: str, port: int, ssl_context: SSLContext | None = None) None
Start the WebSocket server and run until termination.
Initializes the server infrastructure, starts accepting client connections, and runs indefinitely until manually stopped or a critical error occurs.
- Parameters:
address (str) – The network address to bind the server to. Use “localhost” for local-only access or “0.0.0.0” for all interfaces.
port (int) – The TCP port number to listen on. Must be available and not in use by other services.
ssl_context (SSLContext | None, optional) – SSL context for HTTPS/WSS encryption. If None, server runs in plain HTTP/WS mode. Defaults to None.
- Behavior:
Creates a Trio nursery for managing all server tasks
Starts the WebSocket server with the provided configuration
Accepts and handles client connections indefinitely
Performs structured cleanup on shutdown or error
- Raises:
Note
This method blocks until the server is shut down
SSL context enables secure WebSocket (WSS) connections
Server automatically handles client connection lifecycle
Uses structured concurrency for reliable resource management
- async handle_websocket_request(request: WebSocketRequest) None
Process an incoming WebSocket connection request.
Handles the initial WebSocket handshake and delegates connection management to the client connection handler.
- Parameters:
request (WebSocketRequest) – The incoming WebSocket connection request containing client information and connection details.
- Behavior:
Extracts client remote address for logging
Logs the connection request for monitoring
Accepts the WebSocket connection
Delegates to
handle_client_connectionfor lifecycle management
Note
This method is called automatically by the WebSocket server
Connection acceptance happens unconditionally
Client filtering should be implemented here if needed
Remote address formatting handles both string and socket address types
- async handle_client_connection(websocket: WebSocketConnection) None
Manage the complete lifecycle of a client WebSocket connection.
Handles client registration, message processing, and cleanup for an established WebSocket connection throughout its entire lifetime.
- Parameters:
websocket (WebSocketConnection) – An accepted WebSocket connection ready for bidirectional message exchange with the game client.
- Behavior:
Registers the client in the active clients dictionary
Creates a TrioNeuroServerClient instance for the connection
Processes incoming messages in a continuous loop
Automatically handles connection cleanup on disconnection
Logs connection events and errors for monitoring
- Connection Lifecycle:
Client registration with remote address as key
Continuous message processing until disconnection
Automatic cleanup of client registry on exit
Exception logging for debugging connection issues
- Exception Handling:
All exceptions are logged with full traceback
Connection is automatically cleaned up via context manager
Client is removed from registry when connection ends
Exceptions are re-raised for proper error propagation
Note
Method runs until client disconnects or error occurs
Uses WebSocket context manager for automatic cleanup
Client registry is updated automatically
Message processing happens in
TrioNeuroServerClient.read_message
- Example Flow:
>>> # Client connects -> logged and registered >>> # Client sends messages -> processed continuously >>> # Client disconnects -> logged and cleaned up
- class neuro_api.server.ConsoleInteractiveNeuroServer
Bases:
AbstractTrioNeuroServerConsole-based interactive implementation of Neuro Server for development and testing.
This concrete implementation provides a human-operated interface for testing Neuro API integration without requiring a full AI system. It uses console input/output to simulate Neuro’s decision-making process, making it ideal for development, debugging, and demonstration.
- Key Features:
Interactive Console Interface: Human operator makes decisions via console
Development Testing: Test game clients without full AI infrastructure
Action Visualization: Clear display of available actions and context
Schema Support: Handles JSON schema validation and data input
Real-time Feedback: Immediate console output for context messages
- Use Cases:
Development and testing of game client integrations
Debugging WebSocket communication protocols
Demonstrating Neuro API capabilities to stakeholders
Educational purposes for understanding the Neuro system
- Limitations:
Requires human operator interaction for all decisions
Not suitable for production or automated scenarios
Console-based interface may not scale for complex interactions
No persistent state or learning capabilities
Note
This implementation is synchronous from the operator’s perspective
Trio checkpoints are used to maintain cooperative multitasking
All AI decision-making is replaced with human console interaction
Suitable only for development and demonstration environments
- Example Usage:
>>> server = ConsoleInteractiveNeuroServer() >>> await server.run("localhost", 8000) >>> # Server will prompt operator for all AI decisions via console
Console-based interactive server implementation for development and testing.
- __init__() None
Initialize the abstract Neuro server with default state.
Sets up the basic server infrastructure including client registry and prepares for nursery assignment during server startup.
Note
Creates empty client registry for connection tracking
Handler nursery is assigned during
run()method executionServer is not ready to accept connections until
run()is called
- console_command_lock
- add_context(game_title: str | None, message: str, reply_if_not_busy: bool) None
Display context information to the console operator.
Prints contextual information from game clients to the console, allowing the human operator to understand the current game state and events.
- Parameters:
game_title (str | None) – The title of the game providing context. Displayed to help the operator track multiple clients.
message (str) – The contextual message describing game state or events. Displayed directly to the console operator.
reply_if_not_busy (bool) – Flag indicating whether Neuro should respond to this context. Displayed for operator awareness but does not affect behavior in this implementation.
- Behavior:
Prints context message with clear formatting
Shows the reply_if_not_busy flag for operator information
Uses distinct [CONTEXT] prefix for easy identification
No interactive response required from operator
Note
This method provides information only, no decisions required
Multiple context messages may appear rapidly during gameplay
Operator can observe game state changes through these messages
The reply_if_not_busy flag is informational in this implementation
- Example Output:
>>> [CONTEXT] Player entered the dark forest >>> reply_if_not_busy = True
- list_client_actions(client: TrioNeuroServerClient) None
Display available actions for client.
- ask_action_json(action: Action) str | None
Prompt console operator to provide optional data for an action.
- async console_input_command(client: TrioNeuroServerClient) None
Have user input command from console.
- Parameters:
client (TrioNeuroServerClient) – Client to send command to.
- start_console_input_command(client: TrioNeuroServerClient) None
Start console input command if not already active.
- async handle_client_connection(websocket: WebSocketConnection) None
Manage the complete lifecycle of a client WebSocket connection.
Handles client registration, message processing, and cleanup for an established WebSocket connection throughout its entire lifetime.
- Parameters:
websocket (WebSocketConnection) – An accepted WebSocket connection ready for bidirectional message exchange with the game client.
- Behavior:
Registers the client in the active clients dictionary
Creates a TrioNeuroServerClient instance for the connection
Processes incoming messages in a continuous loop
Automatically handles connection cleanup on disconnection
Logs connection events and errors for monitoring
- Connection Lifecycle:
Client registration with remote address as key
Continuous message processing until disconnection
Automatic cleanup of client registry on exit
Exception logging for debugging connection issues
- Exception Handling:
All exceptions are logged with full traceback
Connection is automatically cleaned up via context manager
Client is removed from registry when connection ends
Exceptions are re-raised for proper error propagation
Note
Method runs until client disconnects or error occurs
Uses WebSocket context manager for automatic cleanup
Client registry is updated automatically
Message processing happens in
TrioNeuroServerClient.read_message
- Example Flow:
>>> # Client connects -> logged and registered >>> # Client sends messages -> processed continuously >>> # Client disconnects -> logged and cleaned up
- async choose_force_action(game_title: str | None, state: str | None, query: str, ephemeral_context: bool, actions: tuple[Action, ...], priority: ForcePriority = ForcePriority.LOW) tuple[str, str | None]
Prompt console operator to select an action and provide data.
Presents the force action scenario to the console operator and prompts for action selection and optional JSON data input.
- Parameters:
game_title (str | None) – The requesting game’s title, displayed for operator context and multi-client tracking.
state (str | None) – Current game state description, displayed to help the operator understand the scenario.
query (str) – Instruction describing what should be accomplished, displayed to guide the operator’s decision.
ephemeral_context (bool) – Context persistence flag, displayed for operator information.
actions (tuple[Action, ...]) – Available actions the operator must choose from, displayed with descriptions.
- Returns:
- Tuple containing:
str: Name of the action selected by the operator
str | None: JSON data provided by the operator, or None if no data was provided or the action has no schema
- Return type:
- Interactive Process:
Displays game context (title, state, query)
Shows numbered list of available actions with descriptions
Prompts operator to select action by number
If selected action has a schema, optionally prompts for JSON data
Returns the selected action name and data
- Behavior:
Uses Trio checkpoints to maintain cooperative multitasking
Validates operator input (action number must be valid)
Shows action schema when prompting for JSON data
Allows operator to skip JSON data input even for schema actions
Note
Blocking operation that waits for operator input
Input validation is minimal - operator must provide valid numbers
JSON data validation is not performed by this method
Multiple concurrent force actions will be processed sequentially
- Example Interaction:
>>> [Force Action] game_title = 'MyGame' >>> state = 'Player at crossroads' >>> query = 'Choose direction to explore' >>> Options: >>> 1: go_north >>> Move towards the mountain path >>> 2: go_south >>> Head toward the village >>> Action > 1 >>> action.schema = {...} >>> Do json blob? (y/N) > y >>> Json blob > {"speed": "fast"}
Usage Examples
Basic Server Implementation
from neuro_api.server import AbstractTrioNeuroServer
from neuro_api.command import Action
class MyNeuroServer(AbstractTrioNeuroServer):
def add_context(self, game_title, message, reply_if_not_busy):
print(f"[{game_title}] Context: {message}")
# Forward to your AI system
async def choose_force_action(self, game_title, state, query,
ephemeral_context, actions):
# Implement AI decision logic
selected_action = actions[0] # Simple selection
action_data = '{"param": "value"}' # Generate data
return selected_action.name, action_data
# Start server
server = MyNeuroServer()
await server.run("localhost", 8000)
Development Server
from neuro_api.server import ConsoleInteractiveNeuroServer
import trio
async def main():
server = ConsoleInteractiveNeuroServer()
await server.run("localhost", 8000)
trio.run(main)
Custom Client Handler
from neuro_api.server import AbstractRecordingNeuroServerClient
class CustomClient(AbstractRecordingNeuroServerClient):
def add_context(self, message, reply_if_not_busy):
# Custom context handling
self.log_info(f"Game context: {message}")
async def choose_force_action(self, state, query, ephemeral_context, action_names):
# Custom AI integration
return await self.ai_system.choose_action(
state, query, list(action_names)
)
async def submit_call_async_soon(self, function):
# Schedule with custom task manager
await self.task_scheduler.schedule(function)
Protocol Flow
Connection Lifecycle
Client Connection
WebSocket handshake
Client registration in server registry
TrioNeuroServerClient instance creation
Game Initialization
Client sends
startupcommandServer clears previous actions
Game title registration
Action Management
Client registers actions via
actions/registerActions stored in client registry
Dynamic registration/unregistration supported
Game Interaction
Context updates via
contextcommandForced actions via
actions/forceAction execution and results
Disconnection
Automatic cleanup of client registry
Resource cleanup via context managers
Message Processing
The server processes these command types:
startup: Initialize game sessioncontext: Add contextual informationactions/register: Register new actionsactions/unregister: Remove actionsactions/force: Force action selectionaction/result: Report action outcomesshutdown/ready: Confirm shutdown readiness
Action Force Flow
Client Server AI System
------ ------ ---------
actions/force --> choose_force_action --> [AI Decision]
<-- action command <--
action/result --> handle_action_result
<-- [retry if failed]
Error Handling
Connection Errors
WebSocket Disconnection: Automatic client cleanup
Protocol Errors: Logged and connection terminated
Invalid Commands: Error responses sent to client
Action Errors
Invalid Action Names: Validation and error reporting
Schema Violations: JSON schema validation failures
Missing Actions: Graceful handling of unregistered actions
Server Errors
Startup Failures: Critical error logging and shutdown
Resource Exhaustion: Proper error propagation
AI System Failures: Graceful degradation where possible
Configuration
Server Configuration
# Basic configuration
await server.run("localhost", 8000)
# SSL/TLS configuration
import ssl
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain("server.crt", "server.key")
await server.run("0.0.0.0", 8443, ssl_context=ssl_context)
Development and Testing
Interactive Development Server
The ConsoleInteractiveNeuroServer provides a human-operated interface:
$ python -m neuro_api.server
# Interactive commands available:
# help - Show available commands
# list - Show connected clients and actions
# send <client> <action> - Manually trigger actions
# <enter> - Continue without action
Testing Client Connections
# Test with multiple clients
import trio
from neuro_api.trio_ws import TrioNeuroAPIComponent
async def test_client():
component = TrioNeuroAPIComponent("test_client", "Test Game")
# Connect and test...
Performance Considerations
Memory Usage: Scales with number of active clients and registered actions
Concurrency: Built on Trio’s structured concurrency for efficient scaling
Network I/O: Async WebSocket handling prevents blocking on slow clients
Action Registry: In-memory storage with O(1) action lookup
Integration Notes
This module integrates with:
neuro_api.api- Base API functionalityneuro_api.command- Command definitions and validationneuro_api.client- Client interface definitionstrio- Async runtime and structured concurrencytrio-websocket- WebSocket protocol implementation
For production deployment, implement the abstract methods in AbstractTrioNeuroServer
to integrate with your specific AI system and game infrastructure.