from collections import OrderedDict
from math import floor, ceil
import os
import sys
from time import time
import traceback
from typing import Callable, Iterable
from warnings import warn
import numpy as np
from PyQt6.QtCore import QEvent, QPoint, QRectF, QSettings, Qt, QThread, QTimer
from PyQt6.QtGui import QGuiApplication, QImage, QPixmap
from PyQt6.QtWidgets import (
QApplication,
QFileDialog,
QFrame,
QHBoxLayout,
QLabel,
QLayout,
QMainWindow,
QMessageBox,
QScrollArea,
QSizePolicy,
QVBoxLayout,
QWidget,
)
from magscope._logging import get_logger
from magscope.auto_bead_selection import copy_latest_image, roi_overlaps
from magscope.datatypes import DatasetNotReadyError, VideoBuffer, ZLUTSweepDataset
from magscope.ipc import Delivery, register_ipc_command
from magscope.ipc_commands import *
from magscope.ui.auto_bead_selection_dialog import AutoBeadSelectionDialog
from magscope.ui.controls import (
AcquisitionPanel,
AllanDeviationPanel,
BeadSelectionPanel,
CameraPanel,
ControlPanelBase,
HistogramPanel,
HelpPanel,
MagScopeSettingsPanel,
PlotSettingsPanel,
ProfilePanel,
ResetPanel,
ScriptPanel,
StatusPanel,
TrackingOptionsPanel,
XYLockPanel,
ZLUTGenerationDialog,
ZLUTGenerationPanel,
ZLUTPanel,
ZLockPanel,
has_tweezepy_support,
)
from magscope.ui.panel_layout import (
PANEL_MIME_TYPE,
PanelLayoutManager,
PanelWrapper,
ReorderableColumn,
)
from magscope.ui.plots import PlotWorker, TimeSeriesPlotBase
from magscope.ui.video_viewer import VideoViewer
from magscope.ui.widgets import BeadGraphic, CollapsibleGroupBox, GripSplitter, ResizableLabel
from magscope.processes import ManagerProcessBase
from magscope.scripting import ScriptStatus, register_script_command
from magscope.settings import MagScopeSettings
from magscope.utils import AcquisitionMode, numpy_type_to_qt_image_type
[docs]
logger = get_logger("ui.ui")
[docs]
class _StartupReadyWindow(QMainWindow):
def __init__(self, on_ready: Callable[[], None]):
super().__init__()
[docs]
self._on_ready = on_ready
[docs]
self._startup_ready_scheduled = False
[docs]
self._startup_shown = False
[docs]
def event(self, event):
event_type = event.type()
if event_type == QEvent.Type.Show:
self._startup_shown = True
self._maybe_schedule_startup_ready(after_paint=False)
elif event_type == QEvent.Type.Paint:
self._maybe_schedule_startup_ready(after_paint=True)
return super().event(event)
[docs]
def _maybe_schedule_startup_ready(self, *, after_paint: bool) -> None:
if self._startup_ready_scheduled or not self._startup_shown or not self.isVisible():
return
platform_name = QGuiApplication.platformName()
window_handle = self.windowHandle()
is_exposed = window_handle is not None and window_handle.isExposed()
if not (is_exposed or after_paint or platform_name == "offscreen"):
return
self._startup_ready_scheduled = True
QTimer.singleShot(0, self._on_ready)
[docs]
class UIManager(ManagerProcessBase):
def __init__(self):
super().__init__()
[docs]
self._active_bead_graphic: BeadGraphic | None = None
[docs]
self._active_bead_id: int | None = None
[docs]
self._bead_rois: dict[int, tuple[int, int, int, int]] = {}
[docs]
self._pending_bead_add_id: int | None = None
[docs]
self._pending_bead_add_roi: tuple[int, int, int, int] | None = None
[docs]
self._bead_next_id: int = 0
[docs]
self.beads_in_view_on = False
[docs]
self.beads_in_view_count = 1
[docs]
self.beads_in_view_marker_size = 20
[docs]
self.central_layouts: list[QLayout] = []
[docs]
self.controls: Controls | None = None
[docs]
self.controls_to_add = []
[docs]
self._display_rate_counter: int = 0
[docs]
self._display_rate_last_time: float = time()
[docs]
self._display_rate_last_rate: float = 0
[docs]
self._n_windows: int | None = None
[docs]
self.plot_worker: PlotWorker
[docs]
self.plot_thread: QThread
[docs]
self.plots_to_add: list[TimeSeriesPlotBase] = []
[docs]
self.qt_app: QApplication | None = None
[docs]
self.reference_bead: int | None = None
[docs]
self._timer: QTimer | None = None
[docs]
self._timer_video_view: QTimer | None = None
[docs]
self._video_buffer_last_index: int = 0
[docs]
self._video_viewer_need_reset: bool = True
[docs]
self.video_viewer: VideoViewer | None = None
[docs]
self.windows: list[QMainWindow] = []
[docs]
self._suppress_bead_roi_updates: bool = False
[docs]
self._last_applied_roi: int | None = None
[docs]
self._settings_persistence_warning_shown = False
[docs]
self._bead_roi_capacity = 10000
[docs]
self._auto_bead_selection_dialog: AutoBeadSelectionDialog | None = None
[docs]
self._startup_ready_sent = False
[docs]
self._zlut_generation_dialog: ZLUTGenerationDialog | None = None
[docs]
self._zlut_generation_phase = 'idle'
[docs]
self._zlut_generation_z_axis_min_nm: float | None = None
[docs]
self._zlut_generation_z_axis_max_nm: float | None = None
[docs]
self._zlut_generation_z_axis_descending = False
[docs]
self._zlut_sweep_dataset: ZLUTSweepDataset | None = None
[docs]
self._zlut_evaluation_bead_ids: list[int] = []
[docs]
self._zlut_evaluation_selected_bead_id: int | None = None
[docs]
self._zlut_preview_last_poll: float = 0.0
[docs]
self._shutdown_complete = False
@staticmethod
[docs]
def _zlut_requested_sweep_edges(
z_axis_min_nm: float | None,
z_axis_max_nm: float | None,
n_steps: int,
) -> tuple[float | None, float | None]:
if z_axis_min_nm is None or z_axis_max_nm is None or int(n_steps) <= 0:
return None, None
if int(n_steps) == 1:
return float(z_axis_min_nm), float(z_axis_max_nm)
step_spacing = float(z_axis_max_nm - z_axis_min_nm) / float(int(n_steps) - 1)
half_step = 0.5 * step_spacing
return float(z_axis_min_nm - half_step), float(z_axis_max_nm + half_step)
@staticmethod
[docs]
def _build_zlut_preview_payload(
preview_snapshot: dict[str, object],
*,
z_axis_min_nm: float | None,
z_axis_max_nm: float | None,
z_axis_descending: bool,
) -> dict[str, object]:
state = int(preview_snapshot['state'])
n_steps = int(preview_snapshot['n_steps'])
profiles_per_bead = int(preview_snapshot['profiles_per_bead'])
x_axis_min, x_axis_max = UIManager._zlut_requested_sweep_edges(
z_axis_min_nm,
z_axis_max_nm,
n_steps,
)
preview_image = None
image_x_min = None
image_x_max = None
mode = 'Raw sweep'
profiles = np.asarray(preview_snapshot['profiles'], dtype=np.float64)
if profiles.size > 0:
step_indices = np.asarray(preview_snapshot['step_indices'], dtype=np.uint32)
selected_motor_z_values = np.asarray(
preview_snapshot['motor_z_values'],
dtype=np.float64,
)
if state == ZLUTSweepDataset.STATE_COMPLETE:
mode = 'Averaged sweep'
unique_steps = np.unique(step_indices)
averaged_profiles = []
averaged_z_positions = []
for step_index in unique_steps:
step_profiles = profiles[step_indices == step_index]
step_motor_z_values = selected_motor_z_values[step_indices == step_index]
if step_profiles.size == 0:
continue
averaged_profiles.append(np.nanmean(step_profiles, axis=0))
averaged_z_positions.append(float(np.nanmean(step_motor_z_values)))
if averaged_profiles:
averaged_profiles_array = np.asarray(averaged_profiles, dtype=np.float64)
averaged_z_positions_array = np.asarray(averaged_z_positions, dtype=np.float64)
order = np.argsort(averaged_z_positions_array)
preview_image = averaged_profiles_array[order].T
image_x_min = x_axis_min
image_x_max = x_axis_max
else:
slot_indices = np.zeros((profiles.shape[0],), dtype=np.int64)
per_step_capture_counts: dict[int, int] = {}
for row_index, step_index in enumerate(step_indices):
step_index_int = int(step_index)
within_step_index = per_step_capture_counts.get(step_index_int, 0)
per_step_capture_counts[step_index_int] = within_step_index + 1
if z_axis_descending:
step_rank = n_steps - 1 - step_index_int
else:
step_rank = step_index_int
slot_indices[row_index] = step_rank * profiles_per_bead + within_step_index
sorted_order = np.argsort(slot_indices, kind='stable')
sorted_slot_indices = np.asarray(slot_indices[sorted_order], dtype=np.int64)
sorted_profiles = np.asarray(profiles[sorted_order], dtype=np.float64)
if x_axis_min is not None and x_axis_max is not None and sorted_profiles.shape[0] > 0:
total_slots = n_steps * profiles_per_bead
if total_slots > 0:
slot_width = float(x_axis_max - x_axis_min) / float(total_slots)
min_slot = int(np.min(sorted_slot_indices))
max_slot = int(np.max(sorted_slot_indices))
sparse_width = max_slot - min_slot + 1
preview_image = np.full(
(sorted_profiles.shape[1], sparse_width),
np.nan,
dtype=np.float64,
)
for profile_row, slot_index in zip(
sorted_profiles,
sorted_slot_indices,
strict=False,
):
preview_image[:, int(slot_index - min_slot)] = profile_row
image_x_min = float(x_axis_min + min_slot * slot_width)
image_x_max = float(x_axis_min + (max_slot + 1) * slot_width)
elif sorted_profiles.shape[0] > 0:
preview_image = sorted_profiles.T
return {
'state': state,
'count': int(preview_snapshot['count']),
'capacity': int(preview_snapshot['capacity']),
'n_steps': n_steps,
'n_beads': int(preview_snapshot['n_beads']),
'profiles_per_bead': profiles_per_bead,
'profile_length': int(preview_snapshot['profile_length']),
'preview_image': preview_image,
'selected_bead_id': preview_snapshot['selected_bead_id'],
'mode': mode,
'motor_z_min': preview_snapshot['motor_z_min'],
'motor_z_max': preview_snapshot['motor_z_max'],
'expected_capture_count': n_steps * profiles_per_bead,
'x_axis_label': 'Z Position (nm)',
'x_axis_min': x_axis_min,
'x_axis_max': x_axis_max,
'image_x_min': image_x_min,
'image_x_max': image_x_max,
}
[docs]
def setup(self):
self.qt_app = QApplication.instance()
if not self.qt_app:
self.qt_app = QApplication(sys.argv)
QGuiApplication.styleHints().setColorScheme(Qt.ColorScheme.Dark)
if self.settings is not None:
self._last_applied_roi = self.settings["ROI"]
# If the number of windows is not specified, then use the number of screens
if self._n_windows is None:
self._n_windows = len(QApplication.screens())
# Create the live plots in a separate thread (but dont start it)
self.plots_widget = ResizableLabel()
self.plots_widget.setScaledContents(True)
self.plots_thread = QThread()
self.plot_worker = PlotWorker()
for plot in self.plots_to_add:
self.plot_worker.add_plot(plot)
self.plot_worker.set_locks(self.locks)
self.plot_worker.setup()
# Create controls panel
self.controls = Controls(self)
# Create the video viewer
self.video_viewer = VideoViewer()
self._refresh_bead_overlay()
# Finally start the live plots
self.plot_worker.moveToThread(self.plots_thread)
self.plots_thread.started.connect(self.plot_worker.run) # noqa
self.plot_worker.image_signal.connect(self._set_plot_image)
self.plots_widget.resized.connect(self.update_plot_figure_size)
self.plots_thread.start(QThread.Priority.LowPriority)
# Create the layouts for each window
self.create_central_widgets()
# Create the windows
for i in range(self._n_windows):
if i == 0:
window = _StartupReadyWindow(self._notify_startup_ready)
else:
window = QMainWindow()
window.setWindowTitle("MagScope")
screen = QApplication.screens()[i % len(QApplication.screens())]
geometry = screen.geometry()
window.setGeometry(
geometry.x(), geometry.y(), geometry.width(), geometry.height()
)
window.setMinimumWidth(300)
window.setMinimumHeight(300)
window.closeEvent = lambda _, w=window: self.quit()
window.showMaximized()
window.setCentralWidget(self.central_widgets[i])
self.windows.append(window)
self._show_settings_persistence_warning_if_needed()
# Connect the video viewer
self.video_viewer.coordinatesChanged.connect(self.update_view_coords)
self.video_viewer.sceneClicked.connect(self.callback_view_clicked)
# Timer
self._timer = QTimer(self.qt_app)
self._timer.timeout.connect(self._main_loop_tick) # noqa
self._timer.setInterval(0)
self._timer.start()
# Timer - Video Display
self._timer_video_view = QTimer(self.qt_app)
self._timer_video_view.timeout.connect(self._update_view_and_hist_tick)
self._timer_video_view.setInterval(25)
self._timer_video_view.start()
# Start app
self._running = True
self.qt_app.exec()
[docs]
def _notify_startup_ready(self) -> None:
if self._startup_ready_sent:
return
if self._command_registry is None or self._pipe is None or self._magscope_quitting is None:
return
self._startup_ready_sent = True
self.send_ipc(StartupReadyCommand(process_name=self.name))
@register_ipc_command(
SetSettingsCommand, delivery=Delivery.BROADCAST, target="ManagerProcessBase"
)
[docs]
def set_settings(self, settings: MagScopeSettings):
"""Apply new settings and clear beads if the ROI size changed."""
previous_roi = self._last_applied_roi
super().set_settings(settings)
self._show_settings_persistence_warning_if_needed()
new_roi = self.settings["ROI"]
if previous_roi is not None and new_roi != previous_roi:
self.clear_beads()
self._last_applied_roi = new_roi
self._update_roi_labels(new_roi)
[docs]
def _show_settings_persistence_warning_if_needed(self) -> None:
if self._settings_persistence_warning_shown:
return
if self.settings is None or self.settings.persistence_available:
return
if not self.windows:
return
self._settings_persistence_warning_shown = True
self._show_settings_persistence_warning()
[docs]
def _show_settings_persistence_warning(self) -> None:
msg = QMessageBox(self.windows[0])
msg.setIcon(QMessageBox.Icon.Warning)
msg.setWindowTitle("Settings Persistence Unavailable")
msg.setText(
"Some settings may not automatically load or save for this session."
)
msg.setInformativeText(
"MagScope will continue running with in-memory settings."
)
msg.setStandardButtons(QMessageBox.StandardButton.Ok)
msg.show()
[docs]
def _set_plot_image(self, img: QImage) -> None:
if self.plots_widget is None:
return
self.plots_widget.setPixmap(QPixmap.fromImage(img))
@staticmethod
[docs]
def _disconnect_signal(signal, callback) -> None:
try:
signal.disconnect(callback)
except (RuntimeError, TypeError):
pass
@staticmethod
[docs]
def _stop_timer(timer: QTimer | None) -> None:
if timer is None:
return
try:
timer.stop()
except RuntimeError:
pass
try:
timer.deleteLater()
except RuntimeError:
pass
@staticmethod
[docs]
def _shutdown_plot_worker(self) -> None:
plot_worker = getattr(self, 'plot_worker', None)
plots_thread = getattr(self, 'plots_thread', None)
plots_widget = getattr(self, 'plots_widget', None)
if plot_worker is None:
return
if plots_widget is not None:
self._disconnect_signal(plot_worker.image_signal, self._set_plot_image)
self._disconnect_signal(plots_widget.resized, self.update_plot_figure_size)
stop = getattr(plot_worker, '_stop', None)
if callable(stop):
stop()
if plots_thread is not None:
try:
plots_thread.quit()
except RuntimeError:
pass
try:
plots_thread.wait()
except RuntimeError:
pass
try:
plots_thread.deleteLater()
except RuntimeError:
pass
dispose = getattr(plot_worker, 'dispose', None)
if callable(dispose):
dispose()
self.plot_worker = None
self.plots_thread = None
[docs]
def quit(self):
if self._shutdown_complete or self._quitting.is_set():
return
can_use_process_quit = (
self._command_registry is not None
and hasattr(self._command_registry, 'route_for')
and (self._pipe is None or hasattr(self._pipe, 'close'))
and (self._magscope_quitting is None or hasattr(self._magscope_quitting, 'is_set'))
)
if not can_use_process_quit:
self._quitting.set()
self._running = False
else:
super().quit()
self._running = False
self._stop_timer(self._timer)
self._timer = None
self._stop_timer(self._timer_video_view)
self._timer_video_view = None
if self.video_viewer is not None:
coordinates_changed = getattr(self.video_viewer, 'coordinatesChanged', None)
if coordinates_changed is not None:
self._disconnect_signal(coordinates_changed, self.update_view_coords)
scene_clicked = getattr(self.video_viewer, 'sceneClicked', None)
if scene_clicked is not None:
self._disconnect_signal(scene_clicked, self.callback_view_clicked)
self._shutdown_plot_worker()
if self._auto_bead_selection_dialog is not None:
force_close = getattr(self._auto_bead_selection_dialog, 'force_close', None)
if callable(force_close):
force_close()
self._auto_bead_selection_dialog = None
self._detach_zlut_sweep_dataset()
self._zlut_generation_phase = 'idle'
self._zlut_generation_z_axis_min_nm = None
self._zlut_generation_z_axis_max_nm = None
self._zlut_generation_z_axis_descending = False
self._zlut_evaluation_bead_ids = []
self._zlut_evaluation_selected_bead_id = None
if self._zlut_generation_dialog is not None:
force_close = getattr(self._zlut_generation_dialog, 'force_close', None)
if callable(force_close):
force_close()
self._zlut_generation_dialog = None
for window in self.windows:
self._close_widget(window)
self.windows = []
for central_widget in self.central_widgets:
self._close_widget(central_widget)
self.central_widgets = []
self.central_layouts = []
self._close_widget(self.controls)
self.controls = None
self._close_widget(self.video_viewer)
self.video_viewer = None
self._close_widget(getattr(self, 'plots_widget', None))
self.plots_widget = None
if self.qt_app is not None:
self.qt_app.quit()
self._shutdown_complete = True
[docs]
def do_main_loop(self):
# Because the UIManager is a special case with a GUI
# the main loop is actually called by a timer, not the
# run method of it's super()
if self._running:
self._update_display_rate()
self.update_video_buffer_status()
self.update_video_processors_status()
self.controls.profile_panel.update_plot()
self._update_zlut_generation_dialog()
self.receive_ipc()
[docs]
def _handle_timer_exception(self, exc: BaseException) -> None:
"""Surface exceptions that occur inside Qt timer callbacks."""
self._running = False
self._report_exception(exc)
if self.qt_app is not None:
self.qt_app.quit()
[docs]
def _run_safe(self, callback: Callable[[], None]) -> None:
try:
callback()
except Exception as exc:
self._handle_timer_exception(exc)
[docs]
def _main_loop_tick(self) -> None:
self._run_safe(self.do_main_loop)
[docs]
def _update_view_and_hist_tick(self) -> None:
self._run_safe(self._update_view_and_hist)
[docs]
def set_selected_bead(self, bead: int):
old_selected = self._normalize_bead_id(self.selected_bead)
old_reference = self._normalize_bead_id(self.reference_bead)
self.selected_bead = bead
normalized_bead = self._normalize_bead_id(bead)
if hasattr(self, 'plot_worker') and self.plot_worker is not None:
self.plot_worker.selected_bead_signal.emit(bead)
self._sync_plot_settings_selected_bead(bead)
if self.shared_values is not None:
self.shared_values.live_profile_bead.value = bead
self._clear_live_profile_buffer()
self._set_active_bead(normalized_bead)
self._update_bead_highlights(
old_selected=old_selected,
old_reference=old_reference,
)
[docs]
def set_live_profile_monitor_enabled(self, enabled: bool) -> None:
if self.shared_values is not None:
self.shared_values.live_profile_enabled.value = 1 if enabled else 0
if not enabled:
self._clear_live_profile_buffer()
[docs]
def set_reference_bead(self, bead: int | None):
old_selected = self._normalize_bead_id(self.selected_bead)
old_reference = self._normalize_bead_id(self.reference_bead)
self.reference_bead = bead
emitted_bead = -1 if bead is None else bead
if hasattr(self, 'plot_worker') and self.plot_worker is not None:
self.plot_worker.reference_bead_signal.emit(emitted_bead)
self._sync_plot_settings_reference_bead(bead)
self._update_bead_highlights(
old_selected=old_selected,
old_reference=old_reference,
)
[docs]
def _sync_plot_settings_selected_bead(self, bead: int) -> None:
if self.controls is None or not hasattr(self.controls, 'plot_settings_panel'):
return
lineedit = self.controls.plot_settings_panel.selected_bead.lineedit
lineedit.blockSignals(True)
lineedit.setText(str(bead))
lineedit.blockSignals(False)
[docs]
def _sync_plot_settings_reference_bead(self, bead: int | None) -> None:
if self.controls is None or not hasattr(self.controls, 'plot_settings_panel'):
return
lineedit = self.controls.plot_settings_panel.reference_bead.lineedit
lineedit.blockSignals(True)
lineedit.setText('' if bead is None or bead < 0 else str(bead))
lineedit.blockSignals(False)
[docs]
def _normalize_bead_id(self, bead: int | None) -> int | None:
if bead is None or bead < 0:
return None
return bead
[docs]
def _get_bead_highlight_state(self, bead_id: int) -> str:
selected_id = self._normalize_bead_id(self.selected_bead)
reference_id = self._normalize_bead_id(self.reference_bead)
if bead_id == selected_id:
return 'selected'
if bead_id == reference_id:
return 'reference'
return 'default'
[docs]
def _refresh_bead_overlay(self) -> None:
if self.video_viewer is not None:
self.video_viewer.set_bead_overlay(
self._bead_rois,
self._active_bead_id,
self._normalize_bead_id(self.selected_bead),
self._normalize_bead_id(self.reference_bead),
)
self.video_viewer.viewport().update()
self._update_auto_bead_selection_button_state()
[docs]
def _can_start_auto_bead_selection(self) -> bool:
return (
self.controls is not None
and self.video_viewer is not None
and self.video_buffer is not None
and self._auto_bead_selection_dialog is None
and self._pending_bead_add_id is None
and not self._current_scene_rect().isNull()
)
[docs]
def _snapshot_recent_image(self) -> np.ndarray | None:
if self.video_buffer is None:
return None
# Intentionally use peak_image() as a lightweight snapshot. It does not
# verify that a frame has been written yet, but by the time a user
# starts auto bead selection the buffer is expected to already contain
# a recent frame.
_index, image_bytes = self.video_buffer.peak_image()
return copy_latest_image(
image_bytes,
self.video_buffer.image_shape,
self.video_buffer.dtype,
)
[docs]
def _current_image_display_scale(self) -> int:
if self.video_buffer is None:
return 1
cam_bits = self.camera_type.bits
dtype_bits = np.iinfo(self.video_buffer.dtype).bits
return 2 ** (dtype_bits - cam_bits)
[docs]
def _current_scene_rect(self) -> QRectF:
if self.video_viewer is None:
return QRectF()
image_scene_rect = getattr(self.video_viewer, 'image_scene_rect', None)
if callable(image_scene_rect):
rect = image_scene_rect()
if not rect.isNull():
return rect
scene = getattr(self.video_viewer, 'scene', None)
if scene is None or not hasattr(scene, 'sceneRect'):
return QRectF()
return scene.sceneRect()
[docs]
def _current_visible_scene_rect(self) -> QRectF:
scene_rect = self._current_scene_rect()
if self.video_viewer is None or scene_rect.isNull():
return scene_rect
viewport = self.video_viewer.viewport()
if viewport is None or not hasattr(viewport, 'rect'):
return scene_rect
viewport_rect = viewport.rect()
if viewport_rect.isNull():
return scene_rect
visible_rect = self.video_viewer.mapToScene(viewport_rect).boundingRect()
visible_rect = visible_rect.intersected(scene_rect)
return scene_rect if visible_rect.isEmpty() else visible_rect
[docs]
def _next_random_bead_roi(
self,
rng: np.random.Generator,
visible_rect: QRectF,
) -> tuple[int, int, int, int] | None:
if self.settings is None:
return None
roi_width = int(self.settings['ROI'])
half_width = roi_width / 2
min_x = ceil(visible_rect.left() + half_width)
max_x = floor(visible_rect.right() - half_width)
min_y = ceil(visible_rect.top() + half_width)
max_y = floor(visible_rect.bottom() - half_width)
if min_x > max_x or min_y > max_y:
return None
center_x = int(rng.integers(min_x, max_x + 1))
center_y = int(rng.integers(min_y, max_y + 1))
return BeadGraphic.clamp_roi_to_scene(
BeadGraphic.roi_from_center(center_x, center_y, roi_width),
self._current_scene_rect(),
)
[docs]
def _set_active_bead(self, bead_id: int | None) -> None:
normalized_id = self._normalize_bead_id(bead_id)
if normalized_id is not None and normalized_id not in self._bead_rois:
normalized_id = None
if self._active_bead_graphic is not None:
self._active_bead_graphic.remove()
self._active_bead_graphic = None
self._active_bead_id = normalized_id
if normalized_id is None or self.video_viewer is None:
self._refresh_bead_overlay()
return
roi = self._bead_rois[normalized_id]
self._active_bead_graphic = BeadGraphic(self, normalized_id, roi, self.video_viewer.scene)
self._active_bead_graphic.set_selection_state(
self._get_bead_highlight_state(normalized_id)
)
self._refresh_bead_overlay()
[docs]
def on_active_bead_move_completed(
self,
bead_id: int,
roi: tuple[int, int, int, int],
) -> None:
if bead_id not in self._bead_rois:
return
self._bead_rois[bead_id] = roi
self._update_bead_roi(bead_id, roi)
self._refresh_bead_overlay()
@register_ipc_command(AddRandomBeadsCommand)
@register_script_command(AddRandomBeadsCommand)
[docs]
def add_random_beads(self, count: int, seed: int | None = None) -> None:
if count <= 0:
return
visible_rect = self._current_visible_scene_rect()
if visible_rect.isNull() or visible_rect.isEmpty():
self.show_error('No visible field of view', 'Cannot add random beads without a visible image area.')
return
remaining_capacity = self._bead_roi_capacity - self._bead_next_id
if remaining_capacity <= 0:
self.show_error(
'Maximum bead count reached',
'Remove beads or use Reassign IDs before adding more than 10000 beads.',
)
return
rng = np.random.default_rng(seed)
bead_rois: dict[int, tuple[int, int, int, int]] = {}
count_to_add = min(count, remaining_capacity)
for _ in range(count_to_add):
roi = self._next_random_bead_roi(rng, visible_rect)
if roi is None:
break
bead_rois[len(bead_rois)] = roi
if not bead_rois:
return
self._add_new_bead_batch(list(bead_rois.values()))
[docs]
def _add_new_bead_batch(
self,
rois: Iterable[tuple[int, int, int, int]],
) -> dict[int, tuple[int, int, int, int]]:
bead_rois: dict[int, tuple[int, int, int, int]] = {}
next_bead_id = self._bead_next_id
for roi in rois:
if next_bead_id >= self._bead_roi_capacity:
break
bead_rois[next_bead_id] = tuple(int(value) for value in roi)
next_bead_id += 1
if not bead_rois:
return {}
try:
if self.bead_roi_buffer is None:
updated_bead_rois = {**self._bead_rois, **bead_rois}
self._write_bead_rois_to_buffer(updated_bead_rois)
self._broadcast_bead_roi_update()
else:
self.bead_roi_buffer.add_beads(bead_rois)
self._broadcast_bead_roi_update()
except Exception:
self._update_next_bead_id_label()
raise
self._bead_rois.update(bead_rois)
self._bead_next_id = next_bead_id
self._update_next_bead_id_label()
self._set_active_bead(self._normalize_bead_id(self.selected_bead))
self._refresh_bead_overlay()
return bead_rois
[docs]
def _hit_test_bead(self, pos: QPoint) -> int | None:
if not self._bead_rois:
return None
selected_id = self._normalize_bead_id(self.selected_bead)
reference_id = self._normalize_bead_id(self.reference_bead)
best_match: tuple[int, float, int] | None = None
best_bead_id: int | None = None
for bead_id, (x0, x1, y0, y1) in self._bead_rois.items():
if not (x0 <= pos.x() <= x1 and y0 <= pos.y() <= y1):
continue
if bead_id == self._active_bead_id:
priority = 0
elif bead_id == selected_id:
priority = 1
elif bead_id == reference_id:
priority = 2
else:
priority = 3
center_x = (x0 + x1) / 2
center_y = (y0 + y1) / 2
distance_sq = (center_x - pos.x()) ** 2 + (center_y - pos.y()) ** 2
candidate = (priority, distance_sq, -bead_id)
if best_match is None or candidate < best_match:
best_match = candidate
best_bead_id = bead_id
return best_bead_id
[docs]
def _update_bead_highlight(self, bead_id: int) -> None:
if bead_id == self._active_bead_id and self._active_bead_graphic is not None:
self._active_bead_graphic.set_selection_state(
self._get_bead_highlight_state(bead_id)
)
[docs]
def _update_bead_highlights(
self,
*,
old_selected: int | None = None,
old_reference: int | None = None,
):
selected_id = self._normalize_bead_id(self.selected_bead)
reference_id = self._normalize_bead_id(self.reference_bead)
affected_ids = {
bead_id
for bead_id in (old_selected, old_reference, selected_id, reference_id)
if bead_id is not None
}
for bead_id in affected_ids:
self._update_bead_highlight(bead_id)
self._refresh_bead_overlay()
[docs]
def _clear_live_profile_buffer(self) -> None:
if self.live_profile_buffer is not None:
self.live_profile_buffer.clear()
@property
[docs]
def n_windows(self):
return self._n_windows
@n_windows.setter
def n_windows(self, value):
if self._running:
warn("Application already running", RuntimeWarning)
return
if not 1 <= value <= 3:
warn("Number of windows must be between 1 and 3")
return
self._n_windows = value
@property
[docs]
def bead_roi_updates_suppressed(self) -> bool:
return self._suppress_bead_roi_updates
@property
[docs]
def bead_next_id(self) -> int:
return self._bead_next_id
[docs]
def _broadcast_bead_roi_update(self) -> None:
if self._command_registry is None or self._pipe is None or self._magscope_quitting is None:
return
self.send_ipc(UpdateBeadRoisCommand())
[docs]
def _write_bead_rois_to_buffer(self, bead_rois: dict[int, tuple[int, int, int, int]]) -> None:
if self.bead_roi_buffer is None:
bead_ids = np.asarray(sorted(bead_rois), dtype=np.uint32)
bead_roi_values = np.asarray([bead_rois[int(bead_id)] for bead_id in bead_ids], dtype=np.uint32)
self._bead_roi_ids = bead_ids
if bead_roi_values.size == 0:
self._bead_roi_values = np.zeros((0, 4), dtype=np.uint32)
else:
self._bead_roi_values = bead_roi_values.reshape((-1, 4))
return
self.bead_roi_buffer.replace_beads(bead_rois)
self._refresh_bead_roi_cache()
[docs]
def update_view_coords(self):
pass
[docs]
def _update_view_and_hist(self):
# Get image and _write position
index, image_bytes = self.video_buffer.peak_image()
# Check if _write has changed (a new image is ready)
if self._video_buffer_last_index != index:
# Update the stored index
self._video_buffer_last_index = index
cam_bits = self.camera_type.bits
dtype_bits = np.iinfo(self.video_buffer.dtype).bits
scale = (2 ** (dtype_bits - cam_bits))
# Update the view
qt_img = QImage(
np.frombuffer(image_bytes, self.video_buffer.dtype).copy() *
scale, *self.video_buffer.image_shape,
numpy_type_to_qt_image_type(self.video_buffer.dtype))
self.video_viewer.set_pixmap(QPixmap.fromImage(qt_img))
if self._video_viewer_need_reset:
self.video_viewer.reset_view()
self._video_viewer_need_reset = False
# Update the bead position overlay
self._update_beads_in_view()
# Update the histogram
self.controls.histogram_panel.update_plot(image_bytes)
# Increment the display rate counter
self._display_rate_counter += 1
[docs]
def callback_view_clicked(self, pos: QPoint, button=Qt.MouseButton.LeftButton):
if self.controls is None:
return
if self._pending_bead_add_id is not None:
return
bead_id = self._hit_test_bead(pos)
if button == Qt.MouseButton.RightButton:
if bead_id is not None:
self.remove_bead(bead_id)
return
if bead_id is not None:
self._set_active_bead(bead_id)
self.set_selected_bead(bead_id)
return
self.add_bead(pos)
[docs]
def refresh_bead_rois(self):
super().refresh_bead_rois()
if self._pending_bead_add_id is None or self._pending_bead_add_roi is None:
return
bead_ids, bead_rois = self.get_cached_bead_rois()
pending_id = self._pending_bead_add_id
pending_roi = self._pending_bead_add_roi
matches = bead_ids == pending_id
if not np.any(matches):
return
roi = bead_rois[np.flatnonzero(matches)[0]]
if tuple(int(value) for value in roi) != pending_roi:
return
self._clear_pending_bead_add()
[docs]
def update_bead_rois(self):
self._write_bead_rois_to_buffer(self._bead_rois)
self._broadcast_bead_roi_update()
[docs]
def _add_bead_roi(self, bead_id: int, roi: tuple[int, int, int, int]) -> None:
if self.bead_roi_buffer is None:
self.update_bead_rois()
return
self.bead_roi_buffer.add_beads({bead_id: roi})
self._broadcast_bead_roi_update()
[docs]
def _update_bead_roi(self, bead_id: int, roi: tuple[int, int, int, int]) -> None:
if self.bead_roi_buffer is None:
self.update_bead_rois()
return
self.bead_roi_buffer.update_beads({bead_id: roi})
self._broadcast_bead_roi_update()
[docs]
def _update_multiple_bead_rois(
self,
bead_rois: dict[int, tuple[int, int, int, int]],
) -> None:
if not bead_rois:
return
if self.bead_roi_buffer is None:
self.update_bead_rois()
return
self.bead_roi_buffer.update_beads(bead_rois)
self._broadcast_bead_roi_update()
[docs]
def _remove_bead_roi(self, bead_id: int) -> None:
if self.bead_roi_buffer is None:
self.update_bead_rois()
return
self.bead_roi_buffer.remove_beads([bead_id])
self._broadcast_bead_roi_update()
@register_ipc_command(MoveBeadsCommand)
[docs]
def move_beads(self, moves: list[tuple[int, int, int]]):
moved_ids: list[int] = []
moved_rois: dict[int, tuple[int, int, int, int]] = {}
scene_rect = self._current_scene_rect()
self._suppress_bead_roi_updates = True
try:
for id, dx, dy in moves:
if id not in self._bead_rois:
continue
roi = BeadGraphic.move_roi(self._bead_rois[id], dx, dy, scene_rect)
self._bead_rois[id] = roi
if id == self._active_bead_id and self._active_bead_graphic is not None:
self._active_bead_graphic.set_roi_bounds(roi)
moved_ids.append(id)
moved_rois[id] = roi
finally:
self._suppress_bead_roi_updates = False
if not moved_ids:
return
self._update_multiple_bead_rois(moved_rois)
self._refresh_bead_overlay()
command = RemoveBeadsFromPendingMovesCommand(ids=moved_ids)
self.send_ipc(command)
[docs]
def add_bead(self, pos: QPoint):
if self._bead_next_id >= self._bead_roi_capacity:
self.show_error(
'Maximum bead count reached',
'Remove beads or use Reassign IDs before adding more than 10000 beads.',
)
return
id = self._bead_next_id
x = pos.x()
y = pos.y()
w = self.settings['ROI']
scene_rect = self._current_scene_rect()
roi = BeadGraphic.clamp_roi_to_scene(
BeadGraphic.roi_from_center(x, y, w),
scene_rect,
)
self._bead_rois[id] = roi
previous_next_bead_id = self._bead_next_id
self._bead_next_id += 1
self._update_next_bead_id_label()
# Update the bead ROI
self._pending_bead_add_id = id
self._pending_bead_add_roi = roi
try:
self._add_bead_roi(id, roi)
except Exception:
self._bead_rois.pop(id, None)
self._bead_next_id = previous_next_bead_id
self._update_next_bead_id_label()
self._clear_pending_bead_add()
raise
if id == self._normalize_bead_id(self.selected_bead):
self._set_active_bead(id)
self._refresh_bead_overlay()
[docs]
def start_auto_bead_selection(self) -> None:
if not self._can_start_auto_bead_selection():
return
image = self._snapshot_recent_image()
if image is None:
self.show_error('No live image available', 'Cannot start auto bead selection without a recent frame.')
return
dialog_parent = self.windows[0] if self.windows else None
dialog = AutoBeadSelectionDialog(
parent=dialog_parent,
image=image,
roi_size=self.settings['ROI'],
existing_rois=self._bead_rois,
display_scale=self._current_image_display_scale(),
)
dialog.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose, True)
dialog.finished.connect(self._on_auto_bead_selection_dialog_finished)
dialog.selectionAccepted.connect(self._apply_auto_bead_selection)
self._auto_bead_selection_dialog = dialog
self._update_auto_bead_selection_button_state()
dialog.open()
[docs]
def _apply_auto_bead_selection(self, rois: list[tuple[int, int, int, int]]) -> None:
remaining_capacity = self._bead_roi_capacity - self._bead_next_id
existing_rois = list(self._bead_rois.values())
accepted_rois: list[tuple[int, int, int, int]] = []
for roi in rois:
normalized_roi = tuple(int(value) for value in roi)
if any(roi_overlaps(normalized_roi, existing_roi) for existing_roi in existing_rois):
continue
if any(roi_overlaps(normalized_roi, kept_roi) for kept_roi in accepted_rois):
continue
accepted_rois.append(normalized_roi)
if not accepted_rois:
return
rois_to_add = accepted_rois[:max(0, remaining_capacity)]
skipped_due_to_capacity = len(accepted_rois) - len(rois_to_add)
if not rois_to_add:
if skipped_due_to_capacity > 0:
self._show_auto_bead_selection_capacity_warning(skipped_due_to_capacity)
return
self._add_new_bead_batch(rois_to_add)
if skipped_due_to_capacity > 0:
self._show_auto_bead_selection_capacity_warning(skipped_due_to_capacity)
[docs]
def _show_auto_bead_selection_capacity_warning(self, skipped_count: int) -> None:
bead_label = 'bead' if skipped_count == 1 else 'beads'
self.show_warning(
'Maximum bead count reached',
f'{skipped_count} {bead_label} could not be added because they would exceed '
f'the maximum allowed bead count of {self._bead_roi_capacity} beads.',
)
[docs]
def _on_auto_bead_selection_dialog_finished(self, _result: int) -> None:
self._auto_bead_selection_dialog = None
self._update_auto_bead_selection_button_state()
[docs]
def remove_bead(self, id: int):
old_selected = self._normalize_bead_id(self.selected_bead)
old_reference = self._normalize_bead_id(self.reference_bead)
if id not in self._bead_rois:
return
self._bead_rois.pop(id)
if id == self._active_bead_id:
self._set_active_bead(None)
# Update highlight colors to reflect selection/reference
self._update_bead_highlights(
old_selected=old_selected,
old_reference=old_reference,
)
# Update bead ROI
self._remove_bead_roi(id)
[docs]
def clear_beads(self):
self._clear_pending_bead_add()
self._set_active_bead(None)
self._bead_rois.clear()
self._bead_next_id = 0
self._update_next_bead_id_label()
# Update bead ROIs
if self.bead_roi_buffer is not None:
self.bead_roi_buffer.clear_beads()
self._refresh_bead_roi_cache()
else:
self._bead_roi_ids = np.zeros((0,), dtype=np.uint32)
self._bead_roi_values = np.zeros((0, 4), dtype=np.uint32)
self._broadcast_bead_roi_update()
self.set_reference_bead(None)
self.set_selected_bead(0)
self._refresh_bead_overlay()
[docs]
def reset_bead_ids(self):
self._clear_pending_bead_add()
if not self._bead_rois:
self._bead_next_id = 0
self._update_next_bead_id_label()
return
old_active_bead = self._active_bead_id
new_rois: dict[int, tuple[int, int, int, int]] = {}
id_mapping: dict[int, int] = {}
for new_id, (old_id, roi) in enumerate(sorted(self._bead_rois.items())):
id_mapping[old_id] = new_id
new_rois[new_id] = roi
self._bead_rois = new_rois
self._bead_next_id = len(self._bead_rois)
if self.selected_bead is not None:
new_selected = id_mapping.get(self.selected_bead, -1)
self.set_selected_bead(new_selected)
if self.reference_bead is not None:
new_reference = id_mapping.get(self.reference_bead)
self.set_reference_bead(new_reference)
self._update_bead_highlights()
self.update_bead_rois()
self._update_next_bead_id_label()
self._set_active_bead(id_mapping.get(old_active_bead))
self._refresh_bead_overlay()
[docs]
def _update_roi_labels(self, roi: int) -> None:
if self.controls is None:
return
self.controls.bead_selection_panel.roi_size_label.setText(
f"{roi} x {roi} pixels"
)
self.controls.z_lut_generation_panel.roi_size_label.setText(
f"{roi} x {roi} pixels"
)
[docs]
def _update_next_bead_id_label(self) -> None:
if self.controls is None:
return
self.controls.bead_selection_panel.update_next_bead_id_label(
self._bead_next_id
)
[docs]
def _clear_pending_bead_add(self) -> None:
self._pending_bead_add_id = None
self._pending_bead_add_roi = None
self._update_auto_bead_selection_button_state()
[docs]
def _calculate_next_bead_id(self) -> int:
if not self._bead_rois:
return 0
return max(self._bead_rois.keys()) + 1
[docs]
def update_video_processors_status(self):
busy = self.shared_values.video_process_busy_count.value
total = self.settings['video processors n']
text = f'{busy}/{total} busy'
self.controls.status_panel.update_video_processors_status(text)
[docs]
def update_video_buffer_status(self):
level = self.video_buffer.get_level()
size = self.video_buffer.n_total_images
text = f'{level:.0%} full, {size} max images'
self.controls.status_panel.update_video_buffer_status(text)
[docs]
def _update_display_rate(self):
# If it has been more than a second, re-calculate the display rate
if (now := time()) - self._display_rate_last_time > 1:
dt = now - self._display_rate_last_time
rate = self._display_rate_counter / dt
self._display_rate_last_time = now
self._display_rate_counter = 0
self._display_rate_last_rate = rate
self.controls.status_panel.update_display_rate(f'{rate:.0f} updates/sec')
else:
# This is used to force the "..." to update
self.controls.status_panel.update_display_rate(f'{self._display_rate_last_rate:.0f} updates/sec')
[docs]
def _update_beads_in_view(self):
# Enabled?
if not self.beads_in_view_on or self.beads_in_view_count is None:
self.video_viewer.clear_crosshairs()
return
n = self.beads_in_view_count
# Get latest n timepoints
tracks = self.tracks_buffer.peak_unsorted()
tracks = tracks[np.argsort(tracks[:, 0], kind='stable')]
t = tracks[:, 0]
unique_t = np.unique(t)
top_n_t = unique_t[np.isfinite(unique_t)][-n:]
# Get corresponding values
try:
mask = np.isin(t, top_n_t, assume_unique=False, kind='sort')
x = tracks[mask, 1]
y = tracks[mask, 2]
# Calculate relative x & y
nm_per_px = self.camera_type.nm_per_px / self.settings['magnification']
x /= nm_per_px
y /= nm_per_px
# Plot points
self.video_viewer.plot(x, y, self.beads_in_view_marker_size)
except Exception as e:
print(traceback.format_exc())
@register_ipc_command(UpdateCameraSettingCommand)
[docs]
def update_camera_setting(self, name: str, value: str):
self.controls.camera_panel.update_camera_setting(name, value)
@register_ipc_command(UpdateVideoBufferPurgeCommand)
[docs]
def update_video_buffer_purge(self, t: float):
self.controls.status_panel.update_video_buffer_purge(t)
@register_ipc_command(UpdateScriptStatusCommand)
[docs]
def update_script_status(self, status: ScriptStatus):
self.controls.script_panel.update_status(status)
@register_ipc_command(UpdateScriptStepCommand)
[docs]
def update_script_step(self, current_step: int | None, total_steps: int, description: str | None):
self.controls.script_panel.update_step(current_step, total_steps, description)
@register_ipc_command(ShowMessageCommand)
@register_script_command(ShowMessageCommand)
[docs]
def print(self, text: str, details: str | None = None):
msg = QMessageBox(self.windows[0])
msg.setIcon(QMessageBox.Icon.Information)
msg.setWindowTitle("Information")
msg.setText(text)
if details:
logger.info('%s: %s', text, details)
msg.setInformativeText(details)
else:
logger.info('%s', text)
msg.setStandardButtons(QMessageBox.StandardButton.Ok)
msg.show()
@register_ipc_command(ShowErrorCommand)
[docs]
def show_error(self, text: str, details: str | None = None):
msg = QMessageBox(self.windows[0])
msg.setIcon(QMessageBox.Icon.Critical)
msg.setWindowTitle("Error")
msg.setText(text)
if details:
logger.error('%s: %s', text, details)
msg.setInformativeText(details)
else:
logger.error('%s', text)
msg.setStandardButtons(QMessageBox.StandardButton.Ok)
msg.show()
[docs]
def show_warning(self, text: str, details: str | None = None):
msg = QMessageBox(self.windows[0])
msg.setIcon(QMessageBox.Icon.Warning)
msg.setWindowTitle("Warning")
msg.setText(text)
if details:
logger.warning('%s: %s', text, details)
msg.setInformativeText(details)
else:
logger.warning('%s', text)
msg.setStandardButtons(QMessageBox.StandardButton.Ok)
msg.show()
@register_ipc_command(SetAcquisitionOnCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs]
def set_acquisition_on(self, value: bool):
super().set_acquisition_on(value)
checkbox = self.controls.acquisition_panel.acquisition_on_checkbox.checkbox
checkbox.blockSignals(True) # to prevent a loop
checkbox.setChecked(value)
checkbox.blockSignals(False)
@register_ipc_command(SetAcquisitionDirCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs]
def set_acquisition_dir(self, value: str | None):
super().set_acquisition_dir(value)
textedit = self.controls.acquisition_panel.acquisition_dir_textedit
textedit.blockSignals(True) # to prevent a loop
textedit.setText(value or '')
textedit.blockSignals(False)
@register_ipc_command(SetAcquisitionDirOnCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs]
def set_acquisition_dir_on(self, value: bool):
super().set_acquisition_dir_on(value)
checkbox = self.controls.acquisition_panel.acquisition_dir_on_checkbox.checkbox
checkbox.blockSignals(True) # to prevent a loop
checkbox.setChecked(value)
checkbox.blockSignals(False)
self.controls.acquisition_panel.update_save_highlight(value)
@register_ipc_command(SetAcquisitionModeCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs]
def set_acquisition_mode(self, mode: AcquisitionMode):
super().set_acquisition_mode(mode)
combobox = self.controls.acquisition_panel.acquisition_mode_combobox
combobox.blockSignals(True) # to prevent a loop
combobox.setCurrentText(mode)
combobox.blockSignals(False)
@register_ipc_command(UpdateXYLockEnabledCommand)
[docs]
def update_xy_lock_enabled(self, value: bool):
self.controls.xy_lock_panel.update_enabled(value)
@register_ipc_command(UpdateXYLockIntervalCommand)
[docs]
def update_xy_lock_interval(self, value: float):
self.controls.xy_lock_panel.update_interval(value)
@register_ipc_command(UpdateXYLockMaxCommand)
[docs]
def update_xy_lock_max(self, value: float):
self.controls.xy_lock_panel.update_max(value)
@register_ipc_command(UpdateXYLockWindowCommand)
[docs]
def update_xy_lock_window(self, value: int):
self.controls.xy_lock_panel.update_window(value)
@register_ipc_command(UpdateZLockEnabledCommand)
[docs]
def update_z_lock_enabled(self, value: bool):
self.controls.z_lock_panel.update_enabled(value)
@register_ipc_command(UpdateZLockBeadCommand)
[docs]
def update_z_lock_bead(self, value: int):
self.controls.z_lock_panel.update_bead(value)
@register_ipc_command(UpdateZLockTargetCommand)
[docs]
def update_z_lock_target(self, value: float):
self.controls.z_lock_panel.update_target(value)
@register_ipc_command(UpdateZLockIntervalCommand)
[docs]
def update_z_lock_interval(self, value: float):
self.controls.z_lock_panel.update_interval(value)
@register_ipc_command(UpdateZLockMaxCommand)
[docs]
def update_z_lock_max(self, value: float):
self.controls.z_lock_panel.update_max(value)
@register_ipc_command(UpdateZLockWindowCommand)
[docs]
def update_z_lock_window(self, value: int):
self.controls.z_lock_panel.update_window(value)
[docs]
def request_zlut_file(self, filepath: str) -> None:
if not filepath:
return
command = LoadZLUTCommand(filepath=filepath)
self.send_ipc(command)
[docs]
def clear_zlut(self) -> None:
command = UnloadZLUTCommand()
self.send_ipc(command)
[docs]
def _clear_zlut_generation_preview(
self,
message: str = 'Waiting for Z-LUT sweep data...',
) -> None:
if self._zlut_generation_dialog is not None:
self._zlut_generation_dialog.preview_widget.clear(message)
[docs]
def _read_zlut_preview_snapshot(self, dataset: ZLUTSweepDataset) -> dict[str, object]:
if hasattr(dataset, 'read_preview'):
return dataset.read_preview(selected_bead_id=self._zlut_evaluation_selected_bead_id)
snapshot = dataset.peak()
count = snapshot['bead_ids'].shape[0]
available_bead_ids: list[int] = []
selected_bead_id: int | None = None
motor_z_min: float | None = None
motor_z_max: float | None = None
step_indices = np.zeros((0,), dtype=np.uint32)
motor_z_values = np.zeros((0,), dtype=np.float64)
profiles = np.zeros((0, int(dataset.profile_length)), dtype=np.float64)
if count > 0:
bead_ids = snapshot['bead_ids']
available_bead_ids = [int(bead_id) for bead_id in np.unique(bead_ids)]
if (
self._zlut_evaluation_selected_bead_id is not None
and self._zlut_evaluation_selected_bead_id in bead_ids
):
selected_bead_id = int(self._zlut_evaluation_selected_bead_id)
else:
selected_bead_id = int(np.min(bead_ids))
all_motor_z_values = snapshot['motor_z_values']
finite_motor_z = all_motor_z_values[np.isfinite(all_motor_z_values)]
if finite_motor_z.size > 0:
motor_z_min = float(np.min(finite_motor_z))
motor_z_max = float(np.max(finite_motor_z))
selected_rows = bead_ids == selected_bead_id
step_indices = snapshot['step_indices'][selected_rows]
motor_z_values = all_motor_z_values[selected_rows]
profiles = snapshot['profiles'][selected_rows]
return {
'state': dataset.state,
'count': count,
'capacity': dataset.get_capacity(),
'n_steps': dataset.n_steps,
'n_beads': dataset.n_beads,
'profiles_per_bead': dataset.profiles_per_bead,
'profile_length': dataset.profile_length,
'available_bead_ids': available_bead_ids,
'selected_bead_id': selected_bead_id,
'motor_z_min': motor_z_min,
'motor_z_max': motor_z_max,
'step_indices': step_indices,
'motor_z_values': motor_z_values,
'profiles': profiles,
}
[docs]
def show_zlut_generation_dialog(self) -> None:
if not self.windows:
return
self._detach_zlut_sweep_dataset()
if self._zlut_generation_dialog is None:
dialog = ZLUTGenerationDialog(self.windows[0])
dialog.set_cancel_callback(self.cancel_zlut_generation)
dialog.set_close_callback(self.discard_generated_zlut_evaluation)
dialog.set_save_callback(
lambda bead_id: self.save_generated_zlut(bead_id, load_after_save=False)
)
dialog.set_save_and_load_callback(
lambda bead_id: self.save_generated_zlut(bead_id, load_after_save=True)
)
dialog.set_select_bead_callback(self.select_generated_zlut_bead)
dialog.destroyed.connect(lambda *_: self._handle_zlut_dialog_destroyed())
self._zlut_generation_dialog = dialog
self._zlut_generation_dialog.mark_starting()
self._zlut_generation_dialog.show()
self._zlut_generation_dialog.raise_()
self._zlut_generation_dialog.activateWindow()
self._clear_zlut_generation_preview()
self._zlut_generation_phase = 'waiting_profile_length'
self._zlut_preview_last_poll = 0.0
[docs]
def start_zlut_generation(
self,
*,
start_nm: float,
step_nm: float,
stop_nm: float,
profiles_per_bead: int,
) -> None:
self.show_zlut_generation_dialog()
self.send_ipc(
StartZLUTGenerationCommand(
start_nm=float(start_nm),
step_nm=float(step_nm),
stop_nm=float(stop_nm),
profiles_per_bead=int(profiles_per_bead),
)
)
[docs]
def cancel_zlut_generation(self) -> None:
self.send_ipc(CancelZLUTGenerationCommand())
[docs]
def discard_generated_zlut_evaluation(self) -> None:
self.send_ipc(CancelGeneratedZLUTEvaluationCommand())
[docs]
def select_generated_zlut_bead(self, bead_id: int) -> None:
self._zlut_evaluation_selected_bead_id = int(bead_id)
self.send_ipc(SelectGeneratedZLUTBeadCommand(bead_id=int(bead_id)))
self._zlut_preview_last_poll = 0.0
[docs]
def save_generated_zlut(self, bead_id: int, load_after_save: bool = True) -> None:
settings = QSettings('MagScope', 'MagScope')
last_value = settings.value('last zlut directory', os.path.expanduser('~'), type=str)
default_path = os.path.join(last_value, f'generated_zlut_bead_{int(bead_id)}.txt')
filepath, _ = QFileDialog.getSaveFileName(
self.windows[0] if self.windows else None,
'Save Generated Z-LUT',
default_path,
'Text Files (*.txt)',
)
if not filepath:
return
directory = os.path.dirname(filepath) or last_value
settings.setValue('last zlut directory', directory)
self.send_ipc(
SaveGeneratedZLUTCommand(
filepath=filepath,
bead_id=int(bead_id),
load_after_save=bool(load_after_save),
)
)
[docs]
def request_profile_length(self) -> None:
self.send_ipc(RequestProfileLengthCommand())
@register_ipc_command(UpdateZLUTMetadataCommand)
@register_ipc_command(ReportProfileLengthCommand)
[docs]
def report_profile_length(self, profile_length: int | None = None) -> None:
print(f'Temporary development behavior: profile length = {profile_length}')
@register_ipc_command(UpdateZLUTGenerationStateCommand)
[docs]
def update_zlut_generation_state(
self,
status: str,
detail: str | None = None,
running: bool = False,
can_cancel: bool = False,
phase: str = 'idle',
z_axis_min_nm: float | None = None,
z_axis_max_nm: float | None = None,
z_axis_descending: bool = False,
) -> None:
if self.controls is None:
return
self._zlut_generation_phase = phase
self._zlut_generation_z_axis_min_nm = z_axis_min_nm
self._zlut_generation_z_axis_max_nm = z_axis_max_nm
self._zlut_generation_z_axis_descending = bool(z_axis_descending)
self.controls.z_lut_generation_panel.update_state(
status,
detail,
running=running,
can_cancel=can_cancel,
phase=phase,
)
if self._zlut_generation_dialog is not None:
self._zlut_generation_dialog.update_state(
status,
detail,
running=running,
can_cancel=can_cancel,
phase=phase,
)
@register_ipc_command(UpdateZLUTGenerationEvaluationCommand)
[docs]
def update_zlut_generation_evaluation(
self,
active: bool,
bead_ids: list[int],
selected_bead_id: int | None = None,
) -> None:
self._zlut_evaluation_bead_ids = [int(bead_id) for bead_id in bead_ids]
self._zlut_evaluation_selected_bead_id = None if selected_bead_id is None else int(selected_bead_id)
if not active:
self._detach_zlut_sweep_dataset()
self._clear_zlut_generation_preview()
if self._zlut_generation_dialog is not None:
self._zlut_generation_dialog.update_evaluation(
active=active,
bead_ids=self._zlut_evaluation_bead_ids,
selected_bead_id=self._zlut_evaluation_selected_bead_id,
)
self._zlut_preview_last_poll = 0.0
@register_ipc_command(UpdateZLUTGenerationProgressCommand)
[docs]
def update_zlut_generation_progress(
self,
current_step: int,
total_steps: int,
capture_count: int,
capture_capacity: int,
motor_z_value: float | None = None,
) -> None:
if self.controls is None:
return
if self._zlut_generation_dialog is not None:
self._zlut_generation_dialog.update_progress(
current_step,
total_steps,
capture_count,
capture_capacity,
motor_z_value,
)
[docs]
def _handle_zlut_dialog_destroyed(self) -> None:
self._zlut_generation_dialog = None
[docs]
def _detach_zlut_sweep_dataset(self) -> None:
if self._zlut_sweep_dataset is not None:
self._zlut_sweep_dataset.close()
self._zlut_sweep_dataset = None
[docs]
def _update_zlut_generation_dialog(self) -> None:
if self._zlut_generation_dialog is None or not self._zlut_generation_dialog.isVisible():
return
if self._zlut_generation_phase in {'idle', 'complete'} and not self._zlut_evaluation_bead_ids:
self._clear_zlut_generation_preview()
return
now = time()
if now - self._zlut_preview_last_poll < 1.0:
return
self._zlut_preview_last_poll = now
if self._zlut_sweep_dataset is None:
try:
self._zlut_sweep_dataset = ZLUTSweepDataset.attach(locks=self.locks)
except (DatasetNotReadyError, FileNotFoundError):
self._clear_zlut_generation_preview()
return
try:
self._refresh_zlut_preview_from_dataset()
except FileNotFoundError:
self._detach_zlut_sweep_dataset()
self._clear_zlut_generation_preview()
[docs]
def _refresh_zlut_preview_from_dataset(self) -> None:
if self._zlut_generation_dialog is None or self._zlut_sweep_dataset is None:
return
dataset = self._zlut_sweep_dataset
preview_snapshot = self._read_zlut_preview_snapshot(dataset)
count = int(preview_snapshot['count'])
available_bead_ids = list(preview_snapshot['available_bead_ids'])
if available_bead_ids != self._zlut_evaluation_bead_ids:
self._zlut_evaluation_bead_ids = available_bead_ids
if self._zlut_evaluation_selected_bead_id not in self._zlut_evaluation_bead_ids:
self._zlut_evaluation_selected_bead_id = (
None if not self._zlut_evaluation_bead_ids else self._zlut_evaluation_bead_ids[0]
)
self._zlut_generation_dialog.update_evaluation(
active=self._zlut_generation_phase == 'evaluating',
bead_ids=self._zlut_evaluation_bead_ids,
selected_bead_id=self._zlut_evaluation_selected_bead_id,
)
preview_payload = self._build_zlut_preview_payload(
preview_snapshot,
z_axis_min_nm=self._zlut_generation_z_axis_min_nm,
z_axis_max_nm=self._zlut_generation_z_axis_max_nm,
z_axis_descending=self._zlut_generation_z_axis_descending,
)
preview_payload['count'] = count
self._zlut_generation_dialog.preview_widget.update_preview(**preview_payload)
[docs]
class LoadingWindow(QMainWindow):
def __init__(self):
super().__init__()
# Set up the window
self.setWindowTitle('Loading...')
self.setFixedSize(700, 300)
self.setStyleSheet('background-color: white;')
self.setWindowFlags(Qt.WindowType.FramelessWindowHint
| Qt.WindowType.WindowStaysOnTopHint)
# Create central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Layout
layout = QVBoxLayout(central_widget)
layout.setContentsMargins(20, 20, 20, 20)
layout.setSpacing(15)
# Loading label
[docs]
self.label = QLabel('MagScope' + '\n\n' + 'loading ...')
self.label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.label.setStyleSheet('color: black; font-_count: 20px;')
layout.addWidget(self.label)
# Center the window on the screen
frame_geometry = self.frameGeometry()
center_point = self.screen().availableGeometry().center()
frame_geometry.moveCenter(center_point)
self.move(frame_geometry.topLeft())
[docs]
class AddColumnDropTarget(QFrame):
"""Drop target that creates a new column when a panel is dropped."""
def __init__(self, controls: "Controls") -> None:
super().__init__()
[docs]
self._controls = controls
[docs]
self._drag_active = False
self.setObjectName("add_column_drop_target")
self.setAcceptDrops(True)
self.setMinimumWidth(300)
self.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding)
layout = QVBoxLayout(self)
layout.setContentsMargins(12, 12, 12, 12)
layout.setSpacing(6)
layout.addStretch(1)
label = QLabel("Drop here to create a new column")
label.setWordWrap(True)
label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(label)
layout.addStretch(1)
self._set_active(False)
self.setVisible(False)
[docs]
def set_drag_active(self, active: bool) -> None:
"""Toggle visibility based on whether a panel is being dragged."""
self._drag_active = active
self._update_visibility()
[docs]
def refresh_visibility(self) -> None:
self._update_visibility()
[docs]
def _update_visibility(self) -> None:
should_show = self._drag_active and self._controls.has_room_for_new_column()
self.setVisible(should_show)
if not should_show:
self._set_active(False)
[docs]
def _set_active(self, active: bool) -> None:
color = "palette(highlight)" if active else "palette(midlight)"
self.setStyleSheet(
"#add_column_drop_target { border: 2px dashed %s; border-radius: 6px; }" % color
)
[docs]
def _wrapper_from_event(self, event) -> PanelWrapper | None:
manager = self._controls.layout_manager
if manager is None:
return None
if not self._controls.has_room_for_new_column():
return None
mime_data = event.mimeData()
if not mime_data.hasFormat(PANEL_MIME_TYPE):
return None
panel_id_bytes = mime_data.data(PANEL_MIME_TYPE)
if panel_id_bytes.isEmpty():
return None
panel_id = bytes(panel_id_bytes).decode("utf-8")
return manager.wrapper_for_id(panel_id)
[docs]
def dragEnterEvent(self, event) -> None: # type: ignore[override]
if self._wrapper_from_event(event) is not None:
self._set_active(True)
event.acceptProposedAction()
else:
event.ignore()
[docs]
def dragMoveEvent(self, event) -> None: # type: ignore[override]
if self._wrapper_from_event(event) is not None:
self._set_active(True)
event.acceptProposedAction()
else:
event.ignore()
[docs]
def dragLeaveEvent(self, event) -> None: # type: ignore[override]
self._set_active(False)
super().dragLeaveEvent(event)
[docs]
def dropEvent(self, event) -> None: # type: ignore[override]
wrapper = self._wrapper_from_event(event)
self._set_active(False)
if wrapper is None:
event.ignore()
return
if not self._controls.has_room_for_new_column():
event.ignore()
return
self._controls.create_new_column_with_panel(wrapper)
event.acceptProposedAction()
[docs]
class Controls(QWidget):
"""Container widget hosting draggable, persistent control panels."""
[docs]
LAYOUT_SETTINGS_GROUP = "controls/layout"
def __init__(self, manager: UIManager):
super().__init__()
[docs]
self.panels: dict[str, ControlPanelBase | QWidget] = {}
[docs]
self._settings = QSettings("MagScope", "MagScope")
layout = QHBoxLayout(self)
layout.setSpacing(6)
layout.setContentsMargins(0, 0, 0, 0)
[docs]
self._columns_layout = layout
[docs]
self._column_prefix = "column"
[docs]
self._column_counter = 1
[docs]
self._base_columns = {"left"}
[docs]
self._suppress_layout_callback = False
[docs]
self.layout_manager = PanelLayoutManager(
self._settings,
self.LAYOUT_SETTINGS_GROUP,
[],
on_layout_changed=self._on_layout_changed,
on_drag_active_changed=self._on_drag_active_changed,
)
[docs]
self._add_column_target = AddColumnDropTarget(self)
layout.addWidget(self._add_column_target)
layout.addStretch(1)
stored_layout = self.layout_manager.stored_layout()
self._update_column_counter(stored_layout.keys())
self._add_column("left", pinned_ids={"HelpPanel", "ResetPanel"}, index=0)
for name in stored_layout.keys():
if name in self.layout_manager.columns:
continue
self._add_column(name)
if "right" not in self.layout_manager.columns and len(self.layout_manager.columns) < 2:
self._add_column("right")
# Instantiate standard panels
[docs]
self.help_panel = HelpPanel(self.manager)
[docs]
self.reset_panel = ResetPanel(self.manager)
[docs]
self.settings_panel = MagScopeSettingsPanel(self.manager)
[docs]
self.acquisition_panel = AcquisitionPanel(self.manager)
[docs]
self.bead_selection_panel = BeadSelectionPanel(self.manager)
[docs]
self.camera_panel = CameraPanel(self.manager)
[docs]
self.histogram_panel = HistogramPanel(self.manager)
[docs]
self.tracking_options_panel = TrackingOptionsPanel(self.manager)
[docs]
self.plot_settings_panel = PlotSettingsPanel(self.manager)
[docs]
self.allan_deviation_panel = (
AllanDeviationPanel(self.manager) if has_tweezepy_support() else None
)
[docs]
self.profile_panel = ProfilePanel(self.manager)
[docs]
self.script_panel = ScriptPanel(self.manager)
[docs]
self.status_panel = StatusPanel(self.manager)
[docs]
self.xy_lock_panel = XYLockPanel(self.manager)
[docs]
self.z_lock_panel = ZLockPanel(self.manager)
[docs]
self.zlut_panel = ZLUTPanel(self.manager)
[docs]
self.z_lut_generation_panel = ZLUTGenerationPanel(self.manager)
self.zlut_panel.zlut_file_selected.connect(self.manager.request_zlut_file)
self.zlut_panel.zlut_clear_requested.connect(self.manager.clear_zlut)
definitions: list[tuple[str, QWidget, str, bool]] = [
("HelpPanel", self.help_panel, "left", False),
("ResetPanel", self.reset_panel, "left", False),
("MagScopeSettingsPanel", self.settings_panel, "left", True),
("StatusPanel", self.status_panel, "left", True),
("BeadSelectionPanel", self.bead_selection_panel, "left", True),
("CameraPanel", self.camera_panel, "left", True),
("AcquisitionPanel", self.acquisition_panel, "left", True),
("TrackingOptionsPanel", self.tracking_options_panel, "left", True),
("HistogramPanel", self.histogram_panel, "left", True),
("ProfilePanel", self.profile_panel, "left", True),
("PlotSettingsPanel", self.plot_settings_panel, "right", True),
("ZLUTPanel", self.zlut_panel, "right", True),
("ZLUTGenerationPanel", self.z_lut_generation_panel, "right", True),
("ScriptPanel", self.script_panel, "right", True),
("XYLockPanel", self.xy_lock_panel, "right", True),
("ZLockPanel", self.z_lock_panel, "right", True),
]
if self.allan_deviation_panel is not None:
definitions.insert(
definitions.index(("ZLUTPanel", self.zlut_panel, "right", True)),
("AllanDeviationPanel", self.allan_deviation_panel, "right", True),
)
column_names = list(self.layout_manager.columns.keys())
fallback_column = column_names[0]
for panel_id, widget, column_name, draggable in definitions:
self.panels[panel_id] = widget
target_column = column_name if column_name in self.layout_manager.columns else fallback_column
self.layout_manager.register_panel(
panel_id,
widget,
target_column,
draggable=draggable,
)
column_names = list(self.layout_manager.columns.keys())
for control_factory, column in self.manager.controls_to_add:
widget = control_factory(self.manager)
panel_id = widget.__class__.__name__
if isinstance(column, int):
index = min(max(column, 0), len(column_names) - 1)
column_name = column_names[index]
else:
column_name = str(column)
if column_name not in self.layout_manager.columns:
column_name = column_names[0]
self.panels[panel_id] = widget
self.layout_manager.register_panel(panel_id, widget, column_name)
self.layout_manager.restore_layout()
self._prune_empty_columns()
@property
[docs]
def settings(self):
return self.manager.settings
@settings.setter
def settings(self, value):
raise AttributeError("Read-only attribute.")
[docs]
def _update_column_counter(self, column_names: Iterable[str]) -> None:
prefix = f"{self._column_prefix}_"
for name in column_names:
if not name.startswith(prefix):
continue
suffix = name[len(prefix) :]
try:
value = int(suffix)
except ValueError:
continue
if value >= self._column_counter:
self._column_counter = value + 1
[docs]
def _layout_insert_index(self, name: str) -> int:
drop_index = self._columns_layout.indexOf(self._add_column_target)
if drop_index == -1:
drop_index = self._columns_layout.count()
column_names = list(self.layout_manager.columns.keys())
target_index = column_names.index(name)
count_before = sum(
1 for existing in column_names[:target_index] if existing in self._column_scrolls
)
return min(drop_index, count_before)
[docs]
def _add_column(
self,
name: str,
*,
pinned_ids: Iterable[str] | None = None,
index: int | None = None,
) -> ReorderableColumn:
if name in self.layout_manager.columns:
column = self.layout_manager.columns[name]
else:
column = ReorderableColumn(name, pinned_ids=pinned_ids)
column.setFixedWidth(300)
self.layout_manager.add_column(name, column, index=index)
if name not in self._column_scrolls:
scroll = QScrollArea(self)
scroll.setWidgetResizable(True)
scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
scroll.setFrameShape(QFrame.Shape.NoFrame)
scroll.setWidget(column)
scroll.setFixedWidth(320)
insert_index = self._layout_insert_index(name)
self._columns_layout.insertWidget(insert_index, scroll)
self._column_scrolls[name] = scroll
self._add_column_target.refresh_visibility()
return column
[docs]
def create_new_column_with_panel(self, wrapper: PanelWrapper) -> None:
name = self._generate_column_name()
column = self._add_column(name)
column.add_panel(wrapper)
wrapper.mark_drop_accepted()
self.layout_manager.layout_changed()
[docs]
def _generate_column_name(self) -> str:
while True:
name = f"{self._column_prefix}_{self._column_counter}"
self._column_counter += 1
if name not in self.layout_manager.columns:
return name
[docs]
def _on_layout_changed(self, _layout: dict[str, list[str]]) -> None:
if self._suppress_layout_callback:
return
self._prune_empty_columns()
[docs]
def _on_drag_active_changed(self, active: bool) -> None:
self._add_column_target.set_drag_active(active)
[docs]
def _prune_empty_columns(self) -> None:
removable = [
name
for name, column in list(self.layout_manager.columns.items())
if name not in self._base_columns and not column.panels()
]
for name in removable:
self._remove_column(name)
[docs]
def _remove_column(self, name: str) -> None:
scroll = self._column_scrolls.pop(name, None)
if scroll is not None:
self._columns_layout.removeWidget(scroll)
scroll.hide()
scroll.deleteLater()
column = self.layout_manager.columns.get(name)
if column is None:
return
column.clear_placeholder()
column.hide()
column.setParent(None)
column.deleteLater()
self._suppress_layout_callback = True
try:
self.layout_manager.remove_column(name)
finally:
self._suppress_layout_callback = False
self.layout_manager.layout_changed()
self._add_column_target.refresh_visibility()
[docs]
def reset_to_defaults(self) -> None:
"""Restore panel visibility, order, and columns to defaults."""
settings = QSettings("MagScope", "MagScope")
settings.beginGroup(self.LAYOUT_SETTINGS_GROUP)
settings.remove("")
settings.endGroup()
for panel in self.panels.values():
groupbox = getattr(panel, "groupbox", None)
if isinstance(groupbox, CollapsibleGroupBox):
settings.remove(groupbox.settings_key)
groupbox.reset_to_default()
for column in list(self.layout_manager.columns.values()):
column.clear_placeholder()
column.clear_panels()
for scroll in list(self._column_scrolls.values()):
self._columns_layout.removeWidget(scroll)
scroll.hide()
scroll.deleteLater()
self._column_scrolls.clear()
self.layout_manager.columns = OrderedDict()
self._column_counter = 1
self._add_column("left", pinned_ids={"HelpPanel", "ResetPanel"}, index=0)
self._add_column("right")
for panel_id in self.layout_manager._default_order:
wrapper = self.layout_manager.wrapper_for_id(panel_id)
if wrapper is None:
continue
column_name = self.layout_manager._default_columns.get(panel_id, "left")
if column_name not in self.layout_manager.columns:
self._add_column(column_name)
column = self.layout_manager.columns[column_name]
column.add_panel(wrapper)
self.layout_manager.layout_changed()
[docs]
def has_room_for_new_column(self) -> bool:
"""Return True if a new column can fit beside the existing ones."""
layout_width = self._columns_layout.contentsRect().width()
if layout_width <= 0:
layout_width = self.width()
spacing = max(0, self._columns_layout.spacing())
visible_scrolls = [scroll for scroll in self._column_scrolls.values() if scroll.isVisible()]
if not visible_scrolls:
return layout_width >= self._add_column_target.minimumWidth()
column_width = visible_scrolls[0].width() or visible_scrolls[0].sizeHint().width()
required_width = (len(visible_scrolls) + 1) * (column_width + spacing)
return layout_width >= required_width
[docs]
def resizeEvent(self, event): # type: ignore[override]
super().resizeEvent(event)
self._add_column_target.refresh_visibility()