#! /minish/keh00032/.conda/envs/keh00032/bin/python import logging from os import path from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser import numpy as np import matplotlib.pyplot as plt from jess.dispersion import dedisperse from jess.fitters import median_fitter from scipy.stats import median_abs_deviation from your import Your, Writer from will import create, inject logger = logging.getLogger() logFmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=logFmt) #This function from https://josephwkania.github.io/will/examples/inject_pulse.html#Show-how-we-can-inject-a-pulse-into-a-GREENBURST-filterbank. def show_dynamic( dynamic_spectra: np.ndarray, title: Union[str, None] = None, save: Union[bool, None] = False, ) -> None: """ Show a dynamic spectra by first flattening it in frequency. Do this by getting the medians of each channel and then run a median filter along the bandpass. Then set the limits of the imshow so we get good detail for the majority of the data. Args: dynmaic_spectra - the dynamic spectra to plot title - Title of plot save - Save the plot as `title` + `.png` """ spectra_mads = median_fitter(np.median(dynamic_spectra, axis=0)) flat = dynamic_spectra - spectra_mads std = median_abs_deviation(flat, axis=None) med = np.median(flat) plt.figure(figsize=(20, 10)) plt.imshow(flat.T, vmin=med - 3 * std, vmax=med + 6 * std, aspect="auto") plt.xlabel("Time Sample #", size=20) plt.ylabel("Channel #", size=20) plt.colorbar() plt.tight_layout() if title is not None: plt.title(title, size=28) if save: plt.savefig(title.replace(" ", "_") + ".png", dpi=75, bbox_inches="tight") def processList(values): """ Entry point if values.file is None and values.listfile is set. Processes the file for directories and iterates through by setting values.file and calling addBurst. """ with open(values.listfile, "r") as listfile: dirs = listfile.readlines() for dir in dirs: dataName = path.dirname(dir[:-1]) #strip the trailing slash for dirname values.file = path.join(dir,dataname+".fil") addBurst(values) def addBurst(values): """ Entry point if values.file is set. --listfile will enter into this function multiple times for each line. """ filterbankObj = Your(values.file) samples = filterbankObj.your_header.nspectra spectra = filterbankObj.get_data(0, samples) #save pre-injection spectra plot show_dynamic(spectra, f"{path.basename(values.file)} Pre-injection Dynamic Spectra", save=True) #get bandpass and store in bpWeights bpWeights = create.filter_weights(spectra) #create pulse #first version is very simple, plan on adding more complex injections in future pulseObj = create.SimpleGaussPulse( sigma_time=0.001, sigma_freq=350, center_freq = filterbankObj.your_header.center_freq, dm = values.dm, tau = 20, phi=np.pi / 3, #does nothing if nscint = 0 spectral_index_alpha=0, chan_freqs = filterbankObj.chan_freqs, tsamp = filterbankObj.your_header.tsamp, nscint=0, bandpass = bpWeights ) pulse = pulseObj.sample_pulse(nsamp=int(3e5)) #sample it 300000 times #inject pulse injectedSpectra = inject.inject_constant_into_file( yr_input = filterbankObj, pulse = pulse, start = samples // 4, gulp = samples ) #and save the new plot and filterbank file show_dynamic(spectra, f"{path.basename(values.file)} Dynamic Spectra and {values.dm} DM Pulse", save=True) filWriter = Writer(filterbankObj, outdir="./", outname = f"{path.basename(values.file)}_injected") filWriter.to_fil(data=injectedSpectra) logger.info(f"{path.basename(values.file)}_injected.fil successfully written.") if __name__ == "__main__": parser = ArgumentParser( description="Insert simulated bursts into GREENBURST filterbank files.", formatter_class=ArgumentDefaultsHelpFormatter ) parser.add_argument( "-l", "--listfile", dest="listfile", type=str, help="File containing list of greenburst filterbank directories." ) parser.add_argument( "-f", "--file", dest="file", type=str, help="Single filterbank file." ) parser.add_argument( "-d", "--dm", dest="dm", type=float, help="DM of injected pulse." ) parser.set_defaults(dm=250.0) parser.set_defaults(listfile=None) parser.set_defaults(file=None) values = parser.parse_args() if values.file is not None: #single file takes priority logging.info(f"Running with file {values.file}") addBurst(values) elif values.listfile is not None: #list of files logging.info(f"Looking for directories in file {values.listfile}") processList(values)