Source code for fastpt.core.FASTPT

'''
	FASTPT is a numerical algorithm to calculate
	1-loop contributions to the matter power spectrum
	and other integrals of a similar type.
	The method is presented in papers arXiv:1603.04826 and arXiv:1609.05978
	Please cite these papers if you are using FASTPT in your research.

	Joseph E. McEwen (c) 2016
	mcewen.24@osu.edu

	Xiao Fang
	fang.307@osu.edu

	Jonathan A. Blazek
	blazek.35@osu.edu


	FFFFFFFF    A           SSSSSSSSS   TTTTTTTTTTTTTT             PPPPPPPPP    TTTTTTTTTTTT
	FF     	   A A         SS                 TT                   PP      PP        TT
	FF        A   A        SS                 TT                   PP      PP        TT
	FFFFF    AAAAAAA        SSSSSSSS          TT       ==========  PPPPPPPPP         TT
	FF      AA     AA              SS         TT                   PP                TT
	FF     AA       AA             SS         TT                   PP                TT
	FF    AA         AA    SSSSSSSSS          TT                   PP                TT


	The FASTPT class is the workhorse of the FASTPT algorithm.
	This class calculates integrals of the form:

	\int \frac{d^3q}{(2 \pi)^3} K(q,k-q) P(q) P(|k-q|)

	\int \frac{d^3q_1}{(2 \pi)^3} K(\hat{q_1} \dot \hat{q_2},\hat{q_1} \dot \hat{k}, \hat{q_2} \dot \hat{k}, q_1, q_2) P(q_1) P(|k-q_1|)

'''
from __future__ import division
from __future__ import print_function

from ..info import __version__

import numpy as np
from numpy import exp, log, cos, sin, pi
from ..utils.fastpt_extr import p_window, c_window
from ..utils.matter_power_spt import P_13_reg, Y1_reg_NL, Y2_reg_NL
from ..utils.initialize_params import scalar_stuff, tensor_stuff
from ..IA.IA_tt import IA_tt
from ..IA.IA_ABD import IA_A, IA_DEE, IA_DBB, P_IA_B
from ..IA.IA_ta import IA_deltaE1, P_IA_deltaE2, IA_0E0E, IA_0B0B
from ..IA.IA_gb2 import IA_gb2_F2, IA_gb2_fe, IA_gb2_he, P_IA_13S2F2
from ..IA.IA_gb2 import IA_gb2_S2F2, IA_gb2_S2fe, IA_gb2_S2he
from ..IA.IA_ct import IA_tij_feG2, IA_tij_heG2, IA_tij_F2F2, IA_tij_G2G2, IA_tij_F2G2, P_IA_13G, P_IA_13F, IA_tij_F2G2reg
from ..IA.IA_ctbias import IA_gb2_F2, IA_gb2_G2, IA_gb2_S2F2, IA_gb2_S2G2
from ..utils.OV import OV
from ..utils.kPol import kPol
from ..rsd.RSD import RSDA, RSDB
from ..rsd.RSD_ItypeII import P_Ap1 as RSD_P_Ap1, P_Ap3 as RSD_P_Ap3, P_Ap5 as RSD_P_Ap5
from ..utils.P_extend import k_extend
from . import FASTPT_simple as fastpt_simple
from .CacheManager import CacheManager
from scipy.signal import fftconvolve
from numpy.fft import ifft, irfft, rfft


def cached_property(method):
    """Decorator to cache property values"""
    cache_name = f'_{method.__name__}'
    
    def getter(instance):
        if not hasattr(instance, cache_name):
            setattr(instance, cache_name, method(instance))
        return getattr(instance, cache_name)
    
    return property(getter)

[docs]class FASTPT: """ FASTPT is a numerical algorithm to calculate 1-loop contributions to the matter power spectrum and other integrals of a similar type. The method is presented in papers arXiv:1603.04826 and arXiv:1609.05978. Please cite these papers if you are using FASTPT in your research. Parameters ---------- k : array_like The input k-grid (wavenumbers) in 1/Mpc. Must be logarithmically spaced with equal spacing in log(k) and contain an even number of elements. nu : float, optional Deprecated. Previously used to specify power-law biasing for FFT calculations. Now determined automatically. to_do : list of str, optional List of requirements for precalculation of matrices. Starting in v4, terms are calculated as needed without specifying them here, but pre-computing matrices remains an option and can save time on the initial run of functions. 'All' or 'everything' will initialize all terms. param_mat : array_like, optional Custom parameter matrix for extensions (advanced usage). low_extrap : float, optional If provided, extrapolate the power spectrum to lower k values down to 10^(low_extrap). Helps with edge effects. Typical value: -5. high_extrap : float, optional If provided, extrapolate the power spectrum to higher k values up to 10^(high_extrap). Helps with edge effects. Typical value: 3. Must be greater than low_extrap if both are provided. n_pad : int, optional Number of zeros to pad the array with on both ends. Helps reduce edge effects in FFT calculations. If None, defaults to half the length of the input k array. verbose : bool, optional If True, prints additional information during calculations. simple : bool, optional If True, uses the older, simplified FASTPT interface. Will be deprecated. max_cache_size_mb : int, optional Maximum size of the internal cache in megabytes. Default is 500 MB. dump_cache : bool, optional If True, the cache is cleared when a new power spectrum is provided to a Fast-PT function to avoid building an unnecessarily large cache. Notes ----- The input k array must be strictly increasing, equally logarithmically spaced, and contain an even number of elements. Using extrapolation (low_extrap/high_extrap) and padding (n_pad) is recommended to reduce numerical artifacts from the FFT-based algorithm. """
[docs] def __init__(self, k, nu=None, to_do=None, param_mat=None, low_extrap=None, high_extrap=None, n_pad=None, verbose=False, simple=False, max_cache_size_mb=500, dump_cache=True): if (k is None or len(k) == 0): raise ValueError('You must provide an input k array.') if nu: print("Warning: nu is no longer needed for FAST-PT initialization. It will default to -2, if another value is needed it will be calculated internally.") # if no to_do list is given, default to fastpt_simple SPT case if (simple): if (verbose): print( 'Note: You are using an earlier call structure for FASTPT. Your code will still run correctly, calling FASTPT_simple. See user manual.') if (nu is None): # give a warning if nu=None that a default value is being used. print('WARNING: No value for nu is given. FASTPT_simple is being called with a default of nu=-2') nu = -2 # this is the default value for P22+P13 and bias calculation print("WARNING: No to_do list is given therefore calling FASTPT_simple. FASTPT_simple will soon be DEPRECATED.") self.pt_simple = fastpt_simple.FASTPT(k, nu, param_mat=param_mat, low_extrap=low_extrap, high_extrap=high_extrap, n_pad=n_pad, verbose=verbose) return None # Exit initialization here, since fastpt_simple performs the various checks on the k grid and does extrapolation. self.cache = CacheManager(max_size_mb=max_cache_size_mb, dump_cache=dump_cache) self.max_cache_size_mb = max_cache_size_mb #Stored for save instance method in handler self.dump_cache = dump_cache #Stored for save instance method in handler self.X_registry = {} #Stores the names of X terms to be used as an efficient unique identifier in hash keys self.__k_original = k self.extrap = False if (low_extrap is not None or high_extrap is not None): if (high_extrap < low_extrap): raise ValueError('high_extrap must be greater than low_extrap') self.EK = k_extend(k, low_extrap, high_extrap) k = self.EK.extrap_k() self.extrap = True self.low_extrap = low_extrap self.high_extrap = high_extrap self.__k_extrap = k #K extrapolation not padded # check for log spacing # print('Initializing k-grid quantities...') dk = np.diff(np.log(k)) # dk_test=np.ones_like(dk)*dk[0] delta_L = (log(k[-1]) - log(k[0])) / (k.size - 1) dk_test = np.ones_like(dk) * delta_L log_sample_test = 'ERROR! FASTPT will not work if your in put (k,Pk) values are not sampled evenly in log space!' np.testing.assert_array_almost_equal(dk, dk_test, decimal=4, err_msg=log_sample_test, verbose=False) if (verbose): print(f'the minumum and maximum inputed log10(k) are: {np.min(np.log10(k))} and {np.max(np.log10(k))}') print(f'the grid spacing Delta log (k) is, {(log(np.max(k)) - log(np.min(k))) / (k.size - 1)}') print(f'number of input k points are, {k.size}') print(f'the power spectrum is extraplated to log10(k_min)={low_extrap}') print(f'the power spectrum is extraplated to log10(k_max)={high_extrap}') print(f'the power spectrum has {n_pad} zeros added to both ends of the power spectrum') # print(self.k_extrap.size, 'k size') # size of input array must be an even number if (k.size % 2 != 0): raise ValueError('Input array must contain an even number of elements.') # can we just force the extrapolation to add an element if we need one more? how do we prevent the extrapolation from giving us an odd number of elements? is that hard coded into extrap? or just trim the lowest k value if there is an odd numebr and no extrapolation is requested. if n_pad is None: n_pad = int(0.5 * len(k)) if verbose: print(f"WARNING: N_pad is recommended but none has been provided, defaulting to 0.5*len(k) = {n_pad}.") self.n_pad = n_pad if (n_pad > 0): # Make sure n_pad is an integer if not isinstance(n_pad, int): n_pad = int(n_pad) self.n_pad = n_pad self.id_pad = np.arange(k.size) + n_pad d_logk = delta_L k_pad = np.log(k[0]) - np.arange(1, n_pad + 1) * d_logk k_pad = np.exp(k_pad) k_left = k_pad[::-1] k_pad = np.log(k[-1]) + np.arange(1, n_pad + 1) * d_logk k_right = np.exp(k_pad) k = np.hstack((k_left, k, k_right)) n_pad_check = int(np.log(2) / delta_L) + 1 if (n_pad < n_pad_check): print('*** Warning ***') print(f'You should consider increasing your zero padding to at least {n_pad_check}') print('to ensure that the minimum k_output is > 2k_min in the FASTPT universe.') print(f'k_min in the FASTPT universe is {k[0]} while k_min_input is {self.k_extrap[0]}') self.__k_final = k #log spaced k, with padding and extrap self.k_size = k.size # self.scalar_nu=-2 self.N = k.size # define eta_m and eta_n=eta_m omega = 2 * pi / (float(self.N) * delta_L) self.m = np.arange(-self.N // 2, self.N // 2 + 1) self.eta_m = omega * self.m self.verbose = verbose # define l and tau_l self.n_l = self.m.size + self.m.size - 1 self.l = np.arange(-self.n_l // 2 + 1, self.n_l // 2 + 1) self.tau_l = omega * self.l self.todo_dict = { 'one_loop_dd': False, 'one_loop_cleft_dd': False, 'dd_bias': False, 'IA_all': False, 'IA_tt': False, 'IA_ta': False, 'IA_mix': False, 'OV': False, 'kPol': False, 'RSD': False, 'IRres': False, 'tij': False, 'gb2': False, 'all': False, 'everything': False } if to_do: print("Warning: to_do list is no longer needed for FAST-PT initialization. Terms will now be calculated as needed. It may still be used to pre-compute matrices for faster initial runs.") for entry in to_do: if entry in {'all', 'everything'}: for key in self.todo_dict: self.todo_dict[key] = True elif entry in {'IA_all', 'IA'}: for key in ['IA_tt', 'IA_ta', 'IA_mix', 'gb2', 'tij']: self.todo_dict[key] = True elif entry == 'dd_bias': self.todo_dict['one_loop_dd'] = True self.todo_dict['dd_bias'] = True elif entry == 'tij': for key in ['gb2', 'one_loop_dd', 'tij', 'IA_tt', 'IA_ta', 'IA_mix']: self.todo_dict[key] = True elif entry in self.todo_dict: self.todo_dict[entry] = True else: raise ValueError(f'FAST-PT does not recognize {entry} in the to_do list.\n{self.todo_dict.keys()} are the valid entries.') ### INITIALIZATION of k-grid quantities ### if self.todo_dict['one_loop_dd'] or self.todo_dict['dd_bias'] or self.todo_dict['IRres']: self.X_spt self.X_lpt self.X_sptG if self.todo_dict['IA_tt']: self.X_IA_E self.X_IA_B if self.todo_dict['IA_mix']: self.X_IA_A self.X_IA_DEE self.X_IA_DBB if self.todo_dict['IA_ta']: self.X_IA_deltaE1 self.X_IA_0E0E self.X_IA_0B0B if self.todo_dict['gb2']: self.X_IA_gb2_fe self.X_IA_gb2_he if self.todo_dict['tij']: self.X_IA_tij_feG2 self.X_IA_tij_heG2 self.X_IA_tij_F2F2 self.X_IA_tij_G2G2 self.X_IA_tij_F2G2 self.X_IA_tij_F2G2reg self.X_IA_gb2_F2 self.X_IA_gb2_G2 self.X_IA_gb2_S2F2 self.X_IA_gb2_S2fe self.X_IA_gb2_S2he self.X_IA_gb2_S2G2 if self.todo_dict['OV']: self.X_OV if self.todo_dict['kPol']: self.X_kP1 self.X_kP2 self.X_kP3 if self.todo_dict['RSD']: self.X_RSDA self.X_RSDB
@property def k_original(self): return self.__k_original @property def k_extrap(self): return self.__k_extrap @property def k_final(self): return self.__k_final @cached_property def X_spt(self): nu = -2 p_mat = np.array([[0, 0, 0, 0], [0, 0, 2, 0], [0, 0, 4, 0], [2, -2, 2, 0], [1, -1, 1, 0], [1, -1, 3, 0], [2, -2, 0, 1]]) result = scalar_stuff(p_mat, nu, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_spt' return result @cached_property def X_lpt(self): nu = -2 p_mat = np.array([[0, 0, 0, 0], [0, 0, 2, 0], [2, -2, 2, 0], [1, -1, 1, 0], [1, -1, 3, 0], [0, 0, 4, 0], [2, -2, 0, 1]]) result = scalar_stuff(p_mat, nu, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_lpt' return result @cached_property def X_sptG(self): nu = -2 p_mat = np.array([[0, 0, 0, 0], [0, 0, 2, 0], [0, 0, 4, 0], [2, -2, 2, 0], [1, -1, 1, 0], [1, -1, 3, 0], [2, -2, 0, 1]]) result = scalar_stuff(p_mat, nu, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_sptG' return result @cached_property def X_IA_E(self): hE_tab, _ = IA_tt() p_mat_E = hE_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_E, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_E' return result @cached_property def X_IA_B(self): _, hB_tab = IA_tt() p_mat_B = hB_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_B, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_B' return result @cached_property def X_IA_A(self): IA_A_tab = IA_A() p_mat_A = IA_A_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_A, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_A' return result @cached_property def X_IA_DEE(self): IA_DEE_tab = IA_DEE() p_mat_DEE = IA_DEE_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_DEE, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_DEE' return result @cached_property def X_IA_DBB(self): IA_DBB_tab = IA_DBB() p_mat_DBB = IA_DBB_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_DBB, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_DBB' return result @cached_property def X_IA_deltaE1(self): IA_deltaE1_tab = IA_deltaE1() result = tensor_stuff(IA_deltaE1_tab[:, [0, 1, 5, 6, 7, 8, 9]], self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_deltaE1' return result @cached_property def X_IA_0E0E(self): IA_0E0E_tab = IA_0E0E() result = tensor_stuff(IA_0E0E_tab[:, [0, 1, 5, 6, 7, 8, 9]], self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_0E0E' return result @cached_property def X_IA_0B0B(self): IA_0B0B_tab = IA_0B0B() result = tensor_stuff(IA_0B0B_tab[:, [0, 1, 5, 6, 7, 8, 9]], self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_0B0B' return result @cached_property def X_IA_gb2_fe(self): IA_gb2_fe_tab = IA_gb2_fe() p_mat_gb2_fe = IA_gb2_fe_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_gb2_fe, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_gb2_fe' return result @cached_property def X_IA_gb2_he(self): IA_gb2_he_tab = IA_gb2_he() p_mat_gb2_he = IA_gb2_he_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_gb2_he, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_gb2_he' return result @cached_property def X_IA_tij_feG2(self): IA_tij_feG2_tab = IA_tij_feG2() p_mat_tij_feG2 = IA_tij_feG2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_tij_feG2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_tij_feG2' return result @cached_property def X_IA_tij_heG2(self): IA_tij_heG2_tab = IA_tij_heG2() p_mat_tij_heG2 = IA_tij_heG2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_tij_heG2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_tij_heG2' return result @cached_property def X_IA_tij_F2F2(self): IA_tij_F2F2_tab = IA_tij_F2F2() p_mat_tij_F2F2 = IA_tij_F2F2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_tij_F2F2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_tij_F2F2' return result @cached_property def X_IA_tij_G2G2(self): IA_tij_G2G2_tab = IA_tij_G2G2() p_mat_tij_G2G2 = IA_tij_G2G2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_tij_G2G2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_tij_G2G2' return result @cached_property def X_IA_tij_F2G2(self): IA_tij_F2G2_tab = IA_tij_F2G2() p_mat_tij_F2G2 = IA_tij_F2G2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_tij_F2G2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_tij_F2G2' return result @cached_property def X_IA_tij_F2G2reg(self): IA_tij_F2G2reg_tab =IA_tij_F2G2reg() p_mat_tij_F2G2reg_tab = IA_tij_F2G2reg_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_tij_F2G2reg_tab, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_tij_F2G2reg' return result @cached_property def X_IA_gb2_F2(self): IA_gb2_F2_tab = IA_gb2_F2() p_mat_gb2_F2 = IA_gb2_F2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_gb2_F2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_gb2_F2' return result @cached_property def X_IA_gb2_G2(self): IA_gb2_G2_tab = IA_gb2_G2() p_mat_gb2_G2 = IA_gb2_G2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_gb2_G2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_gb2_G2' return result @cached_property def X_IA_gb2_S2F2(self): IA_gb2_S2F2_tab = IA_gb2_S2F2() p_mat_gb2_S2F2 = IA_gb2_S2F2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_gb2_S2F2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_gb2_S2F2' return result @cached_property def X_IA_gb2_S2fe(self): IA_gb2_S2fe_tab = IA_gb2_S2fe() p_mat_gb2_S2fe = IA_gb2_S2fe_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_gb2_S2fe, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_gb2_S2fe' return result @cached_property def X_IA_gb2_S2he(self): IA_gb2_S2he_tab = IA_gb2_S2he() p_mat_gb2_S2he = IA_gb2_S2he_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_gb2_S2he, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_gb2_S2he' return result @cached_property def X_IA_gb2_S2G2(self): IA_gb2_S2G2_tab = IA_gb2_S2G2() p_mat_gb2_S2G2 = IA_gb2_S2G2_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat_gb2_S2G2, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_IA_gb2_S2G2' return result @cached_property def X_OV(self): OV_tab = OV() p_mat = OV_tab[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_OV' return result @cached_property def X_kP1(self): tab1, _, _ = kPol() p_mat = tab1[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_kP1' return result @cached_property def X_kP2(self): _, tab2, _ = kPol() p_mat = tab2[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_kP2' return result @cached_property def X_kP3(self): _, _, tab3 = kPol() p_mat = tab3[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_kP3' return result @cached_property def X_RSDA(self): tabA, self.A_coeff = RSDA() p_mat = tabA[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_RSDA' return result @cached_property def X_RSDB(self): tabB, self.B_coeff = RSDB() p_mat = tabB[:, [0, 1, 5, 6, 7, 8, 9]] result = tensor_stuff(p_mat, self.N, self.m, self.eta_m, self.l, self.tau_l) self.X_registry[id(result)] = 'X_RSDB' return result def _validate_params(self, **params): """" Same function as before """ valid_params = ('P', 'P_window', 'C_window', 'f', 'X', 'nu', 'mu_n', 'L', 'h', 'rsdrag') for key in params.keys(): if key not in valid_params: raise ValueError(f'Invalid parameter: {key}. Valid parameters are: {valid_params}') P = params.get('P', None) if (P is None or len(P) == 0): raise ValueError('You must provide an input power spectrum array.') if (len(P) != len(self.k_original)): raise ValueError(f'Input k and P arrays must have the same size. P:{len(P)}, K:{len(self.k_original)}') if (np.all(P == 0.0)): raise ValueError('Your input power spectrum array is all zeros.') P_window = params.get('P_window', np.array([])) C_window = params.get('C_window', None) if P_window is not None and len(P_window) > 0: maxP = (log(self.k_final[-1]) - log(self.k_final[0])) / 2 if len(P_window) != 2: raise ValueError(f'P_window must be a tuple of two values.') if P_window[0] > maxP or P_window[1] > maxP: raise ValueError(f'P_window value is too large. Decrease to less than {maxP} to avoid over tapering.') if C_window is not None: if C_window < 0 or C_window > 1: raise ValueError('C_window must be between 0 and 1.') return params ############## ABSTRACTED BEHAVIOR METHODS ############## def _apply_extrapolation(self, *args): """ Applies extrapolation to multiple variables at once """ if not self.extrap: return args if len(args) > 1 else args[0] return [self.EK.PK_original(var)[1] for var in args] if len(args) > 1 else self.EK.PK_original(args[0])[1] def _hash_arrays(self, arrays): """Helper function to create a hash from multiple numpy arrays or scalars""" if arrays is None: return hash(None) if isinstance(arrays, (tuple, list)): hash_key_hash = 0 for i, item in enumerate(arrays): if isinstance(item, np.ndarray): # Use a prime multiplier to avoid collisions item_hash = hash(item.tobytes()) elif isinstance(item, (tuple, list)): # Recursively compute hash of nested structure item_hash = self._hash_arrays(item) else: item_hash = hash(item) # Combine hashes using a prime-based approach to reduce collisions hash_key_hash = hash_key_hash ^ (item_hash + 0x9e3779b9 + (hash_key_hash << 6) + (hash_key_hash >> 2)) return hash_key_hash # Single item case if isinstance(arrays, np.ndarray): return hash(arrays.tobytes()) return hash(arrays) def _create_hash_key(self, term, X, P, P_window, C_window): """Create a hash key from the term and input parameters""" P_hash = self._hash_arrays(P) P_win_hash = self._hash_arrays(P_window) if X is None: X_id = hash(None) else: X_id = hash(self.X_registry.get(id(X), f"unknown_{id(X)}")) term_hash = hash(term) #Included for differentiating between similar param sets hash_list = [term_hash, X_id, P_hash, P_win_hash, hash(C_window)] hash_key = 0 for h in hash_list: if h is not None: hash_key = hash_key ^ (h + 0x9e3779b9 + (hash_key << 6) + (hash_key >> 2)) return hash_key, P_hash
[docs] def compute_term(self, term, X, operation=None, P=None, P_window=None, C_window=None): """ Computes a Fast-PT term with caching support. Parameters ---------- term : str Name of the term to compute, used for cache identification X : tuple Fast-PT coefficient matrices for the calculation operation : callable, optional Function to apply to the result(s) after computation P : array_like Input power spectrum P_window : tuple, optional Window parameters for tapering the power spectrum at the endpoints C_window : float, optional Window parameter for tapering the Fourier coefficients Returns ------- array_like The computed Fast-PT term """ if P is None: raise ValueError('Compute term requires an input power spectrum array.') hash_key, P_hash = self._create_hash_key(term, X, P, P_window, C_window) result = self.cache.get(term, hash_key) if result is not None: return result result, _ = self.J_k_tensor(P, X, P_window=P_window, C_window=C_window) result = self._apply_extrapolation(result) if operation: final_result = operation(result) self.cache.set(final_result, term, hash_key, P_hash) return final_result self.cache.set(result, term, hash_key, P_hash) return result
### Top-level functions to output final quantities ###
[docs] def one_loop_dd(self, P, P_window=None, C_window=None): """ Computes the standard 1-loop density-density corrections to the power spectrum. Returns ------- tuple (P_1loop, Ps) where: P_1loop : 1-loop correction (P_22 + P_13) Ps : Smoothed input power spectrum """ self._validate_params(P=P, P_window=P_window, C_window=C_window) Ps = self._get_Ps(P, P_window=P_window, C_window=C_window) P_1loop = self._get_P_1loop(P, P_window=P_window, C_window=C_window) return P_1loop, Ps #This return is going to be different than the original bc the original return is
# different depending on the todo list which is going to be deprecated. def _get_Ps(self, P, P_window=None, C_window=None): Ps, _ = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) Ps = self._apply_extrapolation(Ps) return Ps def _get_P_1loop(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("P_1loop", self.X_spt, P, P_window, C_window) result = self.cache.get("P_1loop", hash_key) if result is not None: return result # get the roundtrip Fourier power spectrum, i.e. P=IFFT[FFT[P]] # get the matrix for each J_k component Ps, mat = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) P22_coef = np.array([2*1219/1470., 2*671/1029., 2*32/1715., 2*1/3., 2*62/35., 2*8/35., 1/3.]) P22_mat = np.multiply(P22_coef, np.transpose(mat)) P22 = np.sum(P22_mat, axis=1) P13 = P_13_reg(self.k_extrap, Ps) P_1loop = P22 + P13 P_1loop = self._apply_extrapolation(P_1loop) self.cache.set(P_1loop, "P_1loop", hash_key, P_hash) return P_1loop #TODO add comments back explaining math behind one loop
[docs] def one_loop_dd_bias(self, P, P_window=None, C_window=None): """ Computes 1-loop corrections with standard bias terms. Returns ------- tuple (P_1loop, Ps, Pd1d2, Pd2d2, Pd1s2, Pd2s2, Ps2s2, sig4) where: P_1loop : 1-loop correction (P_22 + P_13) Ps : Smoothed input power spectrum Pd1d2 : First order density-second order density correlation Pd2d2 : Second order density auto-correlation Pd1s2 : First order density-second order tidal correlation Pd2s2 : Second order density-second order tidal correlation Ps2s2 : Second order tidal auto-correlation sig4 : σ^4 integral for nonlinear bias """ P_1loop, Ps = self.one_loop_dd(P, P_window=P_window, C_window=C_window) Pd1d2 = self._get_Pd1d2(P, P_window=P_window, C_window=C_window) Pd2d2 = self._get_Pd2d2(P, P_window=P_window, C_window=C_window) Pd1s2 = self._get_Pd1s2(P, P_window=P_window, C_window=C_window) Pd2s2 = self._get_Pd2s2(P, P_window=P_window, C_window=C_window) Ps2s2 = self._get_Ps2s2(P, P_window=P_window, C_window=C_window) sig4 = self._get_sig4(P, P_window=P_window, C_window=C_window) return P_1loop, Ps, Pd1d2, Pd2d2, Pd1s2, Pd2s2, Ps2s2, sig4
def _get_sig4(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("sig4", self.X_spt, P, P_window, C_window) result = self.cache.get("sig4", hash_key) if result is not None: return result Ps, _ = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) # Quadraric bias Legendre components # See eg section B of Baldauf+ 2012 (arxiv: 1201.4827) # Note pre-factor convention is not standardized # Returns relevant correlations (including contraction factors), # but WITHOUT bias values and other pre-factors. # Uses standard "full initialization" of J terms sig4 = np.trapz(self.k_extrap ** 3 * Ps ** 2, x=np.log(self.k_extrap)) / (2. * pi ** 2) self.cache.set(sig4, "sig4", hash_key, P_hash) return sig4 def _get_Pd1d2(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pd1d2", self.X_spt, P, P_window, C_window) result = self.cache.get("Pd1d2", hash_key) if result is not None: return result _, mat = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) Pd1d2 = 2. * (17. / 21 * mat[0, :] + mat[4, :] + 4. / 21 * mat[1, :]) Pd1d2 = self._apply_extrapolation(Pd1d2) self.cache.set(Pd1d2, "Pd1d2", hash_key, P_hash) return Pd1d2 def _get_Pd2d2(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pd2d2", self.X_spt, P, P_window, C_window) result = self.cache.get("Pd2d2", hash_key) if result is not None: return result _, mat = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) Pd2d2 = 2. * (mat[0, :]) Pd2d2 = self._apply_extrapolation(Pd2d2) self.cache.set(Pd2d2, "Pd2d2", hash_key, P_hash) return Pd2d2 def _get_Pd1s2(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pd1s2", self.X_spt, P, P_window, C_window) result = self.cache.get("Pd1s2", hash_key) if result is not None: return result _, mat = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) Pd1s2 = 2. * (8. / 315 * mat[0, :] + 4. / 15 * mat[4, :] + 254. / 441 * mat[1, :] + 2. / 5 * mat[5, :] + 16. / 245 * mat[ 2, :]) Pd1s2 = self._apply_extrapolation(Pd1s2) self.cache.set(Pd1s2, "Pd1s2", hash_key, P_hash) return Pd1s2 def _get_Pd2s2(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pd2s2", self.X_spt, P, P_window, C_window) result = self.cache.get("Pd2s2", hash_key) if result is not None: return result _, mat = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) Pd2s2 = 2. * (2. / 3 * mat[1, :]) Pd2s2 = self._apply_extrapolation(Pd2s2) self.cache.set(Pd2s2, "Pd2s2", hash_key, P_hash) return Pd2s2 def _get_Ps2s2(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Ps2s2", self.X_spt, P, P_window, C_window) result = self.cache.get("Ps2s2", hash_key) if result is not None: return result _, mat = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) Pd2s2 = 2. * (4. / 45 * mat[0, :] + 8. / 63 * mat[1, :] + 8. / 35 * mat[2, :]) Pd2s2 = self._apply_extrapolation(Pd2s2) self.cache.set(Pd2s2, "Ps2s2", hash_key, P_hash) return Pd2s2
[docs] def one_loop_dd_bias_b3nl(self, P, P_window=None, C_window=None): """ Computes 1-loop corrections with bias terms including third-order non-local bias. Returns ------- tuple (P_1loop, Ps, Pd1d2, Pd2d2, Pd1s2, Pd2s2, Ps2s2, sig4, sig3nl) where: The first 8 terms are identical to those returned by one_loop_dd_bias sig3nl : Third order non-local bias term """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P_1loop, Ps = self.one_loop_dd(P, P_window=P_window, C_window=C_window) Pd1d2 = self._get_Pd1d2(P, P_window=P_window, C_window=C_window) Pd2d2 = self._get_Pd2d2(P, P_window=P_window, C_window=C_window) Pd1s2 = self._get_Pd1s2(P, P_window=P_window, C_window=C_window) Pd2s2 = self._get_Pd2s2(P, P_window=P_window, C_window=C_window) Ps2s2 = self._get_Ps2s2(P, P_window=P_window, C_window=C_window) sig4 = self._get_sig4(P, P_window=P_window, C_window=C_window) sig3nl = self._get_sig3nl(P, P_window=P_window, C_window=C_window) return P_1loop, Ps, Pd1d2, Pd2d2, Pd1s2, Pd2s2, Ps2s2, sig4, sig3nl
def _get_sig3nl(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("sig3nl", self.X_spt, P, P_window, C_window) result = self.cache.get("sig3nl", hash_key) if result is not None: return result Ps, _ = self.J_k_scalar(P, self.X_spt, -2, P_window=P_window, C_window=C_window) sig3nl = Y1_reg_NL(self.k_extrap, Ps) sig3nl = self._apply_extrapolation(sig3nl) self.cache.set(sig3nl, "sig3nl", hash_key, P_hash) return sig3nl
[docs] def one_loop_dd_bias_lpt_NL(self, P, P_window=None, C_window=None): """ Computes bias corrections in Lagrangian Perturbation Theory (LPT). Returns ------- tuple (Ps, Pb1L, Pb1L_2, Pb1L_b2L, Pb2L, Pb2L_2, sig4) where: Ps : Smoothed input power spectrum Pb1L : First-order Lagrangian bias correlation term Pb1L_2 : First-order Lagrangian bias squared correlation Pb1L_b2L : First-order and second-order Lagrangian bias cross-correlation Pb2L : Second-order Lagrangian bias correlation Pb2L_2 : Second-order Lagrangian bias squared correlation sig4 : σ^4 integral for nonlinear bias """ self._validate_params(P=P, P_window=P_window, C_window=C_window) _, Ps = self.one_loop_dd(P, P_window=P_window, C_window=C_window) Pb1L = self._get_Pb1L(P, P_window=P_window, C_window=C_window) Pb1L_2 = self._get_Pb1L_2(P, P_window=P_window, C_window=C_window) Pb1L_b2L = self._get_Pb1L_b2L(P, P_window=P_window, C_window=C_window) Pb2L = self._get_Pb2L(P, P_window=P_window, C_window=C_window) Pb2L_2 = self._get_Pb2L_2(P, P_window=P_window, C_window=C_window) sig4 = self._get_sig4(P, P_window=P_window, C_window=C_window) return Ps, Pb1L, Pb1L_2, Pb1L_b2L, Pb2L, Pb2L_2, sig4
def _get_Pb1L(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pb1L", self.X_lpt, P, P_window, C_window) result = self.cache.get("Pb1L", hash_key) if result is not None: return result Ps, mat = self.J_k_scalar(P, self.X_lpt, -2, P_window=P_window, C_window=C_window) [j000, j002, j2n22, j1n11, j1n13, j004, j2n20] = [mat[0, :], mat[1, :], mat[2, :], mat[3, :], mat[4, :], mat[5, :], mat[6, :]] X1 = ((144. / 245.) * j000 - (176. / 343.) * j002 - (128. / 1715.) * j004 + (16. / 35.) * j1n11 - ( 16. / 35.) * j1n13) Y1 = Y1_reg_NL(self.k_extrap, Ps) Pb1L = X1 + Y1 Pb1L = self._apply_extrapolation(Pb1L) self.cache.set(Pb1L, "Pb1L", hash_key, P_hash) return Pb1L def _get_Pb1L_2(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pb1L_2", self.X_lpt, P, P_window, C_window) result = self.cache.get("Pb1L_2", hash_key) if result is not None: return result Ps, mat = self.J_k_scalar(P, self.X_lpt, -2, P_window=P_window, C_window=C_window) [j000, j002, j2n22, j1n11, j1n13, j004, j2n20] = [mat[0, :], mat[1, :], mat[2, :], mat[3, :], mat[4, :], mat[5, :], mat[6, :]] X2 = ((16. / 21.) * j000 - (16. / 21.) * j002 + (16. / 35.) * j1n11 - (16. / 35.) * j1n13) Y2 = Y2_reg_NL(self.k_extrap, Ps) Pb1L_2 = X2 + Y2 Pb1L_2 = self._apply_extrapolation(Pb1L_2) self.cache.set(Pb1L_2, "Pb1L_2", hash_key, P_hash) return Pb1L_2 def _get_Pb1L_b2L(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pb1L_b2L", self.X_lpt, P, P_window, C_window) result = self.cache.get("Pb1L_b2L", hash_key) if result is not None: return result Ps, mat = self.J_k_scalar(P, self.X_lpt, -2, P_window=P_window, C_window=C_window) [j000, j002, j2n22, j1n11, j1n13, j004, j2n20] = [mat[0, :], mat[1, :], mat[2, :], mat[3, :], mat[4, :], mat[5, :], mat[6, :]] X3 = (50. / 21.) * j000 + 2. * j1n11 - (8. / 21.) * j002 Pb1L_b2L = X3 Pb1L_b2L = self._apply_extrapolation(Pb1L_b2L) self.cache.set(Pb1L_b2L, "Pb1L_b2L", hash_key, P_hash) return Pb1L_b2L def _get_Pb2L(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pb2L", self.X_lpt, P, P_window, C_window) result = self.cache.get("Pb2L", hash_key) if result is not None: return result Ps, mat = self.J_k_scalar(P, self.X_lpt, -2, P_window=P_window, C_window=C_window) [j000, j002, j2n22, j1n11, j1n13, j004, j2n20] = [mat[0, :], mat[1, :], mat[2, :], mat[3, :], mat[4, :], mat[5, :], mat[6, :]] X4 = (34. / 21.) * j000 + 2. * j1n11 + (8. / 21.) * j002 Pb2L = X4 Pb2L = self._apply_extrapolation(Pb2L) self.cache.set(Pb2L, "Pb2L", hash_key, P_hash) return Pb2L def _get_Pb2L_2(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pb2L_2", self.X_lpt, P, P_window, C_window) result = self.cache.get("Pb2L_2", hash_key) if result is not None: return result Ps, mat = self.J_k_scalar(P, self.X_lpt, -2, P_window=P_window, C_window=C_window) [j000, j002, j2n22, j1n11, j1n13, j004, j2n20] = [mat[0, :], mat[1, :], mat[2, :], mat[3, :], mat[4, :], mat[5, :], mat[6, :]] X5 = j000 Pb2L_2 = X5 Pb2L_2 = self._apply_extrapolation(Pb2L_2) self.cache.set(Pb2L_2, "Pb2L_2", hash_key, P_hash) return Pb2L_2
[docs] def IA_tt(self, P, P_window=None, C_window=None): """ Computes intrinsic alignment tidal torque contributions. Returns ------- tuple (P_E, P_B) where: P_E : E-mode (curl-free) tidal torque power spectrum P_B : B-mode (divergence-free) tidal torque power spectrum """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P_E = self.compute_term("P_E", self.X_IA_E, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) P_B = self.compute_term("P_B", self.X_IA_B, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) return P_E, P_B
## eq 21 EE; eq 21 BB
[docs] def IA_mix(self, P, P_window=None, C_window=None): """ Computes mixed intrinsic alignment contributions combining tidal alignment and tidal torque. Returns ------- tuple (P_A, P_Btype2, P_DEE, P_DBB) where: P_A : Mixed tidal alignment/tidal torque term P_Btype2 : Second-type B-mode term P_DEE : Contribution to the E-mode power spectrum P_DBB : Contribution to the B-mode power spectrum """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P_A = self.compute_term("P_A", self.X_IA_A, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) P_Btype2 = self._get_P_Btype2(P) #Calculated differently then other terms, can't use compute_term P_DEE = self.compute_term("P_DEE", self.X_IA_DEE, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) P_DBB = self.compute_term("P_DBB", self.X_IA_DBB, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) return P_A, P_Btype2, P_DEE, P_DBB
def _get_P_Btype2(self, P): hash_key, P_hash = self._create_hash_key("P_Btype2", None, P, None, None) result = self.cache.get("P_Btype2", hash_key) if result is not None: return result P_Btype2 = P_IA_B(self.k_original, P) P_Btype2 = 4 * P_Btype2 self.cache.set(P_Btype2, "P_Btype2", hash_key, P_hash) return P_Btype2 ## eq 18; eq 19; eq 27 EE; eq 27 BB
[docs] def IA_ta(self, P, P_window=None, C_window=None): """ Computes intrinsic alignment tidal alignment contributions. Returns ------- tuple (P_deltaE1, P_deltaE2, P_0E0E, P_0B0B) where: P_deltaE1 : First density-E mode correlation P_deltaE2 : Second density-E mode correlation P_0E0E : E-mode auto-correlation P_0B0B : B-mode auto-correlation """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P_deltaE1 = self.compute_term("P_deltaE1", self.X_IA_deltaE1, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) P_deltaE2 = self._get_P_deltaE2(P) #Calculated differently then other terms, can't use compute_term P_0E0E = self.compute_term("P_0E0E", self.X_IA_0E0E, P=P, P_window=P_window, C_window=C_window) P_0B0B = self.compute_term("P_0B0B", self.X_IA_0B0B, P=P, P_window=P_window, C_window=C_window) return P_deltaE1, P_deltaE2, P_0E0E, P_0B0B
def _get_P_deltaE2(self, P): hash_key, P_hash = self._create_hash_key("P_deltaE2", None, P, None, None) result = self.cache.get("P_deltaE2", hash_key) if result is not None: return result P_deltaE2 = P_IA_deltaE2(self.k_original, P) #Add extrap? P_deltaE2 = 2 * P_deltaE2 self.cache.set(P_deltaE2, "P_deltaE2", hash_key, P_hash) return P_deltaE2 ## eq 12 (line 2); eq 12 (line 3); eq 15 EE; eq 15 BB
[docs] def IA_der(self, P, P_window=None, C_window=None): """ Computes k^2 * P(k) derivative term for intrinsic alignment models. Returns ------- array_like P_der : Derivative term of the power spectrum """ hash_key, P_hash = self._create_hash_key("IA_der", None, P, P_window, C_window) result = self.cache.get("IA_der", hash_key) if result is not None: return result self._validate_params(P=P, P_window=P_window, C_window=C_window) P_der = (self.k_original**2)*P self.cache.set(P_der, "IA_der", hash_key, P_hash) return P_der
[docs] def IA_ct(self,P,P_window=None, C_window=None): """ Computes intrinsic alignment velocity-shear contributions. Returns ------- tuple (P_0tE, P_0EtE, P_E2tE, P_tEtE) where: P_0tE : Density-velocity E-mode correlation P_0EtE : E-mode-velocity E-mode correlation P_E2tE : Second torquing-velocity E-mode correlation P_tEtE : Velocity E-mode auto-correlation """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P_0tE = self._get_P_0tE(P, P_window=P_window, C_window=C_window) P_0EtE = self._get_P_0EtE(P, P_window=P_window, C_window=C_window) P_E2tE = self._get_P_E2tE(P, P_window=P_window, C_window=C_window) P_tEtE = self._get_P_tEtE(P, P_window=P_window, C_window=C_window) return P_0tE,P_0EtE,P_E2tE,P_tEtE
def _get_P_0tE(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("P_0tE", self.X_spt, P, P_window, C_window) result = self.cache.get("P_0tE", hash_key) if result is not None: return result nu=-2 Ps, mat = self.J_k_scalar(P, self.X_spt, nu, P_window=P_window, C_window=C_window) one_loop_coef = np.array( [2 * 1219 / 1470., 2 * 671 / 1029., 2 * 32 / 1715., 2 * 1 / 3., 2 * 62 / 35., 2 * 8 / 35., 1 / 3.]) P22_mat = np.multiply(one_loop_coef, np.transpose(mat)) P_22F = np.sum(P22_mat, 1) one_loop_coefG= np.array( [2*1003/1470, 2*803/1029, 2*64/1715, 2*1/3, 2*58/35, 2*12/35, 1/3]) PsG, matG = self.J_k_scalar(P, self.X_sptG, nu, P_window=P_window, C_window=C_window) P22G_mat = np.multiply(one_loop_coefG, np.transpose(matG)) P_22G = np.sum(P22G_mat, 1) P_22F, P_22G = self._apply_extrapolation(P_22F, P_22G) P_13G = P_IA_13G(self.k_original,P,) P_13F = P_IA_13F(self.k_original, P) P_0tE = P_22G-P_22F+P_13G-P_13F P_0tE = 2*P_0tE self.cache.set(P_0tE, "P_0tE", hash_key, P_hash) return P_0tE def _get_P_0EtE(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("P_0EtE", self.X_spt, P, P_window, C_window) result = self.cache.get("P_0EtE", hash_key) if result is not None: return result P_feG2, A = self.J_k_tensor(P,self.X_IA_tij_feG2, P_window=P_window, C_window=C_window) P_feG2 = self._apply_extrapolation(P_feG2) P_A00E = self.compute_term("P_deltaE1", self.X_IA_deltaE1, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) #OG: P_A00E, _, _, _ = self.IA_ta() P_0EtE = np.subtract(P_feG2,(1/2)*P_A00E) P_0EtE = 2*P_0EtE self.cache.set(P_0EtE, "P_0EtE", hash_key, P_hash) return P_0EtE def _get_P_E2tE(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("P_E2tE", self.X_spt, P, P_window, C_window) result = self.cache.get("P_E2tE", hash_key) if result is not None: return result P_heG2, A = self.J_k_tensor(P,self.X_IA_tij_heG2, P_window=P_window, C_window=C_window) P_heG2 = self._apply_extrapolation(P_heG2) P_A0E2 = self.compute_term("P_A", self.X_IA_A, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) #OG: P_A0E2, _, _, _ = self.IA_mix() P_E2tE = np.subtract(P_heG2,(1/2)*P_A0E2) P_E2tE = 2*P_E2tE self.cache.set(P_E2tE, "P_E2tE", hash_key, P_hash) return P_E2tE def _get_P_tEtE(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("P_tEtE", self.X_spt, P, P_window, C_window) result = self.cache.get("P_tEtE", hash_key) if result is not None: return result P_F2F2, A = self.J_k_tensor(P,self.X_IA_tij_F2F2, P_window=P_window, C_window=C_window) P_G2G2, A = self.J_k_tensor(P,self.X_IA_tij_G2G2, P_window=P_window, C_window=C_window) P_F2G2, A = self.J_k_tensor(P,self.X_IA_tij_F2G2, P_window=P_window, C_window=C_window) P_F2F2, P_G2G2, P_F2G2 = self._apply_extrapolation(P_F2F2, P_G2G2, P_F2G2) P_tEtE = P_F2F2+P_G2G2-2*P_F2G2 P_tEtE = 2*P_tEtE self.cache.set(P_tEtE, "P_tEtE", hash_key, P_hash) return P_tEtE
[docs] def gI_ct(self,P,P_window=None, C_window=None): """ Computes galaxy bias cross intrinsic alignment velocity-shear contributions. Returns ------- tuple (P_d2tE, P_s2tE) where: P_d2tE : Second-order density-velocity E-mode correlation P_s2tE : Second-order tidal-velocity E-mode correlation """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P_d2tE = self._get_P_d2tE(P, P_window=P_window, C_window=C_window) P_s2tE = self._get_P_s2tE(P, P_window=P_window, C_window=C_window) return P_d2tE, P_s2tE
def _get_P_d2tE(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("Pd2tE", self.X_IA_gb2_F2, P, P_window, C_window) result = self.cache.get("Pd2tE", hash_key) if result is not None: return result P_F2, _ = self.J_k_tensor(P, self.X_IA_gb2_F2, P_window=P_window, C_window=C_window) P_G2, _ = self.J_k_tensor(P, self.X_IA_gb2_G2, P_window=P_window, C_window=C_window) P_F2, P_G2 = self._apply_extrapolation(P_F2, P_G2) P_d2tE = 2 * (P_G2 - P_F2) self.cache.set(P_d2tE, "Pd2tE", hash_key, P_hash) return P_d2tE def _get_P_s2tE(self, P, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("P_s2tE", self.X_IA_gb2_S2F2, P, P_window, C_window) result = self.cache.get("P_s2tE", hash_key) if result is not None: return result P_S2F2, _ = self.J_k_tensor(P, self.X_IA_gb2_S2F2, P_window=P_window, C_window=C_window) P_S2G2, _ = self.J_k_tensor(P, self.X_IA_gb2_S2G2, P_window=P_window, C_window=C_window) P_S2F2, P_S2G2 = self._apply_extrapolation(P_S2F2, P_S2G2) P_s2tE = 2 * (P_S2G2 - P_S2F2) self.cache.set(P_s2tE, "P_s2tE", hash_key, P_hash) return P_s2tE
[docs] def gI_ta(self,P,P_window=None, C_window=None): """ Computes intrinsic alignment 2nd-order density correlations. Returns ------- tuple (P_d2E, P_d20E, P_s2E, P_s20E) where: P_d2E : 2nd-order density-E-mode correlation P_d20E : 2nd-order density-density-E-mode correlation P_s2E : 2nd-order tidal-E-mode correlation P_s20E : 2nd-order tidal-density-E-mode correlation """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P_d2E = self.compute_term("P_d2E", self.X_IA_gb2_F2, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) P_d20E = self.compute_term("P_d20E", self.X_IA_gb2_fe, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) P_s2E = self.compute_term("P_s2E", self.X_IA_gb2_S2F2, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) P_s20E = self.compute_term("P_s20E", self.X_IA_gb2_S2fe, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) return P_d2E, P_d20E, P_s2E, P_s20E
[docs] def gI_tt(self, P, P_window=None, C_window=None): """ Computes intrinsic alignment 2nd-order tidal correlations. Returns ------- tuple (P_s2E2, P_d2E2) where: P_s2E2 : 2nd-order tidal-E-mode squared correlation P_d2E2 : 2nd-order density-E-mode squared correlation """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P_s2E2 = self.compute_term("P_s2E2", self.X_IA_gb2_S2he, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) P_d2E2 = self.compute_term("P_d2E2", self.X_IA_gb2_he, operation=lambda x: 2 * x, P=P, P_window=P_window, C_window=C_window) return P_s2E2, P_d2E2
[docs] def OV(self, P, P_window=None, C_window=None): """ Computes the Ostriker-Vishniac effect power spectrum. Returns ------- array_like P_OV : Ostriker-Vishniac effect power spectrum """ self._validate_params(P=P, P_window=P_window, C_window=C_window) hash_key, P_hash = self._create_hash_key("OV", None, P, P_window, C_window) result = self.cache.get("P_OV", hash_key) if result is not None: return result P, A = self.J_k_tensor(P, self.X_OV, P_window=P_window, C_window=C_window) P = self._apply_extrapolation(P) P_OV = P * (2 * pi) ** 2 self.cache.set(P_OV, "P_OV", hash_key, P_hash) return P_OV
[docs] def kPol(self, P, P_window=None, C_window=None): """ Computes k-dependent polarization power spectra. Returns ------- tuple (P1, P2, P3) where: P1 : First k-dependent polarization power spectrum P2 : Second k-dependent polarization power spectrum P3 : Third k-dependent polarization power spectrum """ self._validate_params(P=P, P_window=P_window, C_window=C_window) P1 = self.compute_term("P_kP1", self.X_kP1, operation=lambda x: x / (80 * pi ** 2), P=P, P_window=P_window, C_window=C_window) P2 = self.compute_term("P_kP2", self.X_kP2, operation=lambda x: x / (160 * pi ** 2), P=P, P_window=P_window, C_window=C_window) P3 = self.compute_term("P_kP3", self.X_kP3, operation=lambda x: x / (80 * pi ** 2), P=P, P_window=P_window, C_window=C_window) return P1, P2, P3
[docs] def RSD_components(self, P, f, P_window=None, C_window=None): """ Computes redshift-space distortion component terms. Parameters ---------- f : float Logarithmic growth rate Returns ------- tuple (A1, A3, A5, B0, B2, B4, B6, P_Ap1, P_Ap3, P_Ap5) where: A1, A3, A5 : A-type RSD components with different powers of μ B0, B2, B4, B6 : B-type RSD components with different powers of μ P_Ap1, P_Ap3, P_Ap5 : Additional RSD A-prime components """ self._validate_params(P=P, P_window=P_window, C_window=C_window) _, A = self.J_k_tensor(P, self.X_RSDA, P_window=P_window, C_window=C_window) A1 = np.dot(self.A_coeff[:, 0], A) + f * np.dot(self.A_coeff[:, 1], A) + f ** 2 * np.dot(self.A_coeff[:, 2], A) A3 = np.dot(self.A_coeff[:, 3], A) + f * np.dot(self.A_coeff[:, 4], A) + f ** 2 * np.dot(self.A_coeff[:, 5], A) A5 = np.dot(self.A_coeff[:, 6], A) + f * np.dot(self.A_coeff[:, 7], A) + f ** 2 * np.dot(self.A_coeff[:, 8], A) _, B = self.J_k_tensor(P, self.X_RSDB, P_window=P_window, C_window=C_window) B0 = np.dot(self.B_coeff[:, 0], B) + f * np.dot(self.B_coeff[:, 1], B) + f ** 2 * np.dot(self.B_coeff[:, 2], B) B2 = np.dot(self.B_coeff[:, 3], B) + f * np.dot(self.B_coeff[:, 4], B) + f ** 2 * np.dot(self.B_coeff[:, 5], B) B4 = np.dot(self.B_coeff[:, 6], B) + f * np.dot(self.B_coeff[:, 7], B) + f ** 2 * np.dot(self.B_coeff[:, 8], B) B6 = np.dot(self.B_coeff[:, 9], B) + f * np.dot(self.B_coeff[:, 10], B) + f ** 2 * np.dot(self.B_coeff[:, 11], B) A1, A3, A5, B0, B2, B4, B6 = self._apply_extrapolation(A1, A3, A5, B0, B2, B4, B6) P_Ap1 = RSD_P_Ap1(self.k_original, P, f) P_Ap3 = RSD_P_Ap3(self.k_original, P, f) P_Ap5 = RSD_P_Ap5(self.k_original, P, f) return A1, A3, A5, B0, B2, B4, B6, P_Ap1, P_Ap3, P_Ap5
[docs] def RSD_ABsum_components(self, P, f, P_window=None, C_window=None): """ Computes combined redshift-space distortion terms by powers of μ. Parameters ---------- f : float Logarithmic growth rate Returns ------- tuple (ABsum_mu2, ABsum_mu4, ABsum_mu6, ABsum_mu8) where: ABsum_mu2 : Combined term with μ^2 dependence ABsum_mu4 : Combined term with μ^4 dependence ABsum_mu6 : Combined term with μ^6 dependence ABsum_mu8 : Combined term with μ^8 dependence """ self._validate_params(P=P, P_window=P_window, C_window=C_window) A1, A3, A5, B0, B2, B4, B6, P_Ap1, P_Ap3, P_Ap5 = self.RSD_components(P, f, P_window, C_window) ABsum_mu2 = self.k_original * f * (A1 + P_Ap1) + (f * self.k_original) ** 2 * B0 ABsum_mu4 = self.k_original * f * (A3 + P_Ap3) + (f * self.k_original) ** 2 * B2 ABsum_mu6 = self.k_original * f * (A5 + P_Ap5) + (f * self.k_original) ** 2 * B4 ABsum_mu8 = (f * self.k_original) ** 2 * B6 return ABsum_mu2, ABsum_mu4, ABsum_mu6, ABsum_mu8
[docs] def RSD_ABsum_mu(self, P, f, mu_n, P_window=None, C_window=None): """ Computes the total redshift-space distortion correction at a given μ. Parameters ---------- f : float Logarithmic growth rate mu_n : float Cosine of the angle between the wavevector and the line-of-sight Returns ------- array_like ABsum : The total RSD contribution at the specified μ angle """ self._validate_params(P=P, P_window=P_window, C_window=C_window) ABsum_mu2, ABsum_mu4, ABsum_mu6, ABsum_mu8 = self.RSD_ABsum_components(P, f, P_window, C_window) ABsum = ABsum_mu2 * mu_n ** 2 + ABsum_mu4 * mu_n ** 4 + ABsum_mu6 * mu_n ** 6 + ABsum_mu8 * mu_n ** 8 return ABsum
[docs] def IRres(self, P, L=0.2, h=0.67, rsdrag=135, P_window=None, C_window=None): """ Computes the IR-resummed power spectrum, which includes BAO damping. Parameters ---------- L : float, optional IR resummation scale, default is 0.2 h : float, optional Dimensionless Hubble parameter, default is 0.67 rsdrag : float, optional Sound horizon at drag epoch in Mpc, default is 135 Returns ------- array_like P_IRres : IR-resummed power spectrum with damped BAO features """ self._validate_params(P=P, P_window=P_window, C_window=C_window) # based on script by M. Ivanov. See arxiv:1605.02149, eq 7.4 # put this function in the typical fast-pt format, with minimal additional function calls. # We can also include a script to do additional things: e.g read in r_BAO from class/camb output # or calculate r_BAO from cosmological params. from scipy import interpolate k = self.k_original rbao = h * rsdrag * 1.027 # linear BAO scale # set up splining to create P_nowiggle kmin = k[0] kmax = k[-1] knode1 = pi / rbao knode2 = 2 * pi / rbao klogleft = np.arange(log(kmin), log(3.e-3), 0.2) klogright = np.arange(log(0.6), log(kmax), 0.085) klogright = np.hstack((log(knode1), log(knode2), klogright)) kloglist = np.concatenate((klogleft, klogright)) klist = np.exp(kloglist) # how to deal with extended k and P? which values should be used here? probably the extended versions? plin = interpolate.InterpolatedUnivariateSpline(k, P) logPs = np.log(plin(klist)) logpsmooth = interpolate.InterpolatedUnivariateSpline(kloglist, logPs) def psmooth(x): return exp(logpsmooth(log(x))) def pw(x): return plin(x) - psmooth(x) # compute Sigma^2 and the tree-level IR-resummed PS import scipy.integrate as integrate Sigma = integrate.quad(lambda x: (4 * pi) * psmooth(x) * ( 1 - 3 * (2 * rbao * x * cos(x * rbao) + (-2 + rbao ** 2 * x ** 2) * sin(rbao * x)) / ( x * rbao) ** 3) / (3 * (2 * pi) ** 3), kmin, L)[0] # speed up by using trap rule integration? # change to integration over log-k(?): # Sigma = integrate.quad(lambda x: x*(4*pi)*psmooth(x)*(1-3*(2*rbao*x*cos(x*rbao)+(-2+rbao**2*x**2)*sin(rbao*x))/(x*rbao)**3)/(3*(2*pi)**3), np.log(kmin), np.log(L))[0] def presum(x): return psmooth(x) + pw(x) * exp(-x ** 2 * Sigma) P = presum(k) out_1loop = self._get_P_1loop(P, P_window=P_window, C_window=C_window) # p1loop = interpolate.InterpolatedUnivariateSpline(k,out_1loop) # is this necessary? out_1loop should already be at the correct k spacing return psmooth(k) + out_1loop + pw(k) * exp(-k ** 2 * Sigma) * (1 + Sigma * k ** 2)
###################################################################################### ### functions that use the older version structures. ###
[docs] def one_loop(self, P, P_window=None, C_window=None): return self.pt_simple.one_loop(P, P_window=P_window, C_window=C_window)
[docs] def P_bias(self, P, P_window=None, C_window=None): return self.pt_simple.P_bias(P, P_window=P_window, C_window=C_window)
###################################################################################### ### Core functions used by top-level functions ### def J_k_scalar(self, P, X, nu, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("J_k_scalar", X, P, P_window, C_window) result = self.cache.get("J_k_scalar", hash_key) if result is not None: return result pf, p, g_m, g_n, two_part_l, h_l = X if (self.low_extrap is not None): P = self.EK.extrap_P_low(P) if (self.high_extrap is not None): P = self.EK.extrap_P_high(P) P_b = P * self.k_extrap ** (-nu) if (self.n_pad > 0): P_b = np.pad(P_b, pad_width=(self.n_pad, self.n_pad), mode='constant', constant_values=0) c_m = self._cache_fourier_coefficients(P_b, C_window, scalar=True) A_out = np.zeros((pf.shape[0], self.k_size)) for i in range(pf.shape[0]): # convolve f_c and g_c # C_l=np.convolve(c_m*self.g_m[i,:],c_m*self.g_n[i,:]) C_l = self._cache_convolution(c_m, c_m, g_m[i,:], g_n[i,:], h_l[i,:], two_part_l[i]) # set up to feed ifft an array ordered with l=0,1,...,-1,...,N/2-1 c_plus = C_l[self.l >= 0] c_minus = C_l[self.l < 0] C_l = np.hstack((c_plus[:-1], c_minus)) A_k = ifft(C_l) * C_l.size # multiply by size to get rid of the normalization in ifft A_out[i, :] = np.real(A_k[::2]) * pf[i] * self.k_final ** (-p[i] - 2) # note that you have to take every other element # in A_k, due to the extended array created from the # discrete convolution P_out = irfft(c_m[self.m >= 0]) * self.k_final ** nu * float(self.N) if (self.n_pad > 0): # get rid of the elements created from padding P_out = P_out[self.id_pad] A_out = A_out[:, self.id_pad] self.cache.set((P_out, A_out), "J_k_scalar", hash_key, P_hash) return P_out, A_out def J_k_tensor(self, P, X, P_window=None, C_window=None): hash_key, P_hash = self._create_hash_key("J_k_tensor", X, P, P_window, C_window) result = self.cache.get("J_k_tensor", hash_key) if result is not None: return result pf, p, nu1, nu2, g_m, g_n, h_l = X if (self.low_extrap is not None): P = self.EK.extrap_P_low(P) if (self.high_extrap is not None): P = self.EK.extrap_P_high(P) A_out = np.zeros((pf.size, self.k_size)) P_fin = np.zeros(self.k_size) for i in range(pf.size): P_b1 = P * self.k_extrap ** (-nu1[i]) P_b2 = P * self.k_extrap ** (-nu2[i]) if (P_window is not None): # window the input power spectrum, so that at high and low k # the signal smoothly tappers to zero. This make the input # more "like" a periodic signal if (self.verbose): print('windowing biased power spectrum') W = p_window(self.k_extrap, P_window[0], P_window[1]) P_b1 = P_b1 * W P_b2 = P_b2 * W if (self.n_pad > 0): P_b1 = np.pad(P_b1, pad_width=(self.n_pad, self.n_pad), mode='constant', constant_values=0) P_b2 = np.pad(P_b2, pad_width=(self.n_pad, self.n_pad), mode='constant', constant_values=0) c_m = self._cache_fourier_coefficients(P_b1, C_window, scalar=False) c_n = self._cache_fourier_coefficients(P_b2, C_window, scalar=False) # convolve f_c and g_c C_l = self._cache_convolution(c_m, c_n, g_m[i,:], g_n[i,:], h_l[i,:]) # set up to feed ifft an array ordered with l=0,1,...,-1,...,N/2-1 c_plus = C_l[self.l >= 0] c_minus = C_l[self.l < 0] C_l = np.hstack((c_plus[:-1], c_minus)) A_k = ifft(C_l) * C_l.size # multiply by size to get rid of the normalization in ifft A_out[i, :] = np.real(A_k[::2]) * pf[i] * self.k_final ** (p[i]) # note that you have to take every other element # in A_k, due to the extended array created from the # discrete convolution P_fin += A_out[i, :] # P_out=irfft(c_m[self.m>=0])*self.k**self.nu*float(self.N) if (self.n_pad > 0): # get rid of the elements created from padding # P_out=P_out[self.id_pad] A_out = A_out[:, self.id_pad] P_fin = P_fin[self.id_pad] self.cache.set((P_fin, A_out), "J_k_tensor", hash_key, P_hash) return P_fin, A_out def _cache_fourier_coefficients(self, P_b, C_window=None, scalar=False): """Cache and return Fourier coefficients for a given biased power spectrum""" hash_key, P_hash = self._create_hash_key("fourier_coefficients", None, P_b, None, C_window) hash_key = hash_key ^ (hash(scalar) + 0x9e3779b9 + (hash_key << 6) + (hash_key >> 2)) result = self.cache.get("fourier_coefficients", hash_key) if result is not None: return result c_m_positive = rfft(P_b) # We always filter the Fourier coefficients, so the last element is zero. # But in case someone does not filter and this is a scalar calculation, divide the end point by two if scalar: c_m_positive[-1] = c_m_positive[-1] / 2. c_m_negative = np.conjugate(c_m_positive[1:]) c_m = np.hstack((c_m_negative[::-1], c_m_positive)) / float(self.N) if C_window is not None: # Window the Fourier coefficients. # This will damp the highest frequencies if self.verbose: print('windowing the Fourier coefficients') c_m = c_m * c_window(self.m, int(C_window * self.N // 2.)) self.cache.set(c_m, "fourier_coefficients", hash_key, None) return c_m def _cache_convolution(self, c1, c2, g_m, g_n, h_l, two_part_l=None): """Cache and return convolution results""" # Use MurmurHash for faster hashing with low collision rate, original hashing method is much slower for this case def fast_hash(arr): if arr is None: return 0 if isinstance(arr, np.ndarray): if arr.size > 1000: sample = arr.ravel()[:1000] return hash(sample.tobytes()) ^ hash(arr.shape) ^ hash(arr.size) return hash(arr.tobytes()) ^ hash(arr.shape) return hash(arr) # Combine hashes with different prime multipliers to reduce collisions hash_key = 0 primes = [17, 31, 61, 127, 257, 509] arrays = [c1, c2, g_m, g_n, h_l, two_part_l] for i, arr in enumerate(arrays): h = fast_hash(arr) hash_key = hash_key ^ (h * primes[i % len(primes)]) result = self.cache.get("convolution", hash_key) if result is not None: return result # Calculate convolution C_l = fftconvolve(c1 * g_m, c2 * g_n) # C_l=convolve(c_m*self.g_m[i,:],c_m*self.g_n[i,:]) # multiply all l terms together # C_l=C_l*self.h_l[i,:]*self.two_part_l[i] # Apply additional terms if two_part_l is not None: C_l = C_l * h_l * two_part_l else: C_l = C_l * h_l # Cache and return self.cache.set(C_l, "convolution", hash_key, None) return C_l