Source code for magscope.ui.ui

from collections import OrderedDict
from math import floor, ceil
import os
import sys
from time import time
import traceback
from typing import Callable, Iterable
from warnings import warn

import numpy as np
from PyQt6.QtCore import QEvent, QPoint, QRectF, QSettings, Qt, QThread, QTimer
from PyQt6.QtGui import QGuiApplication, QImage, QPixmap
from PyQt6.QtWidgets import (
    QApplication,
    QFileDialog,
    QFrame,
    QHBoxLayout,
    QLabel,
    QLayout,
    QMainWindow,
    QMessageBox,
    QScrollArea,
    QSizePolicy,
    QVBoxLayout,
    QWidget,
)

from magscope._logging import get_logger
from magscope.auto_bead_selection import copy_latest_image, roi_overlaps
from magscope.datatypes import DatasetNotReadyError, VideoBuffer, ZLUTSweepDataset
from magscope.ipc import Delivery, register_ipc_command
from magscope.ipc_commands import *
from magscope.ui.auto_bead_selection_dialog import AutoBeadSelectionDialog
from magscope.ui.controls import (
    AcquisitionPanel,
    AllanDeviationPanel,
    BeadSelectionPanel,
    CameraPanel,
    ControlPanelBase,
    HistogramPanel,
    HelpPanel,
    MagScopeSettingsPanel,
    PlotSettingsPanel,
    ProfilePanel,
    ResetPanel,
    ScriptPanel,
    StatusPanel,
    TrackingOptionsPanel,
    XYLockPanel,
    ZLUTGenerationDialog,
    ZLUTGenerationPanel,
    ZLUTPanel,
    ZLockPanel,
    has_tweezepy_support,
)
from magscope.ui.panel_layout import (
    PANEL_MIME_TYPE,
    PanelLayoutManager,
    PanelWrapper,
    ReorderableColumn,
)
from magscope.ui.plots import PlotWorker, TimeSeriesPlotBase
from magscope.ui.video_viewer import VideoViewer
from magscope.ui.widgets import BeadGraphic, CollapsibleGroupBox, GripSplitter, ResizableLabel
from magscope.processes import ManagerProcessBase
from magscope.scripting import ScriptStatus, register_script_command
from magscope.settings import MagScopeSettings
from magscope.utils import AcquisitionMode, numpy_type_to_qt_image_type

[docs] logger = get_logger("ui.ui")
[docs] class _StartupReadyWindow(QMainWindow): def __init__(self, on_ready: Callable[[], None]): super().__init__()
[docs] self._on_ready = on_ready
[docs] self._startup_ready_scheduled = False
[docs] self._startup_shown = False
[docs] def event(self, event): event_type = event.type() if event_type == QEvent.Type.Show: self._startup_shown = True self._maybe_schedule_startup_ready(after_paint=False) elif event_type == QEvent.Type.Paint: self._maybe_schedule_startup_ready(after_paint=True) return super().event(event)
[docs] def _maybe_schedule_startup_ready(self, *, after_paint: bool) -> None: if self._startup_ready_scheduled or not self._startup_shown or not self.isVisible(): return platform_name = QGuiApplication.platformName() window_handle = self.windowHandle() is_exposed = window_handle is not None and window_handle.isExposed() if not (is_exposed or after_paint or platform_name == "offscreen"): return self._startup_ready_scheduled = True QTimer.singleShot(0, self._on_ready)
[docs] class UIManager(ManagerProcessBase): def __init__(self): super().__init__()
[docs] self._active_bead_graphic: BeadGraphic | None = None
[docs] self._active_bead_id: int | None = None
[docs] self._bead_rois: dict[int, tuple[int, int, int, int]] = {}
[docs] self._pending_bead_add_id: int | None = None
[docs] self._pending_bead_add_roi: tuple[int, int, int, int] | None = None
[docs] self._bead_next_id: int = 0
[docs] self.beads_in_view_on = False
[docs] self.beads_in_view_count = 1
[docs] self.beads_in_view_marker_size = 20
[docs] self.central_widgets: list[QWidget] = []
[docs] self.central_layouts: list[QLayout] = []
[docs] self.controls: Controls | None = None
[docs] self.controls_to_add = []
[docs] self._display_rate_counter: int = 0
[docs] self._display_rate_last_time: float = time()
[docs] self._display_rate_last_rate: float = 0
[docs] self._n_windows: int | None = None
[docs] self.plot_worker: PlotWorker
[docs] self.plot_thread: QThread
[docs] self.plots_widget: QLabel
[docs] self.plots_to_add: list[TimeSeriesPlotBase] = []
[docs] self.qt_app: QApplication | None = None
[docs] self.selected_bead = 0
[docs] self.reference_bead: int | None = None
[docs] self._timer: QTimer | None = None
[docs] self._timer_video_view: QTimer | None = None
[docs] self._video_buffer_last_index: int = 0
[docs] self._video_viewer_need_reset: bool = True
[docs] self.video_viewer: VideoViewer | None = None
[docs] self.windows: list[QMainWindow] = []
[docs] self._suppress_bead_roi_updates: bool = False
[docs] self._last_applied_roi: int | None = None
[docs] self._settings_persistence_warning_shown = False
[docs] self._bead_roi_capacity = 10000
[docs] self._auto_bead_selection_dialog: AutoBeadSelectionDialog | None = None
[docs] self._startup_ready_sent = False
[docs] self._zlut_generation_dialog: ZLUTGenerationDialog | None = None
[docs] self._zlut_generation_phase = 'idle'
[docs] self._zlut_generation_z_axis_min_nm: float | None = None
[docs] self._zlut_generation_z_axis_max_nm: float | None = None
[docs] self._zlut_generation_z_axis_descending = False
[docs] self._zlut_sweep_dataset: ZLUTSweepDataset | None = None
[docs] self._zlut_evaluation_bead_ids: list[int] = []
[docs] self._zlut_evaluation_selected_bead_id: int | None = None
[docs] self._zlut_preview_last_poll: float = 0.0
[docs] self._shutdown_complete = False
@staticmethod
[docs] def _zlut_requested_sweep_edges( z_axis_min_nm: float | None, z_axis_max_nm: float | None, n_steps: int, ) -> tuple[float | None, float | None]: if z_axis_min_nm is None or z_axis_max_nm is None or int(n_steps) <= 0: return None, None if int(n_steps) == 1: return float(z_axis_min_nm), float(z_axis_max_nm) step_spacing = float(z_axis_max_nm - z_axis_min_nm) / float(int(n_steps) - 1) half_step = 0.5 * step_spacing return float(z_axis_min_nm - half_step), float(z_axis_max_nm + half_step)
@staticmethod
[docs] def _build_zlut_preview_payload( preview_snapshot: dict[str, object], *, z_axis_min_nm: float | None, z_axis_max_nm: float | None, z_axis_descending: bool, ) -> dict[str, object]: state = int(preview_snapshot['state']) n_steps = int(preview_snapshot['n_steps']) profiles_per_bead = int(preview_snapshot['profiles_per_bead']) x_axis_min, x_axis_max = UIManager._zlut_requested_sweep_edges( z_axis_min_nm, z_axis_max_nm, n_steps, ) preview_image = None image_x_min = None image_x_max = None mode = 'Raw sweep' profiles = np.asarray(preview_snapshot['profiles'], dtype=np.float64) if profiles.size > 0: step_indices = np.asarray(preview_snapshot['step_indices'], dtype=np.uint32) selected_motor_z_values = np.asarray( preview_snapshot['motor_z_values'], dtype=np.float64, ) if state == ZLUTSweepDataset.STATE_COMPLETE: mode = 'Averaged sweep' unique_steps = np.unique(step_indices) averaged_profiles = [] averaged_z_positions = [] for step_index in unique_steps: step_profiles = profiles[step_indices == step_index] step_motor_z_values = selected_motor_z_values[step_indices == step_index] if step_profiles.size == 0: continue averaged_profiles.append(np.nanmean(step_profiles, axis=0)) averaged_z_positions.append(float(np.nanmean(step_motor_z_values))) if averaged_profiles: averaged_profiles_array = np.asarray(averaged_profiles, dtype=np.float64) averaged_z_positions_array = np.asarray(averaged_z_positions, dtype=np.float64) order = np.argsort(averaged_z_positions_array) preview_image = averaged_profiles_array[order].T image_x_min = x_axis_min image_x_max = x_axis_max else: slot_indices = np.zeros((profiles.shape[0],), dtype=np.int64) per_step_capture_counts: dict[int, int] = {} for row_index, step_index in enumerate(step_indices): step_index_int = int(step_index) within_step_index = per_step_capture_counts.get(step_index_int, 0) per_step_capture_counts[step_index_int] = within_step_index + 1 if z_axis_descending: step_rank = n_steps - 1 - step_index_int else: step_rank = step_index_int slot_indices[row_index] = step_rank * profiles_per_bead + within_step_index sorted_order = np.argsort(slot_indices, kind='stable') sorted_slot_indices = np.asarray(slot_indices[sorted_order], dtype=np.int64) sorted_profiles = np.asarray(profiles[sorted_order], dtype=np.float64) if x_axis_min is not None and x_axis_max is not None and sorted_profiles.shape[0] > 0: total_slots = n_steps * profiles_per_bead if total_slots > 0: slot_width = float(x_axis_max - x_axis_min) / float(total_slots) min_slot = int(np.min(sorted_slot_indices)) max_slot = int(np.max(sorted_slot_indices)) sparse_width = max_slot - min_slot + 1 preview_image = np.full( (sorted_profiles.shape[1], sparse_width), np.nan, dtype=np.float64, ) for profile_row, slot_index in zip( sorted_profiles, sorted_slot_indices, strict=False, ): preview_image[:, int(slot_index - min_slot)] = profile_row image_x_min = float(x_axis_min + min_slot * slot_width) image_x_max = float(x_axis_min + (max_slot + 1) * slot_width) elif sorted_profiles.shape[0] > 0: preview_image = sorted_profiles.T return { 'state': state, 'count': int(preview_snapshot['count']), 'capacity': int(preview_snapshot['capacity']), 'n_steps': n_steps, 'n_beads': int(preview_snapshot['n_beads']), 'profiles_per_bead': profiles_per_bead, 'profile_length': int(preview_snapshot['profile_length']), 'preview_image': preview_image, 'selected_bead_id': preview_snapshot['selected_bead_id'], 'mode': mode, 'motor_z_min': preview_snapshot['motor_z_min'], 'motor_z_max': preview_snapshot['motor_z_max'], 'expected_capture_count': n_steps * profiles_per_bead, 'x_axis_label': 'Z Position (nm)', 'x_axis_min': x_axis_min, 'x_axis_max': x_axis_max, 'image_x_min': image_x_min, 'image_x_max': image_x_max, }
[docs] def setup(self): self.qt_app = QApplication.instance() if not self.qt_app: self.qt_app = QApplication(sys.argv) QGuiApplication.styleHints().setColorScheme(Qt.ColorScheme.Dark) if self.settings is not None: self._last_applied_roi = self.settings["ROI"] # If the number of windows is not specified, then use the number of screens if self._n_windows is None: self._n_windows = len(QApplication.screens()) # Create the live plots in a separate thread (but dont start it) self.plots_widget = ResizableLabel() self.plots_widget.setScaledContents(True) self.plots_thread = QThread() self.plot_worker = PlotWorker() for plot in self.plots_to_add: self.plot_worker.add_plot(plot) self.plot_worker.set_locks(self.locks) self.plot_worker.setup() # Create controls panel self.controls = Controls(self) # Create the video viewer self.video_viewer = VideoViewer() self._refresh_bead_overlay() # Finally start the live plots self.plot_worker.moveToThread(self.plots_thread) self.plots_thread.started.connect(self.plot_worker.run) # noqa self.plot_worker.image_signal.connect(self._set_plot_image) self.plots_widget.resized.connect(self.update_plot_figure_size) self.plots_thread.start(QThread.Priority.LowPriority) # Create the layouts for each window self.create_central_widgets() # Create the windows for i in range(self._n_windows): if i == 0: window = _StartupReadyWindow(self._notify_startup_ready) else: window = QMainWindow() window.setWindowTitle("MagScope") screen = QApplication.screens()[i % len(QApplication.screens())] geometry = screen.geometry() window.setGeometry( geometry.x(), geometry.y(), geometry.width(), geometry.height() ) window.setMinimumWidth(300) window.setMinimumHeight(300) window.closeEvent = lambda _, w=window: self.quit() window.showMaximized() window.setCentralWidget(self.central_widgets[i]) self.windows.append(window) self._show_settings_persistence_warning_if_needed() # Connect the video viewer self.video_viewer.coordinatesChanged.connect(self.update_view_coords) self.video_viewer.sceneClicked.connect(self.callback_view_clicked) # Timer self._timer = QTimer(self.qt_app) self._timer.timeout.connect(self._main_loop_tick) # noqa self._timer.setInterval(0) self._timer.start() # Timer - Video Display self._timer_video_view = QTimer(self.qt_app) self._timer_video_view.timeout.connect(self._update_view_and_hist_tick) self._timer_video_view.setInterval(25) self._timer_video_view.start() # Start app self._running = True self.qt_app.exec()
[docs] def _notify_startup_ready(self) -> None: if self._startup_ready_sent: return if self._command_registry is None or self._pipe is None or self._magscope_quitting is None: return self._startup_ready_sent = True self.send_ipc(StartupReadyCommand(process_name=self.name))
@register_ipc_command( SetSettingsCommand, delivery=Delivery.BROADCAST, target="ManagerProcessBase" )
[docs] def set_settings(self, settings: MagScopeSettings): """Apply new settings and clear beads if the ROI size changed.""" previous_roi = self._last_applied_roi super().set_settings(settings) self._show_settings_persistence_warning_if_needed() new_roi = self.settings["ROI"] if previous_roi is not None and new_roi != previous_roi: self.clear_beads() self._last_applied_roi = new_roi self._update_roi_labels(new_roi)
[docs] def _show_settings_persistence_warning_if_needed(self) -> None: if self._settings_persistence_warning_shown: return if self.settings is None or self.settings.persistence_available: return if not self.windows: return self._settings_persistence_warning_shown = True self._show_settings_persistence_warning()
[docs] def _show_settings_persistence_warning(self) -> None: msg = QMessageBox(self.windows[0]) msg.setIcon(QMessageBox.Icon.Warning) msg.setWindowTitle("Settings Persistence Unavailable") msg.setText( "Some settings may not automatically load or save for this session." ) msg.setInformativeText( "MagScope will continue running with in-memory settings." ) msg.setStandardButtons(QMessageBox.StandardButton.Ok) msg.show()
[docs] def update_plot_figure_size(self, w, h): if hasattr(self, 'plot_worker') and self.plot_worker is not None: self.plot_worker.figure_size_signal.emit(w, h)
[docs] def _set_plot_image(self, img: QImage) -> None: if self.plots_widget is None: return self.plots_widget.setPixmap(QPixmap.fromImage(img))
@staticmethod
[docs] def _disconnect_signal(signal, callback) -> None: try: signal.disconnect(callback) except (RuntimeError, TypeError): pass
@staticmethod
[docs] def _stop_timer(timer: QTimer | None) -> None: if timer is None: return try: timer.stop() except RuntimeError: pass try: timer.deleteLater() except RuntimeError: pass
@staticmethod
[docs] def _close_widget(widget: QWidget | None) -> None: if widget is None: return try: widget.close() except (AttributeError, RuntimeError): pass try: widget.deleteLater() except (AttributeError, RuntimeError): pass
[docs] def _shutdown_plot_worker(self) -> None: plot_worker = getattr(self, 'plot_worker', None) plots_thread = getattr(self, 'plots_thread', None) plots_widget = getattr(self, 'plots_widget', None) if plot_worker is None: return if plots_widget is not None: self._disconnect_signal(plot_worker.image_signal, self._set_plot_image) self._disconnect_signal(plots_widget.resized, self.update_plot_figure_size) stop = getattr(plot_worker, '_stop', None) if callable(stop): stop() if plots_thread is not None: try: plots_thread.quit() except RuntimeError: pass try: plots_thread.wait() except RuntimeError: pass try: plots_thread.deleteLater() except RuntimeError: pass dispose = getattr(plot_worker, 'dispose', None) if callable(dispose): dispose() self.plot_worker = None self.plots_thread = None
[docs] def quit(self): if self._shutdown_complete or self._quitting.is_set(): return can_use_process_quit = ( self._command_registry is not None and hasattr(self._command_registry, 'route_for') and (self._pipe is None or hasattr(self._pipe, 'close')) and (self._magscope_quitting is None or hasattr(self._magscope_quitting, 'is_set')) ) if not can_use_process_quit: self._quitting.set() self._running = False else: super().quit() self._running = False self._stop_timer(self._timer) self._timer = None self._stop_timer(self._timer_video_view) self._timer_video_view = None if self.video_viewer is not None: coordinates_changed = getattr(self.video_viewer, 'coordinatesChanged', None) if coordinates_changed is not None: self._disconnect_signal(coordinates_changed, self.update_view_coords) scene_clicked = getattr(self.video_viewer, 'sceneClicked', None) if scene_clicked is not None: self._disconnect_signal(scene_clicked, self.callback_view_clicked) self._shutdown_plot_worker() if self._auto_bead_selection_dialog is not None: force_close = getattr(self._auto_bead_selection_dialog, 'force_close', None) if callable(force_close): force_close() self._auto_bead_selection_dialog = None self._detach_zlut_sweep_dataset() self._zlut_generation_phase = 'idle' self._zlut_generation_z_axis_min_nm = None self._zlut_generation_z_axis_max_nm = None self._zlut_generation_z_axis_descending = False self._zlut_evaluation_bead_ids = [] self._zlut_evaluation_selected_bead_id = None if self._zlut_generation_dialog is not None: force_close = getattr(self._zlut_generation_dialog, 'force_close', None) if callable(force_close): force_close() self._zlut_generation_dialog = None for window in self.windows: self._close_widget(window) self.windows = [] for central_widget in self.central_widgets: self._close_widget(central_widget) self.central_widgets = [] self.central_layouts = [] self._close_widget(self.controls) self.controls = None self._close_widget(self.video_viewer) self.video_viewer = None self._close_widget(getattr(self, 'plots_widget', None)) self.plots_widget = None if self.qt_app is not None: self.qt_app.quit() self._shutdown_complete = True
[docs] def do_main_loop(self): # Because the UIManager is a special case with a GUI # the main loop is actually called by a timer, not the # run method of it's super() if self._running: self._update_display_rate() self.update_video_buffer_status() self.update_video_processors_status() self.controls.profile_panel.update_plot() self._update_zlut_generation_dialog() self.receive_ipc()
[docs] def _handle_timer_exception(self, exc: BaseException) -> None: """Surface exceptions that occur inside Qt timer callbacks.""" self._running = False self._report_exception(exc) if self.qt_app is not None: self.qt_app.quit()
[docs] def _run_safe(self, callback: Callable[[], None]) -> None: try: callback() except Exception as exc: self._handle_timer_exception(exc)
[docs] def _main_loop_tick(self) -> None: self._run_safe(self.do_main_loop)
[docs] def _update_view_and_hist_tick(self) -> None: self._run_safe(self._update_view_and_hist)
[docs] def set_selected_bead(self, bead: int): old_selected = self._normalize_bead_id(self.selected_bead) old_reference = self._normalize_bead_id(self.reference_bead) self.selected_bead = bead normalized_bead = self._normalize_bead_id(bead) if hasattr(self, 'plot_worker') and self.plot_worker is not None: self.plot_worker.selected_bead_signal.emit(bead) self._sync_plot_settings_selected_bead(bead) if self.shared_values is not None: self.shared_values.live_profile_bead.value = bead self._clear_live_profile_buffer() self._set_active_bead(normalized_bead) self._update_bead_highlights( old_selected=old_selected, old_reference=old_reference, )
[docs] def set_live_profile_monitor_enabled(self, enabled: bool) -> None: if self.shared_values is not None: self.shared_values.live_profile_enabled.value = 1 if enabled else 0 if not enabled: self._clear_live_profile_buffer()
[docs] def set_reference_bead(self, bead: int | None): old_selected = self._normalize_bead_id(self.selected_bead) old_reference = self._normalize_bead_id(self.reference_bead) self.reference_bead = bead emitted_bead = -1 if bead is None else bead if hasattr(self, 'plot_worker') and self.plot_worker is not None: self.plot_worker.reference_bead_signal.emit(emitted_bead) self._sync_plot_settings_reference_bead(bead) self._update_bead_highlights( old_selected=old_selected, old_reference=old_reference, )
[docs] def _sync_plot_settings_selected_bead(self, bead: int) -> None: if self.controls is None or not hasattr(self.controls, 'plot_settings_panel'): return lineedit = self.controls.plot_settings_panel.selected_bead.lineedit lineedit.blockSignals(True) lineedit.setText(str(bead)) lineedit.blockSignals(False)
[docs] def _sync_plot_settings_reference_bead(self, bead: int | None) -> None: if self.controls is None or not hasattr(self.controls, 'plot_settings_panel'): return lineedit = self.controls.plot_settings_panel.reference_bead.lineedit lineedit.blockSignals(True) lineedit.setText('' if bead is None or bead < 0 else str(bead)) lineedit.blockSignals(False)
[docs] def _normalize_bead_id(self, bead: int | None) -> int | None: if bead is None or bead < 0: return None return bead
[docs] def _get_bead_highlight_state(self, bead_id: int) -> str: selected_id = self._normalize_bead_id(self.selected_bead) reference_id = self._normalize_bead_id(self.reference_bead) if bead_id == selected_id: return 'selected' if bead_id == reference_id: return 'reference' return 'default'
[docs] def _refresh_bead_overlay(self) -> None: if self.video_viewer is not None: self.video_viewer.set_bead_overlay( self._bead_rois, self._active_bead_id, self._normalize_bead_id(self.selected_bead), self._normalize_bead_id(self.reference_bead), ) self.video_viewer.viewport().update() self._update_auto_bead_selection_button_state()
[docs] def _update_auto_bead_selection_button_state(self) -> None: if self.controls is None: return button = getattr(self.controls.bead_selection_panel, 'auto_select_button', None) if button is None: return button.setEnabled(self._can_start_auto_bead_selection())
[docs] def _can_start_auto_bead_selection(self) -> bool: return ( self.controls is not None and self.video_viewer is not None and self.video_buffer is not None and self._auto_bead_selection_dialog is None and self._pending_bead_add_id is None and not self._current_scene_rect().isNull() )
[docs] def _snapshot_recent_image(self) -> np.ndarray | None: if self.video_buffer is None: return None # Intentionally use peak_image() as a lightweight snapshot. It does not # verify that a frame has been written yet, but by the time a user # starts auto bead selection the buffer is expected to already contain # a recent frame. _index, image_bytes = self.video_buffer.peak_image() return copy_latest_image( image_bytes, self.video_buffer.image_shape, self.video_buffer.dtype, )
[docs] def _current_image_display_scale(self) -> int: if self.video_buffer is None: return 1 cam_bits = self.camera_type.bits dtype_bits = np.iinfo(self.video_buffer.dtype).bits return 2 ** (dtype_bits - cam_bits)
[docs] def _current_scene_rect(self) -> QRectF: if self.video_viewer is None: return QRectF() image_scene_rect = getattr(self.video_viewer, 'image_scene_rect', None) if callable(image_scene_rect): rect = image_scene_rect() if not rect.isNull(): return rect scene = getattr(self.video_viewer, 'scene', None) if scene is None or not hasattr(scene, 'sceneRect'): return QRectF() return scene.sceneRect()
[docs] def _current_visible_scene_rect(self) -> QRectF: scene_rect = self._current_scene_rect() if self.video_viewer is None or scene_rect.isNull(): return scene_rect viewport = self.video_viewer.viewport() if viewport is None or not hasattr(viewport, 'rect'): return scene_rect viewport_rect = viewport.rect() if viewport_rect.isNull(): return scene_rect visible_rect = self.video_viewer.mapToScene(viewport_rect).boundingRect() visible_rect = visible_rect.intersected(scene_rect) return scene_rect if visible_rect.isEmpty() else visible_rect
[docs] def _next_random_bead_roi( self, rng: np.random.Generator, visible_rect: QRectF, ) -> tuple[int, int, int, int] | None: if self.settings is None: return None roi_width = int(self.settings['ROI']) half_width = roi_width / 2 min_x = ceil(visible_rect.left() + half_width) max_x = floor(visible_rect.right() - half_width) min_y = ceil(visible_rect.top() + half_width) max_y = floor(visible_rect.bottom() - half_width) if min_x > max_x or min_y > max_y: return None center_x = int(rng.integers(min_x, max_x + 1)) center_y = int(rng.integers(min_y, max_y + 1)) return BeadGraphic.clamp_roi_to_scene( BeadGraphic.roi_from_center(center_x, center_y, roi_width), self._current_scene_rect(), )
[docs] def _set_active_bead(self, bead_id: int | None) -> None: normalized_id = self._normalize_bead_id(bead_id) if normalized_id is not None and normalized_id not in self._bead_rois: normalized_id = None if self._active_bead_graphic is not None: self._active_bead_graphic.remove() self._active_bead_graphic = None self._active_bead_id = normalized_id if normalized_id is None or self.video_viewer is None: self._refresh_bead_overlay() return roi = self._bead_rois[normalized_id] self._active_bead_graphic = BeadGraphic(self, normalized_id, roi, self.video_viewer.scene) self._active_bead_graphic.set_selection_state( self._get_bead_highlight_state(normalized_id) ) self._refresh_bead_overlay()
[docs] def on_active_bead_move_completed( self, bead_id: int, roi: tuple[int, int, int, int], ) -> None: if bead_id not in self._bead_rois: return self._bead_rois[bead_id] = roi self._update_bead_roi(bead_id, roi) self._refresh_bead_overlay()
@register_ipc_command(AddRandomBeadsCommand) @register_script_command(AddRandomBeadsCommand)
[docs] def add_random_beads(self, count: int, seed: int | None = None) -> None: if count <= 0: return visible_rect = self._current_visible_scene_rect() if visible_rect.isNull() or visible_rect.isEmpty(): self.show_error('No visible field of view', 'Cannot add random beads without a visible image area.') return remaining_capacity = self._bead_roi_capacity - self._bead_next_id if remaining_capacity <= 0: self.show_error( 'Maximum bead count reached', 'Remove beads or use Reassign IDs before adding more than 10000 beads.', ) return rng = np.random.default_rng(seed) bead_rois: dict[int, tuple[int, int, int, int]] = {} count_to_add = min(count, remaining_capacity) for _ in range(count_to_add): roi = self._next_random_bead_roi(rng, visible_rect) if roi is None: break bead_rois[len(bead_rois)] = roi if not bead_rois: return self._add_new_bead_batch(list(bead_rois.values()))
[docs] def _add_new_bead_batch( self, rois: Iterable[tuple[int, int, int, int]], ) -> dict[int, tuple[int, int, int, int]]: bead_rois: dict[int, tuple[int, int, int, int]] = {} next_bead_id = self._bead_next_id for roi in rois: if next_bead_id >= self._bead_roi_capacity: break bead_rois[next_bead_id] = tuple(int(value) for value in roi) next_bead_id += 1 if not bead_rois: return {} try: if self.bead_roi_buffer is None: updated_bead_rois = {**self._bead_rois, **bead_rois} self._write_bead_rois_to_buffer(updated_bead_rois) self._broadcast_bead_roi_update() else: self.bead_roi_buffer.add_beads(bead_rois) self._broadcast_bead_roi_update() except Exception: self._update_next_bead_id_label() raise self._bead_rois.update(bead_rois) self._bead_next_id = next_bead_id self._update_next_bead_id_label() self._set_active_bead(self._normalize_bead_id(self.selected_bead)) self._refresh_bead_overlay() return bead_rois
[docs] def _hit_test_bead(self, pos: QPoint) -> int | None: if not self._bead_rois: return None selected_id = self._normalize_bead_id(self.selected_bead) reference_id = self._normalize_bead_id(self.reference_bead) best_match: tuple[int, float, int] | None = None best_bead_id: int | None = None for bead_id, (x0, x1, y0, y1) in self._bead_rois.items(): if not (x0 <= pos.x() <= x1 and y0 <= pos.y() <= y1): continue if bead_id == self._active_bead_id: priority = 0 elif bead_id == selected_id: priority = 1 elif bead_id == reference_id: priority = 2 else: priority = 3 center_x = (x0 + x1) / 2 center_y = (y0 + y1) / 2 distance_sq = (center_x - pos.x()) ** 2 + (center_y - pos.y()) ** 2 candidate = (priority, distance_sq, -bead_id) if best_match is None or candidate < best_match: best_match = candidate best_bead_id = bead_id return best_bead_id
[docs] def _update_bead_highlight(self, bead_id: int) -> None: if bead_id == self._active_bead_id and self._active_bead_graphic is not None: self._active_bead_graphic.set_selection_state( self._get_bead_highlight_state(bead_id) )
[docs] def _update_bead_highlights( self, *, old_selected: int | None = None, old_reference: int | None = None, ): selected_id = self._normalize_bead_id(self.selected_bead) reference_id = self._normalize_bead_id(self.reference_bead) affected_ids = { bead_id for bead_id in (old_selected, old_reference, selected_id, reference_id) if bead_id is not None } for bead_id in affected_ids: self._update_bead_highlight(bead_id) self._refresh_bead_overlay()
[docs] def _clear_live_profile_buffer(self) -> None: if self.live_profile_buffer is not None: self.live_profile_buffer.clear()
@property
[docs] def n_windows(self): return self._n_windows
@n_windows.setter def n_windows(self, value): if self._running: warn("Application already running", RuntimeWarning) return if not 1 <= value <= 3: warn("Number of windows must be between 1 and 3") return self._n_windows = value @property
[docs] def bead_roi_updates_suppressed(self) -> bool: return self._suppress_bead_roi_updates
@property
[docs] def bead_next_id(self) -> int: return self._bead_next_id
[docs] def _broadcast_bead_roi_update(self) -> None: if self._command_registry is None or self._pipe is None or self._magscope_quitting is None: return self.send_ipc(UpdateBeadRoisCommand())
[docs] def _write_bead_rois_to_buffer(self, bead_rois: dict[int, tuple[int, int, int, int]]) -> None: if self.bead_roi_buffer is None: bead_ids = np.asarray(sorted(bead_rois), dtype=np.uint32) bead_roi_values = np.asarray([bead_rois[int(bead_id)] for bead_id in bead_ids], dtype=np.uint32) self._bead_roi_ids = bead_ids if bead_roi_values.size == 0: self._bead_roi_values = np.zeros((0, 4), dtype=np.uint32) else: self._bead_roi_values = bead_roi_values.reshape((-1, 4)) return self.bead_roi_buffer.replace_beads(bead_rois) self._refresh_bead_roi_cache()
[docs] def create_central_widgets(self): match self.n_windows: case 1: self.create_one_window_widgets() case 2: self.create_two_window_widgets() case 3: self.create_three_window_widgets()
[docs] def create_one_window_widgets(self): for i in range(1): self.central_widgets.append(QWidget()) self.central_layouts.append(QVBoxLayout()) self.central_widgets[i].setLayout(self.central_layouts[i]) # Left-right split lr_splitter = GripSplitter(name='One Window Left-Right Splitter', orientation=Qt.Orientation.Horizontal) self.central_layouts[0].addWidget(lr_splitter) # Left left_widget = QWidget() left_widget.setMinimumWidth(150) lr_splitter.addWidget(left_widget) left_layout = QHBoxLayout() left_widget.setLayout(left_layout) # Add controls to left left_layout.addWidget(self.controls) # Right right_widget = QWidget() right_widget.setMinimumWidth(150) lr_splitter.addWidget(right_widget) right_layout = QHBoxLayout() right_widget.setLayout(right_layout) # Right: top-bottom split ud_splitter = GripSplitter(name='One Window Top-Bottom Splitter', orientation=Qt.Orientation.Vertical) right_layout.addWidget(ud_splitter) # Right-top right_top_widget = QWidget() right_top_widget.setMinimumHeight(150) ud_splitter.addWidget(right_top_widget) right_top_layout = QHBoxLayout() right_top_widget.setLayout(right_top_layout) # Add plots to right-top right_top_layout.addWidget(self.plots_widget) # Right-bottom right_bottom_widget = QWidget() right_bottom_widget.setMinimumHeight(150) ud_splitter.addWidget(right_bottom_widget) right_bottom_layout = QHBoxLayout() right_bottom_widget.setLayout(right_bottom_layout) # Add video viewer to right-bottom right_bottom_layout.addWidget(self.video_viewer)
[docs] def create_two_window_widgets(self): for i in range(2): self.central_widgets.append(QWidget()) self.central_layouts.append(QVBoxLayout()) self.central_widgets[i].setLayout(self.central_layouts[i]) ### Window 0 ### # Left-right split lr_splitter = GripSplitter(name='Two Window Left-Right Splitter', orientation=Qt.Orientation.Horizontal) self.central_layouts[0].addWidget(lr_splitter) # Left left_widget = QWidget() left_widget.setMinimumWidth(150) lr_splitter.addWidget(left_widget) left_layout = QHBoxLayout() left_widget.setLayout(left_layout) # Add controls to left left_layout.addWidget(self.controls) # Right right_widget = QWidget() right_widget.setMinimumWidth(150) lr_splitter.addWidget(right_widget) right_layout = QHBoxLayout() right_widget.setLayout(right_layout) # Add video viewer to right right_layout.addWidget(self.video_viewer) ### Window 1 ### # Add plots to window-1 self.central_layouts[1].addWidget(self.plots_widget)
[docs] def create_three_window_widgets(self): for i in range(3): self.central_widgets.append(QWidget()) self.central_layouts.append(QVBoxLayout()) self.central_widgets[i].setLayout(self.central_layouts[i]) ### Window 0 ### # Add controls to window-0 self.central_layouts[0].addWidget(self.controls) ### Window 1 ### # Add video viewer to window-1 self.central_layouts[1].addWidget(self.video_viewer) ### Window 2 ### # Add plots to window-2 self.central_layouts[2].addWidget(self.plots_widget)
[docs] def update_view_coords(self): pass
[docs] def _update_view_and_hist(self): # Get image and _write position index, image_bytes = self.video_buffer.peak_image() # Check if _write has changed (a new image is ready) if self._video_buffer_last_index != index: # Update the stored index self._video_buffer_last_index = index cam_bits = self.camera_type.bits dtype_bits = np.iinfo(self.video_buffer.dtype).bits scale = (2 ** (dtype_bits - cam_bits)) # Update the view qt_img = QImage( np.frombuffer(image_bytes, self.video_buffer.dtype).copy() * scale, *self.video_buffer.image_shape, numpy_type_to_qt_image_type(self.video_buffer.dtype)) self.video_viewer.set_pixmap(QPixmap.fromImage(qt_img)) if self._video_viewer_need_reset: self.video_viewer.reset_view() self._video_viewer_need_reset = False # Update the bead position overlay self._update_beads_in_view() # Update the histogram self.controls.histogram_panel.update_plot(image_bytes) # Increment the display rate counter self._display_rate_counter += 1
[docs] def callback_view_clicked(self, pos: QPoint, button=Qt.MouseButton.LeftButton): if self.controls is None: return if self._pending_bead_add_id is not None: return bead_id = self._hit_test_bead(pos) if button == Qt.MouseButton.RightButton: if bead_id is not None: self.remove_bead(bead_id) return if bead_id is not None: self._set_active_bead(bead_id) self.set_selected_bead(bead_id) return self.add_bead(pos)
[docs] def refresh_bead_rois(self): super().refresh_bead_rois() if self._pending_bead_add_id is None or self._pending_bead_add_roi is None: return bead_ids, bead_rois = self.get_cached_bead_rois() pending_id = self._pending_bead_add_id pending_roi = self._pending_bead_add_roi matches = bead_ids == pending_id if not np.any(matches): return roi = bead_rois[np.flatnonzero(matches)[0]] if tuple(int(value) for value in roi) != pending_roi: return self._clear_pending_bead_add()
[docs] def update_bead_rois(self): self._write_bead_rois_to_buffer(self._bead_rois) self._broadcast_bead_roi_update()
[docs] def _add_bead_roi(self, bead_id: int, roi: tuple[int, int, int, int]) -> None: if self.bead_roi_buffer is None: self.update_bead_rois() return self.bead_roi_buffer.add_beads({bead_id: roi}) self._broadcast_bead_roi_update()
[docs] def _update_bead_roi(self, bead_id: int, roi: tuple[int, int, int, int]) -> None: if self.bead_roi_buffer is None: self.update_bead_rois() return self.bead_roi_buffer.update_beads({bead_id: roi}) self._broadcast_bead_roi_update()
[docs] def _update_multiple_bead_rois( self, bead_rois: dict[int, tuple[int, int, int, int]], ) -> None: if not bead_rois: return if self.bead_roi_buffer is None: self.update_bead_rois() return self.bead_roi_buffer.update_beads(bead_rois) self._broadcast_bead_roi_update()
[docs] def _remove_bead_roi(self, bead_id: int) -> None: if self.bead_roi_buffer is None: self.update_bead_rois() return self.bead_roi_buffer.remove_beads([bead_id]) self._broadcast_bead_roi_update()
@register_ipc_command(MoveBeadsCommand)
[docs] def move_beads(self, moves: list[tuple[int, int, int]]): moved_ids: list[int] = [] moved_rois: dict[int, tuple[int, int, int, int]] = {} scene_rect = self._current_scene_rect() self._suppress_bead_roi_updates = True try: for id, dx, dy in moves: if id not in self._bead_rois: continue roi = BeadGraphic.move_roi(self._bead_rois[id], dx, dy, scene_rect) self._bead_rois[id] = roi if id == self._active_bead_id and self._active_bead_graphic is not None: self._active_bead_graphic.set_roi_bounds(roi) moved_ids.append(id) moved_rois[id] = roi finally: self._suppress_bead_roi_updates = False if not moved_ids: return self._update_multiple_bead_rois(moved_rois) self._refresh_bead_overlay() command = RemoveBeadsFromPendingMovesCommand(ids=moved_ids) self.send_ipc(command)
[docs] def add_bead(self, pos: QPoint): if self._bead_next_id >= self._bead_roi_capacity: self.show_error( 'Maximum bead count reached', 'Remove beads or use Reassign IDs before adding more than 10000 beads.', ) return id = self._bead_next_id x = pos.x() y = pos.y() w = self.settings['ROI'] scene_rect = self._current_scene_rect() roi = BeadGraphic.clamp_roi_to_scene( BeadGraphic.roi_from_center(x, y, w), scene_rect, ) self._bead_rois[id] = roi previous_next_bead_id = self._bead_next_id self._bead_next_id += 1 self._update_next_bead_id_label() # Update the bead ROI self._pending_bead_add_id = id self._pending_bead_add_roi = roi try: self._add_bead_roi(id, roi) except Exception: self._bead_rois.pop(id, None) self._bead_next_id = previous_next_bead_id self._update_next_bead_id_label() self._clear_pending_bead_add() raise if id == self._normalize_bead_id(self.selected_bead): self._set_active_bead(id) self._refresh_bead_overlay()
[docs] def start_auto_bead_selection(self) -> None: if not self._can_start_auto_bead_selection(): return image = self._snapshot_recent_image() if image is None: self.show_error('No live image available', 'Cannot start auto bead selection without a recent frame.') return dialog_parent = self.windows[0] if self.windows else None dialog = AutoBeadSelectionDialog( parent=dialog_parent, image=image, roi_size=self.settings['ROI'], existing_rois=self._bead_rois, display_scale=self._current_image_display_scale(), ) dialog.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose, True) dialog.finished.connect(self._on_auto_bead_selection_dialog_finished) dialog.selectionAccepted.connect(self._apply_auto_bead_selection) self._auto_bead_selection_dialog = dialog self._update_auto_bead_selection_button_state() dialog.open()
[docs] def _apply_auto_bead_selection(self, rois: list[tuple[int, int, int, int]]) -> None: remaining_capacity = self._bead_roi_capacity - self._bead_next_id existing_rois = list(self._bead_rois.values()) accepted_rois: list[tuple[int, int, int, int]] = [] for roi in rois: normalized_roi = tuple(int(value) for value in roi) if any(roi_overlaps(normalized_roi, existing_roi) for existing_roi in existing_rois): continue if any(roi_overlaps(normalized_roi, kept_roi) for kept_roi in accepted_rois): continue accepted_rois.append(normalized_roi) if not accepted_rois: return rois_to_add = accepted_rois[:max(0, remaining_capacity)] skipped_due_to_capacity = len(accepted_rois) - len(rois_to_add) if not rois_to_add: if skipped_due_to_capacity > 0: self._show_auto_bead_selection_capacity_warning(skipped_due_to_capacity) return self._add_new_bead_batch(rois_to_add) if skipped_due_to_capacity > 0: self._show_auto_bead_selection_capacity_warning(skipped_due_to_capacity)
[docs] def _show_auto_bead_selection_capacity_warning(self, skipped_count: int) -> None: bead_label = 'bead' if skipped_count == 1 else 'beads' self.show_warning( 'Maximum bead count reached', f'{skipped_count} {bead_label} could not be added because they would exceed ' f'the maximum allowed bead count of {self._bead_roi_capacity} beads.', )
[docs] def _on_auto_bead_selection_dialog_finished(self, _result: int) -> None: self._auto_bead_selection_dialog = None self._update_auto_bead_selection_button_state()
[docs] def remove_bead(self, id: int): old_selected = self._normalize_bead_id(self.selected_bead) old_reference = self._normalize_bead_id(self.reference_bead) if id not in self._bead_rois: return self._bead_rois.pop(id) if id == self._active_bead_id: self._set_active_bead(None) # Update highlight colors to reflect selection/reference self._update_bead_highlights( old_selected=old_selected, old_reference=old_reference, ) # Update bead ROI self._remove_bead_roi(id)
[docs] def clear_beads(self): self._clear_pending_bead_add() self._set_active_bead(None) self._bead_rois.clear() self._bead_next_id = 0 self._update_next_bead_id_label() # Update bead ROIs if self.bead_roi_buffer is not None: self.bead_roi_buffer.clear_beads() self._refresh_bead_roi_cache() else: self._bead_roi_ids = np.zeros((0,), dtype=np.uint32) self._bead_roi_values = np.zeros((0, 4), dtype=np.uint32) self._broadcast_bead_roi_update() self.set_reference_bead(None) self.set_selected_bead(0) self._refresh_bead_overlay()
[docs] def reset_bead_ids(self): self._clear_pending_bead_add() if not self._bead_rois: self._bead_next_id = 0 self._update_next_bead_id_label() return old_active_bead = self._active_bead_id new_rois: dict[int, tuple[int, int, int, int]] = {} id_mapping: dict[int, int] = {} for new_id, (old_id, roi) in enumerate(sorted(self._bead_rois.items())): id_mapping[old_id] = new_id new_rois[new_id] = roi self._bead_rois = new_rois self._bead_next_id = len(self._bead_rois) if self.selected_bead is not None: new_selected = id_mapping.get(self.selected_bead, -1) self.set_selected_bead(new_selected) if self.reference_bead is not None: new_reference = id_mapping.get(self.reference_bead) self.set_reference_bead(new_reference) self._update_bead_highlights() self.update_bead_rois() self._update_next_bead_id_label() self._set_active_bead(id_mapping.get(old_active_bead)) self._refresh_bead_overlay()
[docs] def _update_roi_labels(self, roi: int) -> None: if self.controls is None: return self.controls.bead_selection_panel.roi_size_label.setText( f"{roi} x {roi} pixels" ) self.controls.z_lut_generation_panel.roi_size_label.setText( f"{roi} x {roi} pixels" )
[docs] def _update_next_bead_id_label(self) -> None: if self.controls is None: return self.controls.bead_selection_panel.update_next_bead_id_label( self._bead_next_id )
[docs] def _clear_pending_bead_add(self) -> None: self._pending_bead_add_id = None self._pending_bead_add_roi = None self._update_auto_bead_selection_button_state()
[docs] def _calculate_next_bead_id(self) -> int: if not self._bead_rois: return 0 return max(self._bead_rois.keys()) + 1
[docs] def update_video_processors_status(self): busy = self.shared_values.video_process_busy_count.value total = self.settings['video processors n'] text = f'{busy}/{total} busy' self.controls.status_panel.update_video_processors_status(text)
[docs] def update_video_buffer_status(self): level = self.video_buffer.get_level() size = self.video_buffer.n_total_images text = f'{level:.0%} full, {size} max images' self.controls.status_panel.update_video_buffer_status(text)
[docs] def _update_display_rate(self): # If it has been more than a second, re-calculate the display rate if (now := time()) - self._display_rate_last_time > 1: dt = now - self._display_rate_last_time rate = self._display_rate_counter / dt self._display_rate_last_time = now self._display_rate_counter = 0 self._display_rate_last_rate = rate self.controls.status_panel.update_display_rate(f'{rate:.0f} updates/sec') else: # This is used to force the "..." to update self.controls.status_panel.update_display_rate(f'{self._display_rate_last_rate:.0f} updates/sec')
[docs] def _update_beads_in_view(self): # Enabled? if not self.beads_in_view_on or self.beads_in_view_count is None: self.video_viewer.clear_crosshairs() return n = self.beads_in_view_count # Get latest n timepoints tracks = self.tracks_buffer.peak_unsorted() tracks = tracks[np.argsort(tracks[:, 0], kind='stable')] t = tracks[:, 0] unique_t = np.unique(t) top_n_t = unique_t[np.isfinite(unique_t)][-n:] # Get corresponding values try: mask = np.isin(t, top_n_t, assume_unique=False, kind='sort') x = tracks[mask, 1] y = tracks[mask, 2] # Calculate relative x & y nm_per_px = self.camera_type.nm_per_px / self.settings['magnification'] x /= nm_per_px y /= nm_per_px # Plot points self.video_viewer.plot(x, y, self.beads_in_view_marker_size) except Exception as e: print(traceback.format_exc())
@register_ipc_command(UpdateCameraSettingCommand)
[docs] def update_camera_setting(self, name: str, value: str): self.controls.camera_panel.update_camera_setting(name, value)
@register_ipc_command(UpdateVideoBufferPurgeCommand)
[docs] def update_video_buffer_purge(self, t: float): self.controls.status_panel.update_video_buffer_purge(t)
@register_ipc_command(UpdateScriptStatusCommand)
[docs] def update_script_status(self, status: ScriptStatus): self.controls.script_panel.update_status(status)
@register_ipc_command(UpdateScriptStepCommand)
[docs] def update_script_step(self, current_step: int | None, total_steps: int, description: str | None): self.controls.script_panel.update_step(current_step, total_steps, description)
@register_ipc_command(ShowMessageCommand) @register_script_command(ShowMessageCommand)
[docs] def print(self, text: str, details: str | None = None): msg = QMessageBox(self.windows[0]) msg.setIcon(QMessageBox.Icon.Information) msg.setWindowTitle("Information") msg.setText(text) if details: logger.info('%s: %s', text, details) msg.setInformativeText(details) else: logger.info('%s', text) msg.setStandardButtons(QMessageBox.StandardButton.Ok) msg.show()
@register_ipc_command(ShowErrorCommand)
[docs] def show_error(self, text: str, details: str | None = None): msg = QMessageBox(self.windows[0]) msg.setIcon(QMessageBox.Icon.Critical) msg.setWindowTitle("Error") msg.setText(text) if details: logger.error('%s: %s', text, details) msg.setInformativeText(details) else: logger.error('%s', text) msg.setStandardButtons(QMessageBox.StandardButton.Ok) msg.show()
[docs] def show_warning(self, text: str, details: str | None = None): msg = QMessageBox(self.windows[0]) msg.setIcon(QMessageBox.Icon.Warning) msg.setWindowTitle("Warning") msg.setText(text) if details: logger.warning('%s: %s', text, details) msg.setInformativeText(details) else: logger.warning('%s', text) msg.setStandardButtons(QMessageBox.StandardButton.Ok) msg.show()
@register_ipc_command(SetAcquisitionOnCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs] def set_acquisition_on(self, value: bool): super().set_acquisition_on(value) checkbox = self.controls.acquisition_panel.acquisition_on_checkbox.checkbox checkbox.blockSignals(True) # to prevent a loop checkbox.setChecked(value) checkbox.blockSignals(False)
@register_ipc_command(SetAcquisitionDirCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs] def set_acquisition_dir(self, value: str | None): super().set_acquisition_dir(value) textedit = self.controls.acquisition_panel.acquisition_dir_textedit textedit.blockSignals(True) # to prevent a loop textedit.setText(value or '') textedit.blockSignals(False)
@register_ipc_command(SetAcquisitionDirOnCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs] def set_acquisition_dir_on(self, value: bool): super().set_acquisition_dir_on(value) checkbox = self.controls.acquisition_panel.acquisition_dir_on_checkbox.checkbox checkbox.blockSignals(True) # to prevent a loop checkbox.setChecked(value) checkbox.blockSignals(False) self.controls.acquisition_panel.update_save_highlight(value)
@register_ipc_command(SetAcquisitionModeCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase')
[docs] def set_acquisition_mode(self, mode: AcquisitionMode): super().set_acquisition_mode(mode) combobox = self.controls.acquisition_panel.acquisition_mode_combobox combobox.blockSignals(True) # to prevent a loop combobox.setCurrentText(mode) combobox.blockSignals(False)
@register_ipc_command(UpdateXYLockEnabledCommand)
[docs] def update_xy_lock_enabled(self, value: bool): self.controls.xy_lock_panel.update_enabled(value)
@register_ipc_command(UpdateXYLockIntervalCommand)
[docs] def update_xy_lock_interval(self, value: float): self.controls.xy_lock_panel.update_interval(value)
@register_ipc_command(UpdateXYLockMaxCommand)
[docs] def update_xy_lock_max(self, value: float): self.controls.xy_lock_panel.update_max(value)
@register_ipc_command(UpdateXYLockWindowCommand)
[docs] def update_xy_lock_window(self, value: int): self.controls.xy_lock_panel.update_window(value)
@register_ipc_command(UpdateZLockEnabledCommand)
[docs] def update_z_lock_enabled(self, value: bool): self.controls.z_lock_panel.update_enabled(value)
@register_ipc_command(UpdateZLockBeadCommand)
[docs] def update_z_lock_bead(self, value: int): self.controls.z_lock_panel.update_bead(value)
@register_ipc_command(UpdateZLockTargetCommand)
[docs] def update_z_lock_target(self, value: float): self.controls.z_lock_panel.update_target(value)
@register_ipc_command(UpdateZLockIntervalCommand)
[docs] def update_z_lock_interval(self, value: float): self.controls.z_lock_panel.update_interval(value)
@register_ipc_command(UpdateZLockMaxCommand)
[docs] def update_z_lock_max(self, value: float): self.controls.z_lock_panel.update_max(value)
@register_ipc_command(UpdateZLockWindowCommand)
[docs] def update_z_lock_window(self, value: int): self.controls.z_lock_panel.update_window(value)
[docs] def request_zlut_file(self, filepath: str) -> None: if not filepath: return command = LoadZLUTCommand(filepath=filepath) self.send_ipc(command)
[docs] def clear_zlut(self) -> None: command = UnloadZLUTCommand() self.send_ipc(command)
[docs] def _clear_zlut_generation_preview( self, message: str = 'Waiting for Z-LUT sweep data...', ) -> None: if self._zlut_generation_dialog is not None: self._zlut_generation_dialog.preview_widget.clear(message)
[docs] def _read_zlut_preview_snapshot(self, dataset: ZLUTSweepDataset) -> dict[str, object]: if hasattr(dataset, 'read_preview'): return dataset.read_preview(selected_bead_id=self._zlut_evaluation_selected_bead_id) snapshot = dataset.peak() count = snapshot['bead_ids'].shape[0] available_bead_ids: list[int] = [] selected_bead_id: int | None = None motor_z_min: float | None = None motor_z_max: float | None = None step_indices = np.zeros((0,), dtype=np.uint32) motor_z_values = np.zeros((0,), dtype=np.float64) profiles = np.zeros((0, int(dataset.profile_length)), dtype=np.float64) if count > 0: bead_ids = snapshot['bead_ids'] available_bead_ids = [int(bead_id) for bead_id in np.unique(bead_ids)] if ( self._zlut_evaluation_selected_bead_id is not None and self._zlut_evaluation_selected_bead_id in bead_ids ): selected_bead_id = int(self._zlut_evaluation_selected_bead_id) else: selected_bead_id = int(np.min(bead_ids)) all_motor_z_values = snapshot['motor_z_values'] finite_motor_z = all_motor_z_values[np.isfinite(all_motor_z_values)] if finite_motor_z.size > 0: motor_z_min = float(np.min(finite_motor_z)) motor_z_max = float(np.max(finite_motor_z)) selected_rows = bead_ids == selected_bead_id step_indices = snapshot['step_indices'][selected_rows] motor_z_values = all_motor_z_values[selected_rows] profiles = snapshot['profiles'][selected_rows] return { 'state': dataset.state, 'count': count, 'capacity': dataset.get_capacity(), 'n_steps': dataset.n_steps, 'n_beads': dataset.n_beads, 'profiles_per_bead': dataset.profiles_per_bead, 'profile_length': dataset.profile_length, 'available_bead_ids': available_bead_ids, 'selected_bead_id': selected_bead_id, 'motor_z_min': motor_z_min, 'motor_z_max': motor_z_max, 'step_indices': step_indices, 'motor_z_values': motor_z_values, 'profiles': profiles, }
[docs] def show_zlut_generation_dialog(self) -> None: if not self.windows: return self._detach_zlut_sweep_dataset() if self._zlut_generation_dialog is None: dialog = ZLUTGenerationDialog(self.windows[0]) dialog.set_cancel_callback(self.cancel_zlut_generation) dialog.set_close_callback(self.discard_generated_zlut_evaluation) dialog.set_save_callback( lambda bead_id: self.save_generated_zlut(bead_id, load_after_save=False) ) dialog.set_save_and_load_callback( lambda bead_id: self.save_generated_zlut(bead_id, load_after_save=True) ) dialog.set_select_bead_callback(self.select_generated_zlut_bead) dialog.destroyed.connect(lambda *_: self._handle_zlut_dialog_destroyed()) self._zlut_generation_dialog = dialog self._zlut_generation_dialog.mark_starting() self._zlut_generation_dialog.show() self._zlut_generation_dialog.raise_() self._zlut_generation_dialog.activateWindow() self._clear_zlut_generation_preview() self._zlut_generation_phase = 'waiting_profile_length' self._zlut_preview_last_poll = 0.0
[docs] def start_zlut_generation( self, *, start_nm: float, step_nm: float, stop_nm: float, profiles_per_bead: int, ) -> None: self.show_zlut_generation_dialog() self.send_ipc( StartZLUTGenerationCommand( start_nm=float(start_nm), step_nm=float(step_nm), stop_nm=float(stop_nm), profiles_per_bead=int(profiles_per_bead), ) )
[docs] def cancel_zlut_generation(self) -> None: self.send_ipc(CancelZLUTGenerationCommand())
[docs] def discard_generated_zlut_evaluation(self) -> None: self.send_ipc(CancelGeneratedZLUTEvaluationCommand())
[docs] def select_generated_zlut_bead(self, bead_id: int) -> None: self._zlut_evaluation_selected_bead_id = int(bead_id) self.send_ipc(SelectGeneratedZLUTBeadCommand(bead_id=int(bead_id))) self._zlut_preview_last_poll = 0.0
[docs] def save_generated_zlut(self, bead_id: int, load_after_save: bool = True) -> None: settings = QSettings('MagScope', 'MagScope') last_value = settings.value('last zlut directory', os.path.expanduser('~'), type=str) default_path = os.path.join(last_value, f'generated_zlut_bead_{int(bead_id)}.txt') filepath, _ = QFileDialog.getSaveFileName( self.windows[0] if self.windows else None, 'Save Generated Z-LUT', default_path, 'Text Files (*.txt)', ) if not filepath: return directory = os.path.dirname(filepath) or last_value settings.setValue('last zlut directory', directory) self.send_ipc( SaveGeneratedZLUTCommand( filepath=filepath, bead_id=int(bead_id), load_after_save=bool(load_after_save), ) )
[docs] def request_profile_length(self) -> None: self.send_ipc(RequestProfileLengthCommand())
@register_ipc_command(UpdateZLUTMetadataCommand)
[docs] def update_zlut_metadata(self, filepath: str | None = None, z_min: float | None = None, z_max: float | None = None, step_size: float | None = None, profile_length: int | None = None) -> None: if self.controls is None: return panel = self.controls.zlut_panel panel.set_filepath(filepath) panel.update_metadata(z_min, z_max, step_size, profile_length)
@register_ipc_command(ReportProfileLengthCommand)
[docs] def report_profile_length(self, profile_length: int | None = None) -> None: print(f'Temporary development behavior: profile length = {profile_length}')
@register_ipc_command(UpdateZLUTGenerationStateCommand)
[docs] def update_zlut_generation_state( self, status: str, detail: str | None = None, running: bool = False, can_cancel: bool = False, phase: str = 'idle', z_axis_min_nm: float | None = None, z_axis_max_nm: float | None = None, z_axis_descending: bool = False, ) -> None: if self.controls is None: return self._zlut_generation_phase = phase self._zlut_generation_z_axis_min_nm = z_axis_min_nm self._zlut_generation_z_axis_max_nm = z_axis_max_nm self._zlut_generation_z_axis_descending = bool(z_axis_descending) self.controls.z_lut_generation_panel.update_state( status, detail, running=running, can_cancel=can_cancel, phase=phase, ) if self._zlut_generation_dialog is not None: self._zlut_generation_dialog.update_state( status, detail, running=running, can_cancel=can_cancel, phase=phase, )
@register_ipc_command(UpdateZLUTGenerationEvaluationCommand)
[docs] def update_zlut_generation_evaluation( self, active: bool, bead_ids: list[int], selected_bead_id: int | None = None, ) -> None: self._zlut_evaluation_bead_ids = [int(bead_id) for bead_id in bead_ids] self._zlut_evaluation_selected_bead_id = None if selected_bead_id is None else int(selected_bead_id) if not active: self._detach_zlut_sweep_dataset() self._clear_zlut_generation_preview() if self._zlut_generation_dialog is not None: self._zlut_generation_dialog.update_evaluation( active=active, bead_ids=self._zlut_evaluation_bead_ids, selected_bead_id=self._zlut_evaluation_selected_bead_id, ) self._zlut_preview_last_poll = 0.0
@register_ipc_command(UpdateZLUTGenerationProgressCommand)
[docs] def update_zlut_generation_progress( self, current_step: int, total_steps: int, capture_count: int, capture_capacity: int, motor_z_value: float | None = None, ) -> None: if self.controls is None: return if self._zlut_generation_dialog is not None: self._zlut_generation_dialog.update_progress( current_step, total_steps, capture_count, capture_capacity, motor_z_value, )
[docs] def _handle_zlut_dialog_destroyed(self) -> None: self._zlut_generation_dialog = None
[docs] def _detach_zlut_sweep_dataset(self) -> None: if self._zlut_sweep_dataset is not None: self._zlut_sweep_dataset.close() self._zlut_sweep_dataset = None
[docs] def _update_zlut_generation_dialog(self) -> None: if self._zlut_generation_dialog is None or not self._zlut_generation_dialog.isVisible(): return if self._zlut_generation_phase in {'idle', 'complete'} and not self._zlut_evaluation_bead_ids: self._clear_zlut_generation_preview() return now = time() if now - self._zlut_preview_last_poll < 1.0: return self._zlut_preview_last_poll = now if self._zlut_sweep_dataset is None: try: self._zlut_sweep_dataset = ZLUTSweepDataset.attach(locks=self.locks) except (DatasetNotReadyError, FileNotFoundError): self._clear_zlut_generation_preview() return try: self._refresh_zlut_preview_from_dataset() except FileNotFoundError: self._detach_zlut_sweep_dataset() self._clear_zlut_generation_preview()
[docs] def _refresh_zlut_preview_from_dataset(self) -> None: if self._zlut_generation_dialog is None or self._zlut_sweep_dataset is None: return dataset = self._zlut_sweep_dataset preview_snapshot = self._read_zlut_preview_snapshot(dataset) count = int(preview_snapshot['count']) available_bead_ids = list(preview_snapshot['available_bead_ids']) if available_bead_ids != self._zlut_evaluation_bead_ids: self._zlut_evaluation_bead_ids = available_bead_ids if self._zlut_evaluation_selected_bead_id not in self._zlut_evaluation_bead_ids: self._zlut_evaluation_selected_bead_id = ( None if not self._zlut_evaluation_bead_ids else self._zlut_evaluation_bead_ids[0] ) self._zlut_generation_dialog.update_evaluation( active=self._zlut_generation_phase == 'evaluating', bead_ids=self._zlut_evaluation_bead_ids, selected_bead_id=self._zlut_evaluation_selected_bead_id, ) preview_payload = self._build_zlut_preview_payload( preview_snapshot, z_axis_min_nm=self._zlut_generation_z_axis_min_nm, z_axis_max_nm=self._zlut_generation_z_axis_max_nm, z_axis_descending=self._zlut_generation_z_axis_descending, ) preview_payload['count'] = count self._zlut_generation_dialog.preview_widget.update_preview(**preview_payload)
[docs] class LoadingWindow(QMainWindow): def __init__(self): super().__init__() # Set up the window self.setWindowTitle('Loading...') self.setFixedSize(700, 300) self.setStyleSheet('background-color: white;') self.setWindowFlags(Qt.WindowType.FramelessWindowHint | Qt.WindowType.WindowStaysOnTopHint) # Create central widget central_widget = QWidget() self.setCentralWidget(central_widget) # Layout layout = QVBoxLayout(central_widget) layout.setContentsMargins(20, 20, 20, 20) layout.setSpacing(15) # Loading label
[docs] self.label = QLabel('MagScope' + '\n\n' + 'loading ...')
self.label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.label.setStyleSheet('color: black; font-_count: 20px;') layout.addWidget(self.label) # Center the window on the screen frame_geometry = self.frameGeometry() center_point = self.screen().availableGeometry().center() frame_geometry.moveCenter(center_point) self.move(frame_geometry.topLeft())
[docs] class AddColumnDropTarget(QFrame): """Drop target that creates a new column when a panel is dropped.""" def __init__(self, controls: "Controls") -> None: super().__init__()
[docs] self._controls = controls
[docs] self._drag_active = False
self.setObjectName("add_column_drop_target") self.setAcceptDrops(True) self.setMinimumWidth(300) self.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding) layout = QVBoxLayout(self) layout.setContentsMargins(12, 12, 12, 12) layout.setSpacing(6) layout.addStretch(1) label = QLabel("Drop here to create a new column") label.setWordWrap(True) label.setAlignment(Qt.AlignmentFlag.AlignCenter) layout.addWidget(label) layout.addStretch(1) self._set_active(False) self.setVisible(False)
[docs] def set_drag_active(self, active: bool) -> None: """Toggle visibility based on whether a panel is being dragged.""" self._drag_active = active self._update_visibility()
[docs] def refresh_visibility(self) -> None: self._update_visibility()
[docs] def _update_visibility(self) -> None: should_show = self._drag_active and self._controls.has_room_for_new_column() self.setVisible(should_show) if not should_show: self._set_active(False)
[docs] def _set_active(self, active: bool) -> None: color = "palette(highlight)" if active else "palette(midlight)" self.setStyleSheet( "#add_column_drop_target { border: 2px dashed %s; border-radius: 6px; }" % color )
[docs] def _wrapper_from_event(self, event) -> PanelWrapper | None: manager = self._controls.layout_manager if manager is None: return None if not self._controls.has_room_for_new_column(): return None mime_data = event.mimeData() if not mime_data.hasFormat(PANEL_MIME_TYPE): return None panel_id_bytes = mime_data.data(PANEL_MIME_TYPE) if panel_id_bytes.isEmpty(): return None panel_id = bytes(panel_id_bytes).decode("utf-8") return manager.wrapper_for_id(panel_id)
[docs] def dragEnterEvent(self, event) -> None: # type: ignore[override] if self._wrapper_from_event(event) is not None: self._set_active(True) event.acceptProposedAction() else: event.ignore()
[docs] def dragMoveEvent(self, event) -> None: # type: ignore[override] if self._wrapper_from_event(event) is not None: self._set_active(True) event.acceptProposedAction() else: event.ignore()
[docs] def dragLeaveEvent(self, event) -> None: # type: ignore[override] self._set_active(False) super().dragLeaveEvent(event)
[docs] def dropEvent(self, event) -> None: # type: ignore[override] wrapper = self._wrapper_from_event(event) self._set_active(False) if wrapper is None: event.ignore() return if not self._controls.has_room_for_new_column(): event.ignore() return self._controls.create_new_column_with_panel(wrapper) event.acceptProposedAction()
[docs] class Controls(QWidget): """Container widget hosting draggable, persistent control panels."""
[docs] LAYOUT_SETTINGS_GROUP = "controls/layout"
def __init__(self, manager: UIManager): super().__init__()
[docs] self.manager = manager
[docs] self.panels: dict[str, ControlPanelBase | QWidget] = {}
[docs] self._settings = QSettings("MagScope", "MagScope")
layout = QHBoxLayout(self) layout.setSpacing(6) layout.setContentsMargins(0, 0, 0, 0)
[docs] self._columns_layout = layout
[docs] self._column_scrolls: dict[str, QScrollArea] = {}
[docs] self._column_prefix = "column"
[docs] self._column_counter = 1
[docs] self._base_columns = {"left"}
[docs] self._suppress_layout_callback = False
[docs] self.layout_manager = PanelLayoutManager( self._settings, self.LAYOUT_SETTINGS_GROUP, [], on_layout_changed=self._on_layout_changed, on_drag_active_changed=self._on_drag_active_changed, )
[docs] self._add_column_target = AddColumnDropTarget(self)
layout.addWidget(self._add_column_target) layout.addStretch(1) stored_layout = self.layout_manager.stored_layout() self._update_column_counter(stored_layout.keys()) self._add_column("left", pinned_ids={"HelpPanel", "ResetPanel"}, index=0) for name in stored_layout.keys(): if name in self.layout_manager.columns: continue self._add_column(name) if "right" not in self.layout_manager.columns and len(self.layout_manager.columns) < 2: self._add_column("right") # Instantiate standard panels
[docs] self.help_panel = HelpPanel(self.manager)
[docs] self.reset_panel = ResetPanel(self.manager)
[docs] self.settings_panel = MagScopeSettingsPanel(self.manager)
[docs] self.acquisition_panel = AcquisitionPanel(self.manager)
[docs] self.bead_selection_panel = BeadSelectionPanel(self.manager)
[docs] self.camera_panel = CameraPanel(self.manager)
[docs] self.histogram_panel = HistogramPanel(self.manager)
[docs] self.tracking_options_panel = TrackingOptionsPanel(self.manager)
[docs] self.plot_settings_panel = PlotSettingsPanel(self.manager)
[docs] self.allan_deviation_panel = ( AllanDeviationPanel(self.manager) if has_tweezepy_support() else None )
[docs] self.profile_panel = ProfilePanel(self.manager)
[docs] self.script_panel = ScriptPanel(self.manager)
[docs] self.status_panel = StatusPanel(self.manager)
[docs] self.xy_lock_panel = XYLockPanel(self.manager)
[docs] self.z_lock_panel = ZLockPanel(self.manager)
[docs] self.zlut_panel = ZLUTPanel(self.manager)
[docs] self.z_lut_generation_panel = ZLUTGenerationPanel(self.manager)
self.zlut_panel.zlut_file_selected.connect(self.manager.request_zlut_file) self.zlut_panel.zlut_clear_requested.connect(self.manager.clear_zlut) definitions: list[tuple[str, QWidget, str, bool]] = [ ("HelpPanel", self.help_panel, "left", False), ("ResetPanel", self.reset_panel, "left", False), ("MagScopeSettingsPanel", self.settings_panel, "left", True), ("StatusPanel", self.status_panel, "left", True), ("BeadSelectionPanel", self.bead_selection_panel, "left", True), ("CameraPanel", self.camera_panel, "left", True), ("AcquisitionPanel", self.acquisition_panel, "left", True), ("TrackingOptionsPanel", self.tracking_options_panel, "left", True), ("HistogramPanel", self.histogram_panel, "left", True), ("ProfilePanel", self.profile_panel, "left", True), ("PlotSettingsPanel", self.plot_settings_panel, "right", True), ("ZLUTPanel", self.zlut_panel, "right", True), ("ZLUTGenerationPanel", self.z_lut_generation_panel, "right", True), ("ScriptPanel", self.script_panel, "right", True), ("XYLockPanel", self.xy_lock_panel, "right", True), ("ZLockPanel", self.z_lock_panel, "right", True), ] if self.allan_deviation_panel is not None: definitions.insert( definitions.index(("ZLUTPanel", self.zlut_panel, "right", True)), ("AllanDeviationPanel", self.allan_deviation_panel, "right", True), ) column_names = list(self.layout_manager.columns.keys()) fallback_column = column_names[0] for panel_id, widget, column_name, draggable in definitions: self.panels[panel_id] = widget target_column = column_name if column_name in self.layout_manager.columns else fallback_column self.layout_manager.register_panel( panel_id, widget, target_column, draggable=draggable, ) column_names = list(self.layout_manager.columns.keys()) for control_factory, column in self.manager.controls_to_add: widget = control_factory(self.manager) panel_id = widget.__class__.__name__ if isinstance(column, int): index = min(max(column, 0), len(column_names) - 1) column_name = column_names[index] else: column_name = str(column) if column_name not in self.layout_manager.columns: column_name = column_names[0] self.panels[panel_id] = widget self.layout_manager.register_panel(panel_id, widget, column_name) self.layout_manager.restore_layout() self._prune_empty_columns() @property
[docs] def settings(self): return self.manager.settings
@settings.setter def settings(self, value): raise AttributeError("Read-only attribute.")
[docs] def _update_column_counter(self, column_names: Iterable[str]) -> None: prefix = f"{self._column_prefix}_" for name in column_names: if not name.startswith(prefix): continue suffix = name[len(prefix) :] try: value = int(suffix) except ValueError: continue if value >= self._column_counter: self._column_counter = value + 1
[docs] def _layout_insert_index(self, name: str) -> int: drop_index = self._columns_layout.indexOf(self._add_column_target) if drop_index == -1: drop_index = self._columns_layout.count() column_names = list(self.layout_manager.columns.keys()) target_index = column_names.index(name) count_before = sum( 1 for existing in column_names[:target_index] if existing in self._column_scrolls ) return min(drop_index, count_before)
[docs] def _add_column( self, name: str, *, pinned_ids: Iterable[str] | None = None, index: int | None = None, ) -> ReorderableColumn: if name in self.layout_manager.columns: column = self.layout_manager.columns[name] else: column = ReorderableColumn(name, pinned_ids=pinned_ids) column.setFixedWidth(300) self.layout_manager.add_column(name, column, index=index) if name not in self._column_scrolls: scroll = QScrollArea(self) scroll.setWidgetResizable(True) scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff) scroll.setFrameShape(QFrame.Shape.NoFrame) scroll.setWidget(column) scroll.setFixedWidth(320) insert_index = self._layout_insert_index(name) self._columns_layout.insertWidget(insert_index, scroll) self._column_scrolls[name] = scroll self._add_column_target.refresh_visibility() return column
[docs] def create_new_column_with_panel(self, wrapper: PanelWrapper) -> None: name = self._generate_column_name() column = self._add_column(name) column.add_panel(wrapper) wrapper.mark_drop_accepted() self.layout_manager.layout_changed()
[docs] def _generate_column_name(self) -> str: while True: name = f"{self._column_prefix}_{self._column_counter}" self._column_counter += 1 if name not in self.layout_manager.columns: return name
[docs] def _on_layout_changed(self, _layout: dict[str, list[str]]) -> None: if self._suppress_layout_callback: return self._prune_empty_columns()
[docs] def _on_drag_active_changed(self, active: bool) -> None: self._add_column_target.set_drag_active(active)
[docs] def _prune_empty_columns(self) -> None: removable = [ name for name, column in list(self.layout_manager.columns.items()) if name not in self._base_columns and not column.panels() ] for name in removable: self._remove_column(name)
[docs] def _remove_column(self, name: str) -> None: scroll = self._column_scrolls.pop(name, None) if scroll is not None: self._columns_layout.removeWidget(scroll) scroll.hide() scroll.deleteLater() column = self.layout_manager.columns.get(name) if column is None: return column.clear_placeholder() column.hide() column.setParent(None) column.deleteLater() self._suppress_layout_callback = True try: self.layout_manager.remove_column(name) finally: self._suppress_layout_callback = False self.layout_manager.layout_changed() self._add_column_target.refresh_visibility()
[docs] def reset_to_defaults(self) -> None: """Restore panel visibility, order, and columns to defaults.""" settings = QSettings("MagScope", "MagScope") settings.beginGroup(self.LAYOUT_SETTINGS_GROUP) settings.remove("") settings.endGroup() for panel in self.panels.values(): groupbox = getattr(panel, "groupbox", None) if isinstance(groupbox, CollapsibleGroupBox): settings.remove(groupbox.settings_key) groupbox.reset_to_default() for column in list(self.layout_manager.columns.values()): column.clear_placeholder() column.clear_panels() for scroll in list(self._column_scrolls.values()): self._columns_layout.removeWidget(scroll) scroll.hide() scroll.deleteLater() self._column_scrolls.clear() self.layout_manager.columns = OrderedDict() self._column_counter = 1 self._add_column("left", pinned_ids={"HelpPanel", "ResetPanel"}, index=0) self._add_column("right") for panel_id in self.layout_manager._default_order: wrapper = self.layout_manager.wrapper_for_id(panel_id) if wrapper is None: continue column_name = self.layout_manager._default_columns.get(panel_id, "left") if column_name not in self.layout_manager.columns: self._add_column(column_name) column = self.layout_manager.columns[column_name] column.add_panel(wrapper) self.layout_manager.layout_changed()
[docs] def has_room_for_new_column(self) -> bool: """Return True if a new column can fit beside the existing ones.""" layout_width = self._columns_layout.contentsRect().width() if layout_width <= 0: layout_width = self.width() spacing = max(0, self._columns_layout.spacing()) visible_scrolls = [scroll for scroll in self._column_scrolls.values() if scroll.isVisible()] if not visible_scrolls: return layout_width >= self._add_column_target.minimumWidth() column_width = visible_scrolls[0].width() or visible_scrolls[0].sizeHint().width() required_width = (len(visible_scrolls) + 1) * (column_width + spacing) return layout_width >= required_width
[docs] def resizeEvent(self, event): # type: ignore[override] super().resizeEvent(event) self._add_column_target.refresh_visibility()