Source code for magscope.hardware

from abc import ABC, abstractmethod
from pathlib import Path
from time import time
from warnings import warn

import numpy as np

from magscope.datatypes import MatrixBuffer
from magscope.ipc import register_ipc_command
from magscope.ipc_commands import (
    MoveFocusMotorAbsoluteCommand,
    ReportFocusMotorLimitsCommand,
    RequestFocusMotorLimitsCommand,
    SetSimulatedFocusCommand,
)
from magscope.processes import ManagerProcessBase, SingletonABCMeta


[docs] class HardwareManagerBase(ManagerProcessBase, ABC, metaclass=SingletonABCMeta): def __init__(self): super().__init__()
[docs] self.buffer_shape = (1000, 2)
[docs] self._buffer: MatrixBuffer | None = None
[docs] self._is_connected: bool = False
[docs] def setup(self): self._buffer = MatrixBuffer( create=False, locks=self.locks, name=self.name, )
[docs] def do_main_loop(self): self.fetch() self._save_pending_data_if_enabled()
[docs] def quit(self): super().quit() self.disconnect()
@abstractmethod
[docs] def connect(self): pass
@abstractmethod
[docs] def disconnect(self): pass
@abstractmethod
[docs] def fetch(self): """ Checks if the hardware has new data. If the hardware has new data, then it stores the data and timestamp in the matrix buffer (self._buffer). The timestamp should be the seconds since the unix epoch: (January 1, 1970, 00:00:00 UTC) """
[docs] def _save_pending_data_if_enabled(self) -> None: if not self._acquisition_dir_on or not self._acquisition_dir or self._buffer is None: return rows = self._buffer.read() if rows.size == 0: return # Fresh buffers are initialized with NaNs; skip them until real telemetry arrives. rows = rows[np.any(np.isfinite(rows), axis=1)] if rows.size == 0: return filepath = self._hardware_save_filepath() filepath.parent.mkdir(parents=True, exist_ok=True) should_write_header = not filepath.exists() or filepath.stat().st_size == 0 with filepath.open('a', encoding='utf-8', newline='') as file: if should_write_header: header = self._hardware_save_header() if header: file.write(f'# {header}\n') np.savetxt(file, rows)
[docs] def _hardware_save_filepath(self) -> Path: acquisition_dir = self._acquisition_dir if not acquisition_dir: raise RuntimeError(f'{self.name} has no acquisition directory configured') return Path(acquisition_dir) / f'{self.name}.txt'
[docs] def _hardware_save_header(self) -> str: if self.buffer_shape[1] <= 1: return 'timestamp' extra_columns = [f'value_{index}' for index in range(1, self.buffer_shape[1])] return ' '.join(['timestamp', *extra_columns])
[docs] class FocusMotorBase(HardwareManagerBase, ABC, metaclass=SingletonABCMeta): """Base class for absolute-Z focus motors used by MagScope. Subclasses provide the device-specific motion primitives while this base class standardizes polling, telemetry buffering, and the optional bridge to the simulated camera. The hardware matrix buffer stores rows as ``[timestamp, current_z, target_z, is_at_target]`` where ``is_at_target`` is encoded as ``0.0`` or ``1.0``. """ # 10 nm is a typical settling tolerance for this class of focus hardware.
[docs] at_target_tolerance = 10
def __init__(self): super().__init__()
[docs] self.buffer_shape = (100000, 4)
[docs] self.fetch_interval = 0.05
[docs] self._target_z = 0.0
[docs] self._last_written = 0.0
[docs] self._last_sent_focus: float | None = None
[docs] self._last_state: tuple[float, float, bool] | None = None
[docs] def setup(self): super().setup() self.connect() current_z = float(self.get_current_z()) self._target_z = current_z self._write_state(time(), current_z, force=True)
[docs] def fetch(self): if not self._is_connected: return now = time() self._poll_hardware(now) current_z = float(self.get_current_z()) is_at_target = bool(self.is_at_target()) state = (current_z, self._target_z, is_at_target) moved = self._last_state is None or not np.allclose(state[:2], self._last_state[:2]) target_state_changed = self._last_state is None or is_at_target != self._last_state[2] if moved: self._update_simulated_camera_focus(current_z) if target_state_changed or moved or (now - self._last_written) >= self.fetch_interval: self._write_state(now, current_z)
@register_ipc_command(MoveFocusMotorAbsoluteCommand)
[docs] def handle_move_absolute(self, z: float): z_min, z_max = self.get_position_limits() clipped_z = float(np.clip(z, z_min, z_max)) if not np.isclose(clipped_z, z): warn( f'{self.name} clipped requested z {z} to {clipped_z} within ' f'limits {(z_min, z_max)}' ) self._target_z = clipped_z self.move_absolute(clipped_z) self._write_state(time(), float(self.get_current_z()), force=True)
@register_ipc_command(RequestFocusMotorLimitsCommand)
[docs] def report_focus_motor_limits(self) -> None: z_min, z_max = self.get_position_limits() self.send_ipc(ReportFocusMotorLimitsCommand(z_min=float(z_min), z_max=float(z_max)))
[docs] def get_target_z(self) -> float: return self._target_z
[docs] def is_at_target(self, tolerance: float | None = None) -> bool: if tolerance is None: tolerance = self.at_target_tolerance return ( not self.get_is_moving() and abs(float(self.get_current_z()) - self._target_z) <= float(tolerance) )
[docs] def _write_state(self, timestamp: float, current_z: float, *, force: bool = False) -> None: if self._buffer is None: raise RuntimeError(f'{self.name} has no hardware buffer') is_at_target = bool(self.is_at_target()) row = np.array( [[timestamp, current_z, self._target_z, float(is_at_target)]], dtype=float, ) if force or not np.isclose(current_z, self._target_z): self._update_simulated_camera_focus(current_z, force=force) self._buffer.write(row) self._last_written = timestamp self._last_state = (current_z, self._target_z, is_at_target)
[docs] def _poll_hardware(self, now: float) -> None: """Allow subclasses to advance device state before telemetry is sampled."""
[docs] def _update_simulated_camera_focus(self, current_z: float, *, force: bool = False) -> None: from magscope.camera import DummyCameraBeads if self.camera_type is None or not issubclass(self.camera_type, DummyCameraBeads): return if force or self._last_sent_focus is None or not np.isclose(current_z, self._last_sent_focus): self._last_sent_focus = current_z self.send_ipc(SetSimulatedFocusCommand(offset=current_z))
@abstractmethod
[docs] def move_absolute(self, z: float) -> None: """Command the motor to move to an absolute Z position."""
@abstractmethod
[docs] def get_current_z(self) -> float: """Return the motor's reported current Z position."""
@abstractmethod
[docs] def get_is_moving(self) -> bool: """Return the motor's reported moving state."""
@abstractmethod
[docs] def get_position_limits(self) -> tuple[float, float]: """Return the allowed absolute Z limits for this motor."""