magscope.datatypes

Contents

magscope.datatypes#

Shared-memory buffers used across MagScope.

This module introduces two circular buffers that let different processes share camera frames and other numeric data without copying large arrays:

VideoBuffer

Stores stacks of images in one shared-memory region together with capture timestamps. The class is designed for a producer process that records frames and one or more consumer processes that read them.

MatrixBuffer

Stores general two-dimensional numeric data such as bead positions or motor telemetry. Like VideoBuffer, it uses shared memory.

Both buffers rely on external multiprocessing.synchronize.Lock objects to coordinate access between processes. See the class docstrings below for usage details.

Attributes#

Exceptions#

BufferUnderflow

Raised when attempting to read from a buffer that contains no data.

BufferOverflow

Raised when attempting to write to a buffer that has no free slots.

DatasetNotReadyError

Raised when a shared-memory dataset exists but is not attachable yet.

Classes#

VideoBuffer

Shared memory ring buffer for video data

MatrixBuffer

Shared-memory ring buffer for 2D numeric data.

BeadRoiBuffer

Shared-memory store for bead ROI metadata.

LiveProfileBuffer

Shared buffer that stores the latest radial profile for live display.

ZLUTSweepDataset

Temporary shared-memory dataset used for Z-LUT sweep capture.

Functions#

int_to_uint_dtype(bits)

Return the unsigned integer NumPy dtype matching bits.

Module Contents#

magscope.datatypes.logger[source]#
class magscope.datatypes.VideoBuffer(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], n_stacks: int | None = None, width: int | None = None, height: int | None = None, n_images: int | None = None, bits: int | None = None)[source]#

Shared memory ring buffer for video data

Parameters:
  • create (bool) – True to allocate the shared-memory regions; False to attach to an existing buffer.

  • locks (dict[str, Lock]) – Mapping of buffer names to multiprocessing.Lock instances. The dictionary must contain an entry for VideoBuffer.

  • n_stacks (int, optional) – Number of temporal stacks stored in the buffer. Required when create is True.

  • width (int, optional) – Frame width in pixels. Required when create is True.

  • height (int, optional) – Frame height in pixels. Required when create is True.

  • n_images (int, optional) – Number of frames per stack. Required when create is True.

  • bits (int, optional) – Bit depth of each pixel. Required when create is True.

Notes

The buffer should first be created by a process with create=True. When creating, n_stacks, width, height, n_images and bits must be provided. After the shared memory exists, other processes can access the buffer with create=False.

name: str = 'VideoBuffer'[source]#
lock: multiprocessing.synchronize.Lock[source]#
_shm_info[source]#
stack_shape[source]#
image_shape[source]#
dtype[source]#
itemsize[source]#
n_images = None[source]#
n_total_images[source]#
image_size[source]#
stack_size[source]#
buffer_size[source]#
_shm[source]#
_ts_shm[source]#
_idx_shm[source]#
_buf[source]#
_ts_buf[source]#
_idx_buf[source]#
__del__()[source]#
_get_count_index()[source]#
_get_read_index()[source]#
_get_write_index()[source]#
_set_count_index(value)[source]#
_set_read_index(value)[source]#
_set_write_index(value)[source]#
_check_read(value)[source]#
_check_write(value)[source]#
_get_timestamps(read, length)[source]#
_set_timestamp(write, timestamp)[source]#
get_level()[source]#

Return the fraction of the buffer that currently holds data.

Returns:

Ratio between unread frames and total buffer capacity.

Return type:

float

get_unread_stack_count()[source]#

Return the number of full unread stacks currently buffered.

check_read_stack()[source]#

Return True when at least one full stack can be read.

Returns:

True if n_images frames are available to read; False otherwise.

Return type:

bool

peak_image()[source]#

Return the newest image and its index without acquiring the lock.

This helper supports lightweight live previews. Because the method does not acquire the lock, it may occasionally return a partially written frame or an older image.

Returns:

Tuple containing the newest image index and a memory view of the image bytes. Convert the memory view to a 2D array with dtype and image_shape.

Return type:

tuple of (int, memoryview)

peak_stack()[source]#

Return the next unread stack without advancing the read index.

Returns:

(stack, timestamps) where stack has shape (width, height, n_images) and timestamps is a float64 array aligned with the returned frames.

Return type:

tuple of numpy.ndarray

read_stack_no_return()[source]#

Advance the read index by one stack without returning data.

Returns:

This method updates the internal indices but produces no data.

Return type:

None

read_image()[source]#

Return the next unread image and its timestamp.

Returns:

Tuple consisting of the next unread frame as a 2D array with shape (width, height) and the corresponding timestamp in seconds.

Return type:

tuple of (numpy.ndarray, float)

write_timestamp(timestamp)[source]#

Increment the write index and store a timestamp without frame data.

Parameters:

timestamp (float) – Timestamp in seconds that should be associated with the next frame slot.

write_image_and_timestamp(image, timestamp)[source]#

Increment the write index, storing one image and its timestamp.

Parameters:
  • image (numpy.ndarray) – Frame data shaped (width, height) with the buffer’s dtype.

  • timestamp (float) – Timestamp in seconds associated with the frame.

class magscope.datatypes.MatrixBuffer(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], name: str, shape: tuple[int, int] = None)[source]#

Shared-memory ring buffer for 2D numeric data.

Parameters:
  • create (bool) – True to allocate the shared-memory regions; False to attach to an existing buffer.

  • locks (dict[str, Lock]) – Mapping of buffer names to multiprocessing.Lock instances. The dictionary must contain an entry for name.

  • name (str) – Identifier used for the shared-memory segments.

  • shape (tuple[int, int], optional) – Buffer shape expressed as (rows, columns). Required when create is True.

Notes

The buffer stores time-series style data where each row is a timestamp and each column is a measurement. Reads consume unread bytes, while peak helpers provide views without advancing indices.

name: str[source]#
lock: multiprocessing.synchronize.Lock[source]#
_shm_info[source]#
dtype: numpy.dtype[source]#
itemsize: int[source]#
strides: tuple[int, int][source]#
nbytes: int[source]#
_shm[source]#
_idx_shm[source]#
_buf[source]#
_idx_buf[source]#
__del__()[source]#
_get_count_index()[source]#
_get_read_index()[source]#
_get_write_index()[source]#
_set_count_index(value)[source]#
_set_read_index(value)[source]#
_set_write_index(value)[source]#
get_count_index()[source]#

Return the number of unread bytes currently stored in the buffer.

Returns:

Byte count representing unread data between the read and write indices.

Return type:

int

get_read_index()[source]#

Return the index of the next byte that will be read.

Returns:

Position within the shared buffer corresponding to the next read operation.

Return type:

int

get_write_index()[source]#

Return the index of the next byte that will be written.

Returns:

Position within the shared buffer corresponding to the next write operation.

Return type:

int

write(np_array)[source]#

Write np_array into the buffer, advancing the write index.

Parameters:

np_array (numpy.ndarray) – Array with shape[1] columns. Rows may wrap around to the start of the buffer if the write reaches the end of the allocated space.

read()[source]#

Return unread rows as a NumPy array and reset the read counter.

Returns:

Copy of the unread rows ordered chronologically.

Return type:

numpy.ndarray

peak_unsorted()[source]#

Return a view of the buffer without reordering indices.

Returns:

View into the shared memory representing the buffer layout.

Return type:

numpy.ndarray

peak_sorted()[source]#

Return the buffer contents ordered chronologically.

Returns:

Array containing the buffer rows in FIFO order without updating indices.

Return type:

numpy.ndarray

class magscope.datatypes.BeadRoiBuffer(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], capacity: int | None = None, name: str = 'BeadRoiBuffer')[source]#

Shared-memory store for bead ROI metadata.

The buffer uses a fixed row per bead id so readers can attach once and take compact snapshots of active ROIs without exchanging Python dictionaries over IPC.

name = 'BeadRoiBuffer'[source]#
lock: multiprocessing.synchronize.Lock[source]#
_info_fields = 4[source]#
_info_size = 32[source]#
_roi_dtype[source]#
_occupancy_dtype[source]#
_shm_info[source]#
capacity[source]#
_roi_shape[source]#
_roi_nbytes[source]#
_occupancy_nbytes[source]#
_shm_data[source]#
_shm_occupancy[source]#
_roi_matrix[source]#
_occupancy[source]#
__del__()[source]#
property max_id_plus_one: int[source]#
property active_count: int[source]#
property version: int[source]#
replace_beads(value: dict[int, tuple[int, int, int, int]]) None[source]#
add_beads(value: dict[int, tuple[int, int, int, int]]) None[source]#
update_beads(value: dict[int, tuple[int, int, int, int]]) None[source]#
remove_beads(ids) None[source]#
clear_beads() None[source]#
reorder_beads() dict[int, int][source]#
get_next_available_bead_id() int[source]#
get_beads() tuple[numpy.ndarray, numpy.ndarray][source]#
_normalize_bead_mapping(value: dict[int, tuple[int, int, int, int]]) dict[int, tuple[int, int, int, int]][source]#
_normalize_ids(ids) numpy.ndarray[source]#
_validate_bead_id(bead_id: int) int[source]#
_read_info(index: int) int[source]#
_write_info(index: int, value: int) None[source]#
_increment_version() None[source]#
class magscope.datatypes.LiveProfileBuffer(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], profile_capacity: int | None = None, name: str = 'LiveProfileBuffer')[source]#

Shared buffer that stores the latest radial profile for live display.

The buffer keeps a single row containing timestamp, bead_id, profile_length and the profile samples. It wraps a MatrixBuffer for shared-memory transport but hides the padding logic from callers so profiles can be written at their native length.

_buffer[source]#
profile_capacity[source]#
property shape: tuple[int, int][source]#
clear() None[source]#

Reset the buffer contents to NaN.

write_profile(timestamp: float, bead_id: int, profile: numpy.ndarray) None[source]#

Store the latest profile for a bead.

Parameters:
  • timestamp (float) – Timestamp associated with the profile.

  • bead_id (int) – Bead identifier for the profile.

  • profile (numpy.ndarray) – One-dimensional array of profile samples. Length must not exceed profile_capacity.

peak_unsorted() numpy.ndarray[source]#
class magscope.datatypes.ZLUTSweepDataset(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], capacity: int | None = None, profile_length: int | None = None, n_steps: int | None = None, n_beads: int | None = None, profiles_per_bead: int | None = None, name: str = NAME)[source]#

Temporary shared-memory dataset used for Z-LUT sweep capture.

The dataset stores one row per captured profile with aligned metadata arrays for bead id, step index, timestamp, motor Z, validity, and the full radial profile. Unlike VideoBuffer and MatrixBuffer, this object never wraps and never overwrites old entries. It is intended to be created and destroyed at runtime by the workflow owner, while peer processes attach to the fixed shared-memory names on demand.

NAME = 'ZLUTSweepDataset'[source]#
STATE_ABSENT = 0[source]#
STATE_CREATING = 1[source]#
STATE_READY = 2[source]#
STATE_CAPTURING = 3[source]#
STATE_COMPLETE = 4[source]#
STATE_DETACHING = 5[source]#
STATE_FAILED = 6[source]#
STATE_DESTROYED = 7[source]#
_INFO_FIELDS[source]#
_INFO_SIZE = 64[source]#
_SCHEMA_VERSION = 1[source]#
_UINT64_DTYPE[source]#
_BEAD_ID_DTYPE[source]#
_STEP_INDEX_DTYPE[source]#
_TIMESTAMP_DTYPE[source]#
_MOTOR_Z_DTYPE[source]#
_VALID_DTYPE[source]#
_PROFILE_DTYPE[source]#
_SEGMENT_SUFFIXES[source]#
_SHM_ATTRS = ('_shm_profiles', '_shm_valid', '_shm_motor_z', '_shm_timestamps', '_shm_step_indices',...[source]#
name = 'ZLUTSweepDataset'[source]#
lock: multiprocessing.synchronize.Lock[source]#
_owns_shared_memory[source]#
_closed = False[source]#
classmethod create(*, locks: dict[str, multiprocessing.synchronize.Lock], capacity: int, profile_length: int, n_steps: int, n_beads: int, profiles_per_bead: int, name: str = NAME) ZLUTSweepDataset[source]#
classmethod attach(*, locks: dict[str, multiprocessing.synchronize.Lock], name: str = NAME) ZLUTSweepDataset[source]#
__del__()[source]#
property state: int[source]#
set_state(value: int) None[source]#
write(*, bead_ids: numpy.ndarray, step_indices: numpy.ndarray, timestamps: numpy.ndarray, motor_z_values: numpy.ndarray, valid_flags: numpy.ndarray, profiles: numpy.ndarray) None[source]#
peak() dict[str, numpy.ndarray][source]#
read_preview(selected_bead_id: int | None = None) dict[str, object][source]#
get_count() int[source]#
get_capacity() int[source]#
close() None[source]#
destroy() None[source]#
_validate_schema_version() None[source]#
_validate_attach_ready_state() None[source]#
_read_info(field: str) int[source]#
_write_info(field: str, value: int) None[source]#
classmethod _validate_create_parameters(*, capacity: int | None, profile_length: int | None, n_steps: int | None, n_beads: int | None, profiles_per_bead: int | None) dict[str, int][source]#
_cleanup_shared_memory_segments(*, unlink: bool) None[source]#
exception magscope.datatypes.BufferUnderflow[source]#

Bases: Exception

Raised when attempting to read from a buffer that contains no data.

exception magscope.datatypes.BufferOverflow[source]#

Bases: Exception

Raised when attempting to write to a buffer that has no free slots.

exception magscope.datatypes.DatasetNotReadyError[source]#

Bases: Exception

Raised when a shared-memory dataset exists but is not attachable yet.

magscope.datatypes.bit_to_dtype[source]#
magscope.datatypes.int_to_uint_dtype(bits: int)[source]#

Return the unsigned integer NumPy dtype matching bits.

Parameters:

bits (int) – Width of the target integer in bits. Supported values are 8, 16, 32 and 64.

Returns:

Unsigned integer dtype corresponding to bits.

Return type:

numpy.dtype

Raises:

ValueError – If bits is not one of the supported widths.