"""Camera manager and dummy camera implementations for MagScope.
This module defines the `CameraManager` process responsible for coordinating
camera acquisition with the shared `VideoBuffer`, along with several
simulation-oriented `CameraBase` implementations used when hardware is not
available. The manager exchanges IPC messages with the GUI to keep camera
settings synchronized and ensures buffers are properly released as acquisition
states change.
"""
from abc import ABCMeta, abstractmethod
from contextlib import nullcontext
from functools import lru_cache
import queue
from time import time
from warnings import warn
import numpy as np
from magtrack.simulation import simulate_beads
from magscope.datatypes import BufferUnderflow, VideoBuffer
from magscope.ipc import register_ipc_command
from magscope.ipc_commands import (GetCameraSettingCommand, SetCameraSettingCommand,
SetSimulatedFocusCommand, UpdateCameraSettingCommand,
UpdateVideoBufferPurgeCommand)
from magscope.processes import ManagerProcessBase
[docs]
class CameraManager(ManagerProcessBase):
"""Manager process that feeds frames from a `CameraBase` into shared buffers.
The manager owns a camera instance (dummy by default), connects it to the
shared `VideoBuffer`, relays camera settings to the GUI, and orchestrates
buffer lifecycles based on acquisition and processing state. Its main loop
reacts to pool flags, drains buffers to avoid overflow, and triggers camera
fetches when connected.
"""
def __init__(self):
super().__init__()
[docs]
self.camera: CameraBase = DummyCameraBeads()
[docs]
self._released_completed_stacks = 0
[docs]
def setup(self):
"""Connect to the camera and publish its current settings.
Connection failures are logged as warnings so the rest of the system can
continue running in simulation mode. When a connection succeeds,
broadcast the initial camera settings to keep the GUI in sync with the
camera process.
"""
# Attempt to connect to the camera
try:
if self.shared_values is None:
raise RuntimeError('CameraManager has no shared_values')
self.camera.shared_values = self.shared_values
self.camera.reset_health_counters()
self.camera.connect(self.video_buffer)
except Exception as e:
warn(f"Could not connect to camera: {e}")
# Send the current camera settings to the GUI
if self.camera.is_connected:
for setting in self.camera.settings:
self.get_camera_setting(setting)
[docs]
def do_main_loop(self):
"""Main process loop handling buffer lifecycle and fetching frames.
The video processor reports consumed stacks through shared completion
counters. This manager drains those completions into camera buffer
releases, returns unreserved stacks while acquisition is paused, and
purges excess buffered data when capacity gets too low.
"""
self._release_completed_pool_buffers()
if not self._acquisition_on:
self._release_unattached_buffers()
fraction_available = (1 - self.video_buffer.get_level())
frames_available = fraction_available * self.video_buffer.n_total_images
if frames_available <= 1:
if self._purge_buffers() > 0:
command = UpdateVideoBufferPurgeCommand(t=time())
self.send_ipc(command)
# Check for new images from the camera
if self.camera.is_connected:
self.camera.fetch()
[docs]
def _release_unattached_buffers(self):
"""Return buffers that are no longer tracked by the processing pool."""
if self.video_buffer is None:
return
while self._take_unreserved_stack():
for _ in range(self.video_buffer.n_images):
self.camera.release()
[docs]
def _purge_buffers(self):
"""Drain video buffer contents until at least 30% capacity is free."""
if self.video_buffer is None:
return 0
purged_stacks = 0
while True:
if not self._take_unreserved_stack():
break
for _ in range(self.video_buffer.n_images):
self.camera.release()
purged_stacks += 1
if self.video_buffer.get_level() <= 0.3:
break
return purged_stacks
[docs]
def _release_completed_pool_buffers(self):
"""Release camera buffers for stacks already consumed by workers."""
if self.video_buffer is None:
return
completed_stacks = self.shared_values.video_process_completed_stacks.value
delta = completed_stacks - self._released_completed_stacks
if delta <= 0:
return
for _ in range(delta * self.video_buffer.stack_shape[2]):
self.camera.release()
self._released_completed_stacks = completed_stacks
[docs]
def _take_unreserved_stack(self) -> bool:
if self.video_buffer is None:
return False
with self._stack_coordination_lock():
reserved_stacks = self.shared_values.video_process_reserved_stacks.value
unread_stacks = self.video_buffer.get_unread_stack_count()
if unread_stacks <= reserved_stacks:
return False
try:
self.video_buffer.read_stack_no_return()
except BufferUnderflow:
return False
return True
[docs]
def _stack_coordination_lock(self):
if self.locks is None:
return nullcontext()
return self.locks.get('VideoProcessingReservation', nullcontext())
@register_ipc_command(GetCameraSettingCommand)
[docs]
def get_camera_setting(self, name: str):
"""Send a camera setting value to the GUI via IPC."""
value = self.camera[name]
command = UpdateCameraSettingCommand(name=name, value=value)
self.send_ipc(command)
@register_ipc_command(SetCameraSettingCommand)
[docs]
def set_camera_setting(self, name: str, value: str):
"""Apply a setting to the camera and broadcast the full settings set."""
try:
self.camera[name] = value
except Exception as e:
reason = str(e).strip()
if not reason:
reason = repr(e)
warn(f'Could not set camera setting {name} to {value}: {reason}')
for setting in self.camera.settings:
self.get_camera_setting(setting)
@register_ipc_command(SetSimulatedFocusCommand)
[docs]
def set_simulated_focus(self, offset: float):
"""Adjust the simulated camera focus when using :class:`DummyCameraBeads`."""
if not isinstance(self.camera, DummyCameraBeads):
return
try:
self.camera.set_focus_offset(offset)
except Exception as exc:
reason = str(exc).strip() or repr(exc)
warn(f'Could not update simulated focus to {offset}: {reason}')
[docs]
class CameraBase(metaclass=ABCMeta):
"""Abstract base class describing the camera interface used by managers.
Concrete cameras must expose immutable dimensions and dtype metadata, a
minimal settings API (`__getitem__`/`__setitem__`), and methods for
connecting, fetching frames into a `VideoBuffer`, and releasing buffers back
to the device or simulation pool.
"""
[docs]
settings: list[str] = ['framerate']
def __init__(self):
[docs]
self.is_connected = False
[docs]
self.shared_values = None
[docs]
self.video_buffer: VideoBuffer | None = None
[docs]
self.camera_buffers: queue.Queue | None = None
if None in (self.width, self.height, self.dtype, self.nm_per_px):
raise NotImplementedError
# Check dtype is valid
if self.dtype not in (np.uint8, np.uint16, np.uint32, np.uint64):
raise ValueError(f"Invalid dtype {self.dtype}")
# Check bits is valid
if not isinstance(self.bits, int):
raise ValueError(f"Invalid bits {self.bits}")
if self.bits > np.iinfo(self.dtype).bits:
raise ValueError(f"Invalid bits {self.bits} for dtype {self.dtype}")
# Check settings
if 'framerate' not in self.settings:
raise ValueError("All cameras must declare a 'framerate' setting")
[docs]
def __del__(self):
try:
if self.is_connected:
self.release_all()
except Exception:
pass
self.video_buffer = None
@abstractmethod
[docs]
def connect(self, video_buffer):
"""
Attempts to connect to the camera.
But does not start an acquisition. This method should set the value of self.is_connected to True if successful
or False if not.
"""
self.video_buffer = video_buffer
@abstractmethod
[docs]
def fetch(self):
"""
Checks if the camera has new images.
If the camera has a new image, then it holds the camera's
buffered image in a queue (self.camera_buffers). And stores the
image and timestamp in the video buffer (self._video_buffer).
The timestamp should be the seconds since the unix epoch:
(January 1, 1970, 00:00:00 UTC)
"""
pass
@abstractmethod
[docs]
def release(self):
"""
Gives the buffer back to the camera.
"""
pass
[docs]
def release_all(self):
while self.camera_buffers is not None and self.camera_buffers.qsize() > 0:
self.release()
@abstractmethod
[docs]
def get_setting(self, name: str) -> str: # noqa
""" Should return the current value of the setting from the camera """
if name not in self.settings:
raise KeyError(f"Unknown setting {name}")
@abstractmethod
[docs]
def set_setting(self, name: str, value: str):
""" Should set the value of the setting on the camera """
if name not in self.settings:
raise KeyError(f"Unknown setting {name}")
[docs]
def __getitem__(self, name: str) -> str:
""" Used to get settings. Example: my_cam['framerate'] """
return self.get_setting(name)
[docs]
def __setitem__(self, name: str, value: str) -> None:
""" Used to set settings. Example: my_cam['framerate'] = 100.0 """
self.set_setting(name, value)
[docs]
def reset_health_counters(self) -> None:
if self.shared_values is None:
return
self.shared_values.camera_total_frames.value = 0
self.shared_values.camera_consecutive_timeouts.value = 0
self.shared_values.camera_queue_full_events.value = 0
self.shared_values.camera_last_frame_timestamp.value = 0.0
[docs]
def report_frame_received(self, timestamp: float) -> None:
if self.shared_values is None:
return
self.shared_values.camera_total_frames.value += 1
self.shared_values.camera_consecutive_timeouts.value = 0
self.shared_values.camera_last_frame_timestamp.value = float(timestamp)
[docs]
def report_timeout(self) -> None:
if self.shared_values is None:
return
self.shared_values.camera_consecutive_timeouts.value += 1
[docs]
def report_queue_full(self) -> None:
if self.shared_values is None:
return
self.shared_values.camera_queue_full_events.value += 1
[docs]
class DummyCameraNoise(CameraBase):
"""Noise camera that generates random images at a configurable frame rate."""
[docs]
settings = ['framerate', 'exposure', 'gain']
def __init__(self):
super().__init__()
[docs]
self.fake_settings = {'framerate': 1000.0, 'exposure': 250.0, 'gain': 0.0}
[docs]
self.est_fps = self.fake_settings['framerate']
[docs]
self.est_fps_time = time()
[docs]
def connect(self, video_buffer):
super().connect(video_buffer)
self.is_connected = True
[docs]
def fetch(self):
if (timestamp := time()) - self.last_time < 1. / self.fake_settings['framerate']:
return
self.est_fps_count += 1
if timestamp - self.est_fps_time > 1:
self.est_fps = self.est_fps_count / (timestamp - self.est_fps_time)
self.est_fps_count = 0
self.est_fps_time = timestamp
image = self._fake_image()
self.last_time = timestamp
self.video_buffer.write_image_and_timestamp(image, timestamp)
self.report_frame_received(timestamp)
[docs]
def _fake_image(self):
max_int = np.iinfo(self.dtype).max
images = np.random.rand(self.height, self.width)
images += self.fake_settings['gain']
images *= self.fake_settings['exposure']
images **= (1 + self.fake_settings['gain'])
np.maximum(images, 0, out=images)
np.minimum(images, max_int, out=images)
return images.astype(self.dtype).tobytes()
[docs]
def release(self):
pass
[docs]
def get_setting(self, name: str) -> str:
super().get_setting(name)
if name != 'framerate':
value = self.fake_settings[name]
else:
value = self.est_fps
value = str(round(value))
return value
[docs]
def set_setting(self, name: str, value: str):
super().set_setting(name, value)
match name:
case 'framerate':
value = float(value)
if value < 1 or value > 10000:
raise ValueError
case 'exposure':
value = float(value)
if value < 0 or value > 10000000:
raise ValueError
case 'gain':
value = int(value)
if value < 0 or value > 10:
raise ValueError
self.fake_settings[name] = value
[docs]
class DummyCameraFastNoise(CameraBase):
"""Noise camera that reuses cached random frames for higher throughput."""
[docs]
settings = ['framerate', 'exposure', 'gain']
def __init__(self):
super().__init__()
[docs]
self.fake_settings = {'framerate': 1000.0, 'exposure': 25000.0, 'gain': 0.0}
[docs]
self.est_fps = self.fake_settings['framerate']
[docs]
self.est_fps_time = time()
[docs]
self.fake_images = None
[docs]
self.fake_images_n = 10
[docs]
self.fake_image_index = 0
[docs]
def connect(self, video_buffer):
super().connect(video_buffer)
self.get_fake_image()
self.is_connected = True
[docs]
def fetch(self):
if (timestamp := time()) - self.last_time < 1. / self.fake_settings['framerate']:
return
self.est_fps_count += 1
if timestamp - self.est_fps_time > 1:
self.est_fps = self.est_fps_count / (timestamp - self.est_fps_time)
self.est_fps_count = 0
self.est_fps_time = timestamp
image = self.get_fake_image()
self.last_time = timestamp
self.video_buffer.write_image_and_timestamp(image, timestamp)
self.report_frame_received(timestamp)
[docs]
def get_fake_image(self):
if self.fake_images is None:
max_int = np.iinfo(self.dtype).max
images = np.random.rand(self.height, self.width, self.fake_images_n)
images += self.fake_settings['gain']
images *= self.fake_settings['exposure']
images **= (1 + self.fake_settings['gain'])
np.maximum(images, 0, out=images)
np.minimum(images, max_int, out=images)
self.fake_images = images.astype(self.dtype).tobytes()
stride = self.height * self.width * np.dtype(self.dtype).itemsize
start = self.fake_image_index * stride
end = start + stride
image = self.fake_images[start:end]
self.fake_image_index = (self.fake_image_index + 1) % self.fake_images_n
return image
[docs]
def release(self):
pass
[docs]
def get_setting(self, name: str) -> str:
super().get_setting(name)
if name != 'framerate':
value = self.fake_settings[name]
else:
value = self.est_fps
value = str(round(value))
return value
[docs]
def set_setting(self, name: str, value: str):
super().set_setting(name, value)
match name:
case 'framerate':
value = float(value)
if value < 1 or value > 10000:
raise ValueError
case 'exposure':
value = float(value)
if value < 0 or value > 10000000:
raise ValueError
case 'gain':
value = int(value)
if value < 0 or value > 10:
raise ValueError
self.fake_settings[name] = value
[docs]
class DummyCameraBeads(CameraBase):
"""Bead simulator producing synthetic frames for testing without hardware."""
# Exposed settings
[docs]
settings = [
'framerate',
'fixed_n',
'fixed_z',
'tethered_n',
'tethered_z',
'tethered_z_sigma',
'tethered_xy_sigma',
'gain',
'seed'
]
def __init__(self):
super().__init__()
[docs]
self._settings = {
'framerate' : 30.0,
'fixed_n' : 5,
'fixed_z' : 0.0,
'tethered_n' : 3,
'tethered_z' : 0.0,
'tethered_z_sigma' : 0.3,
'tethered_xy_sigma' : 3.0,
'gain' : 25000.0,
'seed' : 1,
}
[docs]
self._focus_offset = 0.0
[docs]
self._bead_size_px = 50
[docs]
self._min_sep_px = 50.0
[docs]
self._edge_margin_px = 10.0
[docs]
self._radius_nm = 1500.0
[docs]
self._rng = np.random.default_rng(self._settings['seed'])
# placement and bead state
[docs]
self._centers_fixed = np.empty((0,2), np.float32)
[docs]
self._centers_teth = np.empty((0,2), np.float32)
[docs]
self._delta_fixed = None # tapered crop for fixed beads
[docs]
self._xy = np.empty((0,2), np.float32)
[docs]
self._z = np.empty((0,), np.float32)
# time keeping
[docs]
self.est_fps = self._settings['framerate']
[docs]
self.est_fps_time = time()
[docs]
def connect(self, video_buffer):
super().connect(video_buffer)
self._rng = np.random.default_rng(int(self._settings['seed']))
self._reinit_centers_and_fixed()
self._init_tether_state()
self.is_connected = True
self.last_time = 0.0
self.est_fps = float(self._settings['framerate'])
self.est_fps_count = 0
self.est_fps_time = time()
[docs]
def fetch(self):
now = time()
fr = max(float(self._settings['framerate']), 1e-6)
if (now - self.last_time) < (1.0 / fr):
return
# fps estimator
self.est_fps_count += 1
if now - self.est_fps_time >= 1.0:
self.est_fps = self.est_fps_count / (now - self.est_fps_time)
self.est_fps_count = 0
self.est_fps_time = now
# dt for OU
dt = (now - self.last_time) if self.last_time > 0 else (1.0 / fr)
# compose frame
frame = np.full((self.height, self.width), float(self._background), np.float32)
# fixed beads
if self._delta_fixed is not None and self._centers_fixed.size:
for cx, cy in self._centers_fixed:
self._accumulate_bilinear(frame, self._delta_fixed, cx, cy)
# tethered: update OU and render per bead
n_t = self._centers_teth.shape[0]
if n_t:
th_xy = float(self._theta_xy)
sig_xy = float(self._settings['tethered_xy_sigma'])
th_z = float(self._theta_z)
sig_z = float(self._settings['tethered_z_sigma'])
z_anchor = float(self._settings['tethered_z']) + float(self._focus_offset)
size_px = int(self._bead_size_px)
nmpp = float(self.nm_per_px)
radius = float(self._radius_nm)
xyz = np.zeros((1, 3), dtype=np.float32)
for j in range(n_t):
# OU updates
self._xy[j,0] = self._ou_step(self._xy[j,0], dt, th_xy, sig_xy, 0.0, self._rng)
self._xy[j,1] = self._ou_step(self._xy[j,1], dt, th_xy, sig_xy, 0.0, self._rng)
self._z[j] = self._ou_step(self._z[j], dt, th_z, sig_z, z_anchor, self._rng)
# render crop at current z (T=1)
xyz[0, 2] = float(self._z[j])
crop_WHT = simulate_beads(xyz, nm_per_px=nmpp, size_px=size_px, radius_nm=radius) # (w,h,1)
crop_HW = crop_WHT[:, :, 0].T
delta = self._delta_for_crop(crop_HW, pad=4)
cx, cy = self._centers_teth[j]
self._accumulate_bilinear(frame, delta, cx + self._xy[j,0], cy + self._xy[j,1])
# noise and scaling
np.clip(frame, 0.0, 1.0, out=frame)
# Poisson noise always enabled
egain = float(self._settings['gain'])
lam = frame * egain
frame = self._rng.poisson(lam).astype(np.float32) / egain
# quantize
np.clip(frame, 0.0, 1.0, out=frame)
max_int = float(np.iinfo(self.dtype).max)
img_q = (frame * max_int + 0.5).astype(self.dtype)
self.video_buffer.write_image_and_timestamp(img_q.tobytes(), now)
self.report_frame_received(now)
self.last_time = now
[docs]
def release(self):
# no real hardware buffers to free
pass
[docs]
def get_setting(self, name: str) -> str:
super().get_setting(name)
if name == 'framerate':
return str(round(self.est_fps))
return str(self._settings[name])
[docs]
def set_setting(self, name: str, value: str):
super().set_setting(name, value)
def f(v): return float(v)
def i(v): return int(float(v))
if name == 'framerate':
v = f(value)
if not (1 <= v <= 10000):
raise ValueError("framerate must be between 1 and 10000 Hz")
self._settings[name] = v
return
if name in ('fixed_n', 'tethered_n'):
v = i(value)
if not (0 <= v <= 5000):
raise ValueError("fixed_n and tethered_n must be between 0 and 5000")
self._settings[name] = v
self._reinit_centers_and_fixed()
self._init_tether_state()
return
if name in ('fixed_z', 'tethered_z',
'tethered_xy_sigma', 'tethered_z_sigma',
'gain'):
v = f(value)
self._settings[name] = v
if name in ('fixed_z',):
# refresh fixed crop
self._recompute_fixed_delta()
return
if name == 'seed':
v = i(value)
self._settings[name] = v
self._rng = np.random.default_rng(v)
# reinit states deterministically
self._reinit_centers_and_fixed()
self._init_tether_state()
return
raise KeyError(f"Unknown setting {name}")
[docs]
def set_focus_offset(self, offset: float) -> None:
offset = float(offset)
delta = offset - float(self._focus_offset)
self._focus_offset = offset
self._z = self._z + delta
self._recompute_fixed_delta()
# ------------------------- internals ----------------------------
[docs]
def _reinit_centers_and_fixed(self):
w = self.width; h = self.height
size_px = int(self._bead_size_px)
base_margin = size_px // 2 + 2
margin = int(max(base_margin, int(self._edge_margin_px)))
min_sep = float(self._min_sep_px) if self._min_sep_px else float(size_px)
fixed_n = int(self._settings['fixed_n'])
tethered_n = int(self._settings['tethered_n'])
n_total = fixed_n + tethered_n
pts = self._sample_points_uniform_minsep(w, h, n_total, margin, min_sep, self._rng).astype(np.float32)
self._centers_fixed = pts[:fixed_n] if fixed_n else np.empty((0,2), np.float32)
self._centers_teth = pts[fixed_n:] if tethered_n else np.empty((0,2), np.float32)
self._recompute_fixed_delta()
[docs]
def _recompute_fixed_delta(self):
fixed_n = int(self._settings['fixed_n'])
if fixed_n <= 0:
self._delta_fixed = None
return
size_px = int(self._bead_size_px)
nmpp = float(self.nm_per_px)
radius = float(self._radius_nm)
z_s = float(self._settings['fixed_z']) + float(self._focus_offset)
xyz = np.array([[0.0, 0.0, z_s]], np.float32)
crop_WHT = simulate_beads(xyz, nm_per_px=nmpp, size_px=size_px, radius_nm=radius) # (w,h,1)
crop_HW = crop_WHT[:, :, 0].T
self._delta_fixed = self._delta_for_crop(crop_HW, pad=4)
[docs]
def _init_tether_state(self):
n_t = int(self._settings['tethered_n'])
self._xy = np.zeros((n_t, 2), np.float32)
base_z = float(self._settings['tethered_z']) + float(self._focus_offset)
self._z = np.full((n_t,), base_z, np.float32)
@staticmethod
[docs]
def _blit_add(dst, src, x, y, w=1.0):
Hs, Ws = src.shape
Hd, Wd = dst.shape
x0 = max(int(x), 0); y0 = max(int(y), 0)
x1 = min(int(x) + Ws, Wd); y1 = min(int(y) + Hs, Hd)
if x0 >= x1 or y0 >= y1:
return
sx0 = x0 - int(x); sy0 = y0 - int(y)
sx1 = sx0 + (x1 - x0); sy1 = sy0 + (y1 - y0)
dst[y0:y1, x0:x1] += w * src[sy0:sy1, sx0:sx1]
@classmethod
[docs]
def _accumulate_bilinear(cls, dst, srcHW, cx, cy):
H, W = srcHW.shape
x_int = int(np.floor(cx - W / 2.0))
y_int = int(np.floor(cy - H / 2.0))
fx = (cx - W / 2.0) - x_int
fy = (cy - H / 2.0) - y_int
cls._blit_add(dst, srcHW, x_int, y_int, (1.0 - fx) * (1.0 - fy))
cls._blit_add(dst, srcHW, x_int + 1, y_int, fx * (1.0 - fy))
cls._blit_add(dst, srcHW, x_int, y_int + 1, (1.0 - fx) * fy)
cls._blit_add(dst, srcHW, x_int + 1, y_int + 1, fx * fy)
@staticmethod
@staticmethod
@lru_cache(maxsize=8)
[docs]
def _tukey_taper(H, W, pad=4):
y = np.minimum(np.arange(H), np.arange(H)[::-1])
x = np.minimum(np.arange(W), np.arange(W)[::-1])
d = np.minimum.outer(y, x).astype(np.float32) / max(1, pad)
u = np.clip(d, 0.0, 1.0)
win = 0.5 - 0.5*np.cos(np.pi*u) # 0 at edge → 1 inside
win.setflags(write=False)
return win
@classmethod
[docs]
def _delta_for_crop(cls, cropHW, pad=4):
base = cls._border_median(cropHW)
win = cls._tukey_taper(*cropHW.shape, pad=pad)
return (cropHW - base) * win
@staticmethod
[docs]
def _ou_step(x, dt, theta, sigma, mu, rng):
# x_{t+1} = x + theta*(mu - x)*dt + sigma*sqrt(dt)*N(0,1)
return x + theta*(mu - x)*dt + sigma*np.sqrt(dt)*rng.normal()
@staticmethod