Source code for stompy.io.rdb

"""
Tools for reading RDB files, the text-based format often used in USGS 
data.  See stompy/test/data for examples of this type of data.
"""

from __future__ import print_function

from functools import reduce

# the 2to3 stuff added some other cruft which I think actually makes it all worse...

# python import routines for rdb (USGS tab delimited) format

import re,string,time
import six
import io
import pytz

import datetime
import numpy as np
import xarray as xr
from matplotlib.dates import date2num,num2date

try:
    import cPickle as pickle
except ImportError:
    import pickle
    
from .rdb_datadescriptors import dd_to_synonyms

from .. import utils
from . import rdb_codes


# best way to deal with dates:
# 64-bit float is plenty to cover milliseconds to a century.
# what is the best unit, though?
# days are nice, could pretty much ignore leap seconds
# leap years are still a problem, and one would want to be able
# to convert days to years reliably
# presumably the python datetime library can handle dates beyond
# the epoch.
# I think mxdatetime is probably the way to go.  If it's good enough
# for postgresql, it's good enough for me.  Looks like they use a
# pair of values - an int for days, and a float for seconds in the
# day.  Nicely compacted in an "absdays" attr


[docs]class Rdb(object): record_count = 0 def __init__(self, text=None, source_file=None, fp=None): self.source_filename = source_file if text is not None: try: text=text.decode() except AttributeError: pass self.text = text self.fp = fp if self.fp is None: if self.text is not None: self.fp = io.StringIO(self.text) elif self.source_filename: self.fp = open(self.source_filename,'rt') else: # coerce fp to be text, not binary try: self.fp.encoding except AttributeError: self.fp=io.StringIO(fp.read().decode()) self.parse_source_file()
[docs] def float_or_nan(self,s): if s in (None,'','Eqp','***'): return np.nan else: return float(s)
[docs] def data(self): """ assuming that only one data type was requested, try to figure out which column it is, and return that data for single-valued columns, this will expand the data out to be the right length """ for k in list(self.keys()): if re.match('^[0-9_]+$',k): d = self[k] try: len(d) return d except TypeError: return d*ones(self.record_count) print(list(self.keys())) raise Exception("None of the keys looked good")
[docs] def parse_date(self,s): """ parse a date like '2008-01-13 00:31' into a float representing absolute days since 0ad """ d=None for fmt in ["%Y-%m-%d %H:%M","%Y-%m-%d"]: try: d = datetime.datetime.strptime(s,fmt) except ValueError: pass if d: return date2num(d) else: return None
[docs] def parse_source_file(self): # eat the comments line = None preamble_lines=[] for line in self.fp: if line[0]!='#': break preamble_lines.append(line) self.preamble="".join(preamble_lines) if line is None: print("No data in source!") print("text:",self.text) print("source_filename:",self.source_filename) raise Exception("Empty RDB data?!") headers = line.strip().split("\t") try: specs = next(self.fp).strip().split("\t") except StopIteration: raise Exception("Possible bad request: %s"%line.strip()) # import the data into an array columns = [[] for each in headers] # split rows into columns self.record_count = 0 for line in self.fp: data = line.strip("\r\n").split("\t") for column,datum in zip(columns,data): column.append(datum) self.record_count += 1 self.fp.close() # fix up columns for i in range(len(headers)): if specs[i][-1] == 'n': columns[i] = np.array(list(map(self.float_or_nan,columns[i]))) elif specs[i][-1] == 'd': # dates get mapped to days since AD 0 columns[i] = np.array(list(map(self.parse_date,columns[i]))) elif specs[i][-1] == 's': columns[i] = np.array(columns[i],dtype=object) # check for single-valued lists - # The logic here is a bit odd - since we often get columns # that have the same value (e.g. station_id) for every record, # it's convenient to detect that and replace them with a constant. # however, if there is just one record, it screws up code that # isn't expecting a single value... if self.record_count > 1: try: for elt in columns[i]: if elt != columns[i][0]: raise StopIteration columns[i] = columns[i][0] except StopIteration: pass # merge the columns into a hash self.compiled={} for header,column in zip(headers,columns): # main entry for given header title self.compiled[header] = column # other entries for syn in dd_to_synonyms(header): self.compiled[syn] = column
# Dict interface: def __getitem__(self,key): if type(key) == tuple: # split into label and index if type(key[0]) == str: label,idx = key else: idx,label = key val = self.compiled[label] if type(val) in (list,ndarray): val = val[idx] return val else: return self.compiled[key]
[docs] def keys(self): return list(self.compiled.keys())
# Automatic removal of missing data:
[docs] def series(self,*keys): """ return a tuple of vectors for the given keys, but only when all values have valid data """ columns = [self.compiled[key] for key in keys] # create a mask for each: masks = [1-isnan(column) for column in columns] valid = reduce(lambda x,y: x&y, masks) return [compress(valid,column) for column in columns]
[docs]def rdb_to_dataset(filename=None,text=None,to_utc=True): """ Read an rdb file and return an xarray dataset. if to_utc is set, look for a tz_cd attribute, and adjust times to UTC if tz_cd is present. If no data was found, return None """ if filename is not None: usgs_data=Rdb(source_file=filename) else: usgs_data=Rdb(text=text) if len(usgs_data['datetime'])==0: return None # Convert that xarray for consistency ds=xr.Dataset() ds['time']=( ('time',), utils.to_dt64(usgs_data['datetime']) ) ds['datenum']=( ('time',), usgs_data['datetime']) ds.attrs['preamble']=usgs_data.preamble for key in usgs_data.keys(): if key=='datetime': continue data=usgs_data[key] # attempt to find better name for the columns varname=key # default to original name # first number is timeseries code, second is parameter, # and third, optional, is statistic code m=re.match(r'(\d+)_(\d+)(_(\d+))?(_cd)?$',key) meta={} parameter=None if m: meta['ts_code']=m.group(1) meta['parm_code']=m.group(2) if m.group(4): meta['stat_code']=m.group(4) # print("ts: %s parm: %s stat: %s"%(meta['ts_code'], # meta['parm_code'], # meta['stat_code'])) parameter=rdb_codes.parm_code_lookup(meta['parm_code']) if parameter is not None: # parm_nm is often really long! # srsname, when present, is shorter # not great -- srsname is sometimes misleading # like 'stream_flow_mean_daily' when really it's instantaneous # tidal flow. srsname=parameter['srsname'] varname=None if srsname: # sometimes this is nan, though! try: varname=srsname.lower() # force string-like check except AttributeError: pass if varname is None: varname=parameter['parameter_nm'] varname=varname.lower().replace(' ','_').replace(',','').replace('.','') meta['units']=parameter['parameter_units'] # But possible that one station has multiple instances of the same # parameter. In this case, count=0 base_varname=varname while varname in ds: count+=1 varname=base_varname+"_%02d"%count if m.group(4): statistic=rdb_codes.stat_code_lookup(meta['stat_code']) if statistic is not None: meta['statistic']=statistic['name'] if m.group(5) is not None: # TODO: save QA codes continue if (not isinstance(data, np.ndarray)) and (parameter is not None): # In the past, if it had no dimension, I assumed it was # an attribute. But maybe it's better to see whether # it was found as a parameter. In that case, it's # probably a real data point but was collapsed by # the rdb code because it had no variation. # Depending, this may need to be smarter about datatype data=data*np.ones(ds.dims['time']) if isinstance(data, np.ndarray): if len(data)==len(ds.time): ds[varname] = ('time',), data else: print("What to do with %s"%key) for k in meta: ds[varname].attrs[k]=meta[k] else: # probably should be an attribute ds.attrs[varname]=data # if there is a tz_cd attribute, use that to get timestamps # back to UTC. if 'tz_cd' in usgs_data.keys() and to_utc: # tz_cd, in a sample size of 1, is something like PST. tz_target=pytz.utc #if timezone changes, tz_src is an array of strings def tz_to_offset(tz_src): if tz_src == 'PST': return -8 elif tz_src=='PDT': return -7 elif tz_src=='EST': return -5 elif tz_src=='EDT': return -4 else: raise Exception("Not sure how to interpret time zone %s"%tz_src) if isinstance(usgs_data['tz_cd'],np.ndarray): offset_hours=np.array([tz_to_offset(tz_src) for tz_src in usgs_data['tz_cd']]) ds.attrs['tz_cd_original']=",".join( np.unique(ds.tz_cd) ) else: offset_hours=tz_to_offset(usgs_data['tz_cd']) ds.attrs['tz_cd_original']=ds.tz_cd tz_src=usgs_data['tz_cd'] ds['time'].values -= offset_hours * np.timedelta64(1,'h') ds['datenum'].values -= offset_hours/24. ds.attrs['tz_cd']='UTC' return ds