Available Executors

Threading Executor

class pymoa_remote.threading.AsyncThreadExecutor

Bases: object

Executor that executes async functions in a trio event loop in a secondary thread.

cancel_nursery: Optional[trio.Nursery] = None
await execute(obj, async_fn, args=(), kwargs=None, callback=None)
await execute_generator(obj, sync_gen, args=(), kwargs=None, callback=None) AsyncGenerator
await get_echo_clock() Tuple[int, int, int]
await start(name='AsyncThreadExecutor')
await stop()
to_thread_portal: Optional[pymoa_remote.threading.TrioPortal] = None
class pymoa_remote.threading.SyncThreadExecutor

Bases: object

Executor that executes functions in a secondary thread.

eof = <object object>
await execute(obj, sync_fn, args=(), kwargs=None, callback=None)

It’s guaranteed sequential. TODO: if called after stop or thread exited it may block forever.

async for ... in execute_generator(obj, sync_gen, args=(), kwargs=None, callback=None) AsyncGenerator

Last items may be dropped after getting them. TODO: if called after stop is called it may block forever.

await get_echo_clock() Tuple[int, int, int]
max_queue_size = 10
await sleep(duration=None, deadline=None) int
await start(name='ThreadExecutor')
await stop()
class pymoa_remote.threading.ThreadExecutor(**kwargs)

Bases: pymoa_remote.client.Executor

await apply_config_from_remote(obj)
await apply_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED)
await apply_execute_from_remote(obj, exclude_self=True, task_status=TASK_STATUS_IGNORED)
await apply_property_data_from_remote(obj: Any, properties: List[str])
await delete_remote_instance(obj)
await ensure_remote_instance(obj, hash_name, *args, auto_register_class=True, **kwargs) bool
await execute(obj, fn: Union[Callable, str], args=(), kwargs=None, callback: Optional[Union[Callable, str]] = None)
async for ... in execute_generator(obj, gen: Union[Callable, str], args=(), kwargs=None, callback: Optional[Union[Callable, str]] = None, task_status=TASK_STATUS_IGNORED) AsyncGenerator
async with get_channel_from_remote(hash_name: str, channel: str, task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator]
async with get_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator]
await get_echo_clock() Tuple[int, int, int]
await get_remote_object_config(obj: Optional[Any])
await get_remote_object_property_data(obj: Any, properties: List[str]) dict
await get_remote_objects()
is_remote = False
await register_remote_class(cls)
await remote_import(module)
await sleep(duration=None, deadline=None) int
await start_executor()
await stop_executor()
class pymoa_remote.threading.TrioPortal(trio_token=None)

Bases: object

Portal for communicating with trio from a different thread.

await run(afn, *args)
await run_sync(fn, *args)
trio_token: trio.lowlevel.TrioToken = None

Socket Client

class pymoa_remote.socket.client.SocketExecutor(server: str = '', port: int = 0, **kwargs)

Bases: pymoa_remote.client.Executor

Executor that sends all requests to a remote server to be executed there, using a socket.

Each request is sent as a dict with metadata. Typically, it’ll have a data key that contains the request data dict. This data is generated by the corresponding private methods in Executor..

The socket is opened with create_socket_context() or open_socket(). Data is written with write_socket() and encoded with encode(). It is read and decoded with read_decode_json_buffers(). All client requests use this basic API.

Normal methods cannot be canceled because there’s no mechanism. Data streams and the generator can be canceled since they have their own socket

await apply_config_from_remote(obj)
await apply_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED)
await apply_execute_from_remote(obj, exclude_self=True, task_status=TASK_STATUS_IGNORED)
await apply_property_data_from_remote(obj: Any, properties: List[str])
create_socket_context() AsyncContextManager
await decode(data)

Decodes the data encoded with encode().

await delete_remote_instance(obj)
encode(data) bytes

Encodes the data as required by the specific executor.

await ensure_remote_instance(obj, hash_name, *args, auto_register_class=True, **kwargs) bool
await execute(obj, fn: Union[Callable, str], args=(), kwargs=None, callback: Optional[Union[Callable, str]] = None)
async for ... in execute_generator(obj, gen: Union[Callable, str], args=(), kwargs=None, callback: Optional[Union[Callable, str]] = None, task_status=TASK_STATUS_IGNORED) AsyncGenerator
async with get_channel_from_remote(hash_name: str, channel: str, task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator]
async with get_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator]
await get_echo_clock() Tuple[int, int, int]
await get_remote_object_config(obj: Optional[Any])
await get_remote_object_property_data(obj: Any, properties: List[str]) dict
await get_remote_objects()
await open_socket() trio.SocketStream

Opens socket, sends channel=None, reads the response and returns the socket stream.

Returns

port: int = None
raise_return_value(data: dict, packet: Optional[int] = None)
await read_decode_json_buffers(stream: trio.SocketStream)
await register_remote_class(cls)
await remote_import(module)
server: str = ''
await sleep(duration=None, deadline=None) int
socket: Optional[trio.SocketStream] = None
await start_executor()
await stop_executor(block=True)
await write_socket(data: bytes, sock: trio.SocketStream)

Multiprocessing Socket Executor

class pymoa_remote.socket.multiprocessing_client.MultiprocessSocketExecutor(server: str = 'localhost', port: int = 0, stream_changes=True, allow_remote_class_registration=True, allow_import_from_main=False, **kwargs)

Bases: pymoa_remote.socket.client.SocketExecutor

Executor that sends all requests to a remote server to be executed there, using a websocket.

allow_import_from_main = False
allow_remote_class_registration = True
await decode(data)

Decodes the data encoded with encode().

port: int = None
server: str = ''
await start_executor()
await stop_executor(block=True)
stream_changes = True

WebSocket Executor

class pymoa_remote.socket.websocket_client.WebSocketExecutor(nursery: trio.Nursery, **kwargs)

Bases: pymoa_remote.socket.client.SocketExecutor

Executor that sends all requests to a remote server to be executed there, using a websocket.

create_socket_context()
await decode(data)

Decodes the data encoded with encode().

nursery: trio.Nursery = None
await open_socket() trio_websocket._impl.WebSocketConnection

Opens socket, sends channel=None, reads the response and returns the socket stream.

Returns

await read_decode_json_buffers(stream: trio_websocket._impl.WebSocketConnection)
socket: trio_websocket._impl.WebSocketConnection = None
await write_socket(data: bytes, stream: trio_websocket._impl.WebSocketConnection)

REST Executor

class pymoa_remote.rest.client.RestExecutor(uri: str, **kwargs)

Bases: pymoa_remote.client.Executor

Executor that sends all requests to a remote server to be executed there, using a rest API.

await apply_config_from_remote(obj)
await apply_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED)
await apply_execute_from_remote(obj, exclude_self=True, task_status=TASK_STATUS_IGNORED)
await apply_property_data_from_remote(obj: Any, properties: List[str])
await delete_remote_instance(obj)
await ensure_remote_instance(obj, hash_name, *args, auto_register_class=True, **kwargs) bool
await execute(obj, fn: Union[Callable, str], args=(), kwargs=None, callback: Optional[Union[Callable, str]] = None)
async for ... in execute_generator(obj, gen: Union[Callable, str], args=(), kwargs=None, callback: Optional[Union[Callable, str]] = None, task_status=TASK_STATUS_IGNORED) AsyncGenerator
async with get_channel_from_remote(hash_name: str, channel: str, task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator]
async with get_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator]
await get_echo_clock() Tuple[int, int, int]
await get_remote_object_config(obj: Optional[Any])
await get_remote_object_property_data(obj: Any, properties: List[str]) dict
await get_remote_objects()
await register_remote_class(cls)
await remote_import(module)
await sleep(duration=None, deadline=None) int
await start_executor()
await stop_executor()
uri: str = ''