Getting Started ================ Introduction ------------- PyMoa is a framework for describing and running experiments using local or remote devices. It is composed of stages that determine the flow of an experiment, devices that interface with arbitrary hardware or software devices, loggers that automatically log all stage or device state changes, and executors that can run arbitrary devices on different threads, process, or even remote servers such as on a Raspberry-pi with minimal code changes. PyMoa re-uses existing libraries where possible. In particular: * The **amazing** `trio `_ library for all concurrency and event loops (running stages, devices, interfacing with threads) and its socket interface (both client and server side). * `asks `_ for making client side requests over the rest api. * `trio-websocket `_ for making client side requests over the websocket api. * `Quart `_ and `quart-trio `_ for the server-side rest-api and websockets. * `Kivy `_ for event and property dispatching in response to data and stage updates. Framework --------- The following introduction walks through the basic PyMoa components. `The complete example can be found here `_. Stage ***** PyMoa describes an experiment using hierarchies of :class:`~pymoa.stage.MoaStage` instances. For example the following experiment #. Wait for 20 seconds, then repeats the following 2 times; #. Waits for a photo-beam to break (by going high), #. Releases a sugar pallet (by motor port driven high for 200ms), #. Wait for a 20 second - 40 second inter-trial interval #. Waits for 15 seconds Can be described in pseudo-code as follows:: MoaStage: Delay: delay = 20 MoaStage: repeat = 2 DigitalGateStage: device = photo_bream_device exit_state = True Delay: motor_device.set_channel(True) delay = 0.2 UniformRandomDelay: min = 20 max = 40 Delay: delay = 15 In real code, this experiment structure would be implemented as: .. code-block:: python3 class SugarPalletStage(MoaStage): async def do_trial(self): await self.motor_device.write_state(True) await trio.sleep(0.2) await self.motor_device.write_state(False) root = MoaStage(name='Root stage') trial = MoaStage(name='Trial', repeat=2) root.add_stage(Delay(delay=20, name='Habituation')) root.add_stage(trial) root.add_stage(Delay(delay=15, name='post delay')) trial.add_stage(DigitalGateStage( device=photo_bream_device, exit_state=True, name='photobeam_stage')) trial.add_stage(SugarPalletStage( motor_device=motor_device, name='sugar pallet stage', delay=0.2)) trial.add_stage(UniformRandomDelay(min=20, max=40, name='ITI')) The experiment would then be run using ``await root.run_stage()``, which executes all the stages. A :class:`~pymoa.stage.MoaStage` will perform an action when each of its trials are run. The stage will then :attr:`~pymoa.stage.MoaStage.repeat` the stage's action for each trial. Simultaneously, for each trial, the stage will execute its sub-:attr:`~pymoa.stage.MoaStage.stages` serially or in parallel as specified by :attr:`~pymoa.stage.MoaStage.order`. :attr:`~pymoa.stage.MoaStage.complete_on` and :attr:`~pymoa.stage.MoaStage.complete_on_whom` help determine when a trial is complete so it can be repeated again or complete the stage when the trial :attr:`~pymoa.stage.MoaStage.count` is done. The action performed by the stage is customized by overwriting (and calling ``super`` within) these methods: :meth:`~pymoa.stage.MoaStage.init_stage`, :meth:`~pymoa.stage.MoaStage.init_trial`, :meth:`~pymoa.stage.MoaStage.do_trial`, :meth:`~pymoa.stage.MoaStage.trial_done`, :meth:`~pymoa.stage.MoaStage.stage_done`. Other methods and events are also available for further customization. Some example stages are the :class:`~pymoa.stage.delay.Delay` stage that waits for specified delay and the :class:`~pymoa.stage.gate.GateStage` which waits until the specified device condition is met. Device ****** The :class:`~pymoa.device.Device` base class is the interface to interact with input/output devices. Generally they store the current device state as a property of the device (e.g. the state of a digital channel). Whenever the device is updated, it dispatches the ``on_data_update`` event to signal to listeners that the device's state was updated. :mod:`~pymoa.device.port` defines some additional interfaces for channels and channel ports. :mod:`~pymoa.device.digital` and :mod:`~pymoa.device.analog` provide further interfaces for interacting with digital and analog devices, respectively. :mod:`~pymoa.device.adc` provides a interface for an ADC device. These interfaces are example implementations to be customized. For the above experiment, we create a virtual photobeam sensor and sugar pallet delivering device by implementing :class:`~pymoa.device.digital.DigitalChannel` as follows: .. code-block:: python3 class VirtualDevice(DigitalChannel): async def write_state(self, state: bool, **kwargs): self.state = state self.timestamp = perf_counter() self.dispatch('on_data_update', self) async def pump_state(self): while True: await trio.sleep(random.random() * 10 + 1) await self.write_state(random.random() >= 0.5) photo_bream_device = VirtualDevice(name='photobeam_device') motor_device = VirtualDevice(name='motor_device') ``write_state`` simply saves the state and triggers the update. This is simulating setting the channel to high or low (e.g. turning ON the motor). ``pump_state`` will continuously "read" the channel by randomly generating a high or low state for the channel. Logging ******* Stages and devices support automatic logging of important properties through the :class:`~pymoa.data_logger.Loggable` interface. The interface provides a way to indicate which properties/events to log and what triggers the logging action. :class:`~pymoa.stage.MoaStage` and :class:`~pymoa.device.Device` includes this interface by default. Example loggers are :class:`~pymoa.data_logger.SimpleCSVLogger` and :class:`~pymoa.data_logger.SimpleTerminalLogger`, which log the data to a csv file and to the terminal, respectively. For example, to log the experiment stages above, the following will associate the logger with the stages: .. code-block:: python3 logger = SimpleTerminalLogger() for stage in root.iterate_stages(): logger.add_logged_instance(stage) When the experiment is run, this will print the following to the terminal: .. csv-table:: Printed output :file: media/intro_log_output.csv :header-rows: 1 Remote Execution **************** One the best components of this framework is the ability to execute and interface with devices that are remote to the system running the experiments. PyMoa implements some executor classes that help achieve this very simply. For example imagine instead of being able to set the state of the device above on our system, we needed to call the device over the network or in a different process or thread. E.g. this device represented a raspberry-pi so ``write_state`` needs to run on the pi. For the following example we'll use a thread executor, but a socket or rest-api executor will work similarly. The device will now be adjusted as follows: .. code-block:: python3 class VirtualDevice(DigitalChannel): def executor_callback(self, return_value): self.state, self.timestamp = return_value self.dispatch('on_data_update', self) @apply_executor(callback=executor_callback) def write_state(self, state: bool, **kwargs): return state, perf_counter() executor = ThreadExecutor() photo_bream_device = VirtualDevice(name='photobeam_device', executor=executor) motor_device = VirtualDevice(name='motor_device', executor=executor) With this simple change, when we call it as follows ``await motor_device.write_state(True)``, ``write_state`` will be executed in a second thread and the result will be returned to us. In addition, the result will be passed to the callback provided in the decorator to apply the result generically as needed. Executing on e.g. a raspberry-pi is just as simple: instantiate the :mod:`~pymoa.executor.remote.app.quart` server on the raspberry-pi. This will launch a server that can be communicated with using a nice rest api as well as more efficient websockets. And from the the client side, you'd simply do ``executor = WebSocketExecutor(server='x.y.z.a', port=5000)`` and everything else will mostly work the same as the threading example above. The only difference is that we'd also need to call ``await executor.ensure_remote_instance(motor_device)`` to create the device on the pi, but afterwards calling ``await motor_device.write_state(True)`` will execute the ``write_state`` on the pi and return the result to the client!!! Remote clock ^^^^^^^^^^^^ Executors also support pinging the executor to return the current server time. By running this periodically we can estimate the lag to executing on the server, but more importantly, we can also align the server timestamped events to the local clock by doing a simple linear regression. Performance ^^^^^^^^^^^^ Following is some basic performance data for the various implemented executors. The last two rows are when running on a server on the Raspberry-pi over WiFi: .. csv-table:: Performance data :header-rows: 1 Executor,Round-trip lag,Execution Rate,Continuous Execution rate RestExecutor,8.68ms,95.36Hz,1495.93Hz WebSocketExecutor,2.20ms,533.46Hz,1104.19Hz MultiprocessSocketExecutor,0.48ms,1132.06Hz,3144.60Hz ThreadExecutor,0.45ms,2789.56Hz,5395.46Hz AsyncThreadExecutor,1.16ms,-,- DummyRemoteExecutor,0.07ms,10327.27Hz,5739.54Hz none,-,246669.96Hz,10018.94Hz RPi-RestExecutor,16.56ms,59.52Hz,847.97Hz RPi-WebSocketExecutor,6.14ms,153.43Hz,377.19Hz Running the experiment ********************** Once the above has been defined, to run the experiment simply schedule the async functions to run in trio's event loop. This eventloop will do two things simultaneously; continuously "read" the photo beam device to see when the photobeam is "broken" and execute the experiment's stages: .. code-block:: python3 async def run_experiment(): # start the threading executor await executor.start_executor() async with trio.open_nursery() as nursery: async def run_root_stage(): # run the experiment stages, but when done also cancel reading the devices await root.run_stage() nursery.cancel_scope.cancel() # reads the photobeam device continuously nursery.start_soon(photo_bream_device.pump_state) # runs the experiment stages nursery.start_soon(run_root_stage) # we're done so we can stop the threading executor await executor.stop_executor() trio.run(run_experiment)