Source code for permute.npc

import numpy as np
import copy
from scipy.stats import norm, rankdata, ttest_ind, ttest_1samp
from cryptorandom.sample import random_sample
from .utils import get_prng, permute


# Combining functions

[docs]def fisher(pvalues): r""" Apply Fisher's combining function .. math:: -2 \sum_i \log(p_i) Parameters ---------- pvalues : array_like Array of p-values to combine Returns ------- float Fisher's combined test statistic """ return -2 * np.log(np.prod(pvalues))
[docs]def liptak(pvalues): r""" Apply Liptak's combining function .. math:: \sum_i \Phi^{-1}(1-p_i) where $\Phi^{-1}$ is the inverse CDF of the standard normal distribution. Parameters ---------- pvalues : array_like Array of p-values to combine Returns ------- float Liptak's combined test statistic """ return np.sum(norm.ppf(1 - pvalues))
[docs]def tippett(pvalues): r""" Apply Tippett's combining function .. math:: \max_i \{1-p_i\} Parameters ---------- pvalues : array_like Array of p-values to combine Returns ------- float Tippett's combined test statistic """ return np.max(1 - pvalues)
[docs]def inverse_n_weight(pvalues, size): r""" Compute the test statistic .. math:: -\sum_{s=1}^S \frac{p_s}{\sqrt{N_s}} Parameters ---------- pvalues : array_like Array of p-values to combine size : array_like The $i$th entry is the sample size used for the $i$th test Returns ------- float combined test statistic """ weights = size ** (-1 / 2) return np.sum(-1 * pvalues * weights)
# Nonparametric combination of tests
[docs]def check_combfunc_monotonic(pvalues, combfunc): r""" Utility function to check that the combining function is monotonically decreasing in each argument. Parameters ---------- pvalues : array_like Array of p-values to combine combine : function The combining function to use. Returns ------- ``True`` if the combining function passed the check, ``False`` otherwise. """ obs_ts = combfunc(pvalues) for i in range(len(pvalues)): test_pvalues = pvalues.copy() test_pvalues[i] = test_pvalues[i] + 0.1 if(obs_ts < combfunc(test_pvalues)): return False return True
[docs]def npc(pvalues, distr, combine="fisher", plus1=True): r""" Combines p-values from individual partial test hypotheses $H_{0i}$ against $H_{1i}$, $i=1,\dots,n$ to test the global null hypothesis .. math:: \cap_{i=1}^n H_{0i} against the alternative .. math:: \cup_{i=1}^n H_{1i} using an omnibus test statistic. Parameters ---------- pvalues : array_like Array of p-values to combine distr : array_like Array of dimension [B, n] where B is the number of permutations and n is the number of partial hypothesis tests. The $i$th column of distr contains the simulated null distribution of the $i$th test statistic under $H_{0i}$. combine : {'fisher', 'liptak', 'tippett'} or function The combining function to use. Default is "fisher". Valid combining functions must take in p-values as their argument and be monotonically decreasing in each p-value. plus1 : bool flag for whether to add 1 to the numerator and denominator of the p-value based on the empirical permutation distribution. Default is True. Returns ------- float A single p-value for the global test """ n = len(pvalues) B = distr.shape[0] if n < 2: raise ValueError("One p-value: nothing to combine!") if n != distr.shape[1]: raise ValueError("Mismatch in number of p-values and size of distr") combine_library = { "fisher": fisher, "liptak": liptak, "tippett": tippett } if callable(combine): if not check_combfunc_monotonic(pvalues, combine): raise ValueError( "Bad combining function: must be monotonically decreasing in each p-value") combine_func = combine else: combine_func = combine_library[combine] # Convert test statistic distribution to p-values combined_stat_distr = [0] * B pvalues_from_distr = np.zeros((B, n)) for j in range(n): pvalues_from_distr[:, j] = 1 - rankdata(distr[:, j], method="min")/(plus1+B) + (1 + plus1)/(plus1+B) if combine == "liptak": toobig = np.where(pvalues_from_distr >= 1) pvalues_from_distr[toobig] = 1 - np.finfo(float).eps combined_stat_distr = np.apply_along_axis( combine_func, 1, pvalues_from_distr) observed_combined_stat = combine_func(pvalues) return (plus1 + np.sum(combined_stat_distr >= observed_combined_stat)) / (plus1+B)
[docs]def sim_npc(data, test, combine="fisher", in_place=False, reps=int(10**4), seed=None): r''' Combines p-values from individual partial test hypotheses $H_{0i}$ against $H_{1i}$, $i=1,\dots,n$ to test the global null hypothesis .. math:: \cap_{i=1}^n H_{0i} against the alternative .. math:: \cup_{i=1}^n H_{1i} using an omnibus test statistic. Parameters ---------- data : Experiment object test : array_like Array of functions to compute test statistic to apply to each column in cols combine : {'fisher', 'liptak', 'tippett'} or function The combining function to use. Default is "fisher". Valid combining functions must take in p-values as their argument and be monotonically decreasing in each p-value. in_place : Boolean whether randomize group in place, default False reps : int number of repetitions seed : RandomState instance or {None, int, RandomState instance} If None, the pseudorandom number generator is the RandomState instance used by `np.random`; If int, seed is the seed used by the random number generator; If RandomState instance, seed is the pseudorandom number generator Returns ------- array A single p-value for the global test, test statistic values on the original data, partial p-values ''' # check data is of type Experiment if not isinstance(data, Experiment): raise ValueError("data not of class Experiment") # if seed not none, reset seed if seed is not None: data.randomizer.reset_seed(seed) ts = {} tv = {} ps = {} # get the test statistic for each column on the original data for c in range(len(test)): # apply test statistic function to column ts[c] = test[c](data) tv[c] = [] # check if randomization in place if in_place: data_copy = data else: data_copy = copy.deepcopy(data) # get test statistics for random samples for i in range(reps): # randomly permute group data_copy.randomize() # calculate test statistics on permuted data for c in range(len(test)): # get test statistic for this permutation tv[c].append(test[c](data_copy)) # get p-values for original data for c in range(len(test)): ps[c] = (np.sum(np.array(tv[c]) >= ts[c]) + 1)/(reps + 1) # change format of dist to array dist = np.array([tv[c] for c in range(len(test))]).T # append test statistic from orignal data to dist dist = np.append(dist, np.array([ts[c] for c in range(len(test))], ndmin=2), axis=0) # run npc p = npc(np.array([ps[c] for c in range(len(test))]), dist, combine=combine, plus1=False) return p, ts, ps
[docs]def fwer_minp(pvalues, distr, combine='fisher', plus1=True): """ Adjust p-values using the permutation "minP" variant of Holm's step-up method. When considering a closed testing procedure, the adjusted p-value $p_i$ for a given hypothesis $H_i$ is the maximum of all p-values for tests including $H_i$ as a special case (including the p-value for the $H_i$ test itself). Parameters ---------- pvalues : array_like Array of p-values to combine distr : array_like Array of dimension [B, n] where B is the number of permutations and n is the number of partial hypothesis tests. The $i$th column of distr contains the simulated null distribution of the $i$th test statistic under $H_{0i}$. combine : {'fisher', 'liptak', 'tippett'} or function The combining function to use. Default is "fisher". Valid combining functions must take in p-values as their argument and be monotonically decreasing in each p-value. Returns ------- array of adjusted p-values """ j = len(pvalues) if j < 2: raise ValueError("One p-value: nothing to adjust!") if j != distr.shape[1]: raise ValueError("Mismatch in number of p-values and size of distr") # Order the p-values order = np.argsort(pvalues) pvalues_ord = pvalues[order] distr_ord = distr[:, order] # Step down tree of combined hypotheses, from global test to test of the # individual hypothesis with largest p-value pvalues_adjusted = np.zeros(j) pvalues_adjusted[0] = npc(pvalues_ord, distr_ord, combine=combine, plus1=plus1) for jj in range(1, j-1): next_pvalue = npc(pvalues_ord[jj:], distr_ord[:, jj:], combine=combine, plus1=plus1) pvalues_adjusted[jj] = np.max([next_pvalue, pvalues_adjusted[jj-1]]) pvalues_adjusted[j-1] = np.max([pvalues_ord[j-1], pvalues_adjusted[j-2]]) pvalues_adjusted = pvalues_adjusted[np.argsort(pvalues)] return pvalues_adjusted
# Randomization functions
[docs]def randomize_group(data): r""" Unstratified randomization Parameters ---------- data : Experiment object Returns ------- Experiment object Experiment object with randomized group assignments """ data.group = random_sample(data.group, len(data.group), prng=data.randomizer.prng) return data
[docs]def randomize_in_strata(data): r""" Stratified randomization where first covariate is the stratum Parameters ---------- data : Experiment object Returns ------- Experiment object Experiment object with randomized group assignments """ # first covariate is the stratum strata = data.covariate[:, 0] unique_strata = np.unique(strata) for value in unique_strata: data.group[strata == value] = random_sample(data.group[strata == value], len(data.group[strata == value]), prng=data.randomizer.prng) return data
# Experiment class
[docs]class Experiment(): r""" A class to represent an experiment. Attributes ---------- group : vector group assignment for each observation response : array_like array of response values for each observation covariate : array_like array of covariate values for each observation randomizer : Randomizer object randomizer to use when randomizing group assignments. default is unstratified randomization, randomize_group """ def __init__(self, group = None, response = None, covariate = None, randomizer = None): self.group = None if group is None else np.array(group, dtype = object) self.response = None if response is None else np.array(response, dtype = object) self.covariate = None if covariate is None else np.array(covariate, dtype = object) if randomizer is None: self.randomizer = Experiment.Randomizer(randomize = randomize_group) elif isinstance(randomizer, Experiment.Randomizer): self.randomizer = randomizer else: raise ValueError("Not of class Randomizer") def __str__(self): return "This experiment has " + str(len(self.group)) + " subjects, " + str(len(self.response[0])) \ + " response variables, and " \ + (str(len(self.covariate[0])) if self.covariate is not None else str(0)) \ + " covariates." def randomize(self, in_place = True, seed = None): # reset seed, if seed not None if seed is not None: self.randomizer.reset_seed(seed) if in_place: randomized_self = self.randomizer.randomize(self) else: # make deep copy randomized_self = copy.deepcopy(self) randomized_self.randomizer.randomize(randomized_self) return randomized_self @classmethod def make_test_array(cls, func, indices): def create_func(index): def new_func(data): return func(data, index) return new_func test = [create_func(index) for index in indices] return test class TestFunc: def mean_diff(self, index): # get unique groups groups = np.unique(self.group) if len(groups) != 2: raise ValueError("Number of groups must be two") # get mean for each group mx = np.mean(self.response[:, index][self.group == groups[0]]) my = np.mean(self.response[:, index][self.group == groups[1]]) return mx-my def ttest(self, index): # get unique groups groups = np.unique(self.group) if len(groups) != 2: raise ValueError("Number of groups must be two") t = ttest_ind(self.response[:, index][self.group == groups[0]], self.response[:, index][self.group == groups[1]], equal_var=True)[0] return t def one_way_anova(self, index): tst = 0 overall_mean = np.mean(self.response[:, index]) for k in np.unique(self.group): group_k = self.response[:, index][self.group == k] group_mean = np.mean(group_k) nk = len(group_k) tst += (group_mean - overall_mean)**2 * nk return tst class Randomizer(): def __init__(self, randomize = randomize_group, seed = None): self.randomize = randomize self.prng = get_prng(seed) # reset seed def reset_seed(self, seed = None): self.prng = get_prng(seed)