Source code for paprica.segmenter

"""
Module containing classes and functions relative to Segmentation.


By using this code you agree to the terms of the software license agreement.

© Copyright 2020 Wyss Center for Bio and Neuro Engineering – All rights reserved
"""

import os
from time import time

import cv2 as cv
import napari
import numpy as np
import pandas as pd
import pyapr
import sparse
from joblib import load
from tqdm import tqdm

import paprica


[docs]def _predict_on_APR_block(x, clf, n_parts=1e7, output='class', verbose=False): """ Predict particle class with the trained classifier clf on the precomputed features f using a blocked strategy to avoid memory segfault. Parameters ---------- x: ndarray features (n_particle, n_features) for particle prediction n_parts: int number of particles in the batch to predict output: string output type, can be 'class' where each particle get assigned a class or 'proba' where each particle get assigned a probability of belonging to each class. verbose: bool control function verbosity Returns ------- parts_pred: array_like Class prediction for each particle. """ # Predict on numpy array by block to avoid memory issues if verbose: t = time() n_block = int(np.ceil(x.shape[0] / n_parts)) if int(n_parts) != n_parts: raise ValueError('Error: n_parts must be an int.') n_parts = int(n_parts) clf[1].set_params(n_jobs=-1) if output == 'class': y_pred = np.empty((x.shape[0])) for i in tqdm(range(n_block), desc='Predicting particle type'): y_pred[i * n_parts:min((i + 1) * n_parts, x.shape[0])] = clf.predict( x[i * n_parts:min((i + 1) * n_parts, x.shape[0])]) # Transform numpy array to ParticleData parts_pred = pyapr.ShortParticles(y_pred.astype('uint16')) elif output == 'proba': y_pred = np.empty((x.shape[0], len(clf.classes_))) for i in tqdm(range(n_block), desc='Predicting particle type'): y_pred[i * n_parts:min((i + 1) * n_parts, x.shape[0]), :] = clf.predict_proba( x[i * n_parts:min((i + 1) * n_parts, x.shape[0])]) # Transform numpy array to ParticleData parts_pred = [] for i in range(len(clf.classes_)): parts_pred.append(pyapr.ShortParticles( (y_pred[:, i]*(2**16-1)) .astype('uint16'))) else: raise ValueError('Unknown output \'{}\' for APR block prediction.'.format(output)) if verbose: print('Blocked prediction took {:0.3f} s.\n'.format(time() - t)) return parts_pred
[docs]def map_feature(apr, parts_cc, features): """ Map feature values to segmented particle data. Parameters ---------- apr: pyapr.APR apr object to map features to parts_cc: pyapr.ParticleData connected component particle array corresponding to apr features: array_like array containing the values to map Returns ------- Array of mapped values (each particle in the connected component now has the value present in features) """ objects_volume = pyapr.measure.find_label_volume(apr, parts_cc) hash_idx = np.arange(0, len(objects_volume)) # Object with volume 0 are not in CC so we need to get rid of them hash_idx = hash_idx[objects_volume > 0] # We also need to get rid of the background hash_idx = hash_idx[1:] if len(hash_idx) != len(features): raise ValueError('Error: features length ({}) should be the same as the number of connected components ({}).' .format(len(features), len(hash_idx))) # Create hash dict hash_dict = {x: y for x, y in zip(hash_idx, features)} # Replace 0 by 0 hash_dict[0] = 0 mp = np.arange(0, parts_cc.max() + 1) mp[list(hash_dict.keys())] = list(hash_dict.values()) return mp[np.array(parts_cc, copy=False)]
[docs]def compute_gradients(apr, parts): """ Compute gradient for each spatial direction directly on APR. Parameters ---------- apr: (APR) APR object parts: (ParticleData) particle data sampled on APR Returns ------- (dx, dy, dz): (arrays) gradient for each direction """ par = apr.get_parameters() dz = pyapr.filter.gradient(apr, parts, dim=2, delta=par.dz) dx = pyapr.filter.gradient(apr, parts, dim=1, delta=par.dx) dy = pyapr.filter.gradient(apr, parts, dim=0, delta=par.dy) return dz, dx, dy
[docs]def compute_laplacian(apr, parts, grad=None): """ Compute Laplacian for each spatial direction directly on APR. Parameters ---------- apr: (APR) APR object parts: (ParticleData) particle data sampled on APR grad: (dz, dy, dx) gradient for each direction if precomputed (faster for Laplacian computation) Returns ------- Laplacian of APR. """ par = apr.get_parameters() if grad is None: dz, dx, dy = compute_gradients(apr, parts) else: dz, dx, dy = grad dz2 = pyapr.filter.gradient(apr, dz, dim=2, delta=par.dz) dx2 = pyapr.filter.gradient(apr, dx, dim=1, delta=par.dx) dy2 = pyapr.filter.gradient(apr, dy, dim=0, delta=par.dy) return dz2 + dx2 + dy2
[docs]def compute_gradmag(apr, parts): """ Compute gradient magnitude directly on APR. Parameters ---------- apr: (APR) APR object parts: (ParticleData) particle data sampled on APR Returns ------- Gradient magnitude of APR. """ par = apr.get_parameters() gradmag = pyapr.filter.gradient_magnitude(apr, parts, deltas=(par.dz, par.dx, par.dy)) return gradmag
[docs]def gaussian_blur(apr, parts, sigma=1.5, size=11): """ Compute Gaussian blur directly on APR. Parameters ---------- apr: (APR) APR object parts: (ParticleData) particle data sampled on APR sigma: (float) Gaussian blur standard deviation (kernel radius) size: (int) kernel size (increase with caution, complexity is not linear) Returns ------- Blurred APR. """ stencil = pyapr.filter.get_gaussian_stencil(size, sigma, ndims=3, normalize=True) return pyapr.filter.convolve(apr, parts, stencil)
[docs]def particle_levels(apr): """ Returns apr level: for each particle the lvl is defined as the size of the particle in pixel. Parameters ---------- apr: (APR) APR object Returns ------- Particle level. """ lvls = pyapr.ShortParticles(apr.total_number_particles()) lvls.fill_with_levels(apr) lvls = np.array(lvls) return 2 ** (lvls.max() - lvls)
[docs]class tileSegmenter(): """ Class used to segment tiles. It is instantiated with a tileLoader object, a previously trained classifier, a function to compute features (the same features used to train the classifier and a function to get the post processed connected component for the classifier output. """
[docs] def __init__(self, clf, func_to_compute_features, func_to_get_cc, verbose): """ Parameters ---------- clf: sklearn.classifier pre-trained classifier func_to_compute_features: func function to compute the features on ParticleData. Must be the same set of as the one used to train the classifier. func_to_get_cc: func function to post process the segmentation map into a connected component (each cell has a unique id) """ # Store classifier self.clf = clf # Store function to compute features self.func_to_compute_features = func_to_compute_features # Store post processing steps self.func_to_get_cc = func_to_get_cc # Verbose self.verbose = verbose
[docs] @classmethod def from_trainer(cls, trainer, verbose=True): """ Instantiate tileSegmenter object with a tileTrainer object. Parameters ---------- trainer: tileTrainer trainer object previously trained for segmentation verbose: bool control function output Returns ------- tileSegmenter object """ return cls(clf=trainer.clf, func_to_compute_features=trainer.func_to_compute_features, func_to_get_cc=trainer.func_to_get_cc, verbose=verbose)
[docs] @classmethod def from_classifier(cls, classifier, func_to_compute_features, func_to_get_cc=None, verbose=True): """ Instantiate tileSegmenter object with a classifier, function to compute the features and to get the connected components. Parameters ---------- classifier func_to_compute_features: func function to compute features used by the classifier to perform the segmentation. func_to_get_cc: func function to compute the connected component from the classifier prediction. verbose: bool control function output. Returns ------- tileSegmenter object """ if isinstance(classifier, str): clf = load(classifier) else: clf = classifier return cls(clf=clf, func_to_compute_features=func_to_compute_features, func_to_get_cc=func_to_get_cc, verbose=verbose)
[docs] def compute_segmentation(self, tile: paprica.loader.tileLoader, save_cc=True, save_mask=False, lazy_loading=True): """ Compute the segmentation and stores the result as an independent APR. Parameters ---------- verbose: bool control the verbosity of the function to print some info Returns ------- None """ if tile.apr is None: tile.load_tile() # Compute features on APR if self.verbose: t = time() print('Computing features on APR') f = self.func_to_compute_features(tile.apr, tile.parts) # self.filtered_APR = f if self.verbose: print('Features computation took {:0.2f} s.'.format(time()-t)) # Predict particle class parts_pred = _predict_on_APR_block(f, self.clf, verbose=self.verbose) if self.verbose: # Display inference info print('\n****** INFERENCE RESULTS ******') for l in self.clf.classes_: print('Class {}: {} particles ({:0.2f}%)'.format(l, np.sum(parts_pred == l), np.sum(parts_pred == l) / len(parts_pred) * 100)) print('*******************************') # Compute connected component from classification if self.func_to_get_cc is not None: cc = self.func_to_get_cc(tile.apr, parts_pred) tile.parts_cc = cc # Save results if save_mask: # Write particles pyapr.io.write_particles(tile.path, parts_pred, parts_name='segmentation mask', tree=False, append=True) if lazy_loading: # Compute tree parts) tree_parts = pyapr.tree.fill_tree_max(tile.apr, parts_pred) # Save tree parts pyapr.io.write_particles(tile.path, tree_parts, parts_name='segmentation mask', tree=True, append=True) if save_cc: # Write particles pyapr.io.write_particles(tile.path, cc, parts_name='segmentation cc', tree=False, append=True) if lazy_loading: # Compute tree parts tree_parts = pyapr.tree.fill_tree_max(tile.apr, cc) # Save tree parts pyapr.io.write_particles(tile.path, tree_parts, parts_name='segmentation cc', tree=True, append=True) tile.parts_mask = parts_pred
# def _save_segmentation(self, path, name, parts): # """ # Save segmentation particles by appending the original APR file. # # Parameters # ---------- # parts: pyapr.ParticleData # particles to save. Note that the APR tree should be the same otherwise the data # will be inconsistent and not readable. # # Returns # ------- # None # """ # pyapr.io.write_particles(path, parts, parts_name=name, append=True) # pyapr.io.write_particles(path, tree_parts, parts_name=name, append=True, tree=True) # # aprfile = pyapr.io.APRFile() # # aprfile.set_read_write_tree(True) # # aprfile.open(path, 'READWRITE') # # aprfile.write_particles(name, parts, t=0) # # aprfile.close()
[docs]class multitileSegmenter(): """ Class used to segment multitiles acquisition. """
[docs] def __init__(self, tiles: paprica.parser.tileParser, database: (str, pd.DataFrame), clf, func_to_compute_features, func_to_get_cc, verbose=True): """ Parameters ---------- tiles: tileLoader tile object for loading the tile (or containing the preloaded tile). database: pd.DataFrame, string dataframe (or path to the csv file) containing the registration parameters to correctly place each tile. clf: sklearn.classifier pre-trained classifier func_to_compute_features: func function to compute the features on ParticleData. Must be the same set of as the one used to train the classifier. func_to_get_cc: func function to post process the segmentation map into a connected component (each cell has a unique id) """ # Store classifier if isinstance(clf, str): self.clf = load(clf) else: self.clf = clf # Store function to compute features self.func_to_compute_features = func_to_compute_features # Store post processing steps self.func_to_get_cc = func_to_get_cc # If database is a path then load database, if it's a DataFrame keep it as it is. if isinstance(database, str): self.database = pd.read_csv(database) elif isinstance(database, pd.DataFrame): self.database = database else: raise TypeError('Error: database of wrong type.') self.tiles = tiles self.path = tiles.path self.type = tiles.type self.tiles_list = tiles.tiles_list self.n_tiles = tiles.n_tiles self.ncol = tiles.ncol self.nrow = tiles.nrow self.neighbors = tiles.neighbors self.n_edges = tiles.n_edges self.path_list = tiles.path_list self.frame_size = tiles.frame_size self.verbose = verbose self.cells = None self.atlas = None
[docs] @classmethod def from_trainer(cls, tiles: paprica.parser.tileParser, database: (str, pd.DataFrame), trainer, verbose=True): """ Instantiate tileSegmenter object with a tileTrainer object. Parameters ---------- trainer: tileTrainer trainer object previously trained for segmentation verbose: bool control function output Returns ------- tileSegmenter object """ return cls(tiles=tiles, database= database, clf=trainer.clf, func_to_compute_features=trainer.func_to_compute_features, func_to_get_cc=trainer.func_to_get_cc, verbose=verbose)
[docs] @classmethod def from_classifier(cls, tiles: paprica.parser.tileParser, database: (str, pd.DataFrame), classifier, func_to_compute_features, func_to_get_cc=None, verbose=True): """ Instantiate tileSegmenter object with a classifier, function to compute the features and to get the connected components. Parameters ---------- classifier func_to_compute_features: func function to compute features used by the classifier to perform the segmentation. func_to_get_cc: func function to compute the connected component from the classifier prediction. verbose: bool control function output. Returns ------- tileSegmenter object """ if isinstance(classifier, str): clf = load(classifier) else: clf = classifier return cls(tiles=tiles, database= database, clf=clf, func_to_compute_features=func_to_compute_features, func_to_get_cc=func_to_get_cc, verbose=verbose)
[docs] def compute_multitile_segmentation(self, save_cc=True, save_mask=False, lowe_ratio=0.7, distance_max=5, lazy_loading=True): """ Compute the segmentation and stores the result as an independent APR. Parameters ---------- verbose: bool control the verbosity of the function to print some info save_cc: bool option to save the connected component particle to file save_mask: bool option to save the prediction mask to file lowe_ratio: float in ]0, 1[ ratio between the second nearest neighbor and the first nearest neighbor to be considered a good match distance_max: float maximum distance in pixel for two objects to be matched lazy_loading: bool option to save the tree particles to allow for lazy loading later on Returns ------- None """ for tile in tqdm(self.tiles, desc='Extracting and merging cells..'): # Perform tile segmentation tile = self._segment_tile(tile, save_cc=save_cc, save_mask=save_mask, lazy_loading=lazy_loading) # Remove objects on the edge pyapr.morphology.remove_edge_objects(tile.apr, tile.parts_cc, z_edges=False) # Initialized merged cells for the first tile if self.cells is None: self.cells = pyapr.measure.find_label_centers(tile.apr, tile.parts_cc, tile.parts) self.cells += self._get_tile_position(tile.row, tile.col) # Then merge the rest on the first tile else: self._merge_cells(tile, lowe_ratio=lowe_ratio, distance_max=distance_max) try: self.save_cells(output_path=os.path.join(self.tiles.path, 'cells_backup.csv')) except: print('Could not back up cells.')
[docs] def extract_and_merge_cells(self, lowe_ratio=0.7, distance_max=5): """ Function to extract cell positions in each tile and merging across all tiles. Identical cells on overlapping area are automatically detected using Flann method. Parameters ---------- lowe_ratio: float ratio of the second nearest neighbor distance / nearest neighbor distance above lowe_ratio, the cell is supposed to be unique. Below lowe_ratio, it might have a second detection on the neighboring tile. distance_max: float maximum distance in pixel for two cells to be considered the same. Returns ------- None """ for tile in tqdm(self.tiles, desc='Extracting and merging cells..'): tile.load_tile() tile.load_segmentation() # Remove objects on the edge pyapr.morphology.remove_edge_objects(tile.apr, tile.parts_cc) # Initialized merged cells for the first tile if self.cells is None: self.cells = pyapr.measure.find_label_centers(tile.apr, tile.parts_cc, tile.parts) self.cells += self._get_tile_position(tile.row, tile.col) # Then merge the rest on the first tile else: self._merge_cells(tile, lowe_ratio=lowe_ratio, distance_max=distance_max) try: self.save_cells(output_path=os.path.join(self.tiles.path, 'cells_backup.csv')) except: print('Could not back up cells.')
[docs] def save_cells(self, output_path): """ Save cells as a CSV file. Parameters ---------- output_path: string path for saving the CSV file. Returns ------- None """ pd.DataFrame(self.cells).to_csv(output_path, header=['z', 'y', 'x'])
[docs] def _segment_tile(self, tile: paprica.loader.tileLoader, save_cc=True, save_mask=False, lazy_loading=True): """ Compute the segmentation and stores the result as an independent APR. Parameters ---------- verbose: bool control the verbosity of the function to print some info Returns ------- None """ if tile.apr is None: tile.load_tile() # Compute features on APR if self.verbose: t = time() print('Computing features on APR') f = self.func_to_compute_features(tile.apr, tile.parts) self.filtered_APR = f if self.verbose: print('Features computation took {:0.2f} s.'.format(time()-t)) # Predict particle class parts_pred = _predict_on_APR_block(f, self.clf, verbose=self.verbose) if self.verbose: # Display inference info print('\n****** INFERENCE RESULTS ******') for l in self.clf.classes_: print('Class {}: {} particles ({:0.2f}%)'.format(l, np.sum(parts_pred == l), np.sum(parts_pred == l) / len(parts_pred) * 100)) print('*******************************') # Compute connected component from classification if self.func_to_get_cc is not None: cc = self.func_to_get_cc(tile.apr, parts_pred) tile.parts_cc = cc # Save results if save_mask: # Write particles pyapr.io.write_particles(tile.path, parts_pred, parts_name='segmentation mask', tree=False, append=True) if lazy_loading: # Compute tree parts) tree_parts = pyapr.tree.fill_tree_max(tile.apr, parts_pred) # Save tree parts pyapr.io.write_particles(tile.path, tree_parts, parts_name='segmentation mask', tree=True, append=True) if save_cc: # Write particles pyapr.io.write_particles(tile.path, cc, parts_name='segmentation cc', tree=False, append=True) if lazy_loading: # Compute tree parts tree_parts = pyapr.tree.fill_tree_max(tile.apr, cc) # Save tree parts pyapr.io.write_particles(tile.path, tree_parts, parts_name='segmentation cc', tree=True, append=True) tile.parts_mask = parts_pred return tile
[docs] def _merge_cells(self, tile, lowe_ratio, distance_max): """ Function to merge cells on a tile to the final cells list and remove duplicate. Parameters ---------- tile: tileLoader tile from which to merge cells lowe_ratio: float ratio of the second nearest neighbor distance / nearest neighbor distance above lowe_ratio, the cell is supposed to be unique. Below lowe_ratio, it might have a second detection on the neighboring tile. distance_max: float maximum distance in pixel for two cells to be considered the same. Returns ------- None """ if self.cells.shape[0]>0: r1 = np.max(self.cells, axis=0) else: r1 = np.array([0, 0, 0]) r2 = self._get_tile_position(tile.row, tile.col) v_size = np.array(tile.apr.shape()) # Define the overlapping area overlap_i = r2 overlap_f = np.min((r1 + v_size, r2 + v_size), axis=0) # Retrieve cell centers cells2 = pyapr.measure.find_label_centers(tile.apr, tile.parts_cc, tile.parts) cells2 += r2 # Filter cells to keep only those on the overlapping area for i in range(3): if i == 0: ind = np.where(self.cells[:, i] < overlap_i[i])[0] else: ind = np.concatenate((ind, np.where(self.cells[:, i] < overlap_i[i])[0])) ind = np.concatenate((ind, np.where(self.cells[:, i] > overlap_f[i])[0])) ind = np.unique(ind) cells1_out = self.cells[ind, :] cells1_overlap = np.delete(self.cells, ind, axis=0) for i in range(3): if i == 0: ind = np.where(cells2[:, i] < overlap_i[i])[0] else: ind = np.concatenate((ind, np.where(cells2[:, i] < overlap_i[i])[0])) ind = np.concatenate((ind, np.where(cells2[:, i] > overlap_f[i])[0])) ind = np.unique(ind) cells2_out = cells2[ind, :] cells2_overlap = np.delete(cells2, ind, axis=0) # Number of cells should be higher than the number of features for the KNN implementation to work. if cells2_overlap.shape[0] > 4: cells_filtered_overlap = self._filter_cells_flann(cells1_overlap, cells2_overlap, lowe_ratio=lowe_ratio, distance_max=distance_max) self.cells = np.vstack((cells1_out, cells2_out, cells_filtered_overlap)) else: self.cells = np.vstack((cells1_out, cells2_out))
[docs] def _get_tile_position(self, row, col): """ Function to get the absolute tile position defined by it's coordinate in the multitile set. Parameters ---------- row: int row number col: int column number Returns ------- _: ndarray tile absolute position """ df = self.database tile_df = df[(df['row'] == row) & (df['col'] == col)] px = tile_df['ABS_H'].values[0] py = tile_df['ABS_V'].values[0] pz = tile_df['ABS_D'].values[0] return np.array([pz, py, px])
[docs] def _filter_cells_flann(self, c1, c2, lowe_ratio=0.7, distance_max=5): """ Remove cells duplicate using Flann criteria and distance threshold. Parameters ---------- c1: ndarray array containing the first set cells coordinates c2: ndarray array containing the second set cells coordinates lowe_ratio: float ratio of the second nearest neighbor distance / nearest neighbor distance above lowe_ratio, the cell is supposed to be unique. Below lowe_ratio, it might have a second detection on the neighboring tile. distance_max: float maximum distance in pixel for two cells to be considered the same. verbose: bool control function verbosity Returns ------- _: ndarray array containing the merged sets without the duplicates. """ if lowe_ratio < 0 or lowe_ratio > 1: raise ValueError('Lowe ratio is {}, expected between 0 and 1.'.format(lowe_ratio)) # Match cells descriptors by using Flann method FLANN_INDEX_KDTREE = 1 index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=4) search_params = dict(checks=100) flann = cv.FlannBasedMatcher(index_params, search_params) matches = flann.knnMatch(np.float32(c1), np.float32(c2), k=2) # store all the good matches as per Lowe's ratio test. good = [] for m, n in matches: if m.distance < lowe_ratio*n.distance and m.distance < distance_max: good.append(m) # Remove cells that are present in both volumes ind_c1 = [m.queryIdx for m in good] ind_c2 = [m.trainIdx for m in good] # For now I just remove the cells in c2 but merging strategies can be better c2 = np.delete(c2, ind_c2, axis=0) # Display info if self.verbose: try: print('{:0.2f}% of cells were removed.'.format(len(ind_c2)/(c1.shape[0]+c2.shape[0]-len(ind_c2))*100)) except ZeroDivisionError: print('No cell removed.') return np.vstack((c1, c2))
# def _save_segmentation(self, path, name, parts, tree_parts): # """ # Save segmentation particles by appending the original APR file. # # Parameters # ---------- # parts: pyapr.ParticleData # particles to save. Note that the APR tree should be the same otherwise the data # will be inconsistent and not readable. # tree_parts: pyapr.ParticleData # tree particles necessary for lazy loading. # # Returns # ------- # None # """ # pyapr.io.write_particles(path, parts, parts_name=name, append=True) # pyapr.io.write_particles(path, tree_parts, parts_name=name, append=True, tree=True) # # aprfile = pyapr.io.APRFile() # # aprfile.set_read_write_tree(True) # # aprfile.open(path, 'READWRITE') # # aprfile.write_particles(name, parts, t=0, tree_parts=tree_parts) # # aprfile.close()
[docs]class tileTrainer(): """ Class used to train a classifier that works directly on APR data. It uses Napari to manually add labels. """ def __init__(self, tile: paprica.loader.tileLoader, func_to_compute_features, func_to_get_cc=None): tile.load_tile() self.tile = tile self.apr = tile.apr self.parts = tile.parts self.apr_it = self.apr.iterator() self.shape = tile.apr.shape() self.func_to_compute_features = func_to_compute_features self.func_to_get_cc = func_to_get_cc self.labels_manual = None self.pixel_list = None self.labels = None self.use_sparse_labels = None self.parts_train_idx = None self.clf = None self.parts_mask = None self.parts_cc = None self.f = None
[docs] def manually_annotate(self, use_sparse_labels=True, **kwargs): """ Manually annotate dataset using Napari. Parameters ---------- use_sparse_labels: bool use sparse array to store the labels (memory efficient but slower graphics) Returns ------- None """ self.sparse = use_sparse_labels if self.sparse: # We create a sparse array that supports inserting data (COO does not) self.labels_manual = sparse.DOK(shape=self.shape, dtype='uint8') else: self.labels_manual = np.empty(self.shape, dtype='uint8') # We call napari with the APRSlicer and the sparse array for storing the manual annotations viewer = napari.Viewer() image_layer = napari.layers.Image(data=pyapr.reconstruction.APRSlicer(self.apr, self.parts), **kwargs) viewer.add_layer(image_layer) viewer.add_labels(self.labels_manual) viewer.show(block=True) # We extract labels and pixel coordinate from the sparse array if self.sparse: self.labels_manual = self.labels_manual.to_coo() else: self.labels_manual = sparse.COO.from_numpy(self.labels_manual) self.pixel_list = self.labels_manual.coords.T self.labels = self.labels_manual.data
[docs] def add_annotations(self, use_sparse_labels=True, **kwargs): """ Add annotations on previously annotated dataset. Parameters ---------- use_sparse_labels: bool use sparse array to store the labels (memory efficient but slower graphics) Returns ------- None """ self.sparse = use_sparse_labels if self.sparse: # We create a sparse array that supports inserting data (COO does not) self.labels_manual = sparse.DOK(self.labels_manual) else: self.labels_manual = self.labels_manual.todense() # We call napari with the APRSlicer and the sparse array for storing the manual annotations viewer = napari.Viewer() image_layer = napari.layers.Image(data=pyapr.reconstruction.APRSlicer(self.apr, self.parts), **kwargs) viewer.add_layer(image_layer) viewer.add_labels(self.labels_manual) viewer.show(block=True) # We extract labels and pixel coordinate from the sparse array if self.sparse: self.labels_manual = self.labels_manual.to_coo() else: self.labels_manual = sparse.COO.from_numpy(self.labels_manual) self.pixel_list = self.labels_manual.coords.T self.labels = self.labels_manual.data
[docs] def save_labels(self, path=None): """ Save labels as numpy array with columns corresponding to [z, y, x, label]. Parameters ---------- path: string path to save labels. By default it saves them in the data root folder. Returns ------- None """ if path is None: path = os.path.join(self.tile.folder_root, 'manual_labels.npy') to_be_saved = np.hstack((self.pixel_list, self.labels[:, np.newaxis])) np.save(path, to_be_saved)
[docs] def load_labels(self, path=None): """ Load previously saved labels as numpy array with columns corresponding to [z, y, x, label]. Parameters ---------- path: string path to load the saved labels. By default it loads them in the data root folder. Returns ------- None """ if path is None: path = os.path.join(self.tile.folder_root, 'manual_labels.npy') data = np.load(path) self.pixel_list = data[:, :-1] self.labels = data[:, -1] self.labels_manual = sparse.COO(coords=self.pixel_list.T, data=self.labels, shape=self.shape)
[docs] def train_classifier(self, verbose=True, n_estimators=10, class_weight='balanced', mean_norm=True, std_norm=True): """ Train the classifier for segmentation. Parameters ---------- verbose: bool option to print out information. n_estimators : int The number of trees in the random forest. class_weight : {"balanced", "balanced_subsample"}, dict or list of dicts, Weights associated with classes in the form ``{class_label: weight}``. If not given, all classes are supposed to have weight one. For multi-output problems, a list of dicts can be provided in the same order as the columns of y. Note that for multioutput (including multilabel) weights should be defined for each class of every column in its own dict. For example, for four-class multilabel classification weights should be [{0: 1, 1: 1}, {0: 1, 1: 5}, {0: 1, 1: 1}, {0: 1, 1: 1}] instead of [{1:1}, {2:5}, {3:1}, {4:1}]. The "balanced" mode uses the values of y to automatically adjust weights inversely proportional to class frequencies in the input data as ``n_samples / (n_classes * np.bincount(y))`` The "balanced_subsample" mode is the same as "balanced" except that weights are computed based on the bootstrap sample for every tree grown. For multi-output, the weights of each column of y will be multiplied. Note that these weights will be multiplied with sample_weight (passed through the fit method) if sample_weight is specified. mean_norm : bool If True, center the data before scaling. std_norm : bool If True, scale the data to unit variance (or equivalently, unit standard deviation). Returns ------- None """ if self.pixel_list is None: raise ValueError('Error: annotate dataset or load annotations before training classifier.') from sklearn import preprocessing from sklearn.pipeline import make_pipeline from sklearn.ensemble import RandomForestClassifier # We sample pixel_list on APR grid self._sample_pixel_list_on_APR() # We remove ambiguous case where a particle was labeled differently self._remove_ambiguities(verbose=verbose) # We compute features and train the classifier if self.f is None: self.f = self.func_to_compute_features(self.apr, self.parts) # Fetch data that was manually labelled x = self.f[self.parts_train_idx] y = self.parts_labels # Train random forest clf = make_pipeline(preprocessing.StandardScaler(with_mean=mean_norm, with_std=std_norm), RandomForestClassifier(n_estimators=n_estimators, class_weight=class_weight)) t = time() clf.fit(x, y.ravel()) print('Training took {} s.\n'.format(time() - t)) x_pred = clf.predict(x) # Display training info if verbose: print('\n****** TRAINING RESULTS ******') print('Total accuracy: {:0.2f}%'.format(np.sum(x_pred == y) / y.size * 100)) for l in self.unique_labels: print('Class {} accuracy: {:0.2f}% ({} cell particles)'.format(l, np.sum((x_pred == y) * (y == l)) / np.sum(y == l) * 100, np.sum(y == l))) print('******************************\n') self.clf = clf
[docs] def segment_training_tile(self, bg_label=None, display_result=True, verbose=True): """ Apply classifier to the whole tile and display segmentation results using Napari. Parameters ---------- display_result: bool option to display segmentation results using Napari verbose: bool option to print out information. Returns ------- None """ # Apply on whole dataset if self.parts_mask is None: parts_pred = _predict_on_APR_block(self.f, self.clf, verbose=verbose) self.parts_mask = parts_pred if (self.func_to_get_cc is not None) and self.parts_cc is None: self.parts_cc = self.func_to_get_cc(self.apr, self.parts_mask.copy()) # Display inference info if verbose: print('\n****** INFERENCE RESULTS ******') for l in self.unique_labels: print('Class {}: {} cell particles ({:0.2f}%)'.format(l, np.sum(self.parts_mask == l), np.sum(self.parts_mask == l) / len(self.parts_mask) * 100)) print('******************************\n') # Display segmentation using Napari if display_result: if self.parts_cc is not None: paprica.viewer.display_segmentation(self.apr, self.parts, self.parts_cc) elif bg_label is not None: parts_pred = np.array(self.parts_mask.copy()) parts_pred[parts_pred == bg_label] = 0 parts_pred = pyapr.ShortParticles(parts_pred) paprica.viewer.display_segmentation(self.apr, self.parts, parts_pred)
[docs] def display_training_annotations(self, **kwargs): """ Display manual annotations and their sampling on APR grid (if available). Returns ------- None """ image_nap = napari.layers.Image(data=pyapr.reconstruction.APRSlicer(self.apr, self.parts), opacity=0.7, **kwargs) viewer = napari.Viewer() viewer.add_layer(image_nap) viewer.add_labels(self.labels_manual, name='Manual labels', opacity=0.5) if self.parts_labels is not None: mask = np.zeros_like(self.parts, dtype='uint16') mask[self.parts_train_idx] = self.parts_labels label_map = napari.layers.Labels(data=pyapr.reconstruction.APRSlicer(self.apr, pyapr.ShortParticles(mask), tree_mode='max'), name='APR labels', opacity=0.5) viewer.add_layer(label_map) napari.run()
[docs] def apply_on_tile(self, tile, bg_label=None, func_to_get_cc=None, display_result=True, verbose=True): """ Apply classifier to the whole tile and display segmentation results using Napari. Parameters ---------- display_result: bool option to display segmentation results using Napari verbose: bool option to print out information. Returns ------- None """ # Apply on whole dataset if tile.apr is None: tile.load_tile() f = self.func_to_compute_features(tile.apr, tile.parts) parts_pred = _predict_on_APR_block(f, self.clf, verbose=verbose) tile.parts_mask = parts_pred.copy() # Display inference info if verbose: print('\n****** INFERENCE RESULTS ******') for l in self.unique_labels: print('Class {}: {} cell particles ({:0.2f}%)'.format(l, np.sum(parts_pred == l), np.sum(parts_pred == l) / len(parts_pred) * 100)) print('******************************\n') # Display segmentation using Napari if display_result: if func_to_get_cc is not None: tile.parts_cc = func_to_get_cc(tile.apr, parts_pred) paprica.viewer.display_segmentation(tile.apr, tile.parts, tile.parts_cc) elif bg_label is not None: parts_pred = np.array(parts_pred) parts_pred[parts_pred==bg_label] = 0 parts_pred = pyapr.ShortParticles(parts_pred) paprica.viewer.display_segmentation(tile.apr, tile.parts, parts_pred)
[docs] def save_classifier(self, path=None): """ Save the trained classifier. Parameters ---------- path: string path for saving the classifier. By default, it is saved in the data root folder. Returns ------- None """ from joblib import dump if path is None: path = os.path.join(self.tile.folder_root, 'random_forest_n100.joblib') dump(self.clf, path)
[docs] def load_classifier(self, path=None): """ Load a trained classifier. Parameters ---------- path: string path for loading the classifier. By default, it is loaded from root folder. Returns ------- None """ from joblib import load if path is None: path = os.path.join(self.tile.folder_root, 'random_forest_n100.joblib') self.clf = load(path)
[docs] def display_features(self): """ Display the computed features. """ if self.f is None: raise TypeError('Error: filters can''t be displayed because they were not computed') viewer = napari.Viewer() for i in range(self.f.shape[1]): viewer.add_layer(paprica.viewer.apr_to_napari_Image(self.apr, pyapr.FloatParticles(self.f[:, i]))) napari.run()
[docs] def _remove_ambiguities(self, verbose): """ Remove particles that have been labelled with different labels. Parameters ---------- verbose: bool option to print out information. Returns ------- """ if self.parts_train_idx is None: raise ValueError('Error: train classifier before removing ambiguities.') idx_unique = np.unique(self.parts_train_idx) parts_train = [] parts_labels = [] cnt = 0 for idx in idx_unique: local_labels = self.labels[self.parts_train_idx==idx] is_same, l = self._are_labels_the_same(local_labels) if is_same: # If labels are the same we assign it parts_labels.append(l) parts_train.append(idx) else: cnt += 1 self.parts_train_idx = np.array(parts_train) self.parts_labels = np.array(parts_labels) self.unique_labels = np.unique(self.parts_labels) if verbose: print('\n********* ANNOTATIONS ***********') print('{} ambiguous particles were removed.'.format(cnt)) print('{} particles were labeled.'.format(self.parts_labels.shape[0])) for l in self.unique_labels: print('{} particles ({:0.2f}%) were labeled as {}.'.format(np.sum(self.parts_labels==l), 100*np.sum(self.parts_labels==l)/self.parts_labels.shape[0], l)) print('***********************************\n')
[docs] def _sample_pixel_list_on_APR(self): """ Convert manual annotations coordinates from pixel to APR. Returns ------- None """ self.parts_train_idx = np.empty(self.pixel_list.shape[0], dtype='uint64') for i in tqdm(range(self.pixel_list.shape[0]), desc='Sampling labels on APR.'): idx = self._find_particle(self.pixel_list[i, :]) self.parts_train_idx[i] = idx
[docs] def _find_particle(self, coords): """ Find particle index corresponding to pixel location coords. Parameters ---------- coords: array_like pixel coordinate [z, y, x] Returns ------- idx: (int) particle index """ # TODO: @JOEL PUT THIS IN C++ PLEASE for level in range(self.apr_it.level_min(), self.apr_it.level_max()+1): particle_size = 2 ** (self.apr.level_max() - level) z_l, x_l, y_l = coords // particle_size for idx in range(self.apr_it.begin(level, z_l, x_l), self.apr_it.end()): if self.apr_it.y(idx) == y_l: # if np.sqrt(np.sum((coords-np.array([z_l, x_l, y_l])*particle_size)**2))/particle_size > np.sqrt(3): # print('ich') return idx
[docs] def _order_labels(self): """ Order pixel_list in z increasing order, then y increasing order and finally x increasing order. Returns ------- None """ for d in range(3, 0): ind = np.argsort(self.pixel_list[:, d]) self.pixel_list = self.pixel_list[ind] self.labels = self.labels[ind]
[docs] @staticmethod def _are_labels_the_same(local_labels): """ Determine if manual labels in particle are the same and return the labels Parameters ---------- local_labels: ndarray particle labels Returns ------- ((bool): True if labels are the same, (int) corresponding label) """ return ((local_labels == local_labels[0]).all(), local_labels[0])