Stage plugin with animated graphics

  1"""This plugin shows how to create custom graphics to display during a
  2stage and how log RGBA and arbitrary data to the HDF5 file.
  3"""
  4from ceed.stage import CeedStage, StageDoneException
  5import numpy as np
  6from typing import Optional, Dict
  7
  8
  9# this shader is used to generate a time varying plasma image - it's very
 10# low-level GL
 11plasma_shader = '''
 12$HEADER$
 13
 14uniform vec2 resolution;
 15uniform float time;
 16
 17void main(void)
 18{
 19   vec4 frag_coord = frag_modelview_mat * gl_FragCoord;
 20   float x = frag_coord.x;
 21   float y = frag_coord.y;
 22   float mov0 = x+y+cos(sin(time)*2.)*100.+sin(x/100.)*1000.;
 23   float mov1 = y / resolution.y / 0.2 + time;
 24   float mov2 = x / resolution.x / 0.2;
 25   float c1 = abs(sin(mov1+time)/2.+mov2/2.-mov1-mov2+time);
 26   float c2 = abs(sin(c1+sin(mov0/1000.+time)
 27              +sin(y/40.+time)+sin((x+y)/100.)*3.));
 28   float c3 = abs(sin(c2+cos(mov1+mov2+c2)+cos(mov2)+sin(x/1000.)));
 29   gl_FragColor = vec4( c1,c2,c3,1.0);
 30}
 31'''
 32
 33
 34class PlasmaStage(CeedStage):
 35    """Displays a time varying plasma as an experiment stage.
 36    """
 37
 38    gl_ctx: dict = {}
 39    """The gl context used to run the shader, one for each quad if in quad mode,
 40    otherwise just 1.
 41    """
 42
 43    last_time = 0.
 44    """The last time value passed by ceed to the stage in tick. Used to update
 45    the plasma in ``set_gl_colors``.
 46    """
 47
 48    def __init__(self, **kwargs):
 49        # set a custom name for the stage class
 50        self.name = 'Plasma'
 51        super().__init__(**kwargs)
 52        # we can't pre-compute this stage because of how we get the time used
 53        # during drawing
 54        self.disable_pre_compute = True
 55        self.gl_ctx = {}
 56
 57    def add_gl_to_canvas(
 58            self, screen_width: int, screen_height: int, canvas,
 59            name: str, quad_mode: str, quad: Optional[int] = None, **kwargs
 60    ) -> bool:
 61        # todo: this stage does not support quad12x, only quad4x
 62        # we overwrite this method to manually add gl graphics elements to the
 63        # kivy canvas
 64        from kivy.graphics import Color, Rectangle, RenderContext
 65
 66        # create a custom canvas context, so that we can use the gl shader
 67        self.gl_ctx[quad] = ctx = RenderContext(
 68            use_parent_projection=True, use_parent_modelview=True,
 69            use_parent_frag_modelview=True, fs=plasma_shader, group=name)
 70        # pass the variables required by the shader
 71        ctx['time'] = 0.
 72        ctx['resolution'] = [screen_width / 2, screen_height / 2]
 73
 74        # make sure the background behind the canvas is not transparent
 75        with ctx:
 76            Color(1, 1, 1, 1)
 77            Rectangle(size=(screen_width // 2, screen_height // 2))
 78
 79        # add it to the overall canvas
 80        canvas.add(ctx)
 81
 82        # must return True so that set_gl_colors is called
 83        return True
 84
 85    def set_gl_colors(
 86            self, quad: Optional[int] = None, grayscale: str = None,
 87            clear: bool = False, **kwargs) -> None:
 88        # this is called for every time step, after ticking the stage when the
 89        # time is saved. Now pass on to the shader the current time
 90        self.gl_ctx[quad]['time'] = self.last_time
 91        if clear:
 92            # todo: set to black if cleared - it is not supported by plasma
 93            pass
 94
 95    def remove_gl_from_canvas(self, *args, **kwargs) -> None:
 96        # everything is removed by name, so just clear the context
 97        self.gl_ctx = {}
 98        return super().remove_gl_from_canvas(*args, **kwargs)
 99
100    def evaluate_stage(self, shapes, last_end_t):
101        # always get the first time
102        self.t_start = t = yield
103        for _ in range(self.loop):
104            t_start = t
105
106            # only go for 10 second
107            while t - t_start < 10:
108                # save the current time for use by the shader when
109                # set_gl_colors is called after each tick
110                self.last_time = float(t - t_start)
111
112                # save some value to the shapes log - could be any 4 numbers
113                shapes['plasma'].append((.5, 0, 0, 0))
114                # this yields so GUI can draw shapes and resume for next frame
115                t = yield
116
117        # this time value was not used so it ends on the last sample so
118        # that last time will be used as start of next stage and MUST be saved
119        # as t_end
120        self.t_end = t
121        # this is how we indicate we're done
122        raise StageDoneException
123
124    def get_stage_shape_names(self):
125        # add a "shape" named plasma so we can log data for it at every
126        # timestep. The values will also display in the stage preview graph
127        names = super().get_stage_shape_names()
128        names.add('plasma')
129        return names
130
131
132class SoftEllipseStage(CeedStage):
133    """Displays a time varying ellipse as an experiment stage. The ellipse is
134    also zero in the center, with increased intensity as we go out from center
135    """
136
137    texture_pat: np.ndarray = None
138    """A numpy pattern that will be blitted into the kivy gl texture. This
139    contains the ellipse pattern. This pattern is multiplied by an attenuation
140    factor at each step and copied to ``buffer`` to set the actual intensity.
141    """
142
143    buffer: Dict[int, np.ndarray] = []
144    """The RGB buffer that is blitted to the texture.
145    """
146
147    buffer_count = 0
148    """The current time iteration used to set the ellipse intensity.
149    """
150
151    kivy_tex: dict = {}
152    """The Kivy texture, one for each quad if in quad mode, otherwise just 1.
153    """
154
155    def __init__(self, **kwargs):
156        # set a custom name for the stage class
157        self.name = 'SoftEllipse'
158        super().__init__(**kwargs)
159        # we can't pre-compute this stage because of how we get the time used
160        # during drawing
161        self.disable_pre_compute = True
162
163        # generate the ellipse pattern and the RGB buffer
164        x = np.linspace(-1, 1, 150)
165        y = np.linspace(-1, 1, 100)
166        xx, yy = np.meshgrid(x, y)
167        zz = np.sqrt(np.power(xx, 2) + np.power(yy, 2))
168        zz[zz > 1] = 0
169
170        self.texture_pat = np.asarray(zz * 255, dtype=np.uint8)
171        self.buffer = {}
172        self.kivy_tex = {}
173
174    def init_stage(self, *args, **kwargs) -> None:
175        # reset counter to zero for ellipse intensity at the start of the stage
176        self.buffer_count = 0
177        return super().init_stage(*args, **kwargs)
178
179    def init_loop_iteration(self, *args, **kwargs) -> None:
180        # reset counter to zero for ellipse intensity at the start of each loop
181        self.buffer_count = 0
182        return super().init_loop_iteration(*args, **kwargs)
183
184    def draw_ellipse_tex(self, quad, grayscale=None, clear=False):
185        # computes the ellipse intensity using the buffer_count value computed
186        # in the last tick (evaluate_stage). It then blits it
187        buffer = self.buffer[quad]
188        pat = np.asarray(
189            self.texture_pat * (self.buffer_count / 100), dtype=np.uint8)
190        if clear:
191            pat = 0
192
193        if grayscale is None:
194            # we're either in normal or quad4x mode, so set the selected rgb
195            # values, depending on which colors user selected in GUI
196            if self.color_r:
197                buffer[:, :, 0] = pat
198            if self.color_g:
199                buffer[:, :, 1] = pat
200            if self.color_b:
201                buffer[:, :, 2] = pat
202            self.kivy_tex[quad].blit_buffer(buffer.reshape(-1))
203        else:
204            # if in quad12x, set only the specific color channel of this tick,
205            # this method is called 12 times, one for each quad and channel
206            channel = {'r': 0, 'g': 1, 'b': 2}[grayscale]
207            buffer[:, :, channel] = pat
208            if grayscale == 'b':
209                # only blit once all 3 channels were updated
210                self.kivy_tex[quad].blit_buffer(buffer.reshape(-1))
211
212    def add_gl_to_canvas(
213            self, screen_width: int, screen_height: int, canvas,
214            name: str, quad_mode: str, quad: Optional[int] = None, **kwargs
215    ) -> bool:
216        # we overwrite this method to manually add gl graphics elements to the
217        # kivy canvas
218        from kivy.graphics import Color, Rectangle
219        from kivy.graphics.texture import Texture
220        with canvas:
221            Color(1, 1, 1, 1, group=name)
222            rect1 = Rectangle(size=(150, 100), pos=(500, 500), group=name)
223            rect2 = Rectangle(size=(150, 100), pos=(1500, 500), group=name)
224
225        tex = Texture.create(size=(150, 100), colorfmt='rgb')
226        rect1.texture = tex
227        rect2.texture = tex
228        self.kivy_tex[quad] = tex
229        self.buffer[quad] = np.zeros((100, 150, 3), dtype=np.uint8)
230
231        # draw initial ellipse
232        self.draw_ellipse_tex(quad)
233
234        # must return True so that set_gl_colors is called
235        return True
236
237    def set_gl_colors(
238            self, quad: Optional[int] = None, grayscale: str = None,
239            clear: bool = False, **kwargs) -> None:
240        # this is called for every time step, after ticking the stage when the
241        # time is saved (i.e. buffer_count is updated). Update the ellipse
242        self.draw_ellipse_tex(quad, grayscale, clear)
243
244    def remove_gl_from_canvas(self, *args, **kwargs) -> None:
245        # everything is removed by name, so just clear the texture
246        self.kivy_tex = {}
247        self.buffer = {}
248        return super().remove_gl_from_canvas(*args, **kwargs)
249
250    def evaluate_stage(self, shapes, last_end_t):
251        r = self.color_r
252        g = self.color_g
253        b = self.color_b
254
255        # always get the first time
256        self.t_start = t = yield
257        for _ in range(self.loop):
258            t_start = t
259
260            # only go for 10 second
261            while t - t_start < 10:
262                # update the counter at each tick. The ellipse is only updated
263                # as well if the frame is rendered and not dropped. But it is
264                # logged for every frame
265                self.buffer_count = (self.buffer_count + 1) % 100
266
267                intensity = self.buffer_count / 100
268                shapes['soft_ellipse'].append((
269                    intensity if r else 0.,
270                    intensity if g else 0.,
271                    intensity if b else 0.,
272                    1
273                ))
274                # this yields so GUI can draw shapes and resume for next frame
275                t = yield
276
277        # this time value was not used so it ends on the last sample so
278        # that last time will be used as start of next stage and MUST be saved
279        # as t_end
280        self.t_end = t
281        # this is how we indicate we're done
282        raise StageDoneException
283
284    def get_stage_shape_names(self):
285        # add a "shape" named soft_ellipse so we can log data for it at every
286        # timestep. The values will also display in the stage preview graph
287        names = super().get_stage_shape_names()
288        names.add('soft_ellipse')
289        return names
290
291
292def get_ceed_stages(stage_factory):
293    # return all the stage classes
294    return [PlasmaStage, SoftEllipseStage]