#! /minish/keh00032/.conda/envs/keh00032/bin/python import logging from os import path, chdir, getcwd, makedirs from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from typing import Union import numpy as np import matplotlib.pyplot as plt from jess.dispersion import dedisperse from jess.fitters import median_fitter from scipy.stats import median_abs_deviation from your import Your, Writer from will import create, inject logger = logging.getLogger() logFmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=logFmt) #This function from https://josephwkania.github.io/will/examples/inject_pulse.html#Show-how-we-can-inject-a-pulse-into-a-GREENBURST-filterbank. def show_dynamic( dynamic_spectra: np.ndarray, title: Union[str, None] = None, save: Union[bool, None] = False, ) -> None: """ Show a dynamic spectra by first flattening it in frequency. Do this by getting the medians of each channel and then run a median filter along the bandpass. Then set the limits of the imshow so we get good detail for the majority of the data. Args: dynmaic_spectra - the dynamic spectra to plot title - Title of plot save - Save the plot as `title` + `.png` """ spectra_mads = median_fitter(np.median(dynamic_spectra, axis=0)) flat = dynamic_spectra - spectra_mads std = median_abs_deviation(flat, axis=None) med = np.median(flat) plt.figure(figsize=(20, 10)) plt.imshow(flat.T, vmin=med - 3 * std, vmax=med + 6 * std, aspect="auto") plt.xlabel("Time Sample #", size=20) plt.ylabel("Channel #", size=20) plt.colorbar() plt.tight_layout() if title is not None: plt.title(title, size=28) if save: plt.savefig(title.replace(" ", "_") + ".png", dpi=75, bbox_inches="tight") def processList(values): """ Entry point if values.file is None and values.listfile is set. Processes the file for directories and iterates through by setting values.file and calling addBurst. """ with open(values.listfile, "r") as listfile: dirs = listfile.readlines() for dir in dirs: dataName = path.dirname(dir[:-1]) #strip the trailing slash for dirname values.file = path.join(dir,dataname+".fil") addBurst(values) def addBurst(values): """ Entry point if values.file is set. --listfile will enter into this function multiple times for each line. """ filterbankObj = Your(values.file) values.file = values.file.strip(".fil") #removing extension for text manipulation later basename = path.basename(values.file) samples = 8096 spectra = filterbankObj.get_data(0, samples) #save pre-injection spectra plot show_dynamic(spectra, f"{basename} Pre-injection Dynamic Spectra", save=True) #get bandpass and store in bpWeights bpWeights = create.filter_weights(spectra) logger.info(f"{basename} loaded. Sampling pulse {values.nsamp} times.") #create pulse #first version is very simple, plan on adding more complex injections in future pulseObj = create.SimpleGaussPulse( sigma_time=0.001, sigma_freq=350, center_freq = filterbankObj.your_header.center_freq, dm = values.dm, tau = 20, phi=np.pi / 3, #does nothing if nscint = 0 spectral_index_alpha=0, chan_freqs = filterbankObj.chan_freqs, tsamp = filterbankObj.your_header.tsamp, nscint=0, bandpass = bpWeights ) pulse = pulseObj.sample_pulse(nsamp=values.nsamp) #30000 by default logger.info("Injecting pulse") #inject pulse injectedSpectra = inject.inject_constant_into_file( yr_input = filterbankObj, pulse = pulse, start = samples // 4, gulp = samples ) logger.info("Saving plot and filterbank.") #and save the new plot and filterbank file show_dynamic(spectra, f"{basename} Dynamic Spectra and {values.dm} DM Pulse", save=True) filWriter = Writer(filterbankObj, outdir="./", outname = f"{basename}_injected", nstart = 0, nsamp = samples) filWriter.to_fil(data=injectedSpectra) logger.info(f"{basename}_injected.fil successfully written.") if __name__ == "__main__": parser = ArgumentParser( description="Insert simulated bursts into GREENBURST filterbank files.", formatter_class=ArgumentDefaultsHelpFormatter ) parser.add_argument( "-l", "--listfile", dest="listfile", type=str, help="File containing list of greenburst filterbank directories." ) parser.add_argument( "-f", "--file", dest="file", type=str, help="Single filterbank file." ) parser.add_argument( "-d", "--dm", dest="dm", type=float, help="DM of injected pulse." ) parser.add_argument( "-n", "--nsamp", type=int, help="Number of samples to take of the generated pulse." ) parser.set_defaults(dm=250.0) parser.set_defaults(nsamp=int(3e4)) parser.set_defaults(listfile=None) parser.set_defaults(file=None) values = parser.parse_args() #set working directory to ignored directory outdir = path.join(getcwd(),"out","") if not path.isdir(outdir): makedirs(outdir) chdir(outdir) if values.file is not None: #single file takes priority logging.info(f"Running with file {values.file}") addBurst(values) elif values.listfile is not None: #list of files logging.info(f"Looking for directories in file {values.listfile}") processList(values)