Source code for turbo_seti.find_doppler.data_handler

"""
Filterbank data handler for the find_doppler.py functions.
"""

import os
import math
import logging

import numpy as np
from pkg_resources import resource_filename
import h5py

from blimpy import Waterfall
from blimpy.io import sigproc
from blimpy.io.hdf_writer import __write_to_hdf5_heavy as write_to_h5
from blimpy.utils import change_the_ext

from .kernels import Kernels

logger = logging.getLogger('data_handler')

#For debugging
#import cProfile
#import pdb;# pdb.set_trace()

[docs]class DATAHandle: r""" Class to setup input file for further processing of data. Handles conversion to h5 (from fil), extraction of coarse channel info, waterfall info, and file size checking. Parameters ---------- filename : str Name of file (.h5 or .fil). out_dir : str Directory where output files should be saved. n_coarse_chan : int Number of coarse channels. coarse_chans : list or None List of course channels. kernels : Kernels, optional Pre-configured class of Kernels. gpu_backend : bool, optional Use GPU accelerated Kernels? precision : int {2: float64, 1: float32}, optional Floating point precision. Default: 1. gpu_id : int If gpu_backend=True, then this is the device ID to use. """ def __init__(self, filename=None, out_dir='./', n_coarse_chan=None, coarse_chans=None, kernels=None, gpu_backend=False, precision=1, gpu_id=0): if not kernels: self.kernels = Kernels(gpu_backend, precision, gpu_id) else: self.kernels = kernels if filename and os.path.isfile(filename): self.filename = filename self.out_dir = out_dir self.n_coarse_chan = n_coarse_chan self.coarse_chans = coarse_chans if not h5py.is_hdf5(filename): if not sigproc.is_filterbank(filename): self.status = False errmsg = 'Not a filterbank file: {}'.format(filename) raise IOError(errmsg) logger.info("Filterbank file detected. Attempting to create .h5 file in current directory...") self.__make_h5_file() self.filestat = os.stat(filename) self.filesize = self.filestat.st_size/(1024.0**2) # Grab header from DATAH5 datah5_obj = DATAH5(filename, kernels=self.kernels, gpu_id=gpu_id) self.header = datah5_obj.header self.drift_rate_resolution = datah5_obj.drift_rate_resolution datah5_obj.close() # Split the file self.cchan_list = self.__split_h5() self.status = True else: self.status = False errmsg = "File {} doesn\'t exist, please check!".format(filename) raise IOError(errmsg)
[docs] def get_info(self): r""" Get the header of the file. Returns ------- header : dict Header of the blimpy file. """ wf = Waterfall(self.filename, load_data=False) return wf.header
def __make_h5_file(self): r""" Converts file to h5 format, saved in current directory. Sets the filename attribute of the calling DATAHandle. to the (new) filename. """ wf = Waterfall(self.filename, load_data=False) fil_path = os.path.basename(self.filename) h5_path = os.path.join(self.out_dir, change_the_ext(fil_path, 'fil', 'h5')) try: os.remove(h5_path) except: pass write_to_h5(wf, h5_path) self.filename = h5_path def __split_h5(self): r""" Creates a plan to select data from single coarse channels. Returns ------- chan_list : list Where each list member contains a coarse channel dict object for each coarse channel in the file. Dict fields: * filename : file path (common to all objects) * f_start : start frequency of coarse channel * f_stop : stop frequency of coarse channel * coarse_chan_id : coarse channel number (identifier) * n_coarse_chan : total number of coarse channels (common to all objects) """ cchan_list = [] # Create a Waterfall object try: wf = Waterfall(self.filename, load_data=False) except: errmsg = "Error encountered when trying to open file: {}".format(self.filename) raise IOError(errmsg) #Finding lowest freq in file. f_delt = wf.header['foff'] f0 = wf.header['fch1'] # Determine the number of coarse channels: user, blimpy, of header nfpc field. if self.n_coarse_chan is not None: # specifictaion from user logger.info("From user, n_coarse_chan={}".format(self.n_coarse_chan)) else: # no specifictaion from user self.nfpc = wf.header.get('nfpc', None) # Store nfpc value in the DATAHandle object. if self.nfpc is not None: # nfpc is specified in file header if self.nfpc < 1: # nfpc valid? errmsg = "Filterbank header field NFPC must be > 0 but I saw {}. Defaulting to blimpy!" \ .format(self.nfpc) logger.warning(errmsg) self.n_coarse_chan = int(wf.calc_n_coarse_chan()) logger.info("From blimpy, n_coarse_chan={}" .format(self.n_coarse_chan)) else: # nfpc is valid. Use it to calculate the number of coarse channels. self.n_coarse_chan = int(0.01 + wf.header['nchans'] / self.nfpc) logger.info("From nfpc={}, n_fine_nchans={} and n_coarse_chan={}" .format(self.nfpc, wf.header['nchans'], self.n_coarse_chan)) else: # nfpc is NOT in the file header, not specified by user. self.n_coarse_chan = int(wf.calc_n_coarse_chan()) logger.info("From blimpy, n_coarse_chan={}" .format(self.n_coarse_chan)) # Only load coarse chans of interest -- or do all if not specified if self.coarse_chans in (None, ''): self.coarse_chans = range(self.n_coarse_chan) for cchan_id in self.coarse_chans: # Calculate the frequency range for the given course channel (cchan_id). f_start = f0 + cchan_id * (f_delt) * wf.n_channels_in_file / self.n_coarse_chan f_stop = f0 + (cchan_id + 1) * (f_delt) * wf.n_channels_in_file / self.n_coarse_chan if f_start > f_stop: f_start, f_stop = f_stop, f_start # Instantiate the coarse channel object. cchan_obj = {'filename': self.filename, 'f_start': f_start, 'f_stop': f_stop, 'cchan_id': cchan_id, 'n_coarse_chan': self.n_coarse_chan} # Append coarse channel object to list. cchan_list.append(cchan_obj) return cchan_list
[docs]class DATAH5: r""" This class is where the waterfall data is loaded, as well as the DATAH5 header info. Don't be surprised at the use of FITS header names! [?] It creates other attributes related to the dedoppler search (load_drift_indexes). Parameters ---------- filename : string Name of file. f_start : float Start frequency in MHz. f_stop : float Stop frequency in MHz. t_start : int Start integration ID. t_stop : int Stop integration ID. coarse_chan : int Coarse channel ID. n_coarse_chan : int Total number of coarse channels. kernels : Kernels Pre-configured class of kernels. """ def __init__(self, filename, f_start=None, f_stop=None, t_start=None, t_stop=None, cchan_id=0, n_coarse_chan=None, kernels=None, gpu_backend=False, precision=1, gpu_id=0): self.filename = filename self.closed = False self.f_start = f_start self.f_stop = f_stop self.t_start = t_start self.t_stop = t_stop self.n_coarse_chan = n_coarse_chan if not kernels: self.kernels = Kernels(gpu_backend, precision, gpu_id) else: self.kernels = kernels # Create a Waterfall object instance. try: self.fil_file = Waterfall(filename, f_start=self.f_start, f_stop=self.f_stop, t_start=self.t_start, t_stop=self.t_stop, load_data=False) except: errmsg = "Error encountered when trying to open file: {}".format(filename) raise IOError(errmsg) # Create a header used by the search. try: if self.n_coarse_chan: header = self.__make_data_header(self.fil_file.header, coarse=True) else: header = self.__make_data_header(self.fil_file.header) except: errmsg = "Error accessing header from file: {}".format(self.fil_file.header) raise IOError(errmsg) self.header = header self.fftlen = header['NAXIS1'] #EE To check if swapping tsteps_valid and tsteps is more appropriate. self.tsteps_valid = int(self.fil_file.n_ints_in_file) self.tsteps = int(math.pow(2, math.ceil(np.log2(math.floor(self.tsteps_valid))))) self.header['obs_length'] = self.tsteps_valid * header['DELTAT'] self.drift_rate_resolution = (1e6 * np.abs(header['DELTAF'])) / ((self.tsteps_valid - 1) * header['DELTAT']) self.header['cchan_id'] = cchan_id #EE For now I'm not using a shoulder. This is ok as long as ## I'm analyzing each coarse channel individually. #EE In general this parameter is an integer (even number). #This gives two regions, each of n*steps, around spectra[i] self.shoulder_size = 0 self.tdwidth = self.fftlen + self.shoulder_size * self.tsteps
[docs] def load_data(self): r""" Read the spectra and drift indices from file. Returns ------- spectra, drift indices : ndarray, ndarray """ self.fil_file.read_data(f_start=self.f_start, f_stop=self.f_stop) dim_time = self.fil_file.data.shape[0] if dim_time < 2: msg = "data_handler.py:load_data: Cannot handle data with only 1 time step!" logger.error(msg) msg = "data shape = {}!".format(self.fil_file.data.shape) raise ValueError(msg) spectra = self.kernels.np.squeeze(self.fil_file.data) # This check will add rows of zeros if the obs is too short # (and thus not a power of two rows). if spectra.shape[0] < self.tsteps: padding = self.kernels.np.zeros((self.tsteps-spectra.shape[0], self.fftlen)) spectra = self.kernels.np.concatenate((spectra, padding), axis=0) self.tsteps_valid = self.tsteps #updating obs_length and drift_rate resolution after changing the spectra shape self.header['obs_length'] = self.tsteps * self.header['DELTAT'] self.drift_rate_resolution = (1e6 * np.abs(self.header['DELTAF'])) / ((self.tsteps_valid - 1) * self.header['DELTAT']) if spectra.shape != (self.tsteps_valid, self.fftlen): msg = "data_handler.py:load_data: spectra.shape={}!".format(spectra.shape) logger.error(msg) msg = "data_handler.py:load_data: tsteps_valid={}, fftlen={}!" \ .format(self.tsteps_valid, self.fftlen) raise ValueError(msg) drift_indexes = self.load_drift_indexes() return spectra, drift_indexes
[docs] def load_drift_indexes(self): r""" The drift indices are read from a stored file so that there is no need to recalculate. This speed things up. Returns ------- drift_indexes : ndarray """ dia_num = int(np.log2(self.tsteps)) file_path = resource_filename('turbo_seti', f'drift_indexes/drift_indexes_array_{dia_num}.txt') if not os.path.isfile(file_path): dia_file = 'drift_indexes/drift_indexes_array_{}.txt'.format(dia_num) msg = "data_handler.py:load_drift_indexes: time integration steps = {}!".format(self.tsteps) logger.error(msg) msg = "data_handler.py:load_drift_indexes: file {} not found!".format(dia_file) logger.error(msg) if self.tsteps < 4: raise ValueError("Number of time integration steps must be at least 4!") raise ValueError("""Don't attempt to use High Time Resolution (HTR) files with turboSETI. """ """TurboSETI is designed to search for narrowband signals -- the maximum """ """doppler drift we can expect due to the motion of celestial bodies is a few Hz/s. """ """The high time resolution products (ending 0001.fil) has ~0.5 MHz resolution and """ """~100 us integrations, so you'd be looking at insane drift rates. Issue #117.""") di_array = np.array(np.genfromtxt(file_path, delimiter=' ', dtype=int)) ts2 = int(self.tsteps/2) drift_indexes = di_array[(self.tsteps_valid - 1 - ts2), 0:self.tsteps_valid] return drift_indexes
def __make_data_header(self, header, coarse=False): r""" Takes header into fits header format. Parameters ---------- header : dict Blimpy waterfall header. coarse : Boolean Whether or not there are coarse channels to analyze. Returns ------- base_header : dict """ base_header = {} #used by file_writers.py base_header['SOURCE'] = header['source_name'] base_header['MJD'] = header['tstart'] base_header['DEC'] = str(header['src_dej']) base_header['RA'] = str(header['src_raj']) base_header['DELTAF'] = header['foff'] base_header['DELTAT'] = float(header['tsamp']) #used by helper_functions.py if coarse: base_header['NAXIS1'] = int(header['nchans']/self.n_coarse_chan) base_header['FCNTR'] = self.kernels.np.abs(self.f_stop - self.f_start) / 2. + self.kernels.np.fmin( self.f_start, self.f_stop) else: base_header['NAXIS1'] = int(header['nchans']) base_header['FCNTR'] = float(header['fch1']) + header['foff'] * base_header['NAXIS1'] / 2 # Return base_header to caller. return base_header
[docs] def close(self): r""" Closes file and sets the data attribute `.closed` to True. A closed object can no longer be used for I/O operations. `close()` may be called multiple times without error. """ # Call file object destructor which should close the file if hasattr(self, 'fil_file'): del self.fil_file if hasattr(self, 'kernels'): del self.kernels self.closed = True