import os from collections import Counter import matplotlib.pyplot as plt import numpy as np import pandas as pd def linesplit(line): """ Splits line of file into useful components. Returns (dm, pulse width, originating filename). """ dm, line = line.split("pc/cc") dm = int(dm) pw, fname = line.split(" s ") pw = float(pw) fname = fname.strip().removesuffix("_injected") return (dm, pw, fname) def updateDic(lines, dic): """ Processes a list of lines and adds them to given dictionary in-place. """ for line in lines: dm, pw, fname = linesplit(line) dic["file"].append(fname) dic["dm"].append(dm) dic["pulseWidth"].append(pw) injectedDic = { "file" : [], "dm" : [], "pulseWidth" : [] } detectedDic = { "file" : [], "dm" : [], "pulseWidth" : [] } #load injected data into dataframe with open(os.path.join(".","out","2025-07-31T17-25-54.txt"), "r") as file: lines = file.readlines() updateDic(lines, injectedDic) injections = pd.DataFrame(data=injectedDic) #this is our main object for injection data #load detection data into dataframe with open(os.path.join(".","out","plotOut.txt"), "r") as file: lines = file.readlines() updateDic(lines, detectedDic) detections = pd.DataFrame(data=detectedDic) #this is our main object for detection data #define summary printing for multiple steps def summary(stage): print(f"Summary Stage {stage}") print(injections.head()) print(f"Number of files injected: {len(Counter(injections['file']))}") print(f"Number of files with detections: {len(Counter(detections['file']))}") print("=========================") print(f"Number of pulses injected: {len(injections['dm'])}") print(f"Number of pulses detected: {len(detections['dm'])}") print("=========================") print(f"File ratio: {round(len(Counter(detections['file']))/len(Counter(injections['file'])), 3)}") print(f"Detection ratio: {round(len(detections['dm'])/len(injections['dm']), 3)}") print("=========================") #print initial summary summary(1) #many files contained pulsar bursts, so we filter those out via DM minDM = (10**2.5) * 0.95 #as per signal generation, plus a bit of wiggle room print(f"Filtering out pulsars (DM below {int(minDM)}...)") detections = detections[detections['dm'] > minDM] detections = detections.reset_index(drop=True) summary(2) #Let's do detection matching! Yaaaay! #What detections line up to which injections? This will determine which ones got missed entirely. #Define some kind of epsilon for DM and pulse width; if detection is within epsilon in DM we can match it. dmEps = 5 #and define an auxiliary array of 0s for injections. List of detection counts! matchCount = np.zeros(len(injections['dm']), dtype=int) #also keep track of false positives: falsePositiveMask = [False] * len(detections['dm']) #Use queries to find matches for detection in detections.itertuples(): qstring = ( f"(file == '{detection.file}') & " f"((dm - @dmEps) < {detection.dm}) & " f"((dm + @dmEps) > {detection.dm})" ) matches = injections.query(qstring) if len(matches) > 0: print(f"Detection: DM {detection.dm} and PW {detection.pulseWidth}") print(matches) if len(matches) == 1: i = matches.index[0] matchCount[i] += 1 print("======") elif len(matches) > 1: raise ValueError("MULTIPLE MATCHES OHNO") else: #no matching injection... falsePositiveMask[detection.Index] = True print(f"NO MATCH FOR: DM {detection.dm} and PW {detection.pulseWidth}") print("Injections in file:") print(injections.query(f"(file == '{detection.file}')")) matchMaskInj = [matchCount > 0] matchMaskDet = np.logical_not(falsePositiveMask) missedMask = [matchCount == 0] #So where are we? #We have multiple datasets. #1. List of all injected pulses. [injections] #2. List of detections with pulsars filtered out. [detections] #3. Number of times each injection was detected [matchCount] #4. A mask for only detected injections [matchMaskInj] #5. A mask for only true positives [matchMaskDet] #6. A mask for only missed injections [missedMask] #7. A mask for false positives [falsePositiveMask]