Source code for paprica.atlaser

"""
Submodule containing classes and functions relative to **atlasing**.

This submodule is essentially a wrapper to Brainreg (https://github.com/brainglobe/brainreg) for atlasing and
Allen Brain Atlas for ontology analysis. It contains many convenience method for manipulating data
(per region, per superpixel, etc.).

By using this code you agree to the terms of the software license agreement.

© Copyright 2020 Wyss Center for Bio and Neuro Engineering – All rights reserved
"""

import os
from pathlib import Path

import numpy as np
import pandas as pd
from allensdk.core.reference_space_cache import ReferenceSpaceCache
from skimage.filters import gaussian
from skimage.io import imread, imsave
from tqdm import tqdm

import paprica


[docs]class tileAtlaser(): """ Class used for registering a dataset to the Atlas and do some post processing using the Atlas (e.g count cells per region). It can be instantiated using a tileMerger object (for registration using Brainreg) or directly with a previously registered Atlas. """
[docs] def __init__(self, original_pixel_size: (np.array, list), downsample: int, atlas=None, merged_data=None): """ Parameters ---------- original_pixel_size: array_like pixel size in µm on the original data downsample: int downsampling used by APRSlicer to reconstruct the lower resolution pixel data used for registration to the Atlas. atlas: ndarray, string atlas data or path for loading the atlas data merger: tileMerger tileMerger object Returns ------- None """ self.downsample = downsample self.pixel_size_registered_atlas = np.array([25, 25, 25]) self.pixel_size_data = np.array(original_pixel_size) # Z Y X self.merged_data = merged_data self.z_downsample = self.pixel_size_registered_atlas[0] / self.pixel_size_data[0] self.y_downsample = self.pixel_size_registered_atlas[1] / self.pixel_size_data[1] self.x_downsample = self.pixel_size_registered_atlas[2] / self.pixel_size_data[2] if atlas is not None: if isinstance(atlas, str): self.load_atlas(atlas) elif isinstance(atlas, np.array): self.atlas = atlas else: raise TypeError('Error: atlas must be a path or a numpy array.')
[docs] @classmethod def from_merger(cls, merger: paprica.stitcher.tileMerger, original_pixel_size: (np.array, list)): """ Constructor from a tileMerger object. Typically to perform the registration to the Atlas on autofluorescence data. Parameters ---------- merger: tileMerger tileMerger object original_pixel_size: array_like pixel size in µm on the original data Returns ------- tileAtlaser object """ return cls(original_pixel_size=original_pixel_size, downsample=merger.downsample, atlas=None, merged_data=merger.merged_data)
[docs] @classmethod def from_atlas(cls, atlas: (np.array, str), downsample, original_pixel_size: (np.array, list)): """ Constructor from a previously computed Atlas. Typically to perform postprocessing using an Atlas (e.g. count cells per brain region). Parameters ---------- atlas: ndarray, string atlas data or path for loading the atlas data downsample: int downsampling used by APRSlicer to reconstruct the lower resolution pixel data used for registration to the Atlas. original_pixel_size: array_like pixel size in µm on the original data Returns ------- tileAtlaser object """ return cls(original_pixel_size=original_pixel_size, downsample=downsample, atlas=atlas, merged_data=None)
[docs] def load_atlas(self, path): """ Function to load a previously computed atlas. Parameters ---------- path: string path to the registered atlas file. Returns ------- None """ self.atlas = imread(path)
[docs] def register_to_atlas(self, output_dir='./', orientation='spr', merged_data_filename='merged_data.tif', debug=False, params=None): """ Function to compute the registration to the Atlas. It is just a wrapper to call brainreg. Parameters ---------- output_dir: string output directory to save atlas orientation: string orientation of the input data with respect to the origin in Z,Y,X order. E.g. 'spr' means superior (so going from origin to z = zlim we go from superior to inferior), posterior (so going from origin to y = ylim we go from posterior to anterior part) and right (so going from origin to x = xlim we go from right to left part) merged_data_filename: string named of the merged array (Brainreg reads data from files so we need to save the merged volume beforehand. debug: bool add debug option for brainreg which will save intermediate steps. params: dict dictionary with keys as brainreg options and values as parameters (see here: https://brainglobe.info/documentation/brainreg/user-guide/parameters) Returns ------- None """ # Create directories if they do not exist atlas_dir = os.path.join(output_dir, 'atlas') Path(atlas_dir).mkdir(parents=True, exist_ok=True) # If merged_data is a path we ask brainreg to work on this file if isinstance(self.merged_data, str): path_merged_data = self.merged_data # Else it means it's an array so we have to save it first else: path_merged_data = os.path.join(output_dir, merged_data_filename) imsave(path_merged_data, self.merged_data) command = 'brainreg {} {} -v {} {} {} --orientation {} --save-original-orientation'.format('"' + path_merged_data + '"', '"' + atlas_dir + '"', self.pixel_size_data[0]*self.downsample, self.pixel_size_data[1]*self.downsample, self.pixel_size_data[2]*self.downsample, orientation) if params is not None: for key, value in params.items(): command += ' --{} {}'.format(key, value) if debug: command += ' --debug' # Execute brainreg os.system(command) self.load_atlas(os.path.join(atlas_dir, 'registered_atlas.tiff'))
[docs] def get_cells_id(self, cells): """ Returns the Allen Brain Atlas region ID for each cell. Parameters ---------- cells: ndarray cell positions Returns ------- labels: ndarray containing the cell region ID. """ cells_id = self.atlas[np.floor(cells[:, 0]/self.z_downsample).astype('uint64'), np.floor(cells[:, 1]/self.y_downsample).astype('uint64'), np.floor(cells[:, 2]/self.x_downsample).astype('uint64')] return cells_id
[docs] def get_loc_id(self, x, y, z): """ Return the ID (brain region) of a given position (typically to retrieve cell position in the brain). Parameters ---------- x: int x position y: int y position z: int z position Returns ------- _: list ID at the queried position. """ return self.atlas[int(z / self.z_downsample), int(y / self.y_downsample), int(x / self.x_downsample)]
[docs] def get_ontology_mapping(self, labels, n=0): """ Get the mapping between area ID and name with Allen SDK. Parameters ---------- labels: ndarray array of labels to group by ID and fetch area name. n: int number of parent area to group for. Returns ------- area_count: dict area names with the counts. """ rspc = ReferenceSpaceCache(25, 'annotation/ccf_2017', manifest='manifest.json') tree = rspc.get_structure_tree(structure_graph_id=1) name_map = tree.get_name_map() ancestor_map = tree.get_ancestor_id_map() area_count = {} n_not_found = 0 area_unknown = {} for l in labels: # Fetch the ancestor map of the label. try: ids = ancestor_map[int(l)] except KeyError: n_not_found += 1 if 'unknown' not in area_count: area_count['unknown'] = 1 else: area_count['unknown'] += 1 if int(l) not in area_unknown: area_unknown[int(l)] = 1 else: area_unknown[int(l)] += 1 continue # At the bottom of the tree each regions has 10 ancestors, with 'root' being the higher n_ancestors = len(ids) n_start = 10 - n_ancestors if n <= n_start: id = ids[0] elif n > n_start: if n_ancestors > n - n_start + 1: id = ids[n - n_start] else: id = ids[-2] # Get the name and store it name = name_map[id] if name not in area_count: area_count[name] = 1 else: area_count[name] += 1 # Display summary if n == 0: if n_not_found > 0: print('\nUnknown ontology ID found for {} objects ({:0.2f}%).'.format(n_not_found, 100 * n_not_found / len(labels))) print('Unknown ontology IDs and occurrences:\n') print(area_unknown) else: print('\nAll objects were assigned to an atlas ontology category.\n') return pd.DataFrame.from_dict(area_count, orient='index')
[docs] def get_cells_number_per_region(self, cells_id): """ Retuns the number of cell per region. Parameters ---------- cells_id: ndarray cells ID (typically computed by self.get_cells_id()) Returns ------- heatmap: ndarray 3D array where each brain region value is the number of cells contained in this region. """ # Remove 0s cells_id = np.delete(cells_id, cells_id==0) id_count = {} for id in cells_id: if id not in id_count: id_count[id] = 1 else: id_count[id] += 1 heatmap = np.zeros_like(self.atlas) for id, counts in id_count.items(): heatmap[self.atlas==id] = counts return heatmap
[docs] def get_cells_density_per_region(self, cells_id): """ Retuns the cell density (number of cell per voxel) per region. Parameters ---------- cells_id: ndarray cells ID (typically computed by self.get_cells_id()) Returns ------- heatmap: ndarray 3D array where each brain region value is the cell density in this region. """ # Remove 0s cells_id = np.delete(cells_id, cells_id == 0) id_count = {} for id in cells_id: if id not in id_count: id_count[id] = 1 else: id_count[id] += 1 heatmap = np.zeros_like(self.atlas, dtype='float64') for id, counts in id_count.items(): tmp = (self.atlas == id) heatmap[tmp] = counts/np.sum(tmp) return heatmap
[docs] def get_cells_density(self, cells, kernel_size, progress_bar=True): """ Retuns the cell density (local average number of cell per voxel). The local average is computed using a gaussian kernel. Parameters ---------- cells: ndarray cell positions kernel_size: int radius of the gaussian for local cell density estimation Returns ------- _: ndarray estimated cell density """ heatmap = np.zeros((self.atlas.shape)).astype(int) for i in tqdm(range(cells.shape[0]), desc='Building density map..', disable=not progress_bar): z = int(cells[i, 0]/self.z_downsample) y = int(cells[i, 1]/self.y_downsample) x = int(cells[i, 2]/self.x_downsample) heatmap[z, y, x] = 1 return gaussian(heatmap, sigma=kernel_size)
[docs] def get_cell_number_by_acronym(self, acronym_list, cells_ids): """ Get the total number of segmented cell in different regions referenced by their acronyms. Parameters ---------- acronym_list: list list of acronyms (ABA) cells_ids: ndarray cells ID (typically computed by self.get_cells_id()) Returns ------- cell_number: arraylike list containing the total number of cells for each asked region. """ if isinstance(acronym_list, str): acronym_list = [acronym_list] rspc = ReferenceSpaceCache(25, 'annotation/ccf_2017', manifest='manifest.json') tree = rspc.get_structure_tree(structure_graph_id=1) structures = tree.get_structures_by_acronym(acronym_list) cell_number = [] for structure in structures: ids = tree.descendant_ids([structure['id']]) cell_number.append(np.sum(np.isin(element=cells_ids, test_elements=ids))) return cell_number
[docs] def get_area_mask_by_acronym(self, acronym_list): """ Return the mask corresponding to brain regions given in `acronym_list`, brain regions referred by their Allen brain acronym. Parameters ---------- acronym_list: list list of Allen Brain region acronyms to count the mapped cells. Returns ------- mask: ndarray mask containing `1` for the region in `acronym_list` ans `0` elsewhere. """ rspc = ReferenceSpaceCache(25, 'annotation/ccf_2017', manifest='manifest.json') tree = rspc.get_structure_tree(structure_graph_id=1) structures = tree.get_structures_by_acronym(acronym_list) ids = [] for structure in structures: ids.extend(tree.descendant_ids([structure['id']])) ids = np.array(ids) mask = np.isin(element=self.atlas, test_elements=ids) return mask