Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stateless workflow interface #150

Merged
merged 6 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions docs/getting-started/live_data_reduction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,23 @@
"outputs": [],
"source": [
"from beamlime.constructors import multiple_constant_providers, SingletonProvider\n",
"from beamlime.applications.handlers import ImagePath, DataReductionHandler, RawDataSent, UpdateHistogram, PlotStreamer\n",
"from beamlime.applications.handlers import (\n",
" ImagePath,\n",
" DataReductionHandler,\n",
" RawDataSent,\n",
" WorkflowResultUpdate,\n",
" PlotStreamer,\n",
")\n",
"from beamlime.applications.daemons import DataStreamSimulator\n",
"from beamlime.applications.base import Application\n",
"from beamlime.executables.prototypes import default_prototype_factory\n",
"from beamlime.logging import BeamlimeLogger\n",
"from beamlime.applications._parameters import EventRate, NumPixels, NumFrames, DataFeedingSpeed\n",
"from beamlime.applications._parameters import (\n",
" EventRate,\n",
" NumPixels,\n",
" NumFrames,\n",
" DataFeedingSpeed,\n",
")\n",
"from matplotlib import pyplot as plt\n",
"\n",
"import logging, pathlib\n",
Expand All @@ -76,24 +87,22 @@
" NumPixels: NumPixels(10_000),\n",
" NumFrames: NumFrames(140),\n",
" ImagePath: ImagePath(pathlib.Path('./beamlime_plot.png')),\n",
" }\n",
" },\n",
"):\n",
" prototype_factory[BeamlimeLogger].setLevel(logging.INFO)\n",
"\n",
" # Build the application\n",
" app = prototype_factory[Application]\n",
" # Register Handlers\n",
" plot_saver = prototype_factory[PlotStreamer]\n",
" app.register_handling_method(UpdateHistogram, plot_saver.update_histogram)\n",
" app.register_handling_method(WorkflowResultUpdate, plot_saver.update_histogram)\n",
" data_reduction_handler = prototype_factory[DataReductionHandler]\n",
" app.register_handling_method(\n",
" RawDataSent, data_reduction_handler.process_message\n",
" )\n",
" app.register_handling_method(RawDataSent, data_reduction_handler.process_message)\n",
" # Register Daemons\n",
" app.register_daemon(prototype_factory[DataStreamSimulator])\n",
"\n",
" app.run()\n",
" plt.close() # Close the plot from this cell.\n"
" plt.close() # Close the plot from this cell."
]
},
{
Expand All @@ -102,7 +111,7 @@
"metadata": {},
"outputs": [],
"source": [
"plot_saver.figure"
"next(iter(plot_saver.figures.values()))"
]
}
],
Expand All @@ -122,7 +131,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
"version": "3.10.12"
}
},
"nbformat": 4,
Expand Down
166 changes: 0 additions & 166 deletions src/beamlime/applications/_workflow.py

This file was deleted.

3 changes: 1 addition & 2 deletions src/beamlime/applications/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

from ._parameters import ChunkSize, DataFeedingSpeed
from ._random_data_providers import RandomEvents
from ._workflow import Events
from .base import DaemonInterface, MessageRouter
from .handlers import RawDataSent
from .handlers import Events, RawDataSent


@dataclass
Expand Down
94 changes: 34 additions & 60 deletions src/beamlime/applications/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
import pathlib
from dataclasses import dataclass
from typing import Any, Dict, NewType
from typing import Any, NewType

import plopp as pp
import scipp as sc

from beamlime.logging import BeamlimeLogger

from ._workflow import Events, FirstPulseTime, Histogrammed, WorkflowPipeline
from ..stateless_workflow import StatelessWorkflow, WorkflowResult
from .base import HandlerInterface, MessageProtocol

Events = NewType("Events", list[sc.DataArray])


@dataclass
class UpdateHistogram(MessageProtocol):
content: Histogrammed
class WorkflowResultUpdate(MessageProtocol):
content: WorkflowResult
sender: type
receiver: type

Expand All @@ -28,40 +29,21 @@ class RawDataSent(MessageProtocol):


class DataReductionHandler(HandlerInterface):
"""Data reduction handler to process the raw data and update the histogram.

It receives a list of events, and reduces them into a histogram.
It also triggers the update of a plot stream node.
"""
"""Data reduction handler to process the raw data."""

def __init__(self, pipeline: WorkflowPipeline) -> None:
self.pipeline = pipeline
self.first_pulse_time: sc.Variable
def __init__(self, workflow: StatelessWorkflow) -> None:
self.workflow = workflow
super().__init__()

def format_received(self, data: Any) -> str:
# TODO remove ties to specific type of data
return f"{len(data)} pieces of {Events.__name__}"

def process_first_input(self, da: Events) -> None:
self.first_pulse_time = da[0].coords['event_time_zero'][0]
self.pipeline[FirstPulseTime] = self.first_pulse_time

def process_data(self, data: Events) -> Histogrammed:
self.info("Received, %s", self.format_received(data))
self.pipeline[Events] = data
return self.pipeline.compute(Histogrammed)

def process_message(self, message: MessageProtocol) -> MessageProtocol:
if not isinstance(message, RawDataSent):
raise TypeError(f"Message type should be {RawDataSent.__name__}.")

if not hasattr(self, "first_pulse_time"):
self.process_first_input(message.content)

return UpdateHistogram(
sender=DataReductionHandler,
receiver=Any,
content=self.process_data(message.content),
content = message.content
self.info("Received, %s", self.format_received(content))
return WorkflowResultUpdate(
sender=DataReductionHandler, receiver=Any, content=self.workflow(content)
)


Expand All @@ -71,51 +53,43 @@ def process_message(self, message: MessageProtocol) -> MessageProtocol:
def random_image_path() -> ImagePath:
import uuid

return ImagePath(pathlib.Path(f"beamlime_plot_{uuid.uuid4().hex}.png"))
return ImagePath(pathlib.Path(f"beamlime_plot_{uuid.uuid4().hex}"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the name ImagePath to ImageNamePrefix?
And together with --image-path argument parsing in executables/prototypes.py, line #97.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be removed soon anyway, if not, fix it when touching this code next.



class PlotStreamer(HandlerInterface):
def __init__(self, logger: BeamlimeLogger) -> None:
self.logger = logger
self.figure = pp.figure1d(
# If you know the layout, you can just use ``pp.plot`` directly.
title="Wavelength Histogram",
grid=True,
)
self.binning_coords: Dict[str, sc.Variable]
self.output_da: Histogrammed
self.figures = {}
self.artists = {}
super().__init__()

def process_first_histogram(self, data: Histogrammed) -> None:
self.output_da = Histogrammed(sc.zeros_like(data))
self.binning_coords = {"wavelength": data.coords["wavelength"]}
self.info("First data as a seed of histogram: %s", self.output_da)
self.figure.update(self.output_da, key='a')
def plot_item(self, name: str, data: sc.DataArray) -> None:
figure = self.figures.get(name)
if figure is None:
plot = data.plot()
# TODO Either improve Plopp's update method, or handle multiple artists
if len(plot.artists) > 1:
raise NotImplementedError("Data with multiple items not supported.")
self.artists[name] = next(iter(plot.artists))
self.figures[name] = plot
else:
figure.update(data, key=self.artists[name])

def update_histogram(self, message: MessageProtocol) -> None:
if not hasattr(self, "binning_coords"):
self.process_first_histogram(message.content)

self.output_da += sc.rebin(message.content, self.binning_coords)
self.figure.update(self.output_da, key='a')
content = message.content
for name, data in content.items():
self.plot_item(name, data)


class PlotSaver(PlotStreamer):
"""Plot handler to save the updated histogram into an image file."""

def __init__(self, logger: BeamlimeLogger, image_path: ImagePath) -> None:
super().__init__(logger)
self.image_path = image_path.absolute()
self.create_dummy_image()

def create_dummy_image(self) -> None:
import matplotlib.pyplot as plt

plt.plot([])
plt.savefig(self.image_path)
self.info(f"PlotHandler will save updated image into: {self.image_path}")
self.image_path = image_path

def save_histogram(self, message: MessageProtocol) -> None:
super().update_histogram(message)
self.info("Received histogram, saving into %s...", self.image_path)
self.figure.save(self.image_path)
for name, figure in self.figures.items():
figure.save(f'{self.image_path}-{name}.png')
Loading
Loading