from abc import ABC, abstractmethod
from pathlib import Path
from time import time
from warnings import warn
import numpy as np
from magscope.datatypes import MatrixBuffer
from magscope.ipc import register_ipc_command
from magscope.ipc_commands import (
MoveFocusMotorAbsoluteCommand,
ReportFocusMotorLimitsCommand,
RequestFocusMotorLimitsCommand,
SetSimulatedFocusCommand,
)
from magscope.processes import ManagerProcessBase, SingletonABCMeta
[docs]
class HardwareManagerBase(ManagerProcessBase, ABC, metaclass=SingletonABCMeta):
def __init__(self):
super().__init__()
[docs]
self.buffer_shape = (1000, 2)
[docs]
self._buffer: MatrixBuffer | None = None
[docs]
self._is_connected: bool = False
[docs]
def setup(self):
self._buffer = MatrixBuffer(
create=False,
locks=self.locks,
name=self.name,
)
[docs]
def do_main_loop(self):
self.fetch()
self._save_pending_data_if_enabled()
[docs]
def quit(self):
super().quit()
self.disconnect()
@abstractmethod
[docs]
def connect(self):
pass
@abstractmethod
[docs]
def disconnect(self):
pass
@abstractmethod
[docs]
def fetch(self):
"""
Checks if the hardware has new data.
If the hardware has new data, then it stores the
data and timestamp in the matrix buffer (self._buffer).
The timestamp should be the seconds since the unix epoch:
(January 1, 1970, 00:00:00 UTC) """
[docs]
def _save_pending_data_if_enabled(self) -> None:
if not self._acquisition_dir_on or not self._acquisition_dir or self._buffer is None:
return
rows = self._buffer.read()
if rows.size == 0:
return
# Fresh buffers are initialized with NaNs; skip them until real telemetry arrives.
rows = rows[np.any(np.isfinite(rows), axis=1)]
if rows.size == 0:
return
filepath = self._hardware_save_filepath()
filepath.parent.mkdir(parents=True, exist_ok=True)
should_write_header = not filepath.exists() or filepath.stat().st_size == 0
with filepath.open('a', encoding='utf-8', newline='') as file:
if should_write_header:
header = self._hardware_save_header()
if header:
file.write(f'# {header}\n')
np.savetxt(file, rows)
[docs]
def _hardware_save_filepath(self) -> Path:
acquisition_dir = self._acquisition_dir
if not acquisition_dir:
raise RuntimeError(f'{self.name} has no acquisition directory configured')
return Path(acquisition_dir) / f'{self.name}.txt'
[docs]
class FocusMotorBase(HardwareManagerBase, ABC, metaclass=SingletonABCMeta):
"""Base class for absolute-Z focus motors used by MagScope.
Subclasses provide the device-specific motion primitives while this base
class standardizes polling, telemetry buffering, and the optional bridge to
the simulated camera. The hardware matrix buffer stores rows as
``[timestamp, current_z, target_z, is_at_target]`` where ``is_at_target``
is encoded as ``0.0`` or ``1.0``.
"""
# 10 nm is a typical settling tolerance for this class of focus hardware.
[docs]
at_target_tolerance = 10
def __init__(self):
super().__init__()
[docs]
self.buffer_shape = (100000, 4)
[docs]
self.fetch_interval = 0.05
[docs]
self._last_written = 0.0
[docs]
self._last_sent_focus: float | None = None
[docs]
self._last_state: tuple[float, float, bool] | None = None
[docs]
def setup(self):
super().setup()
self.connect()
current_z = float(self.get_current_z())
self._target_z = current_z
self._write_state(time(), current_z, force=True)
[docs]
def fetch(self):
if not self._is_connected:
return
now = time()
self._poll_hardware(now)
current_z = float(self.get_current_z())
is_at_target = bool(self.is_at_target())
state = (current_z, self._target_z, is_at_target)
moved = self._last_state is None or not np.allclose(state[:2], self._last_state[:2])
target_state_changed = self._last_state is None or is_at_target != self._last_state[2]
if moved:
self._update_simulated_camera_focus(current_z)
if target_state_changed or moved or (now - self._last_written) >= self.fetch_interval:
self._write_state(now, current_z)
@register_ipc_command(MoveFocusMotorAbsoluteCommand)
[docs]
def handle_move_absolute(self, z: float):
z_min, z_max = self.get_position_limits()
clipped_z = float(np.clip(z, z_min, z_max))
if not np.isclose(clipped_z, z):
warn(
f'{self.name} clipped requested z {z} to {clipped_z} within '
f'limits {(z_min, z_max)}'
)
self._target_z = clipped_z
self.move_absolute(clipped_z)
self._write_state(time(), float(self.get_current_z()), force=True)
@register_ipc_command(RequestFocusMotorLimitsCommand)
[docs]
def report_focus_motor_limits(self) -> None:
z_min, z_max = self.get_position_limits()
self.send_ipc(ReportFocusMotorLimitsCommand(z_min=float(z_min), z_max=float(z_max)))
[docs]
def get_target_z(self) -> float:
return self._target_z
[docs]
def is_at_target(self, tolerance: float | None = None) -> bool:
if tolerance is None:
tolerance = self.at_target_tolerance
return (
not self.get_is_moving()
and abs(float(self.get_current_z()) - self._target_z) <= float(tolerance)
)
[docs]
def _write_state(self, timestamp: float, current_z: float, *, force: bool = False) -> None:
if self._buffer is None:
raise RuntimeError(f'{self.name} has no hardware buffer')
is_at_target = bool(self.is_at_target())
row = np.array(
[[timestamp, current_z, self._target_z, float(is_at_target)]],
dtype=float,
)
if force or not np.isclose(current_z, self._target_z):
self._update_simulated_camera_focus(current_z, force=force)
self._buffer.write(row)
self._last_written = timestamp
self._last_state = (current_z, self._target_z, is_at_target)
[docs]
def _poll_hardware(self, now: float) -> None:
"""Allow subclasses to advance device state before telemetry is sampled."""
[docs]
def _update_simulated_camera_focus(self, current_z: float, *, force: bool = False) -> None:
from magscope.camera import DummyCameraBeads
if self.camera_type is None or not issubclass(self.camera_type, DummyCameraBeads):
return
if force or self._last_sent_focus is None or not np.isclose(current_z, self._last_sent_focus):
self._last_sent_focus = current_z
self.send_ipc(SetSimulatedFocusCommand(offset=current_z))
@abstractmethod
[docs]
def move_absolute(self, z: float) -> None:
"""Command the motor to move to an absolute Z position."""
@abstractmethod
[docs]
def get_current_z(self) -> float:
"""Return the motor's reported current Z position."""
@abstractmethod
[docs]
def get_is_moving(self) -> bool:
"""Return the motor's reported moving state."""
@abstractmethod
[docs]
def get_position_limits(self) -> tuple[float, float]:
"""Return the allowed absolute Z limits for this motor."""