Source code for magscope.processes

from __future__ import annotations

from abc import ABC, ABCMeta, abstractmethod
from ctypes import c_double, c_int, c_uint32, c_uint64, c_uint8
from multiprocessing import Event, Process, Value
import sys
import traceback
from typing import TYPE_CHECKING
from warnings import warn

import numpy as np

from magscope._logging import get_logger
from magscope.datatypes import BeadRoiBuffer, LiveProfileBuffer, MatrixBuffer, VideoBuffer
from magscope.ipc import (CommandRegistry, Delivery, UnknownCommandError, command_kwargs,
                          drain_pipe_until_quit, register_ipc_command)
from magscope.ipc_commands import (Command, LogExceptionCommand, QuitCommand,
                                   SetAcquisitionDirCommand, SetAcquisitionDirOnCommand,
                                   SetAcquisitionModeCommand, SetAcquisitionOnCommand,
                                   SetSettingsCommand, UpdateBeadRoisCommand)
from magscope.settings import MagScopeSettings
from magscope.utils import AcquisitionMode, register_script_command

[docs] logger = get_logger("processes")
if TYPE_CHECKING: from multiprocessing.connection import Connection from multiprocessing.sharedctypes import Synchronized from multiprocessing.synchronize import Event as EventType from multiprocessing.synchronize import Lock as LockType
[docs] ValueTypeUI8 = Synchronized[int]
from magscope.camera import CameraBase from magscope.hardware import HardwareManagerBase
[docs] class InterprocessValues: def __init__(self):
[docs] self.video_process_busy_count: ValueTypeUI8 = Value(c_uint8, 0)
[docs] self.video_process_reserved_stacks: Value[c_uint32] = Value(c_uint32, 0)
[docs] self.video_process_completed_stacks: Value[c_uint64] = Value(c_uint64, 0)
[docs] self.live_profile_enabled: ValueTypeUI8 = Value(c_uint8, 0)
[docs] self.live_profile_bead: Value[c_int] = Value(c_int, 0)
[docs] self.camera_total_frames: Value[c_uint64] = Value(c_uint64, 0)
[docs] self.camera_consecutive_timeouts: Value[c_uint32] = Value(c_uint32, 0)
[docs] self.camera_queue_full_events: Value[c_uint64] = Value(c_uint64, 0)
[docs] self.camera_last_frame_timestamp: Value[c_double] = Value(c_double, 0.0)
[docs] class SingletonMeta(type):
[docs] _instances = {}
[docs] def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super().__call__(*args, **kwargs) else: # Raise an exception if a second instance is attempted raise TypeError(f"Cannot create another instance of {cls.__name__}. This is a Singleton class.") return cls._instances[cls]
[docs] class SingletonABCMeta(ABCMeta, SingletonMeta): pass
[docs] class ManagerProcessBase(Process, ABC, metaclass=SingletonABCMeta): """ Abstract base class for processes in the MagScope Subclass requirements: * Each subclass should have a unique name. * There should only be one instance of each subclass (singleton). * The class name is used for consistent inter-process identification. """ def __init__(self): # Note: Some setup/initialization will be at the beginning of the 'run()' method super().__init__()
[docs] self._acquisition_on: bool = True
[docs] self._acquisition_dir: str | None = None
[docs] self._acquisition_dir_on: bool = False
[docs] self._acquisition_mode: AcquisitionMode = AcquisitionMode.TRACK
[docs] self.bead_roi_buffer: BeadRoiBuffer | None = None
[docs] self._bead_roi_ids: np.ndarray = np.zeros((0,), dtype=np.uint32)
[docs] self._bead_roi_values: np.ndarray = np.zeros((0, 4), dtype=np.uint32)
[docs] self.camera_type: type[CameraBase] | None = None
[docs] self.hardware_types: dict[str, type[HardwareManagerBase]] = {}
[docs] self.locks: dict[str, LockType] | None = None
[docs] self._magscope_quitting: EventType | None = None
[docs] self.name: str = type(self).__name__ # Read-only
[docs] self._pipe: Connection | None = None # Pipe back to the 'MagScope' for inter-process communication
[docs] self.live_profile_buffer: LiveProfileBuffer | None = None
[docs] self._quitting: EventType = Event()
[docs] self._quit_requested: bool = False # A flag to prevent repeated calls to 'quit()' after one process asks the others to quit
[docs] self._running: bool = False
[docs] self.settings = None
[docs] self.tracks_buffer: MatrixBuffer | None = None
[docs] self.video_buffer: VideoBuffer | None = None
[docs] self.shared_values: InterprocessValues | None = None
[docs] self._command_registry: CommandRegistry | None = None
[docs] self._command_handlers: dict[type[Command], str] = {}
@property
[docs] def quitting_event(self) -> EventType: """Event set when this process has begun quitting.""" return self._quitting
@property
[docs] def bead_rois(self) -> dict[int, tuple[int, int, int, int]]: return { int(bead_id): (int(roi[0]), int(roi[1]), int(roi[2]), int(roi[3])) for bead_id, roi in zip(self._bead_roi_ids, self._bead_roi_values, strict=False) }
[docs] def get_cached_bead_rois(self) -> tuple[np.ndarray, np.ndarray]: return self._bead_roi_ids, self._bead_roi_values
[docs] def _refresh_bead_roi_cache(self) -> None: if self.bead_roi_buffer is None: self._bead_roi_ids = np.zeros((0,), dtype=np.uint32) self._bead_roi_values = np.zeros((0, 4), dtype=np.uint32) return bead_ids, bead_rois = self.bead_roi_buffer.get_beads() self._bead_roi_ids = bead_ids self._bead_roi_values = bead_rois
[docs] def configure_shared_resources( self, *, camera_type: type[CameraBase] | None, hardware_types: dict[str, type[HardwareManagerBase]], quitting_event: EventType, settings: MagScopeSettings, shared_values: InterprocessValues, locks: dict[str, LockType], pipe_end: Connection, command_registry: CommandRegistry, ) -> None: """Attach shared references provided by :class:`~magscope.scope.MagScope`. This centralizes initialization so callers do not need to mutate underscored attributes directly when preparing processes before ``start()`` is invoked. """ self.camera_type = camera_type self.hardware_types = hardware_types self._magscope_quitting = quitting_event self.settings = settings.clone() self.shared_values = shared_values self.locks = locks self._pipe = pipe_end self._command_registry = command_registry self._command_handlers = { command_type: spec.handler for command_type, spec in command_registry.handlers_for_target(self.name).items() }
[docs] def run(self): """Start the process when ``start()`` is called. Subclasses should create a main loop that calls ``receive_ipc()`` last:: while self._running: # do other stuff self.receive_ipc() """ if self._running: warn(f'{self.name} is already running') return logger.info('%s is starting', self.name) self._running = True try: if self._pipe is None: raise RuntimeError(f'{self.name} has no pipe') if self.locks is None: raise RuntimeError(f'{self.name} has no locks') if self._magscope_quitting is None: raise RuntimeError(f'{self.name} has no magscope_quitting event') if self._command_registry is None: raise RuntimeError(f'{self.name} has no command registry') self.live_profile_buffer = LiveProfileBuffer( create=False, locks=self.locks, ) self.bead_roi_buffer = BeadRoiBuffer( create=False, locks=self.locks, ) self.tracks_buffer = MatrixBuffer( create=False, locks=self.locks, name='TracksBuffer', ) self.video_buffer = VideoBuffer( create=False, locks=self.locks, ) self._refresh_bead_roi_cache() self.setup() while self._running: self.do_main_loop() self.receive_ipc() except Exception as exc: self._running = False self._report_exception(exc) raise
@abstractmethod
[docs] def setup(self): pass
@abstractmethod
[docs] def do_main_loop(self): pass
@register_ipc_command(QuitCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs] def quit(self): """Shutdown the process (and ask the other processes to quit too).""" self._quitting.set() self._running = False if not self._quit_requested: self.send_ipc(QuitCommand()) if self._pipe: if self._magscope_quitting is None: raise RuntimeError(f"{self.name} has no magscope_quitting event") drain_pipe_until_quit(self._pipe, self._magscope_quitting) self._pipe.close() self._pipe = None logger.info('%s quit', self.name)
[docs] def send_ipc(self, command: Command): if self._command_registry is None: raise RuntimeError(f"{self.name} cannot send IPC without a command registry") if self._magscope_quitting is None: raise RuntimeError(f"{self.name} has no magscope_quitting event") self._command_registry.route_for(command) # Validate registration early if self._pipe and self._magscope_quitting is not None and not self._magscope_quitting.is_set(): self._pipe.send(command)
[docs] def receive_ipc(self): # Check pipe for new messages if self._pipe is None or not self._pipe.poll(): return # Get the command command = self._pipe.recv() if not isinstance(command, Command): warn(f"Received unknown IPC payload {command!r}") return if isinstance(command, QuitCommand): self._quit_requested = True if self._command_registry is None: raise RuntimeError(f"{self.name} cannot handle IPC without a command registry") handler_name = self._command_handlers.get(type(command)) if handler_name is None: spec = self._command_registry.route_for(command) if spec.delivery != Delivery.BROADCAST: raise UnknownCommandError( f"{self.name} has no handler for command {type(command).__name__}" ) handler_name = spec.handler handler = getattr(self, handler_name, None) if handler is None: raise UnknownCommandError( f"{self.name} is missing handler {handler_name} " f"for command {type(command).__name__}" ) handler(**command_kwargs(command))
@register_ipc_command(SetAcquisitionDirCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase') @register_script_command(SetAcquisitionDirCommand)
[docs] def set_acquisition_dir(self, value: str | None): self._acquisition_dir = value
@register_ipc_command(SetAcquisitionDirOnCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase') @register_script_command(SetAcquisitionDirOnCommand)
[docs] def set_acquisition_dir_on(self, value: bool): self._acquisition_dir_on = value
@register_ipc_command(SetAcquisitionModeCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase') @register_script_command(SetAcquisitionModeCommand)
[docs] def set_acquisition_mode(self, mode: AcquisitionMode): self._acquisition_mode = mode
@register_ipc_command(SetAcquisitionOnCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase') @register_script_command(SetAcquisitionOnCommand)
[docs] def set_acquisition_on(self, value: bool): self._acquisition_on = value
@register_ipc_command(UpdateBeadRoisCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs] def refresh_bead_rois(self): self._refresh_bead_roi_cache()
@register_ipc_command(SetSettingsCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs] def set_settings(self, settings: MagScopeSettings): self.settings = settings.clone()
[docs] def _report_exception(self, exc: BaseException) -> None: error_details = ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__)) error_message = f"{self.name} encountered an unhandled exception:\n{error_details}" print(error_message, file=sys.stderr, flush=True) try: self.send_ipc(LogExceptionCommand(process_name=self.name, details=error_details)) except Exception: # The IPC pipe may already be unavailable; ensure we still surface the error locally. pass