magscope.datatypes
==================

.. py:module:: magscope.datatypes

.. autoapi-nested-parse::

   Shared-memory buffers used across MagScope.

   This module introduces two circular buffers that let different processes share
   camera frames and other numeric data without copying large arrays:

   ``VideoBuffer``
       Stores stacks of images in one shared-memory region together with capture
       timestamps. The class is designed for a producer process that records
       frames and one or more consumer processes that read them.

   ``MatrixBuffer``
       Stores general two-dimensional numeric data such as bead positions or
       motor telemetry. Like :class:`VideoBuffer`, it uses shared memory.

   Both buffers rely on external :class:`multiprocessing.synchronize.Lock`
   objects to coordinate access between processes. See the class docstrings
   below for usage details.



Attributes
----------

.. autoapisummary::

   magscope.datatypes.logger
   magscope.datatypes.bit_to_dtype


Exceptions
----------

.. autoapisummary::

   magscope.datatypes.BufferUnderflow
   magscope.datatypes.BufferOverflow
   magscope.datatypes.DatasetNotReadyError


Classes
-------

.. autoapisummary::

   magscope.datatypes.VideoBuffer
   magscope.datatypes.MatrixBuffer
   magscope.datatypes.BeadRoiBuffer
   magscope.datatypes.LiveProfileBuffer
   magscope.datatypes.ZLUTSweepDataset


Functions
---------

.. autoapisummary::

   magscope.datatypes.int_to_uint_dtype


Module Contents
---------------

.. py:data:: logger

.. py:class:: VideoBuffer(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], n_stacks: int | None = None, width: int | None = None, height: int | None = None, n_images: int | None = None, bits: int | None = None)

   Shared memory ring buffer for video data

   :param create: ``True`` to allocate the shared-memory regions; ``False`` to attach to
                  an existing buffer.
   :type create: bool
   :param locks: Mapping of buffer names to :class:`multiprocessing.Lock` instances. The
                 dictionary must contain an entry for ``VideoBuffer``.
   :type locks: dict[str, Lock]
   :param n_stacks: Number of temporal stacks stored in the buffer. Required when
                    ``create`` is ``True``.
   :type n_stacks: int, optional
   :param width: Frame width in pixels. Required when ``create`` is ``True``.
   :type width: int, optional
   :param height: Frame height in pixels. Required when ``create`` is ``True``.
   :type height: int, optional
   :param n_images: Number of frames per stack. Required when ``create`` is ``True``.
   :type n_images: int, optional
   :param bits: Bit depth of each pixel. Required when ``create`` is ``True``.
   :type bits: int, optional

   .. rubric:: Notes

   The buffer should first be created by a process with ``create=True``. When
   creating, ``n_stacks``, ``width``, ``height``, ``n_images`` and ``bits``
   must be provided. After the shared memory exists, other processes can
   access the buffer with ``create=False``.


   .. py:attribute:: name
      :type:  str
      :value: 'VideoBuffer'



   .. py:attribute:: lock
      :type:  multiprocessing.synchronize.Lock


   .. py:attribute:: _shm_info


   .. py:attribute:: stack_shape


   .. py:attribute:: image_shape


   .. py:attribute:: dtype


   .. py:attribute:: itemsize


   .. py:attribute:: n_images
      :value: None



   .. py:attribute:: n_total_images


   .. py:attribute:: image_size


   .. py:attribute:: stack_size


   .. py:attribute:: buffer_size


   .. py:attribute:: _shm


   .. py:attribute:: _ts_shm


   .. py:attribute:: _idx_shm


   .. py:attribute:: _buf


   .. py:attribute:: _ts_buf


   .. py:attribute:: _idx_buf


   .. py:method:: __del__()


   .. py:method:: _get_count_index()


   .. py:method:: _get_read_index()


   .. py:method:: _get_write_index()


   .. py:method:: _set_count_index(value)


   .. py:method:: _set_read_index(value)


   .. py:method:: _set_write_index(value)


   .. py:method:: _check_read(value)


   .. py:method:: _check_write(value)


   .. py:method:: _get_timestamps(read, length)


   .. py:method:: _set_timestamp(write, timestamp)


   .. py:method:: get_level()

      Return the fraction of the buffer that currently holds data.

      :returns: Ratio between unread frames and total buffer capacity.
      :rtype: float



   .. py:method:: get_unread_stack_count()

      Return the number of full unread stacks currently buffered.



   .. py:method:: check_read_stack()

      Return ``True`` when at least one full stack can be read.

      :returns: ``True`` if ``n_images`` frames are available to read; ``False``
                otherwise.
      :rtype: bool



   .. py:method:: peak_image()

      Return the newest image and its index without acquiring the lock.

      This helper supports lightweight live previews. Because the method does
      not acquire the lock, it may occasionally return a partially written
      frame or an older image.

      :returns: Tuple containing the newest image index and a memory view of the
                image bytes. Convert the memory view to a 2D array with
                ``dtype`` and ``image_shape``.
      :rtype: tuple of (int, memoryview)



   .. py:method:: peak_stack()

      Return the next unread stack without advancing the read index.

      :returns: ``(stack, timestamps)`` where ``stack`` has shape
                ``(width, height, n_images)`` and ``timestamps`` is a ``float64``
                array aligned with the returned frames.
      :rtype: tuple of numpy.ndarray



   .. py:method:: read_stack_no_return()

      Advance the read index by one stack without returning data.

      :returns: This method updates the internal indices but produces no data.
      :rtype: None



   .. py:method:: read_image()

      Return the next unread image and its timestamp.

      :returns: Tuple consisting of the next unread frame as a 2D array with shape
                ``(width, height)`` and the corresponding timestamp in seconds.
      :rtype: tuple of (numpy.ndarray, float)



   .. py:method:: write_timestamp(timestamp)

      Increment the write index and store a timestamp without frame data.

      :param timestamp: Timestamp in seconds that should be associated with the next frame
                        slot.
      :type timestamp: float



   .. py:method:: write_image_and_timestamp(image, timestamp)

      Increment the write index, storing one image and its timestamp.

      :param image: Frame data shaped ``(width, height)`` with the buffer's ``dtype``.
      :type image: numpy.ndarray
      :param timestamp: Timestamp in seconds associated with the frame.
      :type timestamp: float



.. py:class:: MatrixBuffer(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], name: str, shape: tuple[int, int] = None)

   Shared-memory ring buffer for 2D numeric data.

   :param create: ``True`` to allocate the shared-memory regions; ``False`` to attach to
                  an existing buffer.
   :type create: bool
   :param locks: Mapping of buffer names to :class:`multiprocessing.Lock` instances. The
                 dictionary must contain an entry for ``name``.
   :type locks: dict[str, Lock]
   :param name: Identifier used for the shared-memory segments.
   :type name: str
   :param shape: Buffer shape expressed as ``(rows, columns)``. Required when
                 ``create`` is ``True``.
   :type shape: tuple[int, int], optional

   .. rubric:: Notes

   The buffer stores time-series style data where each row is a timestamp and
   each column is a measurement. Reads consume unread bytes, while ``peak``
   helpers provide views without advancing indices.


   .. py:attribute:: name
      :type:  str


   .. py:attribute:: lock
      :type:  multiprocessing.synchronize.Lock


   .. py:attribute:: _shm_info


   .. py:attribute:: dtype
      :type:  numpy.dtype


   .. py:attribute:: itemsize
      :type:  int


   .. py:attribute:: strides
      :type:  tuple[int, int]


   .. py:attribute:: nbytes
      :type:  int


   .. py:attribute:: _shm


   .. py:attribute:: _idx_shm


   .. py:attribute:: _buf


   .. py:attribute:: _idx_buf


   .. py:method:: __del__()


   .. py:method:: _get_count_index()


   .. py:method:: _get_read_index()


   .. py:method:: _get_write_index()


   .. py:method:: _set_count_index(value)


   .. py:method:: _set_read_index(value)


   .. py:method:: _set_write_index(value)


   .. py:method:: get_count_index()

      Return the number of unread bytes currently stored in the buffer.

      :returns: Byte count representing unread data between the read and write
                indices.
      :rtype: int



   .. py:method:: get_read_index()

      Return the index of the next byte that will be read.

      :returns: Position within the shared buffer corresponding to the next read
                operation.
      :rtype: int



   .. py:method:: get_write_index()

      Return the index of the next byte that will be written.

      :returns: Position within the shared buffer corresponding to the next write
                operation.
      :rtype: int



   .. py:method:: write(np_array)

      Write ``np_array`` into the buffer, advancing the write index.

      :param np_array: Array with ``shape[1]`` columns. Rows may wrap around to the start
                       of the buffer if the write reaches the end of the allocated space.
      :type np_array: numpy.ndarray



   .. py:method:: read()

      Return unread rows as a NumPy array and reset the read counter.

      :returns: Copy of the unread rows ordered chronologically.
      :rtype: numpy.ndarray



   .. py:method:: peak_unsorted()

      Return a view of the buffer without reordering indices.

      :returns: View into the shared memory representing the buffer layout.
      :rtype: numpy.ndarray



   .. py:method:: peak_sorted()

      Return the buffer contents ordered chronologically.

      :returns: Array containing the buffer rows in FIFO order without updating
                indices.
      :rtype: numpy.ndarray



.. py:class:: BeadRoiBuffer(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], capacity: int | None = None, name: str = 'BeadRoiBuffer')

   Shared-memory store for bead ROI metadata.

   The buffer uses a fixed row per bead id so readers can attach once and take
   compact snapshots of active ROIs without exchanging Python dictionaries over
   IPC.


   .. py:attribute:: name
      :value: 'BeadRoiBuffer'



   .. py:attribute:: lock
      :type:  multiprocessing.synchronize.Lock


   .. py:attribute:: _info_fields
      :value: 4



   .. py:attribute:: _info_size
      :value: 32



   .. py:attribute:: _roi_dtype


   .. py:attribute:: _occupancy_dtype


   .. py:attribute:: _shm_info


   .. py:attribute:: capacity


   .. py:attribute:: _roi_shape


   .. py:attribute:: _roi_nbytes


   .. py:attribute:: _occupancy_nbytes


   .. py:attribute:: _shm_data


   .. py:attribute:: _shm_occupancy


   .. py:attribute:: _roi_matrix


   .. py:attribute:: _occupancy


   .. py:method:: __del__()


   .. py:property:: max_id_plus_one
      :type: int



   .. py:property:: active_count
      :type: int



   .. py:property:: version
      :type: int



   .. py:method:: replace_beads(value: dict[int, tuple[int, int, int, int]]) -> None


   .. py:method:: add_beads(value: dict[int, tuple[int, int, int, int]]) -> None


   .. py:method:: update_beads(value: dict[int, tuple[int, int, int, int]]) -> None


   .. py:method:: remove_beads(ids) -> None


   .. py:method:: clear_beads() -> None


   .. py:method:: reorder_beads() -> dict[int, int]


   .. py:method:: get_next_available_bead_id() -> int


   .. py:method:: get_beads() -> tuple[numpy.ndarray, numpy.ndarray]


   .. py:method:: _normalize_bead_mapping(value: dict[int, tuple[int, int, int, int]]) -> dict[int, tuple[int, int, int, int]]


   .. py:method:: _normalize_ids(ids) -> numpy.ndarray


   .. py:method:: _validate_bead_id(bead_id: int) -> int


   .. py:method:: _read_info(index: int) -> int


   .. py:method:: _write_info(index: int, value: int) -> None


   .. py:method:: _increment_version() -> None


.. py:class:: LiveProfileBuffer(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], profile_capacity: int | None = None, name: str = 'LiveProfileBuffer')

   Shared buffer that stores the latest radial profile for live display.

   The buffer keeps a single row containing ``timestamp``, ``bead_id``,
   ``profile_length`` and the profile samples. It wraps a
   :class:`MatrixBuffer` for shared-memory transport but hides the padding
   logic from callers so profiles can be written at their native length.


   .. py:attribute:: _buffer


   .. py:attribute:: profile_capacity


   .. py:property:: shape
      :type: tuple[int, int]



   .. py:method:: clear() -> None

      Reset the buffer contents to ``NaN``.



   .. py:method:: write_profile(timestamp: float, bead_id: int, profile: numpy.ndarray) -> None

      Store the latest profile for a bead.

      :param timestamp: Timestamp associated with the profile.
      :type timestamp: float
      :param bead_id: Bead identifier for the profile.
      :type bead_id: int
      :param profile: One-dimensional array of profile samples. Length must not exceed
                      ``profile_capacity``.
      :type profile: numpy.ndarray



   .. py:method:: peak_unsorted() -> numpy.ndarray


.. py:class:: ZLUTSweepDataset(*, create: bool, locks: dict[str, multiprocessing.synchronize.Lock], capacity: int | None = None, profile_length: int | None = None, n_steps: int | None = None, n_beads: int | None = None, profiles_per_bead: int | None = None, name: str = NAME)

   Temporary shared-memory dataset used for Z-LUT sweep capture.

   The dataset stores one row per captured profile with aligned metadata arrays
   for bead id, step index, timestamp, motor Z, validity, and the full radial
   profile. Unlike :class:`VideoBuffer` and :class:`MatrixBuffer`, this object
   never wraps and never overwrites old entries. It is intended to be created
   and destroyed at runtime by the workflow owner, while peer processes attach
   to the fixed shared-memory names on demand.


   .. py:attribute:: NAME
      :value: 'ZLUTSweepDataset'



   .. py:attribute:: STATE_ABSENT
      :value: 0



   .. py:attribute:: STATE_CREATING
      :value: 1



   .. py:attribute:: STATE_READY
      :value: 2



   .. py:attribute:: STATE_CAPTURING
      :value: 3



   .. py:attribute:: STATE_COMPLETE
      :value: 4



   .. py:attribute:: STATE_DETACHING
      :value: 5



   .. py:attribute:: STATE_FAILED
      :value: 6



   .. py:attribute:: STATE_DESTROYED
      :value: 7



   .. py:attribute:: _INFO_FIELDS


   .. py:attribute:: _INFO_SIZE
      :value: 64



   .. py:attribute:: _SCHEMA_VERSION
      :value: 1



   .. py:attribute:: _UINT64_DTYPE


   .. py:attribute:: _BEAD_ID_DTYPE


   .. py:attribute:: _STEP_INDEX_DTYPE


   .. py:attribute:: _TIMESTAMP_DTYPE


   .. py:attribute:: _MOTOR_Z_DTYPE


   .. py:attribute:: _VALID_DTYPE


   .. py:attribute:: _PROFILE_DTYPE


   .. py:attribute:: _SEGMENT_SUFFIXES


   .. py:attribute:: _SHM_ATTRS
      :value: ('_shm_profiles', '_shm_valid', '_shm_motor_z', '_shm_timestamps', '_shm_step_indices',...



   .. py:attribute:: name
      :value: 'ZLUTSweepDataset'



   .. py:attribute:: lock
      :type:  multiprocessing.synchronize.Lock


   .. py:attribute:: _owns_shared_memory


   .. py:attribute:: _closed
      :value: False



   .. py:method:: create(*, locks: dict[str, multiprocessing.synchronize.Lock], capacity: int, profile_length: int, n_steps: int, n_beads: int, profiles_per_bead: int, name: str = NAME) -> ZLUTSweepDataset
      :classmethod:



   .. py:method:: attach(*, locks: dict[str, multiprocessing.synchronize.Lock], name: str = NAME) -> ZLUTSweepDataset
      :classmethod:



   .. py:method:: __del__()


   .. py:property:: state
      :type: int



   .. py:method:: set_state(value: int) -> None


   .. py:method:: write(*, bead_ids: numpy.ndarray, step_indices: numpy.ndarray, timestamps: numpy.ndarray, motor_z_values: numpy.ndarray, valid_flags: numpy.ndarray, profiles: numpy.ndarray) -> None


   .. py:method:: peak() -> dict[str, numpy.ndarray]


   .. py:method:: read_preview(selected_bead_id: int | None = None) -> dict[str, object]


   .. py:method:: get_count() -> int


   .. py:method:: get_capacity() -> int


   .. py:method:: close() -> None


   .. py:method:: destroy() -> None


   .. py:method:: _validate_schema_version() -> None


   .. py:method:: _validate_attach_ready_state() -> None


   .. py:method:: _read_info(field: str) -> int


   .. py:method:: _write_info(field: str, value: int) -> None


   .. py:method:: _validate_create_parameters(*, capacity: int | None, profile_length: int | None, n_steps: int | None, n_beads: int | None, profiles_per_bead: int | None) -> dict[str, int]
      :classmethod:



   .. py:method:: _cleanup_shared_memory_segments(*, unlink: bool) -> None


.. py:exception:: BufferUnderflow

   Bases: :py:obj:`Exception`


   Raised when attempting to read from a buffer that contains no data.


.. py:exception:: BufferOverflow

   Bases: :py:obj:`Exception`


   Raised when attempting to write to a buffer that has no free slots.


.. py:exception:: DatasetNotReadyError

   Bases: :py:obj:`Exception`


   Raised when a shared-memory dataset exists but is not attachable yet.


.. py:data:: bit_to_dtype

.. py:function:: int_to_uint_dtype(bits: int)

   Return the unsigned integer NumPy dtype matching ``bits``.

   :param bits: Width of the target integer in bits. Supported values are ``8``, ``16``,
                ``32`` and ``64``.
   :type bits: int

   :returns: Unsigned integer dtype corresponding to ``bits``.
   :rtype: numpy.dtype

   :raises ValueError: If ``bits`` is not one of the supported widths.


