Available Executors
Threading Executor
- class pymoa_remote.threading.AsyncThreadExecutor
Bases:
object
Executor that executes async functions in a trio event loop in a secondary thread.
- await execute(obj, async_fn, args=(), kwargs=None, callback=None)
- await execute_generator(obj, sync_gen, args=(), kwargs=None, callback=None) AsyncGenerator
- await get_echo_clock() Tuple[int, int, int]
- await start(name='AsyncThreadExecutor')
- await stop()
- to_thread_portal: TrioPortal | None = None
- class pymoa_remote.threading.SyncThreadExecutor
Bases:
object
Executor that executes functions in a secondary thread.
- eof = <object object>
- await execute(obj, sync_fn, args=(), kwargs=None, callback=None)
It’s guaranteed sequential. TODO: if called after stop or thread exited it may block forever.
- async for ... in execute_generator(obj, sync_gen, args=(), kwargs=None, callback=None) AsyncGenerator
Last items may be dropped after getting them. TODO: if called after stop is called it may block forever.
- await get_echo_clock() Tuple[int, int, int]
- max_queue_size = 10
- await sleep(duration=None, deadline=None) int
- await start(name='ThreadExecutor')
- await stop()
- class pymoa_remote.threading.ThreadExecutor(**kwargs)
Bases:
Executor
- await apply_config_from_remote(obj)
- await apply_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED)
- await apply_execute_from_remote(obj, exclude_self=True, task_status=TASK_STATUS_IGNORED)
- await apply_property_data_from_remote(obj: Any, properties: List[str])
- await delete_remote_instance(obj)
- await ensure_remote_instance(obj, hash_name, *args, auto_register_class=True, **kwargs) bool
- await execute(obj, fn: Callable | str, args=(), kwargs=None, callback: Callable | str = None)
- async for ... in execute_generator(obj, gen: Callable | str, args=(), kwargs=None, callback: Callable | str = None, task_status=TASK_STATUS_IGNORED) AsyncGenerator
- async with get_channel_from_remote(hash_name: str, channel: str, task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator, bool | None]
- async with get_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator, bool | None]
- await get_echo_clock() Tuple[int, int, int]
- await get_remote_object_config(obj: Any | None)
- await get_remote_object_property_data(obj: Any, properties: List[str]) dict
- await get_remote_objects()
- is_remote = False
- await register_remote_class(cls)
- await remote_import(module)
- await sleep(duration=None, deadline=None) int
- await start_executor()
- await stop_executor()
- class pymoa_remote.threading.TrioPortal(trio_token=None)
Bases:
object
Portal for communicating with trio from a different thread.
- await run(afn, *args)
- await run_sync(fn, *args)
Socket Client
- class pymoa_remote.socket.client.SocketExecutor(server: str = '', port: int = 0, **kwargs)
Bases:
Executor
Executor that sends all requests to a remote server to be executed there, using a socket.
Each request is sent as a dict with metadata. Typically, it’ll have a
data
key that contains the request data dict. This data is generated by the corresponding private methods inExecutor
..The socket is opened with
create_socket_context()
oropen_socket()
. Data is written withwrite_socket()
and encoded withencode()
. It is read and decoded withread_decode_json_buffers()
. All client requests use this basic API.Normal methods cannot be canceled because there’s no mechanism. Data streams and the generator can be canceled since they have their own socket
- await apply_config_from_remote(obj)
- await apply_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED)
- await apply_execute_from_remote(obj, exclude_self=True, task_status=TASK_STATUS_IGNORED)
- await apply_property_data_from_remote(obj: Any, properties: List[str])
- create_socket_context() AsyncContextManager
- await delete_remote_instance(obj)
- encode(data) bytes
Encodes the data as required by the specific executor.
- await ensure_remote_instance(obj, hash_name, *args, auto_register_class=True, **kwargs) bool
- await execute(obj, fn: Callable | str, args=(), kwargs=None, callback: Callable | str = None)
- async for ... in execute_generator(obj, gen: Callable | str, args=(), kwargs=None, callback: Callable | str = None, task_status=TASK_STATUS_IGNORED) AsyncGenerator
- async with get_channel_from_remote(hash_name: str, channel: str, task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator, bool | None]
- async with get_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator, bool | None]
- await get_echo_clock() Tuple[int, int, int]
- await get_remote_object_config(obj: Any | None)
- await get_remote_object_property_data(obj: Any, properties: List[str]) dict
- await get_remote_objects()
- await open_socket() SocketStream
Opens socket, sends channel=None, reads the response and returns the socket stream.
- Returns:
- port: int = None
- raise_return_value(data: dict, packet: int = None)
- await read_decode_json_buffers(stream: SocketStream)
- await register_remote_class(cls)
- await remote_import(module)
- server: str = ''
- await sleep(duration=None, deadline=None) int
- socket: SocketStream | None = None
- await start_executor()
- await stop_executor(block=True)
- await write_socket(data: bytes, sock: SocketStream)
Multiprocessing Socket Executor
- class pymoa_remote.socket.multiprocessing_client.MultiprocessSocketExecutor(server: str = 'localhost', port: int = 0, stream_changes=True, allow_remote_class_registration=True, allow_import_from_main=False, **kwargs)
Bases:
SocketExecutor
Executor that sends all requests to a remote server to be executed there, using a websocket.
- allow_import_from_main = False
- allow_remote_class_registration = True
- await decode(data)
Decodes the data encoded with
encode()
.
- port: int = None
- server: str = ''
- await start_executor()
- await stop_executor(block=True)
- stream_changes = True
WebSocket Executor
- class pymoa_remote.socket.websocket_client.WebSocketExecutor(nursery: Nursery, **kwargs)
Bases:
SocketExecutor
Executor that sends all requests to a remote server to be executed there, using a websocket.
- create_socket_context()
- await decode(data)
Decodes the data encoded with
encode()
.
- await open_socket() WebSocketConnection
Opens socket, sends channel=None, reads the response and returns the socket stream.
- Returns:
- await read_decode_json_buffers(stream: WebSocketConnection)
- socket: WebSocketConnection = None
- await write_socket(data: bytes, stream: WebSocketConnection)
REST Executor
- class pymoa_remote.rest.client.RestExecutor(uri: str, **kwargs)
Bases:
Executor
Executor that sends all requests to a remote server to be executed there, using a rest API.
- await apply_config_from_remote(obj)
- await apply_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED)
- await apply_execute_from_remote(obj, exclude_self=True, task_status=TASK_STATUS_IGNORED)
- await apply_property_data_from_remote(obj: Any, properties: List[str])
- await delete_remote_instance(obj)
- await ensure_remote_instance(obj, hash_name, *args, auto_register_class=True, **kwargs) bool
- await execute(obj, fn: Callable | str, args=(), kwargs=None, callback: Callable | str = None)
- async for ... in execute_generator(obj, gen: Callable | str, args=(), kwargs=None, callback: Callable | str = None, task_status=TASK_STATUS_IGNORED) AsyncGenerator
- async with get_channel_from_remote(hash_name: str, channel: str, task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator, bool | None]
- async with get_data_from_remote(obj, trigger_names: Iterable[str] = (), triggered_logged_names: Iterable[str] = (), logged_names: Iterable[str] = (), initial_properties: Iterable[str] = (), task_status=TASK_STATUS_IGNORED) AsyncContextManager[AsyncGenerator, bool | None]
- await get_echo_clock() Tuple[int, int, int]
- await get_remote_object_config(obj: Any | None)
- await get_remote_object_property_data(obj: Any, properties: List[str]) dict
- await get_remote_objects()
- await register_remote_class(cls)
- await remote_import(module)
- await sleep(duration=None, deadline=None) int
- await start_executor()
- await stop_executor()
- uri: str = ''