Source code for geovar.binning

"""Module to bin allele frequencies to generate GeoVar codes."""

import numpy as np
import pandas as pd
from tqdm import tqdm
from .utils import sep_freq_mat_pops


[docs]class GeoVar(object): """GeoVar object that performs binning of allele frequencies."""
[docs] def __init__(self, bins=[(0, 0), (0, 0.05), (0.05, 1.0)]): """Object to perform binning of allele frequencies. Args: bins (:obj:`list`): list of tuples containing allele frequency categories. """ assert np.all(np.array(bins) <= 1.0) assert np.all(np.array(bins) >= 0.0) assert len(bins) > 0 self.bins = bins self.freq_mat = None self.n_variants = None self.n_populations = None self.pops = None self.geovar_codes = None
def __str__(self): """Representation of fields in a GeoVar object.""" test_str = "GeoVar\n" test_str += "number of variants: %d\n" % self.n_variants test_str += "number of pops: %d\n" % self.n_populations test_str += "pops: " + ",".join(self.pops) + "\n" test_str += "allele freq bins: " + ",".join([str(i) for i in self.bins]) return test_str def add_freq_mat(self, freq_mat_file): """Adding an allele frequency table (see example notebook for format). Args: freq_mat_file (:obj:`string`): filepath to frequency table file (see example notebook for formatting). """ af_df = pd.read_table(freq_mat_file, sep=r"\s") pops, freq_mat = sep_freq_mat_pops(af_df) self.pops = pops self.freq_mat = freq_mat self.n_variants = freq_mat.shape[0] self.n_populations = freq_mat.shape[1] def generate_bins(self, bins=[(0, 0), (0, 0.05), (0.05, 1.0)]): """Define new bins for each allele frequency categorization. Args: bins (:obj:`list`): list of tuples specifying bins of allele frequency. """ assert np.all(np.array(bins) < 1.0) b = 0.0 new_bins = [] for x in bins: new_bins.append((b, x)) b = x new_bins.append((b, 1.0)) self.bins = new_bins def geovar_binning(self): """Compute the GeoVar codes for each variant across each population.""" geovar_codes = np.zeros(shape=self.freq_mat.shape, dtype=np.uint16) i = 1 for b in self.bins[1:]: idx = np.where((self.freq_mat > b[0]) & (self.freq_mat <= b[1])) geovar_codes[idx] = i i += 1 geovar_codes = np.apply_along_axis( lambda x: "".join([str(i) for i in x]), 1, geovar_codes ) self.geovar_codes = geovar_codes def count_geovar_codes(self): """Count the number of unique GeoVar codes in a dataset.""" assert self.geovar_codes is not None uniq_geovar, n_geovar = np.unique(self.geovar_codes, return_counts=True) ncat = np.max(np.vstack([list(x) for x in uniq_geovar]).astype(np.uint32)) return (uniq_geovar, n_geovar, ncat) def geovar_codes_streaming(self, freq_mat_file): """Version of GeoVar code generation algorithm that streams through file to avoid memory overflow. Args: freq_mat_file (:obj:`string`): filepath to frequency table file (see example notebook for formatting). """ assert self.bins is not None geovar_codes = [] # Setting up the testing bins test_bins = np.array([x[1] for x in self.bins]) with open(freq_mat_file, "r") as f: header = f.readline() # Take the population labels currently pops = np.array(header.split()[6:]) self.pops = pops for line in tqdm(f): # Split after the 6th column ... maf_vector = np.array(line.split()[6:]).astype(np.float64) cur_geovar = np.digitize(maf_vector, test_bins, right=True) cur_geovar_code = "".join([str(i) for i in cur_geovar]) geovar_codes.append(cur_geovar_code) # Setting the variables here self.geovar_codes = np.array(geovar_codes) self.n_variants = self.geovar_codes.size self.n_populations = self.pops.size