Source code for sksurgerysurfacematch.pipelines.register_cloud_to_stereo_mosaic

# -*- coding: utf-8 -*-

""" Pipeline to register 3D point cloud to mosaic'ed surface reconstruction. """

import copy
import numpy as np
import cv2
import sksurgerypclpython as pclp
import sksurgeryopencvpython as cvcpp
import sksurgerycore.algorithms.procrustes as proc
import sksurgerysurfacematch.utils.registration_utils as ru
import sksurgerysurfacematch.interfaces.video_segmentor as vs
import sksurgerysurfacematch.algorithms.reconstructor_with_rectified_images \
    as sr
import sksurgerysurfacematch.interfaces.rigid_registration as rr


# pylint:disable=too-many-instance-attributes, invalid-name, too-many-locals
# pylint:disable=unsubscriptable-object, too-many-arguments,too-many-branches


[docs]class Register3DToMosaicedStereoVideo: """ Class to register a point cloud to a series of surfaces derived from stereo video, and stitched together. """ def __init__( self, video_segmentor: vs.VideoSegmentor, surface_reconstructor: sr.StereoReconstructorWithRectifiedImages, rigid_registration: rr.RigidRegistration, left_camera_matrix: np.ndarray, right_camera_matrix: np.ndarray, left_to_right_rmat: np.ndarray, left_to_right_tvec: np.ndarray, min_number_of_keypoints: int = 25, max_fre_threshold=2, left_mask: np.ndarray = None, z_range: list = None, radius_removal: list = None, voxel_reduction: list = None): """ Uses Dependency Injection for each pluggable component. :param video_segmentor: Optional class to pre-segment the video. :param surface_reconstructor: Mandatory class to do reconstruction. :param rigid_registration: Mandatory class to perform rigid alignment. :param left_camera_matrix: [3x3] camera matrix. :param right_camera_matrix: [3x3] camera matrix. :param left_to_right_rmat: [3x3] left-to-right rotation matrix. :param left_to_right_tvec: [1x3] left-to-right translation vector. :param min_number_of_keypoints: Number of keypoints to use for matching. :param max_fre_threshold: maximum FRE when stitching frames together. :param left_mask: a static mask to apply to stereo reconstruction. :param z_range: [min range, max range] to limit reconstructed points. :param radius_removal: [radius, number] to reject points with too few neighbours :param voxel_reduction: [vx, vy, vz] parameters for PCL Voxel Grid reduction. """ self.video_segmentor = video_segmentor self.surface_reconstructor = surface_reconstructor self.rigid_registration = rigid_registration self.left_camera_matrix = left_camera_matrix self.right_camera_matrix = right_camera_matrix self.left_to_right_rmat = left_to_right_rmat self.left_to_right_tvec = left_to_right_tvec self.min_number_of_keypoints = min_number_of_keypoints self.max_fre_threshold = max_fre_threshold self.left_static_mask = left_mask self.z_range = z_range self.radius_removal = radius_removal self.voxel_reduction = voxel_reduction self.previous_keypoints = None self.previous_descriptors = None self.previous_good_l2r_matches = None self.previous_good_l2r_matched_descriptors = None self.previous_triangulated_key_points = None self.previous_recon = None
[docs] def reset(self): """ Reset's internal data members, so that you can start accumulating data again. """ self.previous_keypoints = None self.previous_descriptors = None self.previous_good_l2r_matches = None self.previous_good_l2r_matched_descriptors = None self.previous_triangulated_key_points = None self.previous_recon = None
[docs] def grab(self, left_image: np.ndarray, right_image: np.ndarray): """ Call this repeatedly to grab a surface and use ORM key points to match previous reconstruction to the current frame. :param left_image: undistorted, BGR image :param right_image: undistorted, BGR image """ left_mask = np.ones((left_image.shape[0], left_image.shape[1]), dtype=np.uint8) * 255 if self.video_segmentor is not None: left_mask = self.video_segmentor.segment(left_image) left_mask = 255 * (left_mask > 0) if self.left_static_mask is not None: left_mask = np.bitwise_and(left_mask, self.left_static_mask) orb = cv2.ORB_create() current_left_key_points, current_left_descriptors = \ orb.detectAndCompute(left_image, None) current_right_key_points, current_right_descriptors = \ orb.detectAndCompute(right_image, None) index_params = dict(algorithm=6, table_number=6, key_size=12, multi_probe_level=1) # 6=FLANN_INDEX_LSH search_params = dict(checks=50) flann = cv2.FlannBasedMatcher(index_params, search_params) left_to_right_matches = flann.knnMatch( current_left_descriptors, current_right_descriptors, k=2) # Keep good matches, based on Lowe's ratio test. good_l2r_matches = [] for m, n in left_to_right_matches: if m.distance < 0.7 * n.distance: good_l2r_matches.append(m) if len(good_l2r_matches) > self.min_number_of_keypoints: left_descriptors = np.zeros((len(good_l2r_matches), len(current_left_descriptors[0])), dtype=np.uint8) for i, m in enumerate(good_l2r_matches): left_descriptors[i, :] = current_left_descriptors[m.queryIdx] left_pts = np.float32([current_left_key_points[m.queryIdx].pt for m in good_l2r_matches]) right_pts = np.float32([current_right_key_points[m.trainIdx].pt for m in good_l2r_matches]) paired_pts = np.zeros((left_pts.shape[0], 4)) paired_pts[:, 0:2] = left_pts paired_pts[:, 2:4] = right_pts triangulated_l2r_key_points = \ cvcpp.triangulate_points_using_hartley(paired_pts, self.left_camera_matrix, self.right_camera_matrix, self.left_to_right_rmat, self.left_to_right_tvec) full_reconstruction = \ self.surface_reconstructor.reconstruct(left_image, self.left_camera_matrix, right_image, self.right_camera_matrix, self.left_to_right_rmat, self.left_to_right_tvec, left_mask ) full_reconstruction = full_reconstruction[:, 0:3] if self.z_range is not None: full_reconstruction = \ pclp.pass_through_filter(full_reconstruction, 'z', self.z_range[0], self.z_range[1], True) if self.radius_removal is not None: full_reconstruction = \ pclp.radius_removal_filter(full_reconstruction, self.radius_removal[0], self.radius_removal[1]) # Match to previous frame if self.previous_keypoints is not None and \ self.previous_descriptors is not None: previous_to_current = flann.knnMatch( self.previous_good_l2r_matched_descriptors, left_descriptors, k=2) # Keep good matches, based on Lowe's ratio test. good_prev_to_current_matches = [] for i, id_pair in enumerate(previous_to_current): if id_pair is not None and len(id_pair) == 2: m, n = id_pair if m.distance < 0.7 * n.distance: good_prev_to_current_matches.append(m) if len(good_prev_to_current_matches) > \ self.min_number_of_keypoints: prev_3d_pts = np.float32( [self.previous_triangulated_key_points[m.queryIdx] for m in good_prev_to_current_matches]) current_3d_pts = np.float32( [triangulated_l2r_key_points[m.trainIdx] for m in good_prev_to_current_matches]) # Compute rigid body transform. Maybe use RANSAC? rmat, tvec, fre = proc.orthogonal_procrustes(current_3d_pts, prev_3d_pts) if fre < self.max_fre_threshold: # Transform previous point cloud to current transformed_point_cloud = \ np.transpose(np.matmul(rmat, np.transpose( self.previous_recon)) + tvec) # Combine and simplify? or just combine? full_reconstruction = \ np.vstack((transformed_point_cloud, full_reconstruction)) # Save current iteration, such that next iteration it will # be called the 'previous' iteration, for tracking purposes. self.previous_keypoints = current_left_key_points self.previous_descriptors = current_left_descriptors self.previous_good_l2r_matches = good_l2r_matches self.previous_good_l2r_matched_descriptors = left_descriptors self.previous_triangulated_key_points = triangulated_l2r_key_points self.previous_recon = full_reconstruction
[docs] def register(self, point_cloud: np.ndarray, initial_transform: np.ndarray = None ): """ Registers a point cloud to the internal mosaicc'ed reconstruction. :param point_cloud: [Nx3] points, each row, x,y,z, e.g. from CT/MR. :param initial_transform: [4x4] of initial rigid transform. :return: residual, [4x4] transform, of point_cloud to left camera space, and [Mx6] reconstructed point cloud, as [x, y, z, r, g, b] rows. """ if self.previous_recon is None: raise ValueError("No reconstruction has been performed") recon_points = self.previous_recon if self.voxel_reduction is not None: recon_points = \ pclp.down_sample_points( recon_points, self.voxel_reduction[0], self.voxel_reduction[1], self.voxel_reduction[2]) residual, transform = ru.do_rigid_registration(recon_points, point_cloud, self.rigid_registration, initial_transform) # Don't return a pointer to internal self.previous_recon. return residual, transform, copy.deepcopy(recon_points)