#!/usr/bin/env python
# --------------------------------------------------------
# Distribution statement A. Approved for public release.
# Distribution is unlimited.
# This work was supported by the Office of Naval Research.
# --------------------------------------------------------
"""This library contains components for PyVALION software.
"""
import datetime
import netCDF4
import os
import pickle
import re
from siphon.catalog import TDSCatalog
import subprocess
from tqdm import tqdm
import numpy as np
import pandas as pd
import PyIRI
import PyVALION
from PyVALION import logger
from PyIRI.main_library import adjust_longitude as adjust_lon
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def download_GIRO_parameters(time_start,
time_finish,
ion_name,
data_save_dir,
save_dir,
name_run,
clean_directory=True,
filter_CS=90):
"""Retrieve GIRO ionospheric parameters from fromlgdc.uml.edu.
Parameters
----------
time_start : datetime.datetime
Start date and time for the validation period
time_finish : datetime.datetime
End date and time for the validation period
ion_name : array-like
String arrays of GIRO ionosondes names
data_save_dir : str
Directory where to save the downloaded files
save_dir : str
Directory where to save the downloaded files
name_run : str
String to add to the name of the files for saved results
clean_directory : bool
If set to True the downloaded .txt ionosonde file will be deleted
(default=True)
filter_CS : float
Minimum accepted Autoscaling Confidence Score. Expects values from 0 to
100, with flag values of 999 for manual scaling and -1 if unknown.
(default=90)
Returns
-------
data_all : dict
Dictionary with all the data combined.
"""
PyVALION.logger.info(''.join(['Downloading data from GIRO for: ',
time_start.strftime('%Y%m%dT%H%MZ'), '-',
time_finish.strftime('%Y%m%dT%H%MZ')]))
output_flag = np.empty((ion_name.size), dtype=bool)
# Open a file that has the names and locations of all GIRO stations
# This is important because the user might want to reduce the number
# of the ionosondes for the validation
file_ion_name = os.path.join(PyVALION.giro_names_dir, 'GIRO_Ionosondes.p')
with open(file_ion_name, 'rb') as fopen:
giro_name = pickle.load(fopen)
# Download parameter for each ionosonde for the duration of the val period
for iname in range(0, ion_name.size):
ionosonde = ion_name[iname]
# Index of the ionosonde in GIRO table
ind_giro = np.where(giro_name['name'] == ionosonde)[0]
# Save downloaded file as
file_name_str = (ionosonde
+ '_' + time_start.strftime('%Y%m%dT%H%MZ')
+ '_' + time_finish.strftime('%Y%m%dT%H%MZ'))
output_file_txt = os.path.join(data_save_dir,
file_name_str + '.txt')
output_file_pic = os.path.join(data_save_dir,
file_name_str + '.p')
# String for wget in GIRO-desired format
url = ''.join((
"https://lgdc.uml.edu/common/DIDBGetValues?ursiCode=", ionosonde,
"&charName=foF2,foF1,hmF2,hmF1,B0,B1&fromDate=",
time_start.strftime('%Y/%m/%d+%H:%M:%S'), "&toDate=",
time_finish.strftime('%Y/%m/%d+%H:%M:%S')))
# Run wget
subprocess.run(["wget", "-O", output_file_txt, url, '-q'])
# Empty arrays to concatenate
adtime = np.empty((0), dtype=datetime.datetime)
ascore = np.empty((0), dtype=float)
afof2 = np.empty((0), dtype=float)
ahmf2 = np.empty((0), dtype=float)
aB0 = np.empty((0), dtype=float)
aB1 = np.empty((0), dtype=float)
alon = np.empty((0), dtype=float)
alat = np.empty((0), dtype=float)
acode = np.empty((0), dtype=object)
# Open downloaded .txt GIRO file and read it
with open(output_file_txt, 'r') as file:
lines = file.readlines()
str_arr = np.array(lines[25:-1])
for line in str_arr:
line_arr = line.split()
if len(line_arr) == 14:
stamp, score, fof2, hmf2, B0, B1 = read_GIRO_line(line_arr)
# Concatenate variables into arrays
adtime = np.concatenate((adtime, stamp), axis=None)
ascore = np.concatenate((ascore, score), axis=None)
afof2 = np.concatenate((afof2, fof2), axis=None)
ahmf2 = np.concatenate((ahmf2, hmf2), axis=None)
aB0 = np.concatenate((aB0, B0), axis=None)
aB1 = np.concatenate((aB1, B1), axis=None)
alon = np.concatenate((alon, giro_name['lon'][ind_giro]),
axis=None)
alat = np.concatenate((alat, giro_name['lat'][ind_giro]),
axis=None)
acode = np.concatenate((acode, giro_name['name'][ind_giro]),
axis=None)
# Combine parameters to a dictionary
data = {'dtime': adtime, 'score': ascore, 'fof2': afof2, 'hmf2': ahmf2,
'B0': aB0, 'B1': aB1, 'lon': alon, 'lat': alat, 'name': acode}
# If there are elements in adtime array pickle the output
if adtime.size > 0:
# Pickle file for each ionosonde
with open(output_file_pic, 'wb') as fclose:
pickle.dump(data, fclose)
output_flag[iname] = True
else:
output_flag[iname] = False
# Delete the .txt file if clean_directory is True
if clean_directory:
# Remove the .txt file
os.remove(output_file_txt)
# output_flag is an array of bool format, has True if data was present,
# False if not.
# The size is the same as the given array ion_name.
# Create new dictionary to store filtered data
empty_flt = np.empty((0), dtype=float)
empty_str = np.empty((0), dtype=str)
giro_name_good = {'name': empty_str, 'city': empty_str, 'lat': empty_flt,
'lon': empty_flt}
# Filter out data that was not downloaded
for key in giro_name:
giro_name_good[key] = giro_name[key][output_flag]
# Combine and filter out data using CS score
data_filtered = filter_GIRO_parameters(time_start,
time_finish,
giro_name_good['name'],
data_save_dir,
filter_CS=filter_CS)
# Pickle file
output_file_str = ('Filtered_independent_ionosondes_' + name_run + '.p')
write_file = os.path.join(save_dir, output_file_str)
with open(write_file, 'wb') as fwrite:
pickle.dump(data_filtered, fwrite)
return data_filtered
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def read_GIRO_line(line_arr):
"""Read 14-element line from GIRO ionosonde file.
Parameters
----------
line_arr : str
String from the GIRO file.
Returns
-------
stamp : datetime.datetime
Datetime stamp from the line.
score : float
Confidence score CS.
fof2 : float
Critical frequency of F2 layer in MHz.
hmf2 : float
Height of the F2 layer in km.
B1 : float
Bottom-side thickness of F2 layer in km.
B0 : float
Bottom-side shape parameter unitless.
"""
try:
data_stamp = '%Y-%m-%dT%H:%M:%S.000Z'
stamp = datetime.datetime.strptime(line_arr[0], data_stamp)
except Exception as err:
stamp = np.nan
logger.error(''.join(['Invalid datetime format in line_arr[0], ',
'failed with Exception: ', str(err)]))
score = safe_float(line_arr, 1, 'score')
fof2 = safe_float(line_arr, 2, 'fof2')
hmf2 = safe_float(line_arr, 6, 'hmf2')
B0 = safe_float(line_arr, 10, 'B0')
B1 = safe_float(line_arr, 12, 'B1')
return stamp, score, fof2, hmf2, B0, B1
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def safe_float(line_arr, index, label):
"""Check if element is float.
Parameters
----------
line_arr : str
String from the GIRO file.
index : ind
Index of the array to check.
label : str
Label of the array to print out.
Returns
-------
res : flt
If given element is can be converted to float return it,
if not, returns nan.
"""
try:
res = float(line_arr[index])
except (ValueError, TypeError):
logger.error(f'{label} is not a valid float')
res = np.nan
return res
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def make_empty_dict_data():
"""Make empty dictionary to collect GIRO data.
Returns
-------
data : dict
Dictionary with empty elements.
"""
empty = np.empty((0))
data = {'dtime': empty, 'score': empty,
'fof2': empty, 'hmf2': empty,
'B0': empty, 'B1': empty,
'lon': empty, 'lat': empty, 'name': empty}
return data
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def filter_GIRO_parameters(time_start,
time_finish,
ion_name,
save_dir,
filter_CS=90):
"""Download data from all GIRO ionosondes for the whole validation period.
Parameters
----------
time_start : datetime.datetime
Start date and time for the validation period
time_finish : datetime.datetime
End date and time for the validation period
ion_name : array-like
String arrays of GIRO ionosondes names
save_dir : str
Directory where the downloaded files are
filter_CS : float
Minimum accepted Autoscaling Confidence Score. Expects values from 0 to
100, with flag values of 999 for manual scaling and -1 if unknown.
(default=90)
Returns
-------
data_all : dict
Dictionary with all the data combined.
"""
data_all = make_empty_dict_data()
for iion in range(0, ion_name.size):
ionosonde = ion_name[iion]
# Save downloaded file as
file_name_str = (ionosonde
+ '_'
+ time_start.strftime('%Y%m%dT%H%MZ')
+ '_'
+ time_finish.strftime('%Y%m%dT%H%MZ') + '.p')
input_file_pic = os.path.join(save_dir, file_name_str)
with open(input_file_pic, 'rb') as fin:
data = pickle.load(fin)
# Filter out nans in hmf2
data_new = make_empty_dict_data()
a = np.where(np.isfinite(data['hmf2']) == 1)[0]
for key in data_new:
data_new[key] = data[key][a]
# Filter out nans in fof2
data_new2 = make_empty_dict_data()
a = np.where(np.isfinite(data_new['fof2']) == 1)[0]
for key in data_new2:
data_new2[key] = data_new[key][a]
# Filter out small scores (below filter_CS)
data_new3 = make_empty_dict_data()
a = np.where(data_new2['score'] > filter_CS)[0]
for key in data_new3:
data_new3[key] = data_new2[key][a]
# Filter out small-size files (< 10 data points)
if data_new3['fof2'].size > 10:
for key in data_all:
data_all[key] = np.concatenate((data_all[key],
data_new3[key]),
axis=None)
# Convert datetime to datetime in array form using pandas
data_all['dtime'] = pd.to_datetime(data_all['dtime'])
return data_all
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def freq2den(freq):
"""Convert ionospheric frequency to plasma density.
Parameters
----------
freq : array-like
ionospheric freqeuncy in MHz.
Returns
-------
dens : array-like
plasma density in m-3.
Notes
-----
This function converts ionospheric frequency to plasma density.
"""
dens = 1.24e10 * (freq)**2
return dens
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def nearest_element(array, value):
"""Locate nearest element in array.
Parameters
----------
array : array-like
Given array.
valeu : flt
Given value.
Returns
-------
smallest_difference_index : int
Index of the element.
"""
absolute_val_array = np.abs(array - value)
smallest_difference_index = absolute_val_array.argmin()
return smallest_difference_index
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def find_G_and_y(adtime,
alon, alat,
data,
save_dir,
name_run,
interp_method='bilinear',
pickle_outputs=True):
"""Create geometry matrix G and observation vector.
Parameters
----------
adtime : datetime.datetime
Array of time
alon : array-like
Model array of longitude in degrees (from -180 to 180)
alat : array-like
Model array of latitudes in degrees (from -90 to 90)
data : dict
Dictionary of GIRO data produced by filter_GIRO_parameters.
save_dir : str
Directory where to save the downloaded files
name_run : str
String to add to the name of the files for saved results
interp_method : str, optional
Method for interpolating model data to observation points. Options are
'bilinear' (default) and 'nearest'. 'bilinear' performs bilinear
interpolation using the four nearest grid points, while 'nearest'
assigns the value of the nearest model grid point to each observation.
pickle_outputs : bool
It true the outputs are also pickled in the root directory, default
is True.
Returns
-------
y : dict
Dictionary that contains observation vectors:
ay_nmf2 : array-like
Array of GIRO NmF2 parameter in m-3.
ay_hmf2 : array-like
Array of GIRO hmF2 parameter in km.
ay_B0 : array-like
Array of GIRO B0 parameter in km.
ay_B1 : array-like
Array of GIRO B1 parameter, unitless.
ay_lon : array-like
Array of observation longitude in degrees.
ay_lat : array-like
Array of observation latitudes in degrees.
ay_name : array-like
Array of ionosonde names.
ay_time : array-like
Array of observation time.
units : dict
Dictionary with strings of units for the keys in dict y
G : array-like
Geometry matrix (N_obs, N_time, N_lat, N_lon)
ion_info : dict
Dictionary with ionosonde information
lon : array-like
Array of longitudes for the stations in degrees.
ay_lat : array-like
Array of latitudes latitudes for the stations in degrees.
name : array-like
Array of names of the stations.
"""
# Look for 15/2 min data around
adtime0 = adtime - datetime.timedelta(minutes=15) / 2.
adtime1 = adtime + datetime.timedelta(minutes=15) / 2.
# Empty arrays for concatenation
ay_hmf2 = np.empty((0))
ay_fof2 = np.empty((0))
ay_B0 = np.empty((0))
ay_B1 = np.empty((0))
ay_lon = np.empty((0))
ay_lat = np.empty((0))
ay_time = np.empty((0))
ay_name = np.empty((0))
# Cycle through the array of time frames in a selected day
PyVALION.logger.info('Forming observation vectors:')
for it in range(0, adtime.size):
a = np.where((data['dtime'] >= adtime0[it])
& (data['dtime'] < adtime1[it]))[0]
# Unique ionosondes for this time frame
un_ion = np.unique(data['name'][a])
# Cycle through unique ions in the selected time period
for iion in range(0, un_ion.size):
b = np.where(data['name'][a] == un_ion[iion])[0]
# If there is data populate arrays with mean ionosonde obs
if data['dtime'][a[b]].size > 0:
mean_fof2 = np.nanmean(data['fof2'][a[b]])
mean_hmf2 = np.nanmean(data['hmf2'][a[b]])
mean_B0 = np.nanmean(data['B0'][a[b]])
mean_B1 = np.nanmean(data['B1'][a[b]])
# Concatenate obs
ay_hmf2 = np.concatenate((ay_hmf2, mean_hmf2), axis=None)
ay_fof2 = np.concatenate((ay_fof2, mean_fof2), axis=None)
ay_B0 = np.concatenate((ay_B0, mean_B0), axis=None)
ay_B1 = np.concatenate((ay_B1, mean_B1), axis=None)
# Concatenate position and time arrays
ion_lon = adjust_lon(np.unique(data['lon'][a[b]])[0], 'to180')
ion_lat = np.unique(data['lat'][a[b]])[0]
ay_lon = np.concatenate((ay_lon, ion_lon), axis=None)
ay_lat = np.concatenate((ay_lat, ion_lat), axis=None)
ay_name = np.concatenate((ay_name, un_ion[iion]), axis=None)
ay_time = np.concatenate((ay_time, adtime[it]), axis=None)
# Convert frequency to density in cm-3
ay_nmf2 = PyVALION.library.freq2den(ay_fof2)
PyVALION.logger.info('Observation vector y has N_obs = ', ay_nmf2.size)
# Make array for geometry matrix
G = np.zeros((ay_name.size, adtime.size, alat.size, alon.size))
PyVALION.logger.info('Forming G:')
# Loop through all observations
for iob in range(0, ay_name.size):
it = np.where(adtime == ay_time[iob])[0][0]
# Update G matrix (bilinear interpolation)
if interp_method == 'bilinear':
# Perform bilinear interpolation to find the weights for the
# fournearest grid points
weights, ind_lat, ind_lon = bilinear_weights(alat, alon,
ay_lat[iob],
ay_lon[iob])
# Update G matrix with the weights for the four nearest obs
for w, ind_lat, ind_lon in zip(weights, ind_lat, ind_lon):
G[iob, it, ind_lat, ind_lon] = w
# Update G matrix (nearest neighbor)
elif interp_method == 'nearest':
# Select nearest model grid element to obs lat/lon
ind_lon = PyVALION.library.nearest_element(alon, ay_lon[iob])
ind_lat = PyVALION.library.nearest_element(alat, ay_lat[iob])
G[iob, it, ind_lat, ind_lon] = 1.
# Raise error if invalid interpolation method is provided
else:
raise ValueError(
"Invalid interpolation method: "
f"{interp_method}. Choose 'bilinear' or 'nearest'.")
PyVALION.logger.info('G has shape [N_obs, N_time, N_lat, N_lon] = ',
G.shape)
# Write observation vectors to the dictionary
y = {'fof2': ay_fof2, 'NmF2': ay_nmf2, 'hmF2': ay_hmf2, 'B0': ay_B0,
'B1': ay_B1, 'lon': ay_lon, 'lat': ay_lat,
'time': ay_time, 'name': ay_name}
units = {'fof2': 'Hz', 'NmF2': 'm$^{-3}$', 'hmF2': 'km', 'B0': 'km',
'B1': 'unitless', 'lon': '°', 'lat': '°',
'time': 'datetime object', 'name': 'unitless'}
# Save information about ionosonde stations
un_names = np.unique(y['name'])
# Open GIRO information file to pull lon and lat for these stations
file_ion_name = os.path.join(PyVALION.giro_names_dir, 'GIRO_Ionosondes.p')
with open(file_ion_name, 'rb') as fopen:
giro_name = pickle.load(fopen)
# Find elements from GIRO array of names as in un_names array
a = np.where(np.isin(giro_name['name'], un_names))[0]
un_lon = giro_name['lon'][a]
un_lat = giro_name['lat'][a]
ion_info = {'name': un_names, 'lon': un_lon, 'lat': un_lat}
return y, units, G, ion_info
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def find_model_data(field, G):
"""Find model data for the given field and forward operator.
Parameters
----------
field : array-like
2-D filed of a model parameter in shape [N_time, N_lat, N_lon].
G : array-like
Forward operator matrix for the ionosonde data in shape [N_obs, N_time,
N_lat, N_lon].
Returns
-------
model_data : array-like
Array of model data, the expected data according to the model.
"""
# Check that G is compatible with field
if (field.shape[0: 2] == G.shape[1:3]):
# Before G and filed can be multiplied, we need to reshape them into
# [N_obs, N_filed] and [N_field] where N_field is a size of a flattened
# array that combines time and horizontal grid dimmentions
nht = G.shape[1] * G.shape[2] * G.shape[3]
G_reshaped = np.reshape(G, (G.shape[0], nht))
field_reshaped = np.reshape(field, (nht))
# Replace nans with zeros (NIMO1 forecast has nans)
field_reshaped = np.nan_to_num(field_reshaped)
# Multiply G matrix and model field matrix
model_data = np.matmul(G_reshaped, field_reshaped)
else:
flag = 'Error: G and filed are not compatable.'
logger.error(flag)
return model_data
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def find_residuals(model, G, obs_data, obs_info, units):
"""Find model data for the given field and forward operator.
Parameters
----------
model : dict
Dictionary with model parameters in shape [N_time, N_lat, N_lon].
G : array-like
Forward operator matrix for the ionosonde data in shape [N_obs, N_time,
N_lat, N_lon].
obs_data : dict
Dictionary that contains observation vectors:
ay_nmf2 : array-like
Array of GIRO NmF2 parameter in m-3.
ay_hmf2 : array-like
Array of GIRO hmF2 parameter in km.
ay_B0 : array-like
Array of GIRO B0 parameter in km.
ay_B1 : array-like
Array of GIRO B1 parameter, unitless.
ay_lon : array-like
Array of observation longitude in degrees.
ay_lat : array-like
Array of observation latitudes in degrees.
ay_name : array-like
Array of ionosonde names.
ay_time : array-like
Array of observation time.
obs_info : dict
Dictionary with ionosonde information
lon : array-like
Array of longitudes for the stations in degrees.
ay_lat : array-like
Array of latitudes latitudes for the stations in degrees.
name : array-like
Array of names of the stations.
units : dict
Dictionary with strings of units for the keys in dict y
Returns
-------
model_data : dict
Array of model data, the expected data according to the model.
residuals : dict
Array of residuals.
model_units : dict
Dictionary with strings of units for the keys in dict model_data
res_ion : dict
Array of mean residuals for each ionosonde station.
"""
# The dictionary model_data will be used to store model predictions at
# observation points
model_data = {}
# The dictionary residuals will store the differences between the observed
# data and the model predictions
residuals = {}
# The dictionary model_units will store the units (as strings) for each key
# in the model_data dictionary
model_units = {}
# Loop through all parameters in the model dictionary to extract model data
# at observation points
for key in model:
model_data[key] = PyVALION.library.find_model_data(model[key], G)
residuals[key] = obs_data[key] - model_data[key]
model_units[key] = units[key]
# Create dictionary to save individual mean residuals for each ionosonde
res_ion = dict.fromkeys(residuals)
# Loop through all parameters in the model dictionary
for key in residuals:
# Make an array with the size of the number of ionosondes
res_ion[key] = np.zeros((obs_info['name'].size))
# Loop through all ionosondes
for i in range(0, obs_info['name'].size):
# Find mean residuals for each ionosonde
a = np.where(obs_data['name'] == obs_info['name'][i])[0]
res_ion[key][i] = np.nanmean(residuals[key][a])
return model_data, residuals, model_units, res_ion
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def download_Jason_TEC(time_start,
time_finish,
save_dir,
name_run='',
save_data_option=False,
include_neg=True,
sat_names=np.array(["JA2", "JA3"]),
jason_manifest_filename="jason_manifest.txt"):
"""Retrieve Jason ionospheric TEC from from www.ncei.noaa.gov THREDDS.
Parameters
----------
time_start : datetime.datetime
Start date and time for the validation period.
time_finish : datetime.datetime
End date and time for the validation period.
save_dir : str
Directory where to save the downloaded data and where Jason file
manifest is stored.
name_run : str
String to add to the name of the files for saved results. Defaults to
empty string.
save_data_option : bool
Option to save data as a pickle (.p) file into save_dir. Defaults to
False.
include_neg : bool
Option to include negative TEC values in the output. Defaults to True.
sat_names : array-like
String arrays of Jason satellite names ("JA2" and/or "JA3").
jason_manifest_filename : str
String to designate an alternative Jaosn file manifest filename.
Returns
-------
data_all : dict
Dictionary with all the data combined.
"""
# Create or update Jason file manifest
jason_manifest_path = os.path.join(save_dir, jason_manifest_filename)
create_or_update_manifest(jason_manifest_path)
# Cross-reference user-inputted dates with file manifest
matching_urls = filter_urls_by_timerange(jason_manifest_path, time_start,
time_finish, sat_names)
# Initialize data dictionary
data_all = make_empty_dict_data_jason()
# Loop and read through all matching urls
# for row in matching_urls:
PyVALION.logger.info(f"\nProcessing Jason TEC data from"
f" {len(matching_urls)} files.")
for row in tqdm(matching_urls, desc="Jason file progress",
unit="file"):
url = row[0]
# Perform different file reading routine for Jason-2 and Jason-2 files
if "JA3" in url and any(sat_names == "JA3"):
# data_file = read_jason3_file(url)
data_file = read_jason3_file(url, include_neg=include_neg)
elif "JA2" in url and any(sat_names == "JA2"):
data_file = read_jason2_file(url, include_neg=include_neg)
# Concatenate all fields in the dictionary
# data_all = {k: np.concatenate([data_all[k], data_file[k]])
# for k in data_all}
data_all = concat_data_dicts(data_all, data_file)
# Convert dtime type
data_all['dtime'] = pd.to_datetime(data_all['dtime'])
# Indices for data within timespan
t_match = ((data_all['dtime'] >= time_start)
& (data_all['dtime'] <= time_finish))
# Clear out data not within time spec
data_all = mask_dict(data_all, t_match)
if save_data_option:
# Jason raw data filename
file_str_raw = ('Jason_TEC_raw_' + name_run + '.p')
file_path_raw = os.path.join(save_dir, file_str_raw)
PyVALION.logger.info(f"Saving Raw Jason TEC data to: "
f"'{file_path_raw}'")
pickle.dump(data_all, open(file_path_raw, "wb"))
return data_all
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def create_or_update_manifest(output_file):
"""Create a new manifest or update an existing one.
Checks if the manifest file exists. If not, creates a new manifest;
otherwise, updates it with new files from recent cycles.
Parameters
----------
output_file : str
Path to the manifest file to create or update.
"""
if not os.path.exists(output_file):
create_manifest(output_file)
else:
update_manifest(output_file)
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def create_manifest(output_file):
"""Create a new manifest of all Jason-2 and Jason-3 .nc files.
Scans all THREDDS catalog URLs, collects netCDF file URLs, deduplicates
by filename, and saves the sorted list to the specified manifest file.
Parameters
----------
output_file : str
Path to the manifest file to create.
"""
# Print message to user
PyVALION.logger.info(f"Creating Jason data file manifest as "
f"'{output_file}'.")
# Jason-2 and Jason-3 catalog URLs (THREDDS):
catalog_urls = [
"https://www.ncei.noaa.gov/thredds-ocean/catalog/jason2/gdr/gdr/"
"catalog.xml",
"https://www.ncei.noaa.gov/thredds-ocean/catalog/jason3/gdr/gdr/"
"catalog.xml",
"https://www.ncei.noaa.gov/thredds-ocean/catalog/jason3/gdr_f/gdr/"
"catalog.xml"
]
PyVALION.logger.info("Counting total cycle catalogs across all Jason-2 "
"and Jason-3 sources...")
total_cycles = sum(count_cycle_subcatalogs(url) for url in
catalog_urls)
PyVALION.logger.info(f"Total of {total_cycles} cycle catalogs will "
"be scanned.")
all_nc_urls = []
# Use progress bar for combined processing
with tqdm(total=total_cycles, desc="Processing cycles") as progress:
for url in catalog_urls:
all_nc_urls.extend(list_nc_files(url, progress=progress))
# Deduplicate by filename
seen = {}
for url in all_nc_urls:
fname = os.path.basename(url)
if (fname not in seen) and "JA2_GPN_2Pf" not in fname:
seen[fname] = url # Save only first occurrence
# Sort by cycle number
sorted_urls = sorted(seen.values(), key=extract_first_date)
with open(output_file, "w") as f:
for url in sorted_urls:
f.write(url + "\n")
PyVALION.logger.info(f"{len(sorted_urls)} unique .nc URLs saved "
f"to '{output_file}'")
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def update_manifest(output_file):
"""Update a manifest file by appending new .nc files from newer cycles.
Scans 'cycleXXX' folders with cycle numbers equal to or higher than the
last entry and appends only files with timestamps newer than the most
recent entry in the manifest.
Parameters
----------
output_file : str
Path to the manifest file to update.
"""
# Print message to user
PyVALION.logger.info(f"File '{output_file}' already exists. "
"Checking for updates.")
# Load existing URLs
with open(output_file, "r") as f:
existing_urls = [line.strip() for line in f if line.strip()]
if not existing_urls:
PyVALION.logger.info("Jason file manifest is empty. "
"Creating new manifest")
create_manifest(output_file)
return
# Get cycle number and timestamp from last URL in manifest
last_url = existing_urls[-1]
last_cycle = extract_cycle_number(last_url)
last_timestamp = extract_first_date(last_url)
# Check that the final file is not a Jason-2 entry
if "JA2" in last_url:
PyVALION.logger.info("WARNING: Last listed file is a Jason-2 entry. "
"Rebuilding manifest...")
create_manifest(output_file)
return
# Hardcoded threshold datetime fir Jason-2 file end
# (do not update Jason-2 manifest listings)
threshold_str = "20191001_065045"
threshold_dt = datetime.datetime.strptime(threshold_str, "%Y%m%d_%H%M%S")
if last_timestamp < threshold_dt:
PyVALION.logger.info(f"WARNING: Last listed file timestamp "
f"{last_timestamp} is older than end of Jason-2"
" record. Rebuilding manifest...")
create_manifest(output_file)
return
PyVALION.logger.info(f"Last recorded data: cycle: {last_cycle}, "
f"timestamp: {last_timestamp}")
# Jason-3 THREDDS catalog URLs
catalog_urls = [
"https://www.ncei.noaa.gov/thredds-ocean/catalog/jason3/gdr/gdr/"
"catalog.xml",
"https://www.ncei.noaa.gov/thredds-ocean/catalog/jason3/gdr_f/gdr/"
"catalog.xml"
]
all_new_urls = []
all_cycles = []
# Collect all newer cycle references first
for root_url in catalog_urls:
try:
cat = TDSCatalog(root_url)
for ref in cat.catalog_refs.values():
if "cycle" in ref.title.lower():
cycle_num = extract_cycle_number(ref.title)
if cycle_num >= last_cycle:
all_cycles.append((cycle_num, ref.href))
except Exception as e:
PyVALION.logger.info(f"Failed to access {root_url}: {e}")
PyVALION.logger.info(f"{len(all_cycles)} total cycles will be scanned "
f"up to/including cycle {last_cycle})")
# Process each cycle with progress bar
with tqdm(total=len(all_cycles), desc="Processing cycles") as progress:
for cycle_num, cycle_url in sorted(all_cycles):
found_urls = list_nc_files(cycle_url)
for url in found_urls:
timestamp = extract_first_date(url)
if timestamp > last_timestamp:
all_new_urls.append(url)
progress.update(1)
# Deduplicate by filename
seen_filenames = set(os.path.basename(u) for u in existing_urls)
new_unique_urls = []
for url in all_new_urls:
fname = os.path.basename(url)
if fname not in seen_filenames:
new_unique_urls.append(url)
seen_filenames.add(fname)
# Sort by extracted date
new_unique_urls.sort(key=extract_first_date)
# Append to manifest
if new_unique_urls:
with open(output_file, "a") as f:
for url in new_unique_urls:
f.write(url + "\n")
PyVALION.logger.info(f"Added {len(new_unique_urls)} new entries "
"to manifest.")
else:
PyVALION.logger.info("No new files to add. Manifest is up to date.")
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def count_cycle_subcatalogs(url):
"""Count the number of cycle sub-catalogs in a THREDDS catalog.
Parameters
----------
url : str
Base catalog url from www.ncei.noaa.gov THREDDS containing "cycle"
sub-catalogs.
Returns
-------
cycle_num : int
Total number of cycle sub-catalogs in base url.
"""
try:
cat = TDSCatalog(url)
return sum("cycle" in ref.title.lower() for ref in
cat.catalog_refs.values())
except Exception:
return 0
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def list_nc_files(url, root_base=None, progress=None):
"""Recursively list netCDF files in a THREDDS catalog.
This function traverses a THREDDS catalog at the given URL and returns
all `.nc` file URLs served via the HTTPServer interface. If a `root_base`
is provided, it is used as the base URL for constructing full paths. An
optional `progress` object can be updated for each subcatalog processed.
Parameters
----------
url : str
URL of the THREDDS catalog (ending with `catalog.xml`).
root_base : str, optional
Base URL to use when constructing full dataset URLs. If None, the
base is inferred from `url`.
progress : object, optional
Progress indicator with an `update(n)` method (e.g., a tqdm instance)
to track progress while iterating through subcatalogs.
Returns
-------
nc_urls : list of str
List of `.nc` file URLs accessible via HTTPServer (converted to DODS/C
URLs for subsequent downloads).
"""
nc_urls = []
try:
cat = TDSCatalog(url)
if root_base is None:
root_base = url.rsplit('catalog.xml', 1)[0]
for _, ds in cat.datasets.items():
if 'HTTPServer' in ds.access_urls:
full_url = ds.access_urls['HTTPServer']
# Specify dodsC for subsequent downloads
full_url = full_url.replace("/fileServer/", "/dodsC/")
nc_urls.append(full_url)
for subcat_ref in cat.catalog_refs.values():
if "cycle" in subcat_ref.title.lower():
nc_urls.extend(list_nc_files(subcat_ref.href, root_base,
progress))
if progress:
progress.update(1)
except Exception:
pass
return nc_urls
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def filter_urls_by_timerange(manifest_filepath,
user_start,
user_finish,
sat_names):
"""Filter list of URLs by user timespan.
Reads a .txt file of URLs and returns only those where the file's time
range overlaps with the user-specified datetime window.
Parameters
----------
manifest_filepath : str
Path to Jason file manifest (.txt file containing URLs).
user_start : datetime.datetime
Start date and time for the validation period.
user_finish : datetime.datetime
End date and time for the validation period.
sat_names : np.ndarray
Array of satellite name substrings to include (e.g., "JA2", "JA3").
Returns
-------
matching_urls : list of tuples
Each tuple contains (url, start_datetime, end_datetime) for files
that overlap with the specified time window.
"""
# Regex pattern to extract start and end datetimes from the filename
pattern = r'_(\d{8})_(\d{6})_(\d{8})_(\d{6})\.nc'
# Initialize list to hold matching URL entries
matching_urls = []
# Open and read the input file line by line
with open(manifest_filepath, 'r') as file:
# Print out progress statements to user
PyVALION.logger.info(f"\nFiltering for overlap with user-specified "
f"time window: {user_start} to {user_finish}.")
lines = file.readlines()
PyVALION.logger.info(f"Searching {len(lines)} URLs from manifest.")
for line in tqdm(lines, desc="Filtering URLs", unit="file"):
url = line.strip()
# Skip if the URL does not contain any of the desired sat names
if not any(sat in url for sat in sat_names):
continue
# Try to extract date info from the filename
match = re.search(pattern, url)
if match:
start_str = match.group(1) + match.group(2)
end_str = match.group(3) + match.group(4)
start_dt = datetime.datetime.strptime(start_str,
'%Y%m%d%H%M%S')
end_dt = datetime.datetime.strptime(end_str, '%Y%m%d%H%M%S')
# Include file if it overlaps with the specified time window
if end_dt >= user_start and start_dt <= user_finish:
matching_urls.append((url, start_dt, end_dt))
# Return the list of matching URLs and their associated datetimes
PyVALION.logger.info(f"Found {len(matching_urls)} matching files.")
return matching_urls
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def make_empty_dict_data_jason():
"""Make empty dictionary to collect Jason-2 and -3 data.
Returns
-------
data : dict
Dictionary with empty elements.
"""
empty = np.empty((0))
data = {'dtime': np.empty(0, dtype='datetime64[s]'),
'TEC': empty,
'lon': empty,
'lat': empty,
'name': np.empty(0, dtype='<U3')}
return data
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def read_jason2_file(j2url, include_neg=True):
"""Read in relevant data from Jason-2 OPeNDAP URL.
See OSTM/Jason-2 Products Handbook Section 4.2.5 for details.
Parameters
----------
j2url : str
Jason-2 OPeNDAP url.
include_neg : bool
Option to include negative TEC values in the output. Defaults to True.
Returns
-------
data_all : dict
Dictionary with all the data from a single file combined.
"""
# Open Jason-2 netCDF with OPeNDAP URL
j2file = netCDF4.Dataset(j2url, 'r')
# Read spatiotemporal variables
time = j2file.variables['time'][:]
time = time.filled(np.nan)
# Define the reference (epoch)
epoch = np.datetime64('2000-01-01T00:00:00')
time = (epoch + time.astype('timedelta64[s]'))
lon = j2file.variables['lon'][:]
lon = lon.filled(np.nan)
lon = adjust_lon(lon, 'to180')
lat = j2file.variables['lat'][:]
lat = lat.filled(np.nan)
# Read Jason-2 ku-band delay and fill missing
iono_ku = j2file.variables['iono_corr_alt_ku'][:]
iono_ku = iono_ku.filled(np.nan)
# Flags
# Surface type flag
surf_flag = j2file.variables['surface_type'][:]
surf_flag = surf_flag.filled(np.nan)
# Open sea ice flag
ice_flag = j2file.variables['ice_flag'][:]
ice_flag = ice_flag.filled(np.nan)
# RMS of the "ocean" altimeter range
rms_flag = j2file.variables['range_rms_ku'][:]
rms_flag = rms_flag.filled(np.nan)
# Number of valid points used to compute the “ocean” altimeter range
points_flag = j2file.variables['range_numval_ku'][:]
points_flag = points_flag.filled(np.nan)
# Only measurements collected a) over the ocean, b) not on ice, c) for
# which the range at 1 Hz was computed with enough valid range observations
# at 20 Hz (RMS below a threshold 0.2 and a number of 20 Hz observations
# above a threshold of 10) and d) for which the ionospheric correction
# could be computed using equation 1 (its values different than Default
# Value (DV)) are selected.
valid = (
(surf_flag == 0)
& (ice_flag == 0)
& (rms_flag > 0)
& (rms_flag < 0.2)
& (points_flag > 10)
)
iono_ku = iono_ku.astype(float)
iono_ku[~valid] = np.nan
# Filter data
iono_ku_filt, filt_flag = robust_iterative_filter(iono_ku)
iono_ku_filt[filt_flag] = np.nan
# Compute TEC from Jason-2 ku-band delay
tec = compute_jason_tec(iono_ku_filt)
j2_sat_tec_bias = -3.5
tec = tec + j2_sat_tec_bias
if not include_neg:
tec[tec < 0] = 0
# Initialize empy data dictionary
data_all = make_empty_dict_data_jason()
# Remove entries with nans
delInd = np.isnan(time) | np.isnan(lat) | np.isnan(lon) | np.isnan(tec)
time = np.delete(time, delInd)
lat = np.delete(lat, delInd)
lon = np.delete(lon, delInd)
tec = np.delete(tec, delInd)
# Fill data dictionary
N = len(lat)
data_all['dtime'] = time
data_all['lat'] = lat
data_all['lon'] = lon
data_all['TEC'] = tec
data_all['name'] = np.full(N, 'JA2', dtype='<U3')
return data_all
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def read_jason3_file(j3url, include_neg=True):
"""Read in relevant data from Jason-3 OPeNDAP URL.
See OSTM/Jason-3 Products Handbook for details.
Parameters
----------
j3url : str
Jason-3 OPeNDAP url.
include_neg : bool
Option to include negative TEC values in the output. Defaults to True.
Returns
-------
data_all : dict
Dictionary with all the data from a single file combined.
"""
# Open Jason-3 netCDF with OPeNDAP URL
j3file = netCDF4.Dataset(j3url, 'r')
# Read spatiotemporal variables
time = j3file.variables['data_01%2ftime'][:]
time = time.filled(np.nan)
# Define the reference (epoch)
epoch = np.datetime64('2000-01-01T00:00:00')
time = (epoch + time.astype('timedelta64[s]'))
lon = j3file.variables['data_01%2flongitude'][:]
lon = lon.filled(np.nan)
lon = adjust_lon(lon, 'to180')
lat = j3file.variables['data_01%2flatitude'][:]
lat = lat.filled(np.nan)
# Read Jason-3 ku-band delay and fill missing
iono_ku = j3file.variables['data_01%2fku%2fiono_cor_alt'][:]
iono_ku = iono_ku.filled(np.nan)
# Flags
# Surface type flag
surf_flag = j3file.variables['data_01%2fsurface_classification_flag'][:]
surf_flag = surf_flag.filled(np.nan)
# Open sea ice flag
ice_flag = j3file.variables['data_01%2fice_flag'][:]
ice_flag = ice_flag.filled(np.nan)
# RMS of the "ocean" altimeter range
rms_flag = j3file.variables['data_01%2fku%2frange_ocean_rms'][:]
rms_flag = rms_flag.filled(np.nan)
# Number of valid points used to compute the “ocean” altimeter range
points_flag = j3file.variables['data_01%2fku%2frange_ocean_numval'][:]
points_flag = points_flag.filled(np.nan)
# Only measurements collected a) over the ocean, b) not on ice, c) for
# which the range at 1 Hz was computed with enough valid range observations
# at 20 Hz (RMS below a threshold 0.2 and a number of 20 Hz observations
# above a threshold of 10) and d) for which the ionospheric correction
# could be computed using equation 1 (its values different than Default
# Value (DV)) are selected.
valid = (
(surf_flag == 0)
& (ice_flag == 0)
& (rms_flag > 0)
& (rms_flag < 0.2)
& (points_flag > 10)
)
iono_ku = iono_ku.astype(float)
iono_ku[~valid] = np.nan
# Filter data
iono_ku_filt, filt_flag = robust_iterative_filter(iono_ku)
iono_ku_filt[filt_flag] = np.nan
# Compute TEC from Jason-3 ku-band delay
tec = compute_jason_tec(iono_ku_filt)
j3_sat_tec_bias = -1
tec = tec + j3_sat_tec_bias
if not include_neg:
tec[tec < 0] = 0
# Initialize empy data dictionary
data_all = make_empty_dict_data_jason()
# Remove entries with nans
delInd = np.isnan(time) | np.isnan(lat) | np.isnan(lon) | np.isnan(tec)
time = np.delete(time, delInd)
lat = np.delete(lat, delInd)
lon = np.delete(lon, delInd)
tec = np.delete(tec, delInd)
# Fill Jason-3 data dictionary
N = len(lat)
data_all['dtime'] = time
data_all['lat'] = lat
data_all['lon'] = lon
data_all['TEC'] = tec
data_all['name'] = np.full(N, 'JA3', dtype='<U3')
return data_all
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def robust_iterative_filter(
data_raw,
INIT_SIGMA=3,
DATA_GAP_MAX=0,
MEDIAN_NB_PTS=30,
MEDIAN_NB_PTS_MIN=30,
MEDIAN_NB_PTS_MEAN=0,
LANCZOS_NB_PTS=50,
LANCZOS_NB_PTS_CUTOFF=50,
LANCZOS_NB_PTS_MIN=10,
CONVERGENCE_THRESHOLD=0
):
"""Robust iterative filtering to remove outliers from a 1D signal.
This function performs a multi-step iterative filtering procedure to
remove outliers from the input data. Steps include:
1. Initial global sigma-based outlier detection.
2. Optional linear interpolation of small gaps.
3. Median filtering.
4. Lanczos filtering.
5. Residual-based outlier flagging.
The iteration continues until convergence criteria are met.
Parameters
----------
data_raw : np.ndarray
1D array of raw input data.
Returns
-------
data_lanczos : np.ndarray
Filtered 1D array after robust iterative filtering.
outlier_mask : np.ndarray of bool
Boolean mask indicating which points were flagged as outliers.
"""
data = data_raw.copy()
outlier_mask = np.zeros(len(data), dtype=bool)
# Step 2: Initial global sigma outlier detection
mean = np.nanmean(data)
std = np.nanstd(data)
init_outliers = np.abs(data - mean) > INIT_SIGMA * std
outlier_mask |= init_outliers
data[init_outliers] = np.nan
# prev_outlier_count = np.sum(outlier_mask)
# Iterate steps 3–7
while True:
# Step 3: Remove current outliers (already done by setting to NaN)
# Step 4: Fill small gaps
if DATA_GAP_MAX > 0:
data = fill_small_gaps(data, DATA_GAP_MAX)
# Step 5: Median Filter
data_median = apply_median_filter(data, MEDIAN_NB_PTS,
MEDIAN_NB_PTS_MIN,
MEDIAN_NB_PTS_MEAN)
# Step 6: Lanczos Filter
data_lanczos = lanczos_filter(data_median, LANCZOS_NB_PTS,
LANCZOS_NB_PTS_CUTOFF,
LANCZOS_NB_PTS_MIN)
# Step 7: Flag outliers based on residual
residual = data_raw - data_lanczos
residual_std = np.nanstd(residual)
new_outliers = np.abs(residual) > 3 * residual_std
new_outliers &= ~outlier_mask # Don't double-flag
outlier_mask |= new_outliers
data = data_raw.copy()
data[outlier_mask] = np.nan
# Step 8: Check convergence
percent_outliers = np.sum(outlier_mask) / len(data_raw)
if percent_outliers <= CONVERGENCE_THRESHOLD:
break
if np.sum(new_outliers) == 0:
break # Converged
return data_lanczos, outlier_mask
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def fill_small_gaps(data, max_gap_size):
"""Linearly interpolate gaps smaller than max_gap_size.
Parameters
----------
data : np.ndarray
1D array of data with NaN values representing gaps.
max_gap_size : int
Maximum consecutive NaN values to interpolate. Gaps longer than this
will remain as NaN.
Returns
-------
interp_all : np.ndarray
1D array with small gaps linearly interpolated, preserving larger gaps
as NaN.
"""
# data_filled = data.copy()
isnan = np.isnan(data)
idx = np.arange(len(data))
valid = ~isnan
# Interpolate all
interp_all = np.interp(idx, idx[valid], data[valid])
# Re-mask large gaps
nan_runs = np.flatnonzero(np.diff(np.concatenate(([0], isnan.view(np.int8),
[0]))))
gap_starts = nan_runs[::2]
gap_ends = nan_runs[1::2]
for start, end in zip(gap_starts, gap_ends):
gap_len = end - start
if gap_len > max_gap_size:
interp_all[start:end] = np.nan
return interp_all
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def lanczos_filter(signal, N, cutoff, min_valid=3):
"""Apply a Lanczos filter to a 1D signal, ignoring NaNs.
This function convolves the input signal with a symmetric Lanczos kernel
of half-width N and the specified cutoff frequency. Elements with NaN
values are ignored, and filtering is applied only if at least `min_valid`
non-NaN points are available in the kernel window.
Parameters
----------
signal : np.ndarray
1D array of data to be filtered.
N : int
Half-width of the Lanczos kernel.
cutoff : float
Cutoff frequency for the filter.
min_valid : int, optional
Minimum number of valid (non-NaN) points required to apply the filter
at each position. Default is 3.
Returns
-------
filtered : np.ndarray
1D array of the filtered signal, with the same shape as `signal`.
Positions with insufficient valid points remain NaN.
"""
kernel = compute_lanczos_kernel(N, cutoff)
filtered = np.full_like(signal, np.nan)
for i in range(len(signal)):
start = max(i - N, 0)
end = min(i + N + 1, len(signal))
segment = signal[start:end]
k_segment = kernel[N - (i - start):N + (end - i)]
valid_mask = ~np.isnan(segment)
if np.sum(valid_mask) >= min_valid:
filtered[i] = (np.nansum(segment[valid_mask]
* k_segment[valid_mask])
/ np.sum(k_segment[valid_mask]))
return filtered
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def compute_lanczos_kernel(N, cutoff):
"""Build a symmetric Lanczos kernel with half-width N and cutoff.
This function creates a normalized symmetric Lanczos kernel using a
sinc filter multiplied by a Lanczos window.
Parameters
----------
N : int
Half-width of the kernel.
cutoff : float
Cutoff frequency.
Returns
-------
kernel : np.ndarray
Normalized symmetric Lanczos kernel.
"""
x = np.arange(-N, N + 1)
sinc_filter = np.sinc(x / cutoff)
lanczos_window = np.sinc(x / N)
kernel = sinc_filter * lanczos_window
return kernel / kernel.sum()
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def compute_jason_tec(iono_ku_delay):
"""Compute Jason TEC.
See OSTM/Jason-2 Products Handbook Section 4.2.5 for details.
Parameters
----------
iono_ku_delay : float
Jason-2 or Jason-3 Ku-band ionospheric delay.
Returns
-------
tec : float
Total electron content in TECU.
"""
scale_topex = 13.575E9 * 13.575E9 / 40.3 # f^2/40.3
tec = -iono_ku_delay * scale_topex * 1E-16 # conversion to TECu
return tec
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def concat_data_dicts(a_dict, b_dict):
"""Concatenate two dictionaries with the same field names.
This function merges the contents of dictionaries A and B, assuming both
have identical field names. The data from A appears before B.
Parameters
----------
a_dict : dict
Dictionary with identical field names to B.
b_dict : dict
Dictionary with identical field names to A.
Returns
-------
c_dict : dict
Dictionary with all the data from A and B combined (A preceding B).
"""
c_dict = {}
for a_key in a_dict.keys():
a_val = a_dict[a_key]
b_val = b_dict[a_key]
if isinstance(a_val, np.ndarray) and isinstance(b_val, np.ndarray):
if a_val.dtype == b_val.dtype:
c_dict[a_key] = np.concatenate([a_val, b_val])
else:
raise TypeError(f"Field '{a_key}' dtype mismatch: "
f"{a_val.dtype} vs {b_val.dtype}")
else:
raise ValueError(f"Field '{a_key}' is not a numpy "
"array in both dicts")
return c_dict
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def mask_dict(data_raw, mask):
"""Mask data within all data dictionary fields.
This function removes elements from each field in the data dictionary
according to the mask provided: 1 for keep, 0 for discard.
Parameters:
-----------
data_raw : dict of arrays (np.ndarray or pd.Series)
Dictionary of data fields to downsample.
mask : 0s and 1s
1 for keep, 0 for discard.
Returns:
--------
dict_resamp : dict
Dictionary with cleaned out fields.
"""
data_clean = {key: data[mask] for key, data in data_raw.items()}
return data_clean
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def downsample_Jason_TEC(data_all,
ddeg,
save_dir='',
name_run='',
save_data_option=False):
"""Downsample all fields in a Jason data dictionary by a specified degree.
Parameters
----------
data_all : dict
Dictionary of raw Jason data.
ddeg : float
Model resolution in degrees. Data will be resampled to approximately
half this resolution in order to maintain adequate sampling.
save_dir : str, optional
Directory where the resampled data will be saved. Defaults to ''.
name_run : str, optional
String appended to the output filename. Defaults to ''.
save_data_option : bool, optional
If True, save data as a pickle (.p) file in `save_dir`. Defaults to
False.
Returns
-------
data_resamp : dict
Dictionary of Jason data with downsampled fields.
"""
# load spatiotemp data from dictionary
lat_j = data_all['lat']
lon_j = data_all['lon']
dtime_j = data_all['dtime']
# Adjust model resolution to be approximately half the specified resolution
ddeg = ddeg / 2
# Determine a resampling period estimate from an hour of data
sample_start = dtime_j[0]
sample_end = sample_start + pd.Timedelta(hours=2)
# Indices for an hour of data
sample_ind = ((dtime_j >= sample_start) & (dtime_j <= sample_end))
# Select just 1st hr of data
lon_sample = lon_j[sample_ind]
lat_sample = lat_j[sample_ind]
# Unique String Resampling
coor_str, _, _ = round_and_stringify(lat_sample,
lon_sample,
ddeg)
_, unq_ind = np.unique(coor_str, return_index=True)
# Determine estimate of resampling step
N_sample = len(lat_sample)
N_unq = len(unq_ind)
N_resamp = np.round(N_sample / N_unq).astype(int)
PyVALION.logger.info(f"\nResampling Jason TEC data for {ddeg:.2f} "
"degree resolution.")
data_resamp = downsample_dict(data_all, N_resamp)
if save_data_option:
# Jason resample data filename, change ddeg to original for clarity
ddeg_str = f"{ddeg * 2:.2f}"
file_str_resample = ('Jason_TEC_resampled_' + ddeg_str + 'res_'
+ name_run + '.p')
file_path_resample = os.path.join(save_dir, file_str_resample)
PyVALION.logger.info("Saving Downsampled Jason TEC data "
f"to: '{file_path_resample}'")
pickle.dump(data_resamp, open(file_path_resample, "wb"))
PyVALION.logger.info('\n')
return data_resamp
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def round_and_stringify(lat, lon, rounding_interval):
"""Round latitude and longitude values and generate string identifiers.
Rounds latitude and longitude values to the nearest multiple of a specified
model resolution and returns a string array used to determine the
resampling rate of Jason data.
Parameters
----------
lat : ndarray of float
Latitude values.
lon : ndarray of float
Longitude values.
rounding_interval : float
Model resolution in degrees.
Returns
-------
coor_str : ndarray of str
Array of strings containing rounded coordinates. Values are scaled to
the nearest integer by a scaling factor. For example, if
`rounding_interval=0.25`, values will be scaled by 4.
lat_rounded : ndarray of float
Array of rounded latitude values.
lon_rounded : ndarray of float
Array of rounded longitude values.
"""
# Round to nearest rounding_interval
lat_rounded = np.round(lat / rounding_interval) * rounding_interval
lon_rounded = np.round(lon / rounding_interval) * rounding_interval
# Scale to integer to avoid float issues
# (e.g., 12.5 -> 1250 if interval=0.01)
scale_factor = int(1 / rounding_interval) if rounding_interval < 1 else 1
lat_int = (lat_rounded * scale_factor).astype(int)
lon_int = (lon_rounded * scale_factor).astype(int)
# Compute required zero-padding width
pad_width = max(len(str(np.max(np.abs(lat_int)))),
len(str(np.max(np.abs(lon_int)))))
# Include another place in string for minus sign
pad_width = pad_width + 1
# Convert to strings with zero-padding
lat_str = np.char.zfill(lat_int.astype(str), pad_width)
lon_str = np.char.zfill(lon_int.astype(str), pad_width)
# Combine into coordinate string
coor_str = np.char.add(np.char.add(lat_str, '_'), lon_str)
return coor_str, lat_rounded, lon_rounded
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def downsample_dict(data_raw, N):
"""Downsample all fields in a dictionary by keeping every Nth point.
Parameters
----------
data_raw : dict of array-like (np.ndarray or pd.Series)
Dictionary of data fields to downsample.
N : int
Step size indicating how frequently to sample. Keeps every Nth point.
Returns
-------
dict_resamp : dict
Dictionary with downsampled fields.
"""
data_resamp = {key: data[::N] for key, data in data_raw.items()}
return data_resamp
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def find_Jason_G_and_y(adtime,
alon,
alat,
data,
interp_method='bilinear'):
"""Create geometry matrix G and observation vector y.
Parameters
----------
adtime : array-like of datetime.datetime
Array of time stamps for the model.
alon : array-like
Array of model longitudes in degrees (-180 to 180).
alat : array-like
Array of model latitudes in degrees (-90 to 90).
data : dict
Dictionary of Jason data produced by `download_Jason_TEC` or
`downsample_Jason_TEC`.
interp_method : str, optional
Method for interpolating model data to observation points. Options are
'bilinear' (default) and 'nearest'. 'bilinear' performs bilinear
interpolation using the four nearest grid points, while 'nearest'
assigns the value of the nearest model grid point to each observation.
Returns
-------
y : dict
Dictionary containing the following keys and values:
TEC : array-like
Jason total electron content in TECU.
lon : array-like
Observation longitudes in degrees.
lat : array-like
Observation latitudes in degrees.
name : array-like
Jason satellite names.
time : array-like
Observation times.
units : dict
Dictionary mapping keys in `y` to their units as strings.
G : ndarray
Geometry matrix with shape (N_obs, N_time, N_lat, N_lon).
"""
# Look for 15/2 min data around
adtime0 = adtime - datetime.timedelta(minutes=15) / 2.
adtime1 = adtime + datetime.timedelta(minutes=15) / 2.
# Empty arrays for concatenation
ay_tec = np.empty((0))
ay_lon = np.empty((0))
ay_lat = np.empty((0))
ay_time = np.empty((0))
ay_name = np.empty((0))
# Determine total number of samples in y
n_tot = np.where((data['dtime'] >= adtime0[0])
& (data['dtime'] < adtime1[-1]))[0]
n_tot = n_tot.size
# intialize G and sample iterator (i)
i = 0
# Make array for geometry matrix
G = np.zeros((n_tot, adtime.size, alat.size, alon.size))
# Cycle through the array of time frames in a selected day
for i_t in range(0, adtime.size):
# Find data within selected time range
a = np.where((data['dtime'] >= adtime0[i_t])
& (data['dtime'] < adtime1[i_t]))[0]
# Loop through observations within timeframe
for i_ob in range(0, a.size):
# Copy over obs data
ob_lat = data['lat'][a[i_ob]]
ob_lon = data['lon'][a[i_ob]]
ob_tec = data['TEC'][a[i_ob]]
ob_name = data['name'][a[i_ob]]
ob_time = data['dtime'][a[i_ob]]
# Concatenate obs, name, position, and time arrays
ay_tec = np.concatenate((ay_tec, ob_tec), axis=None)
ay_lat = np.concatenate((ay_lat, ob_lat), axis=None)
ay_lon = np.concatenate((ay_lon, ob_lon), axis=None)
ay_name = np.concatenate((ay_name, ob_name), axis=None)
ay_time = np.concatenate((ay_time, ob_time), axis=None)
# Update G matrix (bilinear interpolation)
if interp_method == 'bilinear':
# Perform bilinear interpolation to find the weights for the
# fournearest grid points
weights, i_lat, i_lon = bilinear_weights(alat, alon, ob_lat,
ob_lon)
# Update G matrix with the weights for the four nearest obs
for w, lat_idx, lon_idx in zip(weights, i_lat, i_lon):
G[i, i_t, lat_idx, lon_idx] = w
# Update G matrix (nearest neighbor)
elif interp_method == 'nearest':
# Select nearest model grid element to obs lat/lon
i_lat = nearest_element(alat, ob_lat)
i_lon = nearest_element(alon, ob_lon)
# Form G matrix
G[i, i_t, i_lat, i_lon] = 1.
# Raise error if invalid interpolation method is provided
else:
raise ValueError(
"Invalid interpolation method: "
f"{interp_method}. Choose 'bilinear' or 'nearest'.")
# Update sample iterator
i = i + 1
PyVALION.logger.info('G has shape [N_obs, N_time, N_lat, N_lon] = ',
G.shape)
# Write observation vectors to the dictionary
y = {'TEC': ay_tec, 'lon': ay_lon, 'lat': ay_lat,
'time': ay_time, 'name': ay_name}
units = {'TEC': 'TECU', 'lon': '°', 'lat': '°',
'time': 'datetime object', 'name': 'unitless'}
return y, units, G
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def bilinear_weights(alat, alon, ob_lat, ob_lon):
"""Compute bilinear interpolation weights for a lat/lon grid.
Parameters
----------
alat : array-like
1D array of model latitudes (monotonic).
alon : array-like
1D array of model longitudes (monotonic).
ob_lat : float
Observation latitude.
ob_lon : float
Observation longitude.
Returns
-------
weights : ndarray
Array of four bilinear interpolation weights.
i_lat : ndarray
Array of four latitude indices.
i_lon : ndarray
Array of four longitude indices.
"""
alat = np.asarray(alat)
alon = np.asarray(alon)
# Find insertion indices
lat_idx = np.searchsorted(alat, ob_lat) - 1
lon_idx = np.searchsorted(alon, ob_lon) - 1
# Clamp indices to valid range
lat_idx = np.clip(lat_idx, 0, len(alat) - 2)
lon_idx = np.clip(lon_idx, 0, len(alon) - 2)
lat0 = alat[lat_idx]
lat1 = alat[lat_idx + 1]
lon0 = alon[lon_idx]
lon1 = alon[lon_idx + 1]
# Handle degenerate grid spacing safely
if lat1 == lat0 or lon1 == lon0:
weights = np.array([1.0, 0.0, 0.0, 0.0])
i_lat = np.array([lat_idx] * 4)
i_lon = np.array([lon_idx] * 4)
return weights, i_lat, i_lon
# Compute normalized distances
t = (ob_lat - lat0) / (lat1 - lat0)
u = (ob_lon - lon0) / (lon1 - lon0)
# Bilinear weights
w00 = (1 - t) * (1 - u)
w01 = (1 - t) * u
w10 = t * (1 - u)
w11 = t * u
weights = np.array([w00, w01, w10, w11])
i_lat = np.array([lat_idx, lat_idx, lat_idx + 1, lat_idx + 1])
i_lon = np.array([lon_idx, lon_idx + 1, lon_idx, lon_idx + 1])
return weights, i_lat, i_lon
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def find_Jason_residuals(model, G, obs_data, units):
"""Find residuals from model data for the given field and forward operator.
Parameters
----------
model : dict
Dictionary with model parameters in shape [N_time, N_lat, N_lon].
G : array-like
Geometry matrix [N_obs, N_time, N_lat, N_lon].
obs_data : dict
Dictionary containing the following keys and values:
TEC : array-like
Array of Jason total electron content in TECU.
lon : array-like
Array of observation longitude in degrees.
lat : array-like
Array of observation latitudes in degrees.
name : array-like
Array of Jason satellite names.
time : array-like
Array of observation time.
units : dict
Dictionary with strings of units for the keys in dict y.
Returns
-------
model_data : dict
Array of model data, the expected data according to the model.
residuals : dict
Dictionary containing residuals (obs_data - model_data).
model_units : dict
Dictionary with strings of units for the keys in dict model_data.
"""
# The dictionary model_data will be used to store model predictions at
# observation points
model_data = {}
# The dictionary residuals will store the differences between the observed
# data and the model predictions
residuals = {}
# The dictionary model_units will store the units (as strings) for each key
# in the model_data dictionary
model_units = {}
# Loop through all parameters in the model dictionary to extract model data
# at observation points
for key in model:
PyVALION.logger.info('key= ', key)
model_data[key] = find_model_data(model[key], G)
residuals[key] = obs_data[key] - model_data[key]
model_units[key] = units[key]
return model_data, residuals, model_units
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]
def sza_data_space(dtime, alon, alat):
"""Compute solar zenith angles for a sequence of times and locations.
Parameters
----------
dtime : np.ndarray
Array of datetime objects.
alon : np.ndarray
Array of longitudes (degrees East).
alat : np.ndarray
Array of latitudes (degrees North).
Returns
-------
solzen : np.ndarray
Array of solar zenith angles in degrees.
Raises
-------
ValueError
If input arrays have different shapes
"""
# Make sure they are Numpy arrays
alon = np.asarray(alon)
alat = np.asarray(alat)
dtime = np.asarray(dtime)
# Shape check
if not (dtime.shape == alon.shape == alat.shape):
raise ValueError("Input arrays must have the same shape.")
# pd.to_datetime requires 1-D arrays, therefore we need to flatten them
# and record the initial shape to reshape back the result
initial_shape = dtime.shape
alon = np.reshape(alon, alon.size)
alat = np.reshape(alat, alat.size)
dtime = np.reshape(dtime, dtime.size)
# Once the arrays are flat make sure dtime array acts as dtime object
dtime = pd.to_datetime(dtime)
# Initialize result
solzen = np.zeros(shape=dtime.shape)
# Compute solar zenith angle for each entry
for i in range(0, dtime.size):
jday = PyIRI.main_library.juldat(dtime[i])
slon, slat = PyIRI.main_library.subsolar_point(jday)
solzen[i] = PyIRI.main_library.solar_zenith(slon,
slat,
alon[i],
alat[i])
# Reform the result to the original shape
solzen = np.reshape(solzen, initial_shape)
return solzen