Getting Started

Introduction

PyMoa is a framework for describing and running experiments using local or remote devices.

It is composed of stages that determine the flow of an experiment, devices that interface with arbitrary hardware or software devices, loggers that automatically log all stage or device state changes, and executors that can run arbitrary devices on different threads, process, or even remote servers such as on a Raspberry-pi with minimal code changes.

PyMoa re-uses existing libraries where possible. In particular:

  • The amazing trio library for all concurrency and event loops (running stages, devices, interfacing with threads) and its socket interface (both client and server side).

  • asks for making client side requests over the rest api.

  • trio-websocket for making client side requests over the websocket api.

  • Quart and quart-trio for the server-side rest-api and websockets.

  • Kivy for event and property dispatching in response to data and stage updates.

Framework

The following introduction walks through the basic PyMoa components. The complete example can be found here.

Stage

PyMoa describes an experiment using hierarchies of MoaStage instances. For example the following experiment

  1. Wait for 20 seconds, then repeats the following 2 times;

    1. Waits for a photo-beam to break (by going high),

    2. Releases a sugar pallet (by motor port driven high for 200ms),

    3. Wait for a 20 second - 40 second inter-trial interval

  2. Waits for 15 seconds

Can be described in pseudo-code as follows:

MoaStage:
    Delay:
        delay = 20
    MoaStage:
        repeat = 2
        DigitalGateStage:
            device = photo_bream_device
            exit_state = True
        Delay:
            motor_device.set_channel(True)
            delay = 0.2
        UniformRandomDelay:
            min = 20
            max = 40
    Delay:
        delay = 15

In real code, this experiment structure would be implemented as:

class SugarPalletStage(MoaStage):

    async def do_trial(self):
        await self.motor_device.write_state(True)
        await trio.sleep(0.2)
        await self.motor_device.write_state(False)

root = MoaStage(name='Root stage')
trial = MoaStage(name='Trial', repeat=2)

root.add_stage(Delay(delay=20, name='Habituation'))
root.add_stage(trial)
root.add_stage(Delay(delay=15, name='post delay'))

trial.add_stage(DigitalGateStage(
    device=photo_bream_device, exit_state=True, name='photobeam_stage'))
trial.add_stage(SugarPalletStage(
    motor_device=motor_device, name='sugar pallet stage', delay=0.2))
trial.add_stage(UniformRandomDelay(min=20, max=40, name='ITI'))

The experiment would then be run using await root.run_stage(), which executes all the stages.

A MoaStage will perform an action when each of its trials are run. The stage will then repeat the stage’s action for each trial. Simultaneously, for each trial, the stage will execute its sub-stages serially or in parallel as specified by order. complete_on and complete_on_whom help determine when a trial is complete so it can be repeated again or complete the stage when the trial count is done.

The action performed by the stage is customized by overwriting (and calling super within) these methods: init_stage(), init_trial(), do_trial(), trial_done(), stage_done(). Other methods and events are also available for further customization.

Some example stages are the Delay stage that waits for specified delay and the GateStage which waits until the specified device condition is met.

Device

The Device base class is the interface to interact with input/output devices. Generally they store the current device state as a property of the device (e.g. the state of a digital channel). Whenever the device is updated, it dispatches the on_data_update event to signal to listeners that the device’s state was updated.

port defines some additional interfaces for channels and channel ports. digital and analog provide further interfaces for interacting with digital and analog devices, respectively. adc provides a interface for an ADC device. These interfaces are example implementations to be customized.

For the above experiment, we create a virtual photobeam sensor and sugar pallet delivering device by implementing DigitalChannel as follows:

class VirtualDevice(DigitalChannel):

    async def write_state(self, state: bool, **kwargs):
        self.state = state
        self.timestamp = perf_counter()
        self.dispatch('on_data_update', self)

    async def pump_state(self):
        while True:
            await trio.sleep(random.random() * 10 + 1)
            await self.write_state(random.random() >= 0.5)

photo_bream_device = VirtualDevice(name='photobeam_device')
motor_device = VirtualDevice(name='motor_device')

write_state simply saves the state and triggers the update. This is simulating setting the channel to high or low (e.g. turning ON the motor). pump_state will continuously “read” the channel by randomly generating a high or low state for the channel.

Logging

Stages and devices support automatic logging of important properties through the Loggable interface. The interface provides a way to indicate which properties/events to log and what triggers the logging action. MoaStage and Device includes this interface by default.

Example loggers are SimpleCSVLogger and SimpleTerminalLogger, which log the data to a csv file and to the terminal, respectively.

For example, to log the experiment stages above, the following will associate the logger with the stages:

logger = SimpleTerminalLogger()
for stage in root.iterate_stages():
    logger.add_logged_instance(stage)

When the experiment is run, this will print the following to the terminal:

Printed output

index

timestamp

name

trigger

item

value

0

0.6059512

Root stage

on_stage_start

count

-1

1

0.6065675

Root stage

on_trial_start

count

0

2

0.6068585

Habituation

on_stage_start

count

-1

3

0.6071759

Habituation

on_trial_start

count

0

4

20.6083242

Habituation

on_trial_end

count

0

5

20.6085545

Habituation

on_stage_end

count

0

6

20.6087105

Trial

on_stage_start

count

-1

7

20.6091834

Trial

on_trial_start

count

0

8

20.60976

photobeam_stage

on_stage_start

count

-1

9

20.6101662

photobeam_stage

on_trial_start

count

0

10

20.6122033

photobeam_stage

state

count

0

11

28.9837687

photobeam_stage

state

count

0

12

28.9841575

photobeam_stage

on_trial_end

count

0

13

28.9843255

photobeam_stage

on_stage_end

count

0

14

28.9844328

sugar pallet stage

on_stage_start

count

-1

15

28.9847545

sugar pallet stage

on_trial_start

count

0

16

29.1881972

sugar pallet stage

on_trial_end

count

0

17

29.1883574

sugar pallet stage

on_stage_end

count

0

18

29.1884679

ITI

on_stage_start

count

-1

19

29.1888977

ITI

on_trial_start

count

0

20

50.6432094

ITI

on_trial_end

count

0

21

50.6432811

ITI

on_stage_end

count

0

22

50.6434171

Trial

on_trial_end

count

0

23

50.6435501

Trial

on_trial_start

count

0

24

50.6437013

photobeam_stage

on_stage_start

count

0

25

50.6438246

photobeam_stage

on_trial_start

count

0

26

50.6439204

photobeam_stage

state

count

0

27

56.7902294

photobeam_stage

state

count

0

28

56.790501

photobeam_stage

on_trial_end

count

0

29

56.79059

photobeam_stage

on_stage_end

count

0

30

56.7906541

sugar pallet stage

on_stage_start

count

0

31

56.79086

sugar pallet stage

on_trial_start

count

0

32

56.9954589

sugar pallet stage

on_trial_end

count

0

33

56.9956666

sugar pallet stage

on_stage_end

count

0

34

56.995809

ITI

on_stage_start

count

0

35

56.9962146

ITI

on_trial_start

count

0

36

80.2745445

ITI

on_trial_end

count

0

37

80.2747385

ITI

on_stage_end

count

0

38

80.2750996

Trial

on_trial_end

count

1

39

80.2752385

Trial

on_stage_end

count

1

40

80.2753473

post delay

on_stage_start

count

-1

41

80.2756924

post delay

on_trial_start

count

0

42

95.2781586

post delay

on_trial_end

count

0

43

95.278257

post delay

on_stage_end

count

0

44

95.2784493

Root stage

on_trial_end

count

0

45

95.2785226

Root stage

on_stage_end

count

0

Remote Execution

One the best components of this framework is the ability to execute and interface with devices that are remote to the system running the experiments. PyMoa implements some executor classes that help achieve this very simply.

For example imagine instead of being able to set the state of the device above on our system, we needed to call the device over the network or in a different process or thread. E.g. this device represented a raspberry-pi so write_state needs to run on the pi. For the following example we’ll use a thread executor, but a socket or rest-api executor will work similarly.

The device will now be adjusted as follows:

class VirtualDevice(DigitalChannel):

    def executor_callback(self, return_value):
        self.state, self.timestamp = return_value
        self.dispatch('on_data_update', self)

    @apply_executor(callback=executor_callback)
    def write_state(self, state: bool, **kwargs):
        return state, perf_counter()

executor = ThreadExecutor()
photo_bream_device = VirtualDevice(name='photobeam_device', executor=executor)
motor_device = VirtualDevice(name='motor_device', executor=executor)

With this simple change, when we call it as follows await motor_device.write_state(True), write_state will be executed in a second thread and the result will be returned to us. In addition, the result will be passed to the callback provided in the decorator to apply the result generically as needed.

Executing on e.g. a raspberry-pi is just as simple: instantiate the quart server on the raspberry-pi. This will launch a server that can be communicated with using a nice rest api as well as more efficient websockets. And from the the client side, you’d simply do executor = WebSocketExecutor(server='x.y.z.a', port=5000) and everything else will mostly work the same as the threading example above. The only difference is that we’d also need to call await executor.ensure_remote_instance(motor_device) to create the device on the pi, but afterwards calling await motor_device.write_state(True) will execute the write_state on the pi and return the result to the client!!!

Remote clock

Executors also support pinging the executor to return the current server time. By running this periodically we can estimate the lag to executing on the server, but more importantly, we can also align the server timestamped events to the local clock by doing a simple linear regression.

Performance

Following is some basic performance data for the various implemented executors. The last two rows are when running on a server on the Raspberry-pi over WiFi:

Performance data

Executor

Round-trip lag

Execution Rate

Continuous Execution rate

RestExecutor

8.68ms

95.36Hz

1495.93Hz

WebSocketExecutor

2.20ms

533.46Hz

1104.19Hz

MultiprocessSocketExecutor

0.48ms

1132.06Hz

3144.60Hz

ThreadExecutor

0.45ms

2789.56Hz

5395.46Hz

AsyncThreadExecutor

1.16ms

DummyRemoteExecutor

0.07ms

10327.27Hz

5739.54Hz

none

246669.96Hz

10018.94Hz

RPi-RestExecutor

16.56ms

59.52Hz

847.97Hz

RPi-WebSocketExecutor

6.14ms

153.43Hz

377.19Hz

Running the experiment

Once the above has been defined, to run the experiment simply schedule the async functions to run in trio’s event loop. This eventloop will do two things simultaneously; continuously “read” the photo beam device to see when the photobeam is “broken” and execute the experiment’s stages:

async def run_experiment():
    # start the threading executor
    await executor.start_executor()
    async with trio.open_nursery() as nursery:
        async def run_root_stage():
            # run the experiment stages, but when done also cancel reading the devices
            await root.run_stage()
            nursery.cancel_scope.cancel()

        # reads the photobeam device continuously
        nursery.start_soon(photo_bream_device.pump_state)
        # runs the experiment stages
        nursery.start_soon(run_root_stage)

    # we're done so we can stop the threading executor
    await executor.stop_executor()

trio.run(run_experiment)