greenburstAux/signalgen.py
2025-07-21 16:24:03 -04:00

173 lines
5.7 KiB
Python
Executable file

#! /minish/keh00032/.conda/envs/keh00032/bin/python
import logging
from os import path, chdir, getcwd, makedirs
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from typing import Union
import numpy as np
import matplotlib.pyplot as plt
from jess.dispersion import dedisperse
from jess.fitters import median_fitter
from scipy.stats import median_abs_deviation
from your import Your, Writer
from your.formats.filwriter import make_sigproc_object
from will import create, inject
logger = logging.getLogger()
logFmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=logFmt)
#This function from https://josephwkania.github.io/will/examples/inject_pulse.html#Show-how-we-can-inject-a-pulse-into-a-GREENBURST-filterbank.
def show_dynamic(
dynamic_spectra: np.ndarray,
title: Union[str, None] = None,
save: Union[bool, None] = False,
) -> None:
"""
Show a dynamic spectra by first flattening it
in frequency. Do this by getting the medians of
each channel and then run a median filter along the
bandpass.
Then set the limits of the imshow so we get good detail
for the majority of the data.
Args:
dynmaic_spectra - the dynamic spectra to plot
title - Title of plot
save - Save the plot as `title` + `.png`
"""
spectra_mads = median_fitter(np.median(dynamic_spectra, axis=0))
flat = dynamic_spectra - spectra_mads
std = median_abs_deviation(flat, axis=None)
med = np.median(flat)
plt.figure(figsize=(20, 10))
plt.imshow(flat.T, vmin=med - 3 * std, vmax=med + 6 * std, aspect="auto")
plt.xlabel("Time Sample #", size=20)
plt.ylabel("Channel #", size=20)
plt.colorbar()
plt.tight_layout()
if title is not None:
plt.title(title, size=28)
if save:
plt.savefig(title.replace(" ", "_") + ".png", dpi=75, bbox_inches="tight")
def processList(values):
"""
Entry point if values.file is None and values.listfile is set.
Processes the file for directories and iterates through by setting values.file and calling addBurst.
"""
with open(values.listfile, "r") as listfile:
dirs = listfile.readlines()
for dir in dirs:
dataName = path.dirname(dir[:-1]) #strip the trailing slash for dirname
values.file = path.join(dir,dataname+".fil")
addBurst(values)
def addBurst(values):
"""
Entry point if values.file is set.
--listfile will enter into this function multiple times for each line.
"""
filterbankObj = Your(values.file)
values.file = values.file.strip(".fil") #removing extension for text manipulation later
basename = path.basename(values.file)
samples = 8096
#the full filterbanks use 64GB in RAM when injecting burst, so we write out a truncated version and load that instead.
filWriter = Writer(filterbankObj, outdir="./", outname = f"{basename}_trunc", nstart = 0, nsamp = samples)
filWriter.to_fil()
#replace filterbankObj object and reload spectra (spectra should be the same but just in case)
filterbankObj = Your(path.join("./", f"{basename}_trunc.fil"))
spectra = filterbankObj.get_data(0, samples)
#save pre-injection spectra plot
show_dynamic(spectra, f"{basename} Pre-injection Dynamic Spectra", save=True)
#get bandpass and store in bpWeights
bpWeights = create.filter_weights(spectra)
logger.info(f"{basename} loaded. Sampling pulse {values.nsamp} times.")
#create pulse
#first version is very simple, plan on adding more complex injections in future
pulseObj = create.SimpleGaussPulse(
sigma_time=0.001,
sigma_freq=350,
center_freq = filterbankObj.your_header.center_freq,
dm = values.dm,
tau = 20,
phi=np.pi / 3, #does nothing if nscint = 0
spectral_index_alpha=0,
chan_freqs = filterbankObj.chan_freqs,
tsamp = filterbankObj.your_header.tsamp,
nscint=0,
bandpass = bpWeights
)
pulse = pulseObj.sample_pulse(nsamp=values.nsamp) #30000 by default
logger.info("Injecting pulse")
#inject pulse
injectedSpectra = inject.inject_constant_into_file(
yr_input = filterbankObj,
pulse = pulse,
start = samples // 2,
gulp = samples
)
logger.info("Saving plot and filterbank.")
#and save the new plot
show_dynamic(injectedSpectra, f"{basename} Dynamic Spectra and {values.dm} DM Pulse", save=True)
#now generate new filterbank file
sigprocObj = make_sigproc_object(**(filterbankObj.your_header.__dict__))
newName = f"{basename}_injected.fil"
sigprocObj.write_header(newName)
sigprocObj.append_spectra(injectedSpectra, newName)
logger.info(f"{newName} successfully written.")
if __name__ == "__main__":
parser = ArgumentParser(
description="Insert simulated bursts into GREENBURST filterbank files.",
formatter_class=ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"-l", "--listfile", dest="listfile", type=str, help="File containing list of greenburst filterbank directories."
)
parser.add_argument(
"-f", "--file", dest="file", type=str, help="Single filterbank file."
)
parser.add_argument(
"-d", "--dm", dest="dm", type=float, help="DM of injected pulse."
)
parser.add_argument(
"-n", "--nsamp", type=int, help="Number of samples to take of the generated pulse."
)
parser.set_defaults(dm=250.0)
parser.set_defaults(nsamp=int(3e4))
parser.set_defaults(listfile=None)
parser.set_defaults(file=None)
values = parser.parse_args()
#set working directory to ignored directory
outdir = path.join(getcwd(),"out","")
if not path.isdir(outdir):
makedirs(outdir)
chdir(outdir)
if values.file is not None: #single file takes priority
logging.info(f"Running with file {values.file}")
addBurst(values)
elif values.listfile is not None: #list of files
logging.info(f"Looking for directories in file {values.listfile}")
processList(values)