"""Shared-memory buffers used across MagScope.
This module introduces two circular buffers that let different processes share
camera frames and other numeric data without copying large arrays:
``VideoBuffer``
Stores stacks of images in one shared-memory region together with capture
timestamps. The class is designed for a producer process that records
frames and one or more consumer processes that read them.
``MatrixBuffer``
Stores general two-dimensional numeric data such as bead positions or
motor telemetry. Like :class:`VideoBuffer`, it uses shared memory.
Both buffers rely on external :class:`multiprocessing.synchronize.Lock`
objects to coordinate access between processes. See the class docstrings
below for usage details.
"""
import struct
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.synchronize import Lock
import numpy as np
from ._logging import get_logger
[docs]
logger = get_logger("datatypes")
[docs]
class VideoBuffer:
"""Shared memory ring buffer for video data
Parameters
----------
create : bool
``True`` to allocate the shared-memory regions; ``False`` to attach to
an existing buffer.
locks : dict[str, Lock]
Mapping of buffer names to :class:`multiprocessing.Lock` instances. The
dictionary must contain an entry for ``VideoBuffer``.
n_stacks : int, optional
Number of temporal stacks stored in the buffer. Required when
``create`` is ``True``.
width : int, optional
Frame width in pixels. Required when ``create`` is ``True``.
height : int, optional
Frame height in pixels. Required when ``create`` is ``True``.
n_images : int, optional
Number of frames per stack. Required when ``create`` is ``True``.
bits : int, optional
Bit depth of each pixel. Required when ``create`` is ``True``.
Notes
-----
The buffer should first be created by a process with ``create=True``. When
creating, ``n_stacks``, ``width``, ``height``, ``n_images`` and ``bits``
must be provided. After the shared memory exists, other processes can
access the buffer with ``create=False``.
"""
def __init__(self, *,
create: bool,
locks: dict[str, Lock],
n_stacks: int|None=None,
width: int|None=None,
height: int|None=None,
n_images: int|None=None,
bits: int|None=None):
[docs]
self.name: str = type(self).__name__
[docs]
self.lock: Lock = locks[self.name]
# Some meta-data to describe the buffer is stored in the shared memory
# along with the buffer itself. The first creator writes that metadata,
# and subsequent processes read the stored values so they can interpret
# the underlying byte buffers.
[docs]
self._shm_info = SharedMemory(
create=create, name=self.name + ' Info', size=8 * 5)
if create:
if any(param is None for param in [n_stacks, width, height, n_images, bits]):
raise ValueError("VideoBuffer misconfigured")
self.n_stacks = n_stacks
self._shm_info.buf[0:8] = int(n_stacks).to_bytes(8, byteorder='big')
self._shm_info.buf[8:16] = int(width).to_bytes(8, byteorder='big')
self._shm_info.buf[16:24] = int(height).to_bytes(8, byteorder='big')
self._shm_info.buf[24:32] = int(n_images).to_bytes(8, byteorder='big')
self._shm_info.buf[32:40] = int(bits).to_bytes(8, byteorder='big')
else:
self.n_stacks = int.from_bytes(self._shm_info.buf[0:8], byteorder='big')
width = int.from_bytes(self._shm_info.buf[8:16], byteorder='big')
height = int.from_bytes(self._shm_info.buf[16:24], byteorder='big')
n_images = int.from_bytes(self._shm_info.buf[24:32], byteorder='big')
bits = int.from_bytes(self._shm_info.buf[32:40], byteorder='big')
# Setup more meta-data
[docs]
self.stack_shape = (width, height, n_images)
[docs]
self.image_shape = (width, height)
[docs]
self.dtype = int_to_uint_dtype(bits)
[docs]
self.itemsize = np.dtype(self.dtype).itemsize
[docs]
self.n_images = n_images
[docs]
self.n_total_images = self.n_images * self.n_stacks
[docs]
self.image_size = width * height * self.itemsize
[docs]
self.stack_size = self.image_size * self.n_images
[docs]
self.buffer_size = self.stack_size * self.n_stacks
if create:
logger.info('Creating VideoBuffer with size %s MB', self.buffer_size / 1e6)
# Setup the buffer and buffer indexes
[docs]
self._shm = SharedMemory(
create=create, name=self.name, size=self.buffer_size)
[docs]
self._ts_shm = SharedMemory(
create=create,
name=self.name + ' Timestamps',
size=8 * self.n_total_images)
[docs]
self._idx_shm = SharedMemory(
create=create, name=self.name + ' Index', size=24)
[docs]
self._buf = self._shm.buf
[docs]
self._ts_buf = self._ts_shm.buf
[docs]
self._idx_buf = self._idx_shm.buf
# Initialise the buffer and indexes when creating for the first time
if create:
self._set_read_index(0)
self._set_write_index(0)
self._set_count_index(0)
[docs]
def __del__(self):
if hasattr(self, '_shm'):
self._shm.close()
if hasattr(self, '_idx_shm'):
self._idx_shm.close()
if hasattr(self, '_shm_info'):
self._shm_info.close()
[docs]
def _get_count_index(self):
return int.from_bytes(self._idx_buf[16:24], byteorder='big')
[docs]
def _get_read_index(self):
return int.from_bytes(self._idx_buf[0:8], byteorder='big')
[docs]
def _get_write_index(self):
return int.from_bytes(self._idx_buf[8:16], byteorder='big')
[docs]
def _set_count_index(self, value):
self._idx_buf[16:24] = int(value).to_bytes(8, byteorder='big')
[docs]
def _set_read_index(self, value):
value = value % self.n_total_images
self._idx_buf[0:8] = int(value).to_bytes(8, byteorder='big')
[docs]
def _set_write_index(self, value):
value = value % self.n_total_images
self._idx_buf[8:16] = int(value).to_bytes(8, byteorder='big')
[docs]
def _check_read(self, value):
if value > self._get_count_index():
raise BufferUnderflow('BufferUnderflow')
[docs]
def _check_write(self, value):
if value > (self.n_total_images - self._get_count_index()):
raise BufferOverflow('BufferOverflow')
[docs]
def _get_timestamps(self, read, length):
buf = self._ts_buf[(read * 8):((read + length) * 8)]
return np.ndarray((length, ), dtype='float64', buffer=buf)
[docs]
def _set_timestamp(self, write, timestamp):
self._ts_buf[(write * 8):((write + 1) * 8)] = struct.pack(
'd', timestamp)
[docs]
def get_level(self):
"""Return the fraction of the buffer that currently holds data.
Returns
-------
float
Ratio between unread frames and total buffer capacity.
"""
with self.lock:
return self._get_count_index() / self.n_total_images
[docs]
def get_unread_stack_count(self):
"""Return the number of full unread stacks currently buffered."""
with self.lock:
return self._get_count_index() // self.n_images
[docs]
def check_read_stack(self):
"""Return ``True`` when at least one full stack can be read.
Returns
-------
bool
``True`` if ``n_images`` frames are available to read; ``False``
otherwise.
"""
with self.lock:
try:
self._check_read(self.n_images)
except BufferUnderflow:
return False
else:
return True
[docs]
def peak_image(self):
"""Return the newest image and its index without acquiring the lock.
This helper supports lightweight live previews. Because the method does
not acquire the lock, it may occasionally return a partially written
frame or an older image.
Returns
-------
tuple of (int, memoryview)
Tuple containing the newest image index and a memory view of the
image bytes. Convert the memory view to a 2D array with
``dtype`` and ``image_shape``.
"""
read = (self._get_write_index() - 1) % self.n_total_images
return read, self._buf[(read * self.image_size):((read + 1) *
self.image_size)]
[docs]
def peak_stack(self):
"""Return the next unread stack without advancing the read index.
Returns
-------
tuple of numpy.ndarray
``(stack, timestamps)`` where ``stack`` has shape
``(width, height, n_images)`` and ``timestamps`` is a ``float64``
array aligned with the returned frames.
"""
with self.lock:
self._check_read(self.n_images)
read = self._get_read_index()
stack_bytes = self._buf[(read *
self.image_size):((read + self.n_images) *
self.image_size)]
# Transposed stack, axes=(T,Y,X)
trans_stack = np.ndarray(self.stack_shape[::-1],
dtype=self.dtype,
buffer=stack_bytes)
# Stack, axes=(X,Y,T)
stack = trans_stack.transpose(2, 1, 0)
timestamps = self._get_timestamps(read, self.n_images)
return stack, timestamps
[docs]
def read_stack_no_return(self):
"""Advance the read index by one stack without returning data.
Returns
-------
None
This method updates the internal indices but produces no data.
"""
with self.lock:
self._check_read(self.n_images)
read = self._get_read_index()
count = self._get_count_index()
self._set_read_index(read + self.n_images)
self._set_count_index(count - self.n_images)
[docs]
def read_image(self):
"""Return the next unread image and its timestamp.
Returns
-------
tuple of (numpy.ndarray, float)
Tuple consisting of the next unread frame as a 2D array with shape
``(width, height)`` and the corresponding timestamp in seconds.
"""
with self.lock:
self._check_read(1)
read = self._get_read_index()
count = self._get_count_index()
self._set_read_index(read + 1)
self._set_count_index(count - 1)
image_bytes = self._buf[(read * self.image_size):((read + 1) *
self.image_size)]
trans_image = np.ndarray(self.image_shape[::-1],
dtype=self.dtype,
buffer=image_bytes)
image = trans_image.transpose(1, 0)
timestamp = self._get_timestamps(read, 1)[0]
return image, timestamp
[docs]
def write_timestamp(self, timestamp):
"""Increment the write index and store a timestamp without frame data.
Parameters
----------
timestamp : float
Timestamp in seconds that should be associated with the next frame
slot.
"""
with self.lock:
self._check_write(1)
write = self._get_write_index()
count = self._get_count_index()
self._set_timestamp(write, timestamp)
self._set_write_index(write + 1)
self._set_count_index(count + 1)
[docs]
def write_image_and_timestamp(self, image, timestamp):
"""Increment the write index, storing one image and its timestamp.
Parameters
----------
image : numpy.ndarray
Frame data shaped ``(width, height)`` with the buffer's ``dtype``.
timestamp : float
Timestamp in seconds associated with the frame.
"""
with self.lock:
self._check_write(1)
write = self._get_write_index()
count = self._get_count_index()
self._buf[(write * self.image_size):((write + 1) *
self.image_size)] = image
self._set_timestamp(write, timestamp)
self._set_write_index(write + 1)
self._set_count_index(count + 1)
[docs]
class MatrixBuffer:
"""Shared-memory ring buffer for 2D numeric data.
Parameters
----------
create : bool
``True`` to allocate the shared-memory regions; ``False`` to attach to
an existing buffer.
locks : dict[str, Lock]
Mapping of buffer names to :class:`multiprocessing.Lock` instances. The
dictionary must contain an entry for ``name``.
name : str
Identifier used for the shared-memory segments.
shape : tuple[int, int], optional
Buffer shape expressed as ``(rows, columns)``. Required when
``create`` is ``True``.
Notes
-----
The buffer stores time-series style data where each row is a timestamp and
each column is a measurement. Reads consume unread bytes, while ``peak``
helpers provide views without advancing indices.
"""
def __init__(self, *,
create: bool,
locks: dict[str, Lock],
name: str,
shape: tuple[int, int]=None):
[docs]
self.lock: Lock = locks[self.name]
# Some meta-data to describe the buffer is stored in the shared memory
# along with the buffer itself. The first creator writes that metadata,
# and subsequent processes read the stored values so they can interpret
# the underlying byte buffers.
[docs]
self._shm_info = SharedMemory(
create=create, name=self.name + ' Info', size=8 * 2)
if create:
if shape is None:
raise ValueError('shape must be specified when creating a MatrixBuffer')
self.shape = shape
r: int = self.shape[0]
c: int = self.shape[1]
self._shm_info.buf[0:8] = int(r).to_bytes(8, byteorder='big')
self._shm_info.buf[8:16] = int(c).to_bytes(8, byteorder='big')
else:
r: int = int.from_bytes(self._shm_info.buf[0:8], byteorder='big')
c: int = int.from_bytes(self._shm_info.buf[8:16], byteorder='big')
self.shape: tuple[int, int] = (r, c)
# Setup more meta-data
[docs]
self.dtype: np.dtype = np.dtype(np.float64)
[docs]
self.itemsize: int = self.dtype.itemsize
[docs]
self.strides: tuple[int, int] = (self.shape[1] * self.itemsize, self.itemsize)
[docs]
self.nbytes: int = self.shape[0] * self.shape[1] * self.itemsize
# Setup the buffer and buffer indexes
[docs]
self._shm = SharedMemory(
create=create, name=self.name, size=self.nbytes)
[docs]
self._idx_shm = SharedMemory(
create=create, name=self.name + ' Index', size=24)
[docs]
self._buf = self._shm.buf
[docs]
self._idx_buf = self._idx_shm.buf
# Initialise the buffer and indexes when creating for the first time
if create:
self._set_read_index(0)
self._set_write_index(0)
self._set_count_index(0)
self.write(np.ones(shape, dtype=self.dtype) + np.nan)
self._set_count_index(0)
[docs]
def __del__(self):
self._shm.close()
self._idx_shm.close()
[docs]
def _get_count_index(self):
return int.from_bytes(self._idx_buf[16:24], byteorder='big')
[docs]
def _get_read_index(self):
return int.from_bytes(self._idx_buf[0:8], byteorder='big')
[docs]
def _get_write_index(self):
return int.from_bytes(self._idx_buf[8:16], byteorder='big')
[docs]
def _set_count_index(self, value):
self._idx_buf[16:24] = int(value).to_bytes(8, byteorder='big')
[docs]
def _set_read_index(self, value):
value = value % self.nbytes
self._idx_buf[0:8] = int(value).to_bytes(8, byteorder='big')
[docs]
def _set_write_index(self, value):
value = value % self.nbytes
self._idx_buf[8:16] = int(value).to_bytes(8, byteorder='big')
[docs]
def get_count_index(self):
"""Return the number of unread bytes currently stored in the buffer.
Returns
-------
int
Byte count representing unread data between the read and write
indices.
"""
with self.lock:
return self._get_count_index()
[docs]
def get_read_index(self):
"""Return the index of the next byte that will be read.
Returns
-------
int
Position within the shared buffer corresponding to the next read
operation.
"""
with self.lock:
return self._get_read_index()
[docs]
def get_write_index(self):
"""Return the index of the next byte that will be written.
Returns
-------
int
Position within the shared buffer corresponding to the next write
operation.
"""
with self.lock:
return self._get_write_index()
[docs]
def write(self, np_array):
"""Write ``np_array`` into the buffer, advancing the write index.
Parameters
----------
np_array : numpy.ndarray
Array with ``shape[1]`` columns. Rows may wrap around to the start
of the buffer if the write reaches the end of the allocated space.
"""
assert np_array.shape[0] <= self.shape[0]
assert np_array.shape[1] == self.shape[1]
with self.lock:
write = self._get_write_index()
count = self._get_count_index()
r = min(np_array.nbytes, self.nbytes - write)
l = np_array.nbytes - r
self._buf[write:(write + r)] = np.ravel(np_array).view('uint8')[0:r].tobytes() # right
self._buf[0:l] = np.ravel(np_array).view('uint8')[r:].tobytes() # left
self._set_write_index(write + np_array.nbytes)
self._set_count_index(count + np_array.nbytes)
[docs]
def read(self):
"""Return unread rows as a NumPy array and reset the read counter.
Returns
-------
numpy.ndarray
Copy of the unread rows ordered chronologically.
"""
with self.lock:
count = self._get_count_index()
read = self._get_read_index()
write = self._get_write_index()
assert count >= 0
self._set_read_index(read + count)
self._set_count_index(0)
# Does the unread portion wrap around the end of the _buf
if read <= write: # no wrap
n = count // self.shape[1] // self.itemsize
return np.ndarray(shape=(n, self.shape[1]),
dtype=self.dtype,
buffer=self._buf[read:(read +
count)]).copy()
else: # wrap
right = self._buf[read:self.nbytes]
left = self._buf[0:write]
r = len(right) // self.shape[1] // self.itemsize
l = len(left) // self.shape[1] // self.itemsize
np_array_right = np.ndarray(shape=(r, self.shape[1]),
dtype=self.dtype,
buffer=right)
np_array_left = np.ndarray(shape=(l, self.shape[1]),
dtype=self.dtype,
buffer=left)
return np.vstack((np_array_right, np_array_left)).copy()
[docs]
def peak_unsorted(self):
"""Return a view of the buffer without reordering indices.
Returns
-------
numpy.ndarray
View into the shared memory representing the buffer layout.
"""
with self.lock:
return np.ndarray(self.shape, dtype=self.dtype, buffer=self._buf)
[docs]
def peak_sorted(self):
"""Return the buffer contents ordered chronologically.
Returns
-------
numpy.ndarray
Array containing the buffer rows in FIFO order without updating
indices.
"""
with self.lock:
write = self._get_write_index()
right = self._buf[write:self.nbytes]
left = self._buf[0:write]
r = int(len(right) / self.shape[1] / self.itemsize)
l = self.shape[0] - r
np_array_right = np.ndarray((r, self.shape[1]),
dtype=self.dtype,
buffer=right)
np_array_left = np.ndarray((l, self.shape[1]),
dtype=self.dtype,
buffer=left)
return np.vstack((np_array_right, np_array_left))
[docs]
class BeadRoiBuffer:
"""Shared-memory store for bead ROI metadata.
The buffer uses a fixed row per bead id so readers can attach once and take
compact snapshots of active ROIs without exchanging Python dictionaries over
IPC.
"""
def __init__(
self,
*,
create: bool,
locks: dict[str, Lock],
capacity: int | None = None,
name: str = 'BeadRoiBuffer',
):
[docs]
self.lock: Lock = locks[self.name]
[docs]
self._info_size = 8 * self._info_fields
[docs]
self._roi_dtype = np.dtype(np.uint32)
[docs]
self._occupancy_dtype = np.dtype(np.uint8)
[docs]
self._shm_info = SharedMemory(
create=create,
name=self.name + ' Info',
size=self._info_size,
)
if create:
if capacity is None:
raise ValueError('capacity must be provided when creating BeadRoiBuffer')
self._write_info(0, int(capacity))
self._write_info(1, 0)
self._write_info(2, 0)
self._write_info(3, 0)
elif capacity is not None and capacity != self._read_info(0):
raise ValueError('capacity does not match existing BeadRoiBuffer')
[docs]
self.capacity = self._read_info(0)
[docs]
self._roi_shape = (self.capacity, 4)
[docs]
self._roi_nbytes = int(np.prod(self._roi_shape)) * self._roi_dtype.itemsize
[docs]
self._occupancy_nbytes = self.capacity * self._occupancy_dtype.itemsize
[docs]
self._shm_data = SharedMemory(
create=create,
name=self.name + ' Data',
size=self._roi_nbytes,
)
[docs]
self._shm_occupancy = SharedMemory(
create=create,
name=self.name + ' Occupancy',
size=self._occupancy_nbytes,
)
[docs]
self._roi_matrix = np.ndarray(self._roi_shape, dtype=self._roi_dtype, buffer=self._shm_data.buf)
[docs]
self._occupancy = np.ndarray((self.capacity,), dtype=self._occupancy_dtype, buffer=self._shm_occupancy.buf)
if create:
self._roi_matrix.fill(0)
self._occupancy.fill(0)
[docs]
def __del__(self):
if hasattr(self, '_shm_data'):
self._shm_data.close()
if hasattr(self, '_shm_occupancy'):
self._shm_occupancy.close()
if hasattr(self, '_shm_info'):
self._shm_info.close()
@property
[docs]
def max_id_plus_one(self) -> int:
with self.lock:
return self._read_info(1)
@property
[docs]
def active_count(self) -> int:
with self.lock:
return self._read_info(2)
@property
[docs]
def version(self) -> int:
with self.lock:
return self._read_info(3)
[docs]
def replace_beads(self, value: dict[int, tuple[int, int, int, int]]) -> None:
validated = self._normalize_bead_mapping(value)
with self.lock:
self._roi_matrix.fill(0)
self._occupancy.fill(0)
if validated:
bead_ids = np.fromiter(validated.keys(), dtype=np.uint32, count=len(validated))
rois = np.asarray(list(validated.values()), dtype=self._roi_dtype)
self._roi_matrix[bead_ids] = rois
self._occupancy[bead_ids] = 1
max_id_plus_one = int(bead_ids.max()) + 1
else:
max_id_plus_one = 0
self._write_info(1, max_id_plus_one)
self._write_info(2, len(validated))
self._increment_version()
[docs]
def add_beads(self, value: dict[int, tuple[int, int, int, int]]) -> None:
validated = self._normalize_bead_mapping(value)
if not validated:
return
with self.lock:
bead_ids = np.fromiter(validated.keys(), dtype=np.uint32, count=len(validated))
occupied = self._occupancy[bead_ids] != 0
if np.any(occupied):
existing_ids = bead_ids[occupied].tolist()
raise ValueError(f'bead ids already exist: {existing_ids}')
rois = np.asarray(list(validated.values()), dtype=self._roi_dtype)
self._roi_matrix[bead_ids] = rois
self._occupancy[bead_ids] = 1
self._write_info(1, max(self._read_info(1), int(bead_ids.max()) + 1))
self._write_info(2, self._read_info(2) + len(validated))
self._increment_version()
[docs]
def update_beads(self, value: dict[int, tuple[int, int, int, int]]) -> None:
validated = self._normalize_bead_mapping(value)
if not validated:
return
with self.lock:
bead_ids = np.fromiter(validated.keys(), dtype=np.uint32, count=len(validated))
occupied = self._occupancy[bead_ids] != 0
if not np.all(occupied):
missing_ids = bead_ids[~occupied].tolist()
raise ValueError(f'bead ids do not exist: {missing_ids}')
self._roi_matrix[bead_ids] = np.asarray(list(validated.values()), dtype=self._roi_dtype)
self._increment_version()
[docs]
def remove_beads(self, ids) -> None:
normalized_ids = self._normalize_ids(ids)
if normalized_ids.size == 0:
return
with self.lock:
occupied_mask = self._occupancy[normalized_ids] != 0
if not np.any(occupied_mask):
return
bead_ids = normalized_ids[occupied_mask]
self._occupancy[bead_ids] = 0
self._roi_matrix[bead_ids] = 0
self._write_info(2, max(0, self._read_info(2) - bead_ids.size))
self._increment_version()
[docs]
def clear_beads(self) -> None:
with self.lock:
self._roi_matrix.fill(0)
self._occupancy.fill(0)
self._write_info(1, 0)
self._write_info(2, 0)
self._increment_version()
[docs]
def reorder_beads(self) -> dict[int, int]:
with self.lock:
bead_ids = np.flatnonzero(self._occupancy[:self._read_info(1)])
if bead_ids.size == 0:
self._write_info(1, 0)
self._write_info(2, 0)
self._increment_version()
return {}
original_rois = self._roi_matrix[bead_ids].copy()
mapping = {int(old_id): int(new_id) for new_id, old_id in enumerate(bead_ids.tolist())}
self._roi_matrix.fill(0)
self._occupancy.fill(0)
new_ids = np.arange(bead_ids.size, dtype=np.uint32)
self._roi_matrix[new_ids] = original_rois
self._occupancy[new_ids] = 1
self._write_info(1, bead_ids.size)
self._write_info(2, bead_ids.size)
self._increment_version()
return mapping
[docs]
def get_next_available_bead_id(self) -> int:
with self.lock:
return self._read_info(1)
[docs]
def get_beads(self) -> tuple[np.ndarray, np.ndarray]:
with self.lock:
occupied = self._occupancy[:self._read_info(1)] != 0
bead_ids = np.flatnonzero(occupied).astype(np.uint32, copy=False)
rois = self._roi_matrix[bead_ids].copy()
return bead_ids, rois
[docs]
def _normalize_bead_mapping(
self,
value: dict[int, tuple[int, int, int, int]],
) -> dict[int, tuple[int, int, int, int]]:
normalized: dict[int, tuple[int, int, int, int]] = {}
for bead_id, roi in value.items():
bead_id_int = self._validate_bead_id(bead_id)
if len(roi) != 4:
raise ValueError(f'ROI for bead {bead_id_int} must contain four values')
roi_values = tuple(int(coord) for coord in roi)
if min(roi_values) < 0:
raise ValueError(f'ROI for bead {bead_id_int} cannot contain negative values')
normalized[bead_id_int] = roi_values
return normalized
[docs]
def _normalize_ids(self, ids) -> np.ndarray:
normalized = [self._validate_bead_id(bead_id) for bead_id in ids]
if not normalized:
return np.zeros((0,), dtype=np.uint32)
return np.asarray(normalized, dtype=np.uint32)
[docs]
def _validate_bead_id(self, bead_id: int) -> int:
bead_id_int = int(bead_id)
if bead_id_int < 0 or bead_id_int >= self.capacity:
raise ValueError(f'bead id {bead_id_int} is out of range 0..{self.capacity - 1}')
return bead_id_int
[docs]
def _read_info(self, index: int) -> int:
start = index * 8
end = start + 8
return int.from_bytes(self._shm_info.buf[start:end], byteorder='big')
[docs]
def _write_info(self, index: int, value: int) -> None:
start = index * 8
end = start + 8
self._shm_info.buf[start:end] = int(value).to_bytes(8, byteorder='big')
[docs]
def _increment_version(self) -> None:
self._write_info(3, self._read_info(3) + 1)
[docs]
class LiveProfileBuffer:
"""Shared buffer that stores the latest radial profile for live display.
The buffer keeps a single row containing ``timestamp``, ``bead_id``,
``profile_length`` and the profile samples. It wraps a
:class:`MatrixBuffer` for shared-memory transport but hides the padding
logic from callers so profiles can be written at their native length.
"""
def __init__(
self,
*,
create: bool,
locks: dict[str, Lock],
profile_capacity: int | None = None,
name: str = 'LiveProfileBuffer',
):
if create and profile_capacity is None:
raise ValueError('profile_capacity must be provided when creating LiveProfileBuffer')
shape = None if not create else (1, 3 + profile_capacity)
[docs]
self._buffer = MatrixBuffer(create=create, locks=locks, name=name, shape=shape)
[docs]
self.profile_capacity = self._buffer.shape[1] - 3
@property
[docs]
def shape(self) -> tuple[int, int]:
return self._buffer.shape
[docs]
def clear(self) -> None:
"""Reset the buffer contents to ``NaN``."""
empty_row = np.full((1, 3 + self.profile_capacity), np.nan, dtype=np.float64)
self._buffer.write(empty_row)
[docs]
def write_profile(self, timestamp: float, bead_id: int, profile: np.ndarray) -> None:
"""Store the latest profile for a bead.
Parameters
----------
timestamp : float
Timestamp associated with the profile.
bead_id : int
Bead identifier for the profile.
profile : numpy.ndarray
One-dimensional array of profile samples. Length must not exceed
``profile_capacity``.
"""
if profile.shape[0] > self.profile_capacity:
raise ValueError(
f'Profile length {profile.shape[0]} exceeds live buffer capacity {self.profile_capacity}'
)
row = np.full((1, 3 + self.profile_capacity), np.nan, dtype=np.float64)
row[0, 0] = timestamp
row[0, 1] = bead_id
row[0, 2] = profile.shape[0]
row[0, 3:3 + profile.shape[0]] = profile
self._buffer.write(row)
[docs]
def peak_unsorted(self) -> np.ndarray:
return self._buffer.peak_unsorted()
[docs]
class ZLUTSweepDataset:
"""Temporary shared-memory dataset used for Z-LUT sweep capture.
The dataset stores one row per captured profile with aligned metadata arrays
for bead id, step index, timestamp, motor Z, validity, and the full radial
profile. Unlike :class:`VideoBuffer` and :class:`MatrixBuffer`, this object
never wraps and never overwrites old entries. It is intended to be created
and destroyed at runtime by the workflow owner, while peer processes attach
to the fixed shared-memory names on demand.
"""
[docs]
NAME = 'ZLUTSweepDataset'
[docs]
_INFO_FIELDS = {
'schema_version': 0,
'state': 1,
'capacity': 2,
'profile_length': 3,
'n_steps': 4,
'n_beads': 5,
'profiles_per_bead': 6,
'count': 7,
}
[docs]
_INFO_SIZE = 8 * len(_INFO_FIELDS)
[docs]
_UINT64_DTYPE = np.dtype(np.uint64)
[docs]
_BEAD_ID_DTYPE = np.dtype(np.uint32)
[docs]
_STEP_INDEX_DTYPE = np.dtype(np.uint32)
[docs]
_TIMESTAMP_DTYPE = np.dtype(np.float64)
[docs]
_MOTOR_Z_DTYPE = np.dtype(np.float64)
[docs]
_VALID_DTYPE = np.dtype(np.uint8)
[docs]
_PROFILE_DTYPE = np.dtype(np.float64)
[docs]
_SEGMENT_SUFFIXES = {
'info': ' Info',
'bead_ids': ' BeadIds',
'step_indices': ' StepIndices',
'timestamps': ' Timestamps',
'motor_z': ' MotorZ',
'valid': ' Valid',
'profiles': ' Profiles',
}
[docs]
_SHM_ATTRS = (
'_shm_profiles',
'_shm_valid',
'_shm_motor_z',
'_shm_timestamps',
'_shm_step_indices',
'_shm_bead_ids',
'_shm_info',
)
def __init__(
self,
*,
create: bool,
locks: dict[str, Lock],
capacity: int | None = None,
profile_length: int | None = None,
n_steps: int | None = None,
n_beads: int | None = None,
profiles_per_bead: int | None = None,
name: str = NAME,
):
[docs]
self.lock: Lock = locks[self.name]
[docs]
self._owns_shared_memory = create
for attr in self._SHM_ATTRS:
setattr(self, attr, None)
if create:
validated_parameters = self._validate_create_parameters(
capacity=capacity,
profile_length=profile_length,
n_steps=n_steps,
n_beads=n_beads,
profiles_per_bead=profiles_per_bead,
)
self.capacity = validated_parameters['capacity']
self.profile_length = validated_parameters['profile_length']
self.n_steps = validated_parameters['n_steps']
self.n_beads = validated_parameters['n_beads']
self.profiles_per_bead = validated_parameters['profiles_per_bead']
try:
self._shm_info = SharedMemory(
create=create,
name=self.name + self._SEGMENT_SUFFIXES['info'],
size=self._INFO_SIZE,
)
if create:
self._write_info('schema_version', self._SCHEMA_VERSION)
self._write_info('state', self.STATE_CREATING)
self._write_info('capacity', self.capacity)
self._write_info('profile_length', self.profile_length)
self._write_info('n_steps', self.n_steps)
self._write_info('n_beads', self.n_beads)
self._write_info('profiles_per_bead', self.profiles_per_bead)
self._write_info('count', 0)
else:
self._validate_attach_ready_state()
self._validate_schema_version()
self.capacity = self._read_info('capacity')
self.profile_length = self._read_info('profile_length')
self.n_steps = self._read_info('n_steps')
self.n_beads = self._read_info('n_beads')
self.profiles_per_bead = self._read_info('profiles_per_bead')
if capacity is not None and int(capacity) != self.capacity:
raise ValueError('capacity does not match existing ZLUTSweepDataset')
if profile_length is not None and int(profile_length) != self.profile_length:
raise ValueError('profile_length does not match existing ZLUTSweepDataset')
self._shm_bead_ids = SharedMemory(
create=create,
name=self.name + self._SEGMENT_SUFFIXES['bead_ids'],
size=self.capacity * self._BEAD_ID_DTYPE.itemsize,
)
self._shm_step_indices = SharedMemory(
create=create,
name=self.name + self._SEGMENT_SUFFIXES['step_indices'],
size=self.capacity * self._STEP_INDEX_DTYPE.itemsize,
)
self._shm_timestamps = SharedMemory(
create=create,
name=self.name + self._SEGMENT_SUFFIXES['timestamps'],
size=self.capacity * self._TIMESTAMP_DTYPE.itemsize,
)
self._shm_motor_z = SharedMemory(
create=create,
name=self.name + self._SEGMENT_SUFFIXES['motor_z'],
size=self.capacity * self._MOTOR_Z_DTYPE.itemsize,
)
self._shm_valid = SharedMemory(
create=create,
name=self.name + self._SEGMENT_SUFFIXES['valid'],
size=self.capacity * self._VALID_DTYPE.itemsize,
)
self._shm_profiles = SharedMemory(
create=create,
name=self.name + self._SEGMENT_SUFFIXES['profiles'],
size=self.capacity * self.profile_length * self._PROFILE_DTYPE.itemsize,
)
self._bead_ids = np.ndarray(
(self.capacity,), dtype=self._BEAD_ID_DTYPE, buffer=self._shm_bead_ids.buf
)
self._step_indices = np.ndarray(
(self.capacity,), dtype=self._STEP_INDEX_DTYPE, buffer=self._shm_step_indices.buf
)
self._timestamps = np.ndarray(
(self.capacity,), dtype=self._TIMESTAMP_DTYPE, buffer=self._shm_timestamps.buf
)
self._motor_z = np.ndarray(
(self.capacity,), dtype=self._MOTOR_Z_DTYPE, buffer=self._shm_motor_z.buf
)
self._valid = np.ndarray(
(self.capacity,), dtype=self._VALID_DTYPE, buffer=self._shm_valid.buf
)
self._profiles = np.ndarray(
(self.capacity, self.profile_length),
dtype=self._PROFILE_DTYPE,
buffer=self._shm_profiles.buf,
)
if create:
self._bead_ids.fill(0)
self._step_indices.fill(0)
self._timestamps.fill(np.nan)
self._motor_z.fill(np.nan)
self._valid.fill(0)
self._profiles.fill(np.nan)
self.set_state(self.STATE_READY)
except Exception:
self._cleanup_shared_memory_segments(unlink=create)
raise
@classmethod
[docs]
def create(
cls,
*,
locks: dict[str, Lock],
capacity: int,
profile_length: int,
n_steps: int,
n_beads: int,
profiles_per_bead: int,
name: str = NAME,
) -> 'ZLUTSweepDataset':
return cls(
create=True,
locks=locks,
capacity=capacity,
profile_length=profile_length,
n_steps=n_steps,
n_beads=n_beads,
profiles_per_bead=profiles_per_bead,
name=name,
)
@classmethod
[docs]
def attach(cls, *, locks: dict[str, Lock], name: str = NAME) -> 'ZLUTSweepDataset':
try:
return cls(create=False, locks=locks, name=name)
except FileNotFoundError as exc:
raise DatasetNotReadyError('ZLUTSweepDataset shared memory is not available yet.') from exc
[docs]
def __del__(self):
self.close()
@property
[docs]
def state(self) -> int:
return self._read_info('state')
[docs]
def set_state(self, value: int) -> None:
with self.lock:
self._write_info('state', int(value))
[docs]
def write(
self,
*,
bead_ids: np.ndarray,
step_indices: np.ndarray,
timestamps: np.ndarray,
motor_z_values: np.ndarray,
valid_flags: np.ndarray,
profiles: np.ndarray,
) -> None:
if self._closed:
raise RuntimeError('Cannot write to a closed ZLUTSweepDataset')
bead_ids_array = np.asarray(bead_ids, dtype=self._BEAD_ID_DTYPE)
step_indices_array = np.asarray(step_indices, dtype=self._STEP_INDEX_DTYPE)
timestamps_array = np.asarray(timestamps, dtype=self._TIMESTAMP_DTYPE)
motor_z_array = np.asarray(motor_z_values, dtype=self._MOTOR_Z_DTYPE)
valid_array = np.asarray(valid_flags, dtype=self._VALID_DTYPE)
profiles_array = np.asarray(profiles, dtype=self._PROFILE_DTYPE)
batch_size = bead_ids_array.shape[0]
expected_shapes = {
'step_indices': step_indices_array.shape,
'timestamps': timestamps_array.shape,
'motor_z_values': motor_z_array.shape,
'valid_flags': valid_array.shape,
}
for field_name, shape in expected_shapes.items():
if shape != (batch_size,):
raise ValueError(f'{field_name} must have shape ({batch_size},)')
if profiles_array.shape != (batch_size, self.profile_length):
raise ValueError(
f'profiles must have shape ({batch_size}, {self.profile_length})'
)
with self.lock:
count = self._read_info('count')
end = count + batch_size
if end > self.capacity:
raise BufferOverflow('ZLUTSweepDataset capacity exceeded')
self._bead_ids[count:end] = bead_ids_array
self._step_indices[count:end] = step_indices_array
self._timestamps[count:end] = timestamps_array
self._motor_z[count:end] = motor_z_array
self._valid[count:end] = valid_array
self._profiles[count:end, :] = profiles_array
self._write_info('count', end)
[docs]
def peak(self) -> dict[str, np.ndarray]:
if self._closed:
raise RuntimeError('Cannot read from a closed ZLUTSweepDataset')
with self.lock:
count = self._read_info('count')
return {
'bead_ids': self._bead_ids[:count].copy(),
'step_indices': self._step_indices[:count].copy(),
'timestamps': self._timestamps[:count].copy(),
'motor_z_values': self._motor_z[:count].copy(),
'valid_flags': self._valid[:count].copy(),
'profiles': self._profiles[:count, :].copy(),
}
[docs]
def read_preview(self, selected_bead_id: int | None = None) -> dict[str, object]:
if self._closed:
raise RuntimeError('Cannot read from a closed ZLUTSweepDataset')
with self.lock:
count = self._read_info('count')
state = self._read_info('state')
available_bead_ids: list[int] = []
effective_selected_bead_id: int | None = None
motor_z_min: float | None = None
motor_z_max: float | None = None
step_indices = np.zeros((0,), dtype=self._STEP_INDEX_DTYPE)
motor_z_values = np.zeros((0,), dtype=self._MOTOR_Z_DTYPE)
profiles = np.zeros((0, self.profile_length), dtype=self._PROFILE_DTYPE)
if count > 0:
bead_ids = self._bead_ids[:count]
available_bead_ids = [int(bead_id) for bead_id in np.unique(bead_ids)]
if available_bead_ids:
if selected_bead_id is not None and np.any(bead_ids == int(selected_bead_id)):
effective_selected_bead_id = int(selected_bead_id)
else:
effective_selected_bead_id = available_bead_ids[0]
all_motor_z_values = self._motor_z[:count]
finite_motor_z = all_motor_z_values[np.isfinite(all_motor_z_values)]
if finite_motor_z.size > 0:
motor_z_min = float(np.min(finite_motor_z))
motor_z_max = float(np.max(finite_motor_z))
if effective_selected_bead_id is not None:
selected_rows = bead_ids == effective_selected_bead_id
step_indices = self._step_indices[:count][selected_rows].copy()
motor_z_values = all_motor_z_values[selected_rows].copy()
profiles = self._profiles[:count, :][selected_rows].copy()
return {
'state': state,
'count': count,
'capacity': self.capacity,
'n_steps': self.n_steps,
'n_beads': self.n_beads,
'profiles_per_bead': self.profiles_per_bead,
'profile_length': self.profile_length,
'available_bead_ids': available_bead_ids,
'selected_bead_id': effective_selected_bead_id,
'motor_z_min': motor_z_min,
'motor_z_max': motor_z_max,
'step_indices': step_indices,
'motor_z_values': motor_z_values,
'profiles': profiles,
}
[docs]
def get_count(self) -> int:
if self._closed:
raise RuntimeError('Cannot read from a closed ZLUTSweepDataset')
with self.lock:
return self._read_info('count')
[docs]
def get_capacity(self) -> int:
return self.capacity
[docs]
def close(self) -> None:
if self._closed:
return
self._cleanup_shared_memory_segments(unlink=False)
self._closed = True
[docs]
def destroy(self) -> None:
if not self._owns_shared_memory:
raise RuntimeError('Only the creating process may destroy a ZLUTSweepDataset')
self.set_state(self.STATE_DESTROYED)
self._cleanup_shared_memory_segments(unlink=True)
self._closed = True
[docs]
def _validate_schema_version(self) -> None:
schema_version = self._read_info('schema_version')
if schema_version == 0:
raise DatasetNotReadyError('ZLUTSweepDataset schema metadata is not initialized yet.')
if schema_version != self._SCHEMA_VERSION:
raise ValueError(
f'Unsupported ZLUTSweepDataset schema version: {schema_version}'
)
[docs]
def _validate_attach_ready_state(self) -> None:
state = self._read_info('state')
if state in {
self.STATE_ABSENT,
self.STATE_CREATING,
self.STATE_DETACHING,
self.STATE_DESTROYED,
}:
raise DatasetNotReadyError(
f'ZLUTSweepDataset is not attachable while in state {state}.'
)
[docs]
def _read_info(self, field: str) -> int:
field_index = self._INFO_FIELDS[field]
start = field_index * 8
end = start + 8
return int.from_bytes(self._shm_info.buf[start:end], byteorder='big')
[docs]
def _write_info(self, field: str, value: int) -> None:
field_index = self._INFO_FIELDS[field]
start = field_index * 8
end = start + 8
self._shm_info.buf[start:end] = int(value).to_bytes(8, byteorder='big')
@classmethod
[docs]
def _validate_create_parameters(
cls,
*,
capacity: int | None,
profile_length: int | None,
n_steps: int | None,
n_beads: int | None,
profiles_per_bead: int | None,
) -> dict[str, int]:
required = {
'capacity': capacity,
'profile_length': profile_length,
'n_steps': n_steps,
'n_beads': n_beads,
'profiles_per_bead': profiles_per_bead,
}
missing = [field for field, value in required.items() if value is None]
if missing:
raise ValueError(
f"Missing required ZLUTSweepDataset creation parameters: {', '.join(missing)}"
)
validated: dict[str, int] = {}
for field, value in required.items():
if value <= 0:
raise ValueError(f'{field} must be positive')
validated[field] = int(value)
return validated
[docs]
def _cleanup_shared_memory_segments(self, *, unlink: bool) -> None:
for attr in self._SHM_ATTRS:
shm = getattr(self, attr, None)
if shm is None:
continue
if unlink:
try:
shm.unlink()
except FileNotFoundError:
pass
shm.close()
setattr(self, attr, None)
[docs]
class BufferUnderflow(Exception):
"""Raised when attempting to read from a buffer that contains no data."""
[docs]
class BufferOverflow(Exception):
"""Raised when attempting to write to a buffer that has no free slots."""
[docs]
class DatasetNotReadyError(Exception):
"""Raised when a shared-memory dataset exists but is not attachable yet."""
[docs]
bit_to_dtype = {
8: np.uint8,
16: np.uint16,
32: np.uint32,
64: np.uint64
}
[docs]
def int_to_uint_dtype(bits: int):
"""Return the unsigned integer NumPy dtype matching ``bits``.
Parameters
----------
bits : int
Width of the target integer in bits. Supported values are ``8``, ``16``,
``32`` and ``64``.
Returns
-------
numpy.dtype
Unsigned integer dtype corresponding to ``bits``.
Raises
------
ValueError
If ``bits`` is not one of the supported widths.
"""
if bits not in bit_to_dtype:
raise ValueError(f"Unsupported bit width: {bits}")
return bit_to_dtype[bits]