from __future__ import annotations
from abc import ABC, ABCMeta, abstractmethod
from ctypes import c_double, c_int, c_uint32, c_uint64, c_uint8
from multiprocessing import Event, Process, Value
import sys
import traceback
from typing import TYPE_CHECKING
from warnings import warn
import numpy as np
from magscope._logging import get_logger
from magscope.datatypes import BeadRoiBuffer, LiveProfileBuffer, MatrixBuffer, VideoBuffer
from magscope.ipc import (CommandRegistry, Delivery, UnknownCommandError, command_kwargs,
drain_pipe_until_quit, register_ipc_command)
from magscope.ipc_commands import (Command, LogExceptionCommand, QuitCommand,
SetAcquisitionDirCommand, SetAcquisitionDirOnCommand,
SetAcquisitionModeCommand, SetAcquisitionOnCommand,
SetSettingsCommand, UpdateBeadRoisCommand)
from magscope.settings import MagScopeSettings
from magscope.utils import AcquisitionMode, register_script_command
[docs]
logger = get_logger("processes")
if TYPE_CHECKING:
from multiprocessing.connection import Connection
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event as EventType
from multiprocessing.synchronize import Lock as LockType
[docs]
ValueTypeUI8 = Synchronized[int]
from magscope.camera import CameraBase
from magscope.hardware import HardwareManagerBase
[docs]
class InterprocessValues:
def __init__(self):
[docs]
self.video_process_busy_count: ValueTypeUI8 = Value(c_uint8, 0)
[docs]
self.video_process_reserved_stacks: Value[c_uint32] = Value(c_uint32, 0)
[docs]
self.video_process_completed_stacks: Value[c_uint64] = Value(c_uint64, 0)
[docs]
self.live_profile_enabled: ValueTypeUI8 = Value(c_uint8, 0)
[docs]
self.live_profile_bead: Value[c_int] = Value(c_int, 0)
[docs]
self.camera_total_frames: Value[c_uint64] = Value(c_uint64, 0)
[docs]
self.camera_consecutive_timeouts: Value[c_uint32] = Value(c_uint32, 0)
[docs]
self.camera_queue_full_events: Value[c_uint64] = Value(c_uint64, 0)
[docs]
self.camera_last_frame_timestamp: Value[c_double] = Value(c_double, 0.0)
[docs]
class ManagerProcessBase(Process, ABC, metaclass=SingletonABCMeta):
""" Abstract base class for processes in the MagScope
Subclass requirements:
* Each subclass should have a unique name.
* There should only be one instance of each subclass (singleton).
* The class name is used for consistent inter-process identification.
"""
def __init__(self):
# Note: Some setup/initialization will be at the beginning of the 'run()' method
super().__init__()
[docs]
self._acquisition_on: bool = True
[docs]
self._acquisition_dir: str | None = None
[docs]
self._acquisition_dir_on: bool = False
[docs]
self._acquisition_mode: AcquisitionMode = AcquisitionMode.TRACK
[docs]
self.bead_roi_buffer: BeadRoiBuffer | None = None
[docs]
self._bead_roi_ids: np.ndarray = np.zeros((0,), dtype=np.uint32)
[docs]
self._bead_roi_values: np.ndarray = np.zeros((0, 4), dtype=np.uint32)
[docs]
self.camera_type: type[CameraBase] | None = None
[docs]
self.hardware_types: dict[str, type[HardwareManagerBase]] = {}
[docs]
self.locks: dict[str, LockType] | None = None
[docs]
self._magscope_quitting: EventType | None = None
[docs]
self.name: str = type(self).__name__ # Read-only
[docs]
self._pipe: Connection | None = None # Pipe back to the 'MagScope' for inter-process communication
[docs]
self.live_profile_buffer: LiveProfileBuffer | None = None
[docs]
self._quitting: EventType = Event()
[docs]
self._quit_requested: bool = False # A flag to prevent repeated calls to 'quit()' after one process asks the others to quit
[docs]
self._running: bool = False
[docs]
self.tracks_buffer: MatrixBuffer | None = None
[docs]
self.video_buffer: VideoBuffer | None = None
[docs]
self.shared_values: InterprocessValues | None = None
[docs]
self._command_registry: CommandRegistry | None = None
[docs]
self._command_handlers: dict[type[Command], str] = {}
@property
[docs]
def quitting_event(self) -> EventType:
"""Event set when this process has begun quitting."""
return self._quitting
@property
[docs]
def bead_rois(self) -> dict[int, tuple[int, int, int, int]]:
return {
int(bead_id): (int(roi[0]), int(roi[1]), int(roi[2]), int(roi[3]))
for bead_id, roi in zip(self._bead_roi_ids, self._bead_roi_values, strict=False)
}
[docs]
def get_cached_bead_rois(self) -> tuple[np.ndarray, np.ndarray]:
return self._bead_roi_ids, self._bead_roi_values
[docs]
def _refresh_bead_roi_cache(self) -> None:
if self.bead_roi_buffer is None:
self._bead_roi_ids = np.zeros((0,), dtype=np.uint32)
self._bead_roi_values = np.zeros((0, 4), dtype=np.uint32)
return
bead_ids, bead_rois = self.bead_roi_buffer.get_beads()
self._bead_roi_ids = bead_ids
self._bead_roi_values = bead_rois
[docs]
def run(self):
"""Start the process when ``start()`` is called.
Subclasses should create a main loop that calls ``receive_ipc()`` last::
while self._running:
# do other stuff
self.receive_ipc()
"""
if self._running:
warn(f'{self.name} is already running')
return
logger.info('%s is starting', self.name)
self._running = True
try:
if self._pipe is None:
raise RuntimeError(f'{self.name} has no pipe')
if self.locks is None:
raise RuntimeError(f'{self.name} has no locks')
if self._magscope_quitting is None:
raise RuntimeError(f'{self.name} has no magscope_quitting event')
if self._command_registry is None:
raise RuntimeError(f'{self.name} has no command registry')
self.live_profile_buffer = LiveProfileBuffer(
create=False,
locks=self.locks,
)
self.bead_roi_buffer = BeadRoiBuffer(
create=False,
locks=self.locks,
)
self.tracks_buffer = MatrixBuffer(
create=False,
locks=self.locks,
name='TracksBuffer',
)
self.video_buffer = VideoBuffer(
create=False,
locks=self.locks,
)
self._refresh_bead_roi_cache()
self.setup()
while self._running:
self.do_main_loop()
self.receive_ipc()
except Exception as exc:
self._running = False
self._report_exception(exc)
raise
@abstractmethod
@abstractmethod
[docs]
def do_main_loop(self):
pass
@register_ipc_command(QuitCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs]
def quit(self):
"""Shutdown the process (and ask the other processes to quit too)."""
self._quitting.set()
self._running = False
if not self._quit_requested:
self.send_ipc(QuitCommand())
if self._pipe:
if self._magscope_quitting is None:
raise RuntimeError(f"{self.name} has no magscope_quitting event")
drain_pipe_until_quit(self._pipe, self._magscope_quitting)
self._pipe.close()
self._pipe = None
logger.info('%s quit', self.name)
[docs]
def send_ipc(self, command: Command):
if self._command_registry is None:
raise RuntimeError(f"{self.name} cannot send IPC without a command registry")
if self._magscope_quitting is None:
raise RuntimeError(f"{self.name} has no magscope_quitting event")
self._command_registry.route_for(command) # Validate registration early
if self._pipe and self._magscope_quitting is not None and not self._magscope_quitting.is_set():
self._pipe.send(command)
[docs]
def receive_ipc(self):
# Check pipe for new messages
if self._pipe is None or not self._pipe.poll():
return
# Get the command
command = self._pipe.recv()
if not isinstance(command, Command):
warn(f"Received unknown IPC payload {command!r}")
return
if isinstance(command, QuitCommand):
self._quit_requested = True
if self._command_registry is None:
raise RuntimeError(f"{self.name} cannot handle IPC without a command registry")
handler_name = self._command_handlers.get(type(command))
if handler_name is None:
spec = self._command_registry.route_for(command)
if spec.delivery != Delivery.BROADCAST:
raise UnknownCommandError(
f"{self.name} has no handler for command {type(command).__name__}"
)
handler_name = spec.handler
handler = getattr(self, handler_name, None)
if handler is None:
raise UnknownCommandError(
f"{self.name} is missing handler {handler_name} "
f"for command {type(command).__name__}"
)
handler(**command_kwargs(command))
@register_ipc_command(SetAcquisitionDirCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
@register_script_command(SetAcquisitionDirCommand)
[docs]
def set_acquisition_dir(self, value: str | None):
self._acquisition_dir = value
@register_ipc_command(SetAcquisitionDirOnCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
@register_script_command(SetAcquisitionDirOnCommand)
[docs]
def set_acquisition_dir_on(self, value: bool):
self._acquisition_dir_on = value
@register_ipc_command(SetAcquisitionModeCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
@register_script_command(SetAcquisitionModeCommand)
[docs]
def set_acquisition_mode(self, mode: AcquisitionMode):
self._acquisition_mode = mode
@register_ipc_command(SetAcquisitionOnCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
@register_script_command(SetAcquisitionOnCommand)
[docs]
def set_acquisition_on(self, value: bool):
self._acquisition_on = value
@register_ipc_command(UpdateBeadRoisCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs]
def refresh_bead_rois(self):
self._refresh_bead_roi_cache()
@register_ipc_command(SetSettingsCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs]
def set_settings(self, settings: MagScopeSettings):
self.settings = settings.clone()
[docs]
def _report_exception(self, exc: BaseException) -> None:
error_details = ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__))
error_message = f"{self.name} encountered an unhandled exception:\n{error_details}"
print(error_message, file=sys.stderr, flush=True)
try:
self.send_ipc(LogExceptionCommand(process_name=self.name, details=error_details))
except Exception:
# The IPC pipe may already be unavailable; ensure we still surface the error locally.
pass