from math import copysign
from time import time
from warnings import warn
import numpy as np
from magscope.datatypes import MatrixBuffer
from magscope.hardware import FocusMotorBase
from magscope.ipc import register_ipc_command
from magscope.ipc_commands import (
ExecuteXYLockCommand,
MoveFocusMotorAbsoluteCommand,
MoveBeadsCommand,
RemoveBeadFromPendingMovesCommand,
RemoveBeadsFromPendingMovesCommand,
SetXYLockIntervalCommand,
SetXYLockMaxCommand,
SetXYLockOnCommand,
SetXYLockWindowCommand,
SetZLockBeadCommand,
SetZLockIntervalCommand,
SetZLockMaxCommand,
SetZLockOnCommand,
SetZLockTargetCommand,
SetZLockWindowCommand,
UpdateXYLockEnabledCommand,
UpdateXYLockIntervalCommand,
UpdateXYLockMaxCommand,
UpdateXYLockWindowCommand,
UpdateZLockBeadCommand,
UpdateZLockEnabledCommand,
UpdateZLockIntervalCommand,
UpdateZLockMaxCommand,
UpdateZLockTargetCommand,
UpdateZLockWindowCommand,
)
from magscope.processes import ManagerProcessBase
from magscope.utils import register_script_command
[docs]
class BeadLockManager(ManagerProcessBase):
def __init__(self):
super().__init__()
# XY-Lock Properties
[docs]
self.xy_lock_on: bool = False
[docs]
self.xy_lock_interval: float
[docs]
self.xy_lock_max: float
[docs]
self.xy_lock_window: int
[docs]
self._xy_lock_last_time: float = 0.0
[docs]
self._xy_lock_global_cutoff: float = 0.0
[docs]
self._xy_lock_bead_cutoff: dict[int, float] = {}
[docs]
self._xy_lock_pending_moves: list[int] = []
# Z-Lock Properties
[docs]
self.z_lock_on: bool = False
[docs]
self.z_lock_bead: int = 0
[docs]
self.z_lock_target: float | None = None
[docs]
self.z_lock_interval: float
[docs]
self.z_lock_window: int
[docs]
self._z_lock_last_time: float = 0.0
[docs]
self._z_lock_global_cutoff: float = 0.0
[docs]
self._z_lock_expected_focus_target: float | None = None
[docs]
self._z_lock_last_focus_target: float | None = None
[docs]
self._focus_motor_name: str | None = None
[docs]
self._focus_buffer: MatrixBuffer | None = None
[docs]
def setup(self):
self.xy_lock_interval = self.settings['xy-lock default interval']
self.xy_lock_max = self.settings['xy-lock default max']
window_default = self.settings.get('xy-lock default window', 1)
self.xy_lock_window = max(1, int(window_default))
self.z_lock_interval = self.settings['z-lock default interval']
self.z_lock_max = self.settings['z-lock default max']
z_window_default = self.settings.get('z-lock default window', 1)
self.z_lock_window = max(1, int(z_window_default))
[docs]
def do_main_loop(self):
# XY-Lock Enabled
if self.xy_lock_on:
# Timer
if (now := time()) - self._xy_lock_last_time > self.xy_lock_interval:
self.do_xy_lock(now=now)
# Z-Lock Enabled
if self.z_lock_on:
# Timer
if (now := time()) - self._z_lock_last_time > self.z_lock_interval:
self.do_z_lock(now=now)
@register_ipc_command(ExecuteXYLockCommand)
@register_script_command(ExecuteXYLockCommand)
[docs]
def do_xy_lock(self, now=None):
""" Centers the bead-rois based on their tracked position """
# Gather information
width = self.settings['ROI']
half_width = width // 2
tracks = self.tracks_buffer.peak_unsorted().copy()
if now is None:
now = time()
self._xy_lock_last_time = now
bead_ids, bead_rois = self.get_cached_bead_rois()
# For each bead calculate if/how much to move
moves_to_send: list[tuple[int, int, int]] = []
for bead_id, roi in zip(bead_ids.tolist(), bead_rois, strict=False):
# Get the track for this bead
track = tracks[tracks[:, 4] == bead_id, :]
# Check there is track data
if track.shape[0] == 0:
continue
# Filter to valid positions for this ROI
position_mask = ~np.isnan(track[:, [0, 1, 2]]).any(axis=1)
valid_track = track[position_mask]
cutoff = max(
self._xy_lock_global_cutoff,
self._xy_lock_bead_cutoff.get(bead_id, 0.),
)
time_mask = valid_track[:, 0] >= cutoff
valid_track = valid_track[time_mask]
if valid_track.shape[0] == 0:
continue
# Use the most recent valid positions
order = np.argsort(valid_track[:, 0])[::-1]
recent_track = valid_track[order[: self.xy_lock_window]]
_, xs, ys, *_ = recent_track.T
x = float(np.mean(xs))
y = float(np.mean(ys))
# Check the bead started the last move
if bead_id in self._xy_lock_pending_moves:
continue
# Calculate the move
nm_per_px = self.camera_type.nm_per_px / self.settings['magnification']
dx = (x / nm_per_px) - half_width - roi[0]
dy = (y / nm_per_px) - half_width - roi[2]
if abs(dx) <= 1:
dx = 0.
if abs(dy) <= 1:
dy = 0.
dx = round(dx)
dy = round(dy)
# Limit movement to the maximum threshold
dx = copysign(min(abs(dx), self.xy_lock_max), dx)
dy = copysign(min(abs(dy), self.xy_lock_max), dy)
# Move the bead as needed
if abs(dx) > 0 or abs(dy) > 0:
moves_to_send.append((bead_id, int(dx), int(dy)))
if moves_to_send:
self._xy_lock_pending_moves.extend([id for id, _, _ in moves_to_send])
command = MoveBeadsCommand(moves=moves_to_send)
self.send_ipc(command)
[docs]
def do_z_lock(self, now=None):
if now is None:
now = time()
self._z_lock_last_time = now
focus_state = self._latest_focus_state()
if focus_state is not None:
self._update_z_lock_cutoff_for_external_focus_change(focus_state, now)
tracked_z = self._averaged_bead_z(self.z_lock_bead, self.z_lock_window)
if tracked_z is None:
return
if self.z_lock_target is None:
self.set_z_lock_target(tracked_z)
return
if focus_state is None:
return
current_focus_z, _target_focus_z, is_at_target = focus_state
if not is_at_target:
return
correction = float(self.z_lock_target - tracked_z)
max_step = abs(float(self.z_lock_max))
if max_step <= 0:
return
correction = float(np.clip(correction, -max_step, max_step))
if np.isclose(correction, 0.0):
return
new_target = float(current_focus_z + correction)
self._z_lock_expected_focus_target = new_target
self._advance_z_lock_cutoff(now)
self.send_ipc(MoveFocusMotorAbsoluteCommand(z=new_target))
[docs]
def _averaged_bead_z(self, bead_id: int, window: int) -> float | None:
if self.tracks_buffer is None:
return None
tracks = self.tracks_buffer.peak_unsorted()
if tracks.size == 0:
return None
finite_rows = np.isfinite(tracks[:, [0, 3, 4]]).all(axis=1)
written_rows = tracks[:, 0] > 0.0
bead_rows = tracks[:, 4] == bead_id
cutoff_rows = tracks[:, 0] >= self._z_lock_global_cutoff
valid_rows = finite_rows & written_rows & bead_rows & cutoff_rows
if not np.any(valid_rows):
return None
bead_tracks = tracks[valid_rows]
order = np.argsort(bead_tracks[:, 0])[::-1]
recent_track = bead_tracks[order[: max(1, int(window))]]
return float(np.mean(recent_track[:, 3]))
[docs]
def _latest_focus_state(self) -> tuple[float, float, bool] | None:
focus_buffer = self._focus_matrix_buffer()
if focus_buffer is None:
return None
data = focus_buffer.peak_sorted()
if data.size == 0:
return None
finite_rows = np.isfinite(data[:, 0])
if not np.any(finite_rows):
return None
_timestamp, current_z, target_z, is_at_target = data[finite_rows][-1, :]
return float(current_z), float(target_z), bool(round(is_at_target))
[docs]
def _advance_z_lock_cutoff(self, now: float | None = None) -> None:
self._z_lock_global_cutoff = time() if now is None else float(now)
[docs]
def _update_z_lock_cutoff_for_external_focus_change(
self,
focus_state: tuple[float, float, bool],
now: float,
) -> None:
_current_focus_z, focus_target_z, _is_at_target = focus_state
if self._z_lock_expected_focus_target is not None and np.isclose(
focus_target_z, self._z_lock_expected_focus_target
):
self._z_lock_last_focus_target = focus_target_z
return
if self._z_lock_last_focus_target is None:
self._z_lock_last_focus_target = focus_target_z
return
if np.isclose(focus_target_z, self._z_lock_last_focus_target):
return
self._z_lock_expected_focus_target = None
self._z_lock_last_focus_target = focus_target_z
self._advance_z_lock_cutoff(now)
[docs]
def _focus_matrix_buffer(self) -> MatrixBuffer | None:
self._focus_motor_name = self._focus_motor_name or self._discover_focus_motor_name()
if self._focus_motor_name is None:
return None
if self._focus_buffer is None:
self._focus_buffer = MatrixBuffer(
create=False,
locks=self.locks,
name=self._focus_motor_name,
)
return self._focus_buffer
[docs]
def _discover_focus_motor_name(self) -> str | None:
focus_motor_names: list[str] = []
for name, hardware_type in self.hardware_types.items():
try:
if issubclass(hardware_type, FocusMotorBase):
focus_motor_names.append(name)
except TypeError:
continue
if len(focus_motor_names) == 1:
return focus_motor_names[0]
if len(focus_motor_names) > 1:
warn('Z-lock requires exactly one registered FocusMotorBase hardware manager.')
return None
[docs]
def refresh_bead_rois(self):
previous_bead_rois = self.bead_rois
super().refresh_bead_rois()
current_bead_rois = self.bead_rois
active_ids = set(current_bead_rois)
# Check if any of the beads have been deleted
self._xy_lock_pending_moves = [
bead_id for bead_id in self._xy_lock_pending_moves if bead_id in active_ids
]
# Remove any bead-specific cutoffs for deleted beads
bead_cutoff_ids = list(self._xy_lock_bead_cutoff)
for bead_id in bead_cutoff_ids:
if bead_id not in current_bead_rois:
self._xy_lock_bead_cutoff.pop(bead_id, None)
now = time()
for bead_id, roi in current_bead_rois.items():
previous_roi = previous_bead_rois.get(bead_id)
if previous_roi == roi:
continue
if bead_id in self._xy_lock_pending_moves:
continue
self._xy_lock_bead_cutoff[bead_id] = now
if bead_id == self.z_lock_bead:
self._advance_z_lock_cutoff(now)
@register_ipc_command(RemoveBeadFromPendingMovesCommand)
[docs]
def remove_bead_from_xy_lock_pending_moves(self, id: int):
if id in self._xy_lock_pending_moves:
self._xy_lock_pending_moves.remove(id)
@register_ipc_command(RemoveBeadsFromPendingMovesCommand)
[docs]
def remove_beads_from_xy_lock_pending_moves(self, ids: list[int]):
if not ids:
return
pending_set = set(ids)
self._xy_lock_pending_moves = [
bead_id for bead_id in self._xy_lock_pending_moves if bead_id not in pending_set
]
@register_ipc_command(SetXYLockOnCommand)
@register_script_command(SetXYLockOnCommand)
[docs]
def set_xy_lock_on(self, value: bool):
self.xy_lock_on = value
self._xy_lock_global_cutoff = time()
command = UpdateXYLockEnabledCommand(value=value)
self.send_ipc(command)
@register_ipc_command(SetXYLockIntervalCommand)
@register_script_command(SetXYLockIntervalCommand)
[docs]
def set_xy_lock_interval(self, value: float):
if value <= 0:
return
self.xy_lock_interval = value
command = UpdateXYLockIntervalCommand(value=value)
self.send_ipc(command)
@register_ipc_command(SetXYLockMaxCommand)
@register_script_command(SetXYLockMaxCommand)
[docs]
def set_xy_lock_max(self, value: float):
value = max(1, round(value))
self.xy_lock_max = value
command = UpdateXYLockMaxCommand(value=value)
self.send_ipc(command)
@register_ipc_command(SetXYLockWindowCommand)
@register_script_command(SetXYLockWindowCommand)
[docs]
def set_xy_lock_window(self, value: int):
self.xy_lock_window = max(1, int(value))
command = UpdateXYLockWindowCommand(value=self.xy_lock_window)
self.send_ipc(command)
@register_ipc_command(SetZLockOnCommand)
@register_script_command(SetZLockOnCommand)
[docs]
def set_z_lock_on(self, value: bool):
self.z_lock_on = value
self._advance_z_lock_cutoff()
self._z_lock_expected_focus_target = None
command = UpdateZLockEnabledCommand(value=value)
self.send_ipc(command)
@register_ipc_command(SetZLockBeadCommand)
@register_script_command(SetZLockBeadCommand)
[docs]
def set_z_lock_bead(self, value: int):
value = int(value)
self.z_lock_bead = value
self._advance_z_lock_cutoff()
command = UpdateZLockBeadCommand(value=value)
self.send_ipc(command)
@register_ipc_command(SetZLockTargetCommand)
@register_script_command(SetZLockTargetCommand)
[docs]
def set_z_lock_target(self, value: float):
self.z_lock_target = value
self._advance_z_lock_cutoff()
command = UpdateZLockTargetCommand(value=value)
self.send_ipc(command)
@register_ipc_command(SetZLockIntervalCommand)
@register_script_command(SetZLockIntervalCommand)
[docs]
def set_z_lock_interval(self, value: float):
if value <= 0:
return
self.z_lock_interval = value
self._advance_z_lock_cutoff()
command = UpdateZLockIntervalCommand(value=value)
self.send_ipc(command)
@register_ipc_command(SetZLockMaxCommand)
@register_script_command(SetZLockMaxCommand)
[docs]
def set_z_lock_max(self, value: float):
self.z_lock_max = value
self._advance_z_lock_cutoff()
command = UpdateZLockMaxCommand(value=value)
self.send_ipc(command)
@register_ipc_command(SetZLockWindowCommand)
@register_script_command(SetZLockWindowCommand)
[docs]
def set_z_lock_window(self, value: int):
self.z_lock_window = max(1, int(value))
self._advance_z_lock_cutoff()
command = UpdateZLockWindowCommand(value=self.z_lock_window)
self.send_ipc(command)