Source code for sksurgeryimage.acquire.video_source

# coding=utf-8
#pylint:disable=no-name-in-module

"""
Module for video source acquisition.
Classes capture data from a video source into a numpy array.
"""

import logging
import datetime
import cv2
import numpy as np
import sksurgerycore.utilities.validate_file as vf
import sksurgeryimage.utilities.camera_utilities as cu

LOGGER = logging.getLogger(__name__)

[docs]class TimestampedVideoSource: """ Capture and store data from camera/file source. Augments the cv2.VideoCapture() to provide passing of camera dimensions in constructor, and storage of frame data. """ def __init__(self, source_num_or_file, dims=None): """ Constructs a TimestampedVideoSource. :param source_num_or_file: integer camera number or file path :param dims: optional (width, height) as a pair of integers """ self.source = cv2.VideoCapture(source_num_or_file) self.timestamp = None if not self.source.isOpened(): raise RuntimeError("Failed to open Video camera:" + str(source_num_or_file)) self.source_name = source_num_or_file LOGGER.info("Adding input from source: %s", self.source_name) if dims: width, height = dims if not isinstance(width, int): raise TypeError("Width must be an integer") if not isinstance(height, int): raise TypeError("Height must be an integer") if width < 1: raise ValueError("Width must be >= 1") if height < 1: raise ValueError("Height must be >= 1") self.set_resolution(width, height) else: width = int(self.source.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.source.get(cv2.CAP_PROP_FRAME_HEIGHT)) LOGGER.info("Source dimensions %s %s", width, height) self.frame = np.empty((height, width, 3), dtype=np.uint8) self.ret = None
[docs] def set_resolution(self, width: int, height: int): """Set the resolution of the input source. :param width: Width :type width: int :param height: Height :type height: int :raises ValueError: If resolution is not supported. """ self.source.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.source.set(cv2.CAP_PROP_FRAME_HEIGHT, height) set_w = self.source.get(cv2.CAP_PROP_FRAME_WIDTH) set_h = self.source.get(cv2.CAP_PROP_FRAME_HEIGHT) if set_w != width or set_h != height: raise ValueError(f"Tried to set width/height to {width} x {height} \ but failed. Width and height set to {set_w} x {set_h}")
[docs] def grab(self): """ Call the cv2.VideoCapture grab function and get a timestamp. """ self.ret = self.source.grab() self.timestamp = datetime.datetime.now() return self.ret
[docs] def retrieve(self): """ Call the cv2.VideoCapture retrieve function and store the returned frame. """ self.ret, self.frame = self.source.retrieve() return self.ret, self.frame
[docs] def read(self): """ Do a grab(), then retrieve() operation. """ self.grab() self.retrieve() return self.ret, self.frame
[docs] def isOpened(self): """ Call the cv2.VideoCapture isOpened function. """ # pylint: disable=invalid-name # using isOpened to be consistent with OpenCV function name return self.source.isOpened()
[docs] def release(self): """ Release the cv2.VideoCapture source. """ self.source.release()
[docs]class VideoSourceWrapper: """ Wrapper for multiple TimestampedVideoSource objects. """ def __init__(self): self.sources = [] self.frames = [] self.timestamps = [] self.save_timestamps = True self.num_sources = 0
[docs] def add_camera(self, camera_number, dims=None): """ Create VideoCapture object from camera and add it to the list of sources. :param camera_number: integer camera number :param dims: (width, height) as integer numbers of pixels """ cu.validate_camera_input(camera_number) self.add_source(camera_number, dims)
[docs] def add_file(self, filename, dims=None): """ Create videoCapture object from file and add it to the list of sources. :param filename: a string containing a valid file path :param dims: (width, height) as integer numbers of pixels """ vf.validate_is_file(filename) self.add_source(filename, dims)
[docs] def add_source(self, camera_num_or_file, dims=None): """ Add a video source (camera or file) to the list of sources. :param camera_num_or_file: either an integer camera number or filename :param dims: (width, height) as integer numbers of pixels """ video_source = TimestampedVideoSource(camera_num_or_file, dims) self.sources.append(video_source) self.num_sources = len(self.sources)
[docs] def are_all_sources_open(self): """ Check all input sources are active/open. """ for source in self.sources: if not source.isOpened(): return False return True
[docs] def release_all_sources(self): """ Close all camera/file sources. """ logging.info("Releasing video sources") for source in self.sources: source.release()
[docs] def get_next_frames(self): """ Do a grab() operation for each source, followed by a retrieve(). """ self.grab() self.retrieve()
[docs] def grab(self): """ Perform a grab() operation for each source """ if self.are_all_sources_open(): for source in self.sources: source.grab()
[docs] def retrieve(self): """ Perform a retrieve operation for each source. Should only be run after a grab() operation. :returns list of views on frames """ self.frames = [] for source in self.sources: source.retrieve() self.frames.append(source.frame) return self.frames