from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from time import time
import numpy as np
from magscope._logging import get_logger
from magscope.datatypes import MatrixBuffer, ZLUTSweepDataset
from magscope.hardware import FocusMotorBase
from magscope.ipc import register_ipc_command
from magscope.ipc_commands import (
ArmZLUTSweepCaptureCommand,
CancelGeneratedZLUTEvaluationCommand,
CancelZLUTGenerationCommand,
ClearPendingZLUTProfileLengthCommand,
DisarmZLUTSweepCaptureCommand,
LoadZLUTCommand,
MoveFocusMotorAbsoluteCommand,
ReportFocusMotorLimitsCommand,
ReportZLUTProfileLengthCommand,
RequestFocusMotorLimitsCommand,
RequestZLUTProfileLengthCommand,
SaveGeneratedZLUTCommand,
SelectGeneratedZLUTBeadCommand,
SetAcquisitionOnCommand,
ShowErrorCommand,
ShowMessageCommand,
StartZLUTGenerationCommand,
UpdateZLUTGenerationEvaluationCommand,
UpdateZLUTGenerationProgressCommand,
UpdateZLUTGenerationStateCommand,
ZLUTSweepCaptureCompleteCommand,
)
from magscope.processes import ManagerProcessBase
from magscope.utils import AcquisitionMode
[docs]
logger = get_logger('zlut_generation')
@dataclass(frozen=True)
[docs]
class GeneratedZLUTResult:
[docs]
class ZLUTGenerationManager(ManagerProcessBase):
[docs]
_TRACKING_ACQUISITION_MODES = {
AcquisitionMode.TRACK,
AcquisitionMode.TRACK_AND_VIDEO_ROIS,
AcquisitionMode.TRACK_AND_VIDEO_FULL,
}
def __init__(self):
super().__init__()
[docs]
self._cancel_requested = False
[docs]
self._current_step_index = 0
[docs]
self._dataset: ZLUTSweepDataset | None = None
[docs]
self._focus_buffer: MatrixBuffer | None = None
[docs]
self._focus_motor_name: str | None = None
[docs]
self._generated_zluts: dict[int, GeneratedZLUTResult] = {}
[docs]
self._last_progress_emit = 0.0
[docs]
self._previous_acquisition_on = False
[docs]
self._profile_length: int | None = None
[docs]
self._profiles_per_bead = 0
[docs]
self._pending_start_request: tuple[float, float, float, int] | None = None
[docs]
self._current_step_capture_earliest_timestamp = 0.0
[docs]
self._current_step_profiles_written = 0
[docs]
self._requested_range: tuple[float, float, float] | None = None
[docs]
self._selected_bead_id: int | None = None
[docs]
self._session_bead_roi_ids = np.zeros((0,), dtype=np.uint32)
[docs]
self._session_bead_roi_values = np.zeros((0, 4), dtype=np.uint32)
[docs]
self._step_capture_complete = False
[docs]
self._steps = np.zeros((0,), dtype=np.float64)
[docs]
def setup(self):
self._focus_motor_name = self._discover_focus_motor_name()
if self._focus_motor_name is not None:
self._focus_buffer = MatrixBuffer(
create=False,
locks=self.locks,
name=self._focus_motor_name,
)
[docs]
def do_main_loop(self):
if not self._active:
return
if self._cancel_requested:
self._cancel_session()
return
if self._phase == 'moving':
self._advance_when_in_position()
elif self._phase == 'capturing':
self._advance_after_capture()
self._maybe_send_progress()
[docs]
def quit(self):
self._cleanup_runtime_state(destroy_dataset=True)
super().quit()
@register_ipc_command(StartZLUTGenerationCommand)
[docs]
def start_generation(self, start_nm: float, step_nm: float, stop_nm: float, profiles_per_bead: int):
if self._active or self._phase in {'evaluating', 'waiting_focus_limits'}:
startup_running = self._active or self._phase == 'waiting_focus_limits'
self._send_state(
'Generation already running.',
detail='Cancel the current sweep before starting another one.',
running=startup_running,
can_cancel=startup_running,
phase=self._phase,
)
return
self._refresh_bead_roi_cache()
try:
if self._acquisition_mode not in self._TRACKING_ACQUISITION_MODES:
raise RuntimeError(
'Z-LUT generation requires a tracking acquisition mode. '
'Switch to Track, Track and Video (ROIs), or Track and Video (Full).'
)
self._build_steps(start_nm, step_nm, stop_nm)
if int(profiles_per_bead) <= 0:
raise ValueError('Measurements per step must be a positive integer.')
self._focus_motor_name = self._focus_motor_name or self._discover_focus_motor_name()
if self._focus_motor_name is None:
raise RuntimeError('No FocusMotorBase hardware is registered.')
self._pending_start_request = (
float(start_nm),
float(step_nm),
float(stop_nm),
int(profiles_per_bead),
)
self._phase = 'waiting_focus_limits'
self._send_state(
'Waiting for focus motor limits.',
detail='Checking that the requested Z-LUT sweep stays within the focus motor range.',
running=True,
can_cancel=True,
phase='waiting_focus_limits',
)
self.send_ipc(RequestFocusMotorLimitsCommand())
except Exception as exc:
self._fail_startup(exc)
return
@register_ipc_command(CancelZLUTGenerationCommand)
[docs]
def cancel_generation(self):
if self._phase == 'evaluating':
self.cancel_evaluation()
return
if self._phase == 'waiting_focus_limits':
self._send_state('Z-LUT generation canceled.', running=False, can_cancel=False, phase='idle')
self._cleanup_runtime_state(destroy_dataset=True)
return
if not self._active:
return
self._cancel_requested = True
self._send_state(
'Canceling Z-LUT generation...',
running=True,
can_cancel=False,
phase=self._phase,
)
@register_ipc_command(ReportZLUTProfileLengthCommand)
[docs]
def report_profile_length(self, profile_length: int | None = None):
if not self._active or self._phase != 'waiting_profile_length':
return
if profile_length is None or int(profile_length) <= 0:
self._fail_session('Could not determine profile length from the current tracking output.')
return
self._profile_length = int(profile_length)
self._create_dataset()
self._current_step_index = 0
self._step_capture_complete = False
self._issue_move_for_current_step()
@register_ipc_command(ReportFocusMotorLimitsCommand)
[docs]
def report_focus_motor_limits(self, z_min: float, z_max: float) -> None:
if self._phase != 'waiting_focus_limits' or self._pending_start_request is None:
return
try:
start_nm, step_nm, stop_nm, profiles_per_bead = self._pending_start_request
self._validate_sweep_limits(start_nm, stop_nm, z_min, z_max)
self._prepare_session(start_nm, step_nm, stop_nm, profiles_per_bead)
except Exception as exc:
self._fail_startup(exc)
return
self._pending_start_request = None
self._send_state(
'Waiting for a processed frame to measure profile length.',
detail='Z-LUT generation is preparing shared memory and capture settings.',
running=True,
can_cancel=True,
phase='waiting_profile_length',
)
self._send_progress(force=True)
self.send_ipc(SetAcquisitionOnCommand(True))
self.send_ipc(
RequestZLUTProfileLengthCommand(
bead_ids=self._bead_id_payload(),
bead_rois=self._bead_roi_payload(),
)
)
@register_ipc_command(ZLUTSweepCaptureCompleteCommand)
[docs]
def handle_capture_complete(
self,
step_index: int,
written_count: int,
written_profiles_per_bead: int,
error: str | None = None,
):
if not self._active or self._phase != 'capturing':
return
if step_index != self._current_step_index:
return
if error:
self._fail_session(error)
return
if written_count <= 0:
logger.debug(
'Retrying Z-LUT capture for step %d after no fresh processed profiles were available',
self._current_step_index,
)
self.send_ipc(
ArmZLUTSweepCaptureCommand(
step_index=self._current_step_index,
motor_z_value=float(self._steps[self._current_step_index]),
remaining_profiles_per_bead=self._profiles_per_bead - self._current_step_profiles_written,
earliest_timestamp=self._current_step_capture_earliest_timestamp,
bead_ids=self._bead_id_payload(),
bead_rois=self._bead_roi_payload(),
)
)
self._send_state(
f'Waiting for a fresh settled frame at step {self._current_step_index + 1} of {self._steps.size}.',
detail='Skipping frames captured before the focus motor fully settled.',
running=True,
can_cancel=True,
phase='capturing',
)
return
self._current_step_profiles_written += int(written_profiles_per_bead)
if self._current_step_profiles_written >= self._profiles_per_bead:
self._step_capture_complete = True
return
self.send_ipc(
ArmZLUTSweepCaptureCommand(
step_index=self._current_step_index,
motor_z_value=float(self._steps[self._current_step_index]),
remaining_profiles_per_bead=self._profiles_per_bead - self._current_step_profiles_written,
earliest_timestamp=self._current_step_capture_earliest_timestamp,
bead_ids=self._bead_id_payload(),
bead_rois=self._bead_roi_payload(),
)
)
self._send_state(
f'Capturing step {self._current_step_index + 1} of {self._steps.size}.',
detail=(
f'Collected {self._current_step_profiles_written} / {self._profiles_per_bead} '
'profiles per bead.'
),
running=True,
can_cancel=True,
phase='capturing',
)
@register_ipc_command(SelectGeneratedZLUTBeadCommand)
[docs]
def select_generated_bead(self, bead_id: int):
if self._phase != 'evaluating':
return
bead_id = int(bead_id)
if bead_id not in self._generated_zluts:
self._fail_evaluation(f'Generated Z-LUT bead {bead_id} is not available.')
return
self._selected_bead_id = bead_id
self._send_evaluation_state(active=True)
@register_ipc_command(SaveGeneratedZLUTCommand)
[docs]
def save_generated_zlut(self, filepath: str, bead_id: int, load_after_save: bool = True):
if self._phase != 'evaluating':
return
bead_id = int(bead_id)
result = self._generated_zluts.get(bead_id)
if result is None:
self._fail_evaluation(f'Generated Z-LUT bead {bead_id} is not available.')
return
path = Path(filepath).expanduser()
if not path.parent.exists():
self._fail_evaluation(f'Directory does not exist: {path.parent}')
return
try:
np.savetxt(path, result.zlut_array)
except Exception as exc:
reason = str(exc).strip() or repr(exc)
self._fail_evaluation(f'Failed to save generated Z-LUT: {reason}')
return
if load_after_save:
self.send_ipc(LoadZLUTCommand(filepath=str(path)))
self.send_ipc(
ShowMessageCommand(
text='Generated Z-LUT loaded.',
details=f'Loaded bead {bead_id} from {path}',
)
)
self._send_state(
'Generated Z-LUT saved and loaded.',
detail=f'Saved and loaded bead {bead_id} from {path}',
running=False,
can_cancel=False,
phase='evaluating',
)
else:
self._send_state(
'Generated Z-LUT saved.',
detail=f'Saved bead {bead_id} to {path}',
running=False,
can_cancel=False,
phase='evaluating',
)
self._send_evaluation_state(active=True)
@register_ipc_command(CancelGeneratedZLUTEvaluationCommand)
[docs]
def cancel_evaluation(self):
if self._phase != 'evaluating':
return
self._send_state(
'Generated Z-LUT discarded.',
detail='The temporary sweep dataset has been cleared without loading a new Z-LUT.',
running=False,
can_cancel=False,
phase='idle',
)
self._cleanup_runtime_state(destroy_dataset=True)
[docs]
def _prepare_session(
self,
start_nm: float,
step_nm: float,
stop_nm: float,
profiles_per_bead: int,
) -> None:
self._cleanup_runtime_state(destroy_dataset=True)
if self._acquisition_mode not in self._TRACKING_ACQUISITION_MODES:
raise RuntimeError(
'Z-LUT generation requires a tracking acquisition mode. '
'Switch to Track, Track and Video (ROIs), or Track and Video (Full).'
)
self._focus_motor_name = self._focus_motor_name or self._discover_focus_motor_name()
if self._focus_motor_name is None:
raise RuntimeError('No FocusMotorBase hardware is registered.')
if self._focus_buffer is None:
self._focus_buffer = MatrixBuffer(create=False, locks=self.locks, name=self._focus_motor_name)
if self.video_buffer is None:
raise RuntimeError('Video buffer is not available.')
if self._bead_roi_ids.size == 0 or self._bead_roi_values.shape[0] == 0:
raise RuntimeError('At least one bead ROI must be selected before generating a Z-LUT.')
self._session_bead_roi_ids = np.asarray(self._bead_roi_ids, dtype=np.uint32).copy()
self._session_bead_roi_values = np.asarray(self._bead_roi_values, dtype=np.uint32).copy()
steps = self._build_steps(start_nm, step_nm, stop_nm)
if int(profiles_per_bead) <= 0:
raise ValueError('Measurements per step must be a positive integer.')
self._active = True
self._cancel_requested = False
self._current_step_index = 0
self._dataset = None
self._generated_zluts = {}
self._last_progress_emit = 0.0
self._phase = 'waiting_profile_length'
self._previous_acquisition_on = bool(self._acquisition_on)
self._profile_length = None
self._profiles_per_bead = int(profiles_per_bead)
self._current_step_profiles_written = 0
self._requested_range = (float(start_nm), float(step_nm), float(stop_nm))
self._selected_bead_id = None
self._step_capture_complete = False
self._steps = steps
self._send_evaluation_state(active=False)
[docs]
def _create_dataset(self) -> None:
if self._profile_length is None:
raise RuntimeError('Profile length must be known before creating the dataset.')
n_steps = int(self._steps.size)
n_beads = int(self._session_bead_roi_ids.size)
capacity = n_steps * n_beads * self._profiles_per_bead
self._reset_dataset(destroy=True)
self._dataset = ZLUTSweepDataset.create(
locks=self.locks,
capacity=capacity,
profile_length=self._profile_length,
n_steps=n_steps,
n_beads=n_beads,
profiles_per_bead=self._profiles_per_bead,
)
self._dataset.set_state(ZLUTSweepDataset.STATE_CAPTURING)
[docs]
def _issue_move_for_current_step(self) -> None:
if self._current_step_index >= self._steps.size:
self._complete_session()
return
target_z = float(self._steps[self._current_step_index])
self._phase = 'moving'
self._step_capture_complete = False
self._current_step_profiles_written = 0
self._current_step_capture_earliest_timestamp = 0.0
self.send_ipc(DisarmZLUTSweepCaptureCommand())
self.send_ipc(SetAcquisitionOnCommand(False))
self.send_ipc(MoveFocusMotorAbsoluteCommand(z=target_z))
self._send_state(
f'Moving focus motor to step {self._current_step_index + 1} of {self._steps.size}.',
detail=f'Target Z: {target_z:.3f} nm',
running=True,
can_cancel=True,
phase='moving',
)
[docs]
def _advance_when_in_position(self) -> None:
focus_state = self._latest_focus_state()
if focus_state is None:
return
current_z, target_z, is_at_target = focus_state
requested_z = float(self._steps[self._current_step_index])
if not is_at_target:
return
if not np.isclose(target_z, requested_z):
return
self._phase = 'capturing'
self._step_capture_complete = False
self._current_step_capture_earliest_timestamp = time()
self.send_ipc(SetAcquisitionOnCommand(True))
self.send_ipc(
ArmZLUTSweepCaptureCommand(
step_index=self._current_step_index,
motor_z_value=float(current_z),
remaining_profiles_per_bead=self._profiles_per_bead,
earliest_timestamp=self._current_step_capture_earliest_timestamp,
bead_ids=self._bead_id_payload(),
bead_rois=self._bead_roi_payload(),
)
)
self._send_state(
f'Capturing step {self._current_step_index + 1} of {self._steps.size}.',
detail=f'Motor position: {current_z:.3f} nm',
running=True,
can_cancel=True,
phase='capturing',
)
[docs]
def _advance_after_capture(self) -> None:
if not self._step_capture_complete:
return
self.send_ipc(SetAcquisitionOnCommand(False))
self._step_capture_complete = False
self._current_step_index += 1
if self._current_step_index >= self._steps.size:
self._complete_session()
return
self._issue_move_for_current_step()
[docs]
def _complete_session(self) -> None:
if self._dataset is None:
self._fail_session('Z-LUT sweep dataset is unavailable at completion time.')
return
self._dataset.set_state(ZLUTSweepDataset.STATE_COMPLETE)
self.send_ipc(DisarmZLUTSweepCaptureCommand())
self.send_ipc(SetAcquisitionOnCommand(self._previous_acquisition_on))
self._active = False
self._cancel_requested = False
self._phase = 'processing'
self._send_progress(force=True)
self._send_state(
'Sweep capture complete. Processing generated Z-LUTs...',
detail='Averaging captured profiles for evaluation.',
running=True,
can_cancel=False,
phase='processing',
)
try:
self._build_generated_zluts()
except Exception as exc:
reason = str(exc).strip() or repr(exc)
self._fail_session(f'Failed to process captured sweep data: {reason}')
return
self._phase = 'evaluating'
self._send_evaluation_state(active=True)
self._send_state(
'Review the generated Z-LUT.',
detail='Select a bead, then save and load the generated Z-LUT or cancel to discard it.',
running=False,
can_cancel=False,
phase='evaluating',
)
[docs]
def _cancel_session(self) -> None:
self._clear_pending_profile_length_request()
self.send_ipc(DisarmZLUTSweepCaptureCommand())
self.send_ipc(SetAcquisitionOnCommand(self._previous_acquisition_on))
self._send_state('Z-LUT generation canceled.', running=False, can_cancel=False, phase='idle')
self._cleanup_runtime_state(destroy_dataset=True)
[docs]
def _fail_session(self, reason: str) -> None:
logger.warning('Z-LUT generation failed: %s', reason)
self.send_ipc(ShowErrorCommand(text='Z-LUT generation failed', details=reason))
self._clear_pending_profile_length_request()
self.send_ipc(DisarmZLUTSweepCaptureCommand())
self.send_ipc(SetAcquisitionOnCommand(self._previous_acquisition_on))
self._send_state('Z-LUT generation failed.', detail=reason, running=False, can_cancel=False, phase='idle')
self._cleanup_runtime_state(destroy_dataset=True)
[docs]
def _fail_evaluation(self, reason: str) -> None:
logger.warning('Z-LUT evaluation failed: %s', reason)
self.send_ipc(ShowErrorCommand(text='Generated Z-LUT evaluation failed', details=reason))
self._send_state(
'Generated Z-LUT evaluation failed.',
detail=reason,
running=False,
can_cancel=False,
phase='evaluating',
)
[docs]
def _fail_startup(self, exc: Exception) -> None:
reason = str(exc).strip() or repr(exc)
logger.warning('Could not start Z-LUT generation: %s', reason)
self.send_ipc(ShowErrorCommand(text='Could not start Z-LUT generation', details=reason))
self._clear_pending_profile_length_request()
self._send_state('Generation failed to start.', detail=reason, phase='idle')
self._cleanup_runtime_state(destroy_dataset=True)
[docs]
def _cleanup_runtime_state(self, *, destroy_dataset: bool) -> None:
self._active = False
self._cancel_requested = False
self._current_step_index = 0
self._generated_zluts = {}
self._last_progress_emit = 0.0
self._phase = 'idle'
self._profile_length = None
self._profiles_per_bead = 0
self._pending_start_request = None
self._current_step_capture_earliest_timestamp = 0.0
self._current_step_profiles_written = 0
self._requested_range = None
self._session_bead_roi_ids = np.zeros((0,), dtype=np.uint32)
self._session_bead_roi_values = np.zeros((0, 4), dtype=np.uint32)
self._selected_bead_id = None
self._step_capture_complete = False
self._steps = np.zeros((0,), dtype=np.float64)
self._reset_dataset(destroy=destroy_dataset)
self._send_evaluation_state(active=False)
[docs]
def _reset_dataset(self, *, destroy: bool) -> None:
if self._dataset is None:
return
dataset = self._dataset
self._dataset = None
if destroy:
try:
dataset.destroy()
except Exception:
logger.exception('Failed to destroy Z-LUT sweep dataset')
else:
dataset.close()
[docs]
def _latest_focus_state(self) -> tuple[float, float, bool] | None:
if self._focus_buffer is None:
return None
data = self._focus_buffer.peak_sorted()
if data.size == 0:
return None
finite_rows = np.isfinite(data[:, 0])
if not np.any(finite_rows):
return None
timestamp, current_z, target_z, is_at_target = data[finite_rows][-1, :]
_ = timestamp
return float(current_z), float(target_z), bool(round(is_at_target))
[docs]
def _clear_pending_profile_length_request(self) -> None:
self.send_ipc(ClearPendingZLUTProfileLengthCommand())
[docs]
def _bead_id_payload(self) -> tuple[int, ...]:
return tuple(int(bead_id) for bead_id in self._session_bead_roi_ids)
[docs]
def _bead_roi_payload(self) -> tuple[tuple[int, int, int, int], ...]:
return tuple(tuple(int(value) for value in roi) for roi in self._session_bead_roi_values)
[docs]
def _maybe_send_progress(self) -> None:
self._send_progress(force=False)
[docs]
def _send_progress(self, *, force: bool) -> None:
now = time()
if not force and (now - self._last_progress_emit) < 0.1:
return
self._last_progress_emit = now
capture_count = 0
capture_capacity = 0
if self._dataset is not None:
capture_count = self._dataset.get_count()
capture_capacity = self._dataset.get_capacity()
motor_z_value = None
focus_state = self._latest_focus_state()
if focus_state is not None:
motor_z_value = focus_state[0]
display_step = self._current_step_index
if self._active and self._steps.size > 0:
display_step = min(self._current_step_index + 1, int(self._steps.size))
self.send_ipc(
UpdateZLUTGenerationProgressCommand(
current_step=display_step,
total_steps=int(self._steps.size),
capture_count=capture_count,
capture_capacity=capture_capacity,
motor_z_value=motor_z_value,
)
)
[docs]
def _send_state(
self,
status: str,
*,
detail: str | None = None,
running: bool = False,
can_cancel: bool = False,
phase: str = 'idle',
) -> None:
z_axis_min_nm = None
z_axis_max_nm = None
z_axis_descending = False
if self._requested_range is not None:
z_axis_min_nm = float(min(self._requested_range[0], self._requested_range[2]))
z_axis_max_nm = float(max(self._requested_range[0], self._requested_range[2]))
z_axis_descending = bool(self._requested_range[2] < self._requested_range[0])
self.send_ipc(
UpdateZLUTGenerationStateCommand(
status=status,
detail=detail,
running=running,
can_cancel=can_cancel,
phase=phase,
z_axis_min_nm=z_axis_min_nm,
z_axis_max_nm=z_axis_max_nm,
z_axis_descending=z_axis_descending,
)
)
[docs]
def _send_evaluation_state(self, *, active: bool) -> None:
self.send_ipc(
UpdateZLUTGenerationEvaluationCommand(
active=active,
bead_ids=sorted(self._generated_zluts),
selected_bead_id=self._selected_bead_id,
)
)
[docs]
def _build_generated_zluts(self) -> None:
if self._dataset is None:
raise RuntimeError('Z-LUT sweep dataset is not available.')
snapshot = self._dataset.peak()
valid_rows = snapshot['valid_flags'] != 0
if not np.any(valid_rows):
raise RuntimeError('No valid captured sweep profiles are available.')
bead_ids = snapshot['bead_ids'][valid_rows]
step_indices = snapshot['step_indices'][valid_rows]
motor_z_values = snapshot['motor_z_values'][valid_rows]
profiles = snapshot['profiles'][valid_rows]
expected_steps = np.arange(int(self._dataset.n_steps), dtype=np.uint32)
unique_steps = np.unique(step_indices)
if unique_steps.shape != expected_steps.shape or not np.array_equal(unique_steps, expected_steps):
raise RuntimeError('Sweep capture is missing one or more step indices.')
generated: dict[int, GeneratedZLUTResult] = {}
for bead_id in np.unique(bead_ids):
bead_mask = bead_ids == bead_id
bead_step_indices = step_indices[bead_mask]
bead_profiles = profiles[bead_mask]
bead_motor_z = motor_z_values[bead_mask]
averaged_profiles: list[np.ndarray] = []
z_references: list[float] = []
for step_index in expected_steps:
step_mask = bead_step_indices == step_index
if not np.any(step_mask):
raise RuntimeError(
f'Data is corrupt. Please try again.'
)
step_profiles = bead_profiles[step_mask]
step_motor_z = bead_motor_z[step_mask]
averaged_profiles.append(np.nanmean(step_profiles, axis=0))
z_references.append(float(np.nanmean(step_motor_z)))
averaged_matrix = np.asarray(averaged_profiles, dtype=np.float64)
zlut_array = np.vstack((np.asarray(z_references, dtype=np.float64), averaged_matrix.T))
zlut_array = np.where(np.isfinite(zlut_array), zlut_array, np.nan)
generated[int(bead_id)] = GeneratedZLUTResult(bead_id=int(bead_id), zlut_array=zlut_array)
if not generated:
raise RuntimeError('No generated Z-LUT candidates were produced from the sweep dataset.')
self._generated_zluts = generated
self._selected_bead_id = min(generated)
[docs]
def _discover_focus_motor_name(self) -> str | None:
focus_motor_names: list[str] = []
for name, hardware_type in self.hardware_types.items():
try:
if issubclass(hardware_type, FocusMotorBase):
focus_motor_names.append(name)
except TypeError:
continue
if len(focus_motor_names) == 1:
return focus_motor_names[0]
return None
@staticmethod
[docs]
def _validate_sweep_limits(start_nm: float, stop_nm: float, z_min: float, z_max: float) -> None:
lower_limit = float(min(z_min, z_max))
upper_limit = float(max(z_min, z_max))
requested_min = float(min(start_nm, stop_nm))
requested_max = float(max(start_nm, stop_nm))
if requested_min < lower_limit or requested_max > upper_limit:
raise ValueError(
'Requested sweep range '
f'[{requested_min:.3f}, {requested_max:.3f}] nm exceeds focus motor limits '
f'[{lower_limit:.3f}, {upper_limit:.3f}] nm.'
)
@staticmethod
[docs]
def _build_steps(start_nm: float, step_nm: float, stop_nm: float) -> np.ndarray:
start = float(start_nm)
step = float(step_nm)
stop = float(stop_nm)
if np.isclose(step, 0.0):
raise ValueError('Step size must be non-zero.')
if np.isclose(start, stop):
raise ValueError('Z-LUT generation requires at least two z positions.')
delta = stop - start
if np.sign(delta) != np.sign(step):
raise ValueError('Step size direction must point from start toward stop.')
intervals = delta / step
rounded_intervals = int(round(intervals))
if rounded_intervals < 0 or not np.isclose(start + rounded_intervals * step, stop):
raise ValueError('Stop position must land exactly on the requested step grid.')
return start + step * np.arange(rounded_intervals + 1, dtype=np.float64)