#!/usr/bin/env python
"""General helper functions and general information
*author*: Johannes Röttenbacher
"""
import os
import shutil
import sys
from itertools import groupby
import toml
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import colors
import matplotlib.dates as mdates
import datetime
import logging
import cmasher as cmr
from tqdm import tqdm
import xarray as xr
import pandas as pd
log = logging.getLogger(__name__)
cm = 1 / 2.54
# ecRad bands in nanometers
ecRad_bands = dict(Band1=(3077, 3846), Band2=(2500, 3076), Band3=(2150, 2500), Band4=(1942, 2150), Band5=(1626, 1941),
Band6=(1299, 1625), Band7=(1242, 1298), Band8=(778, 1241), Band9=(625, 777), Band10=(442, 624),
Band11=(345, 442), Band12=(263, 344), Band13=(200, 262), Band14=(3846, 12195))
# sea ice albedo bands micron
ci_bands = [(0.185, 0.25), (0.25, 0.44), (0.44, 0.69), (0.69, 1.19), (1.19, 2.38), (2.38, 4.0)]
# sea ice albedo in 6-spectral intervals for each month
ci_albedo = np.empty((12, 6))
# Sea ice surf. albedo for 0.185-0.25 micron (snow covered; Ebert and Curry, 1993)
ci_albedo[:, 0] = (0.975, 0.975, 0.975, 0.975,
0.975, 0.876, 0.778, 0.778,
0.975, 0.975, 0.975, 0.975)
# Sea ice surf. albedo for 0.25-0.44 micron (snow covered; Ebert and Curry, 1993)
ci_albedo[:, 1] = (0.975, 0.975, 0.975, 0.975,
0.975, 0.876, 0.778, 0.778,
0.975, 0.975, 0.975, 0.975)
# Sea ice surf. albedo for 0.44-0.69 micron (snow covered; Ebert and Curry, 1993)
ci_albedo[:, 2] = (0.975, 0.975, 0.975, 0.975,
0.975, 0.876, 0.778, 0.778,
0.975, 0.975, 0.975, 0.975)
# Sea ice surf. albedo for 0.69-1.19 micron (snow covered; Ebert and Curry, 1993)
ci_albedo[:, 3] = (0.832, 0.832, 0.832, 0.832,
0.832, 0.638, 0.443, 0.443,
0.832, 0.832, 0.832, 0.832)
# Sea ice surf. albedo for 1.19-2.38 micron (snow covered; Ebert and Curry, 1993)
ci_albedo[:, 4] = (0.250, 0.250, 0.250, 0.250,
0.250, 0.153, 0.055, 0.055,
0.250, 0.250, 0.250, 0.250)
# Sea ice surf. albedo for 2.38-4.00 microns (snow covered; Ebert and Curry, 1993)
ci_albedo[:, 5] = (0.025, 0.025, 0.025, 0.025,
0.025, 0.030, 0.036, 0.036,
0.025, 0.025, 0.025, 0.025)
# create a DataArray from the sea ice albedo parameterization
ci_albedo_da = xr.DataArray(
data=ci_albedo,
dims=["time", "sw_albedo_band"],
name="ci_albedo",
coords=dict(
sw_albedo_band=np.arange(1, 7),
time=pd.date_range("2022-01-15",
periods=12,
freq=pd.offsets.DateOffset(months=1))),
attrs=dict(band_bounds=f"{ci_bands}"))
# Sea ice direct surface albedo for the six sea ice albedo bands (snow covered; Ebert and Curry, 1993)
ci_albedo_direct = np.array([0.980, 0.980, 0.980, 0.902, 0.384, 0.053])
# ozone sonde stations
ozone_files = dict(Flight_20210629a="sc210624.b11",
RF16="ny220413.b16", RF17="ny220413.b16", RF18="ny220413.b16")
# plotting metadata
figsize_wide = (24 * cm, 12 * cm)
figsize_equal = (12 * cm, 12 * cm)
plot_units = dict(cloud_fraction="", clwc=r"mg$\,$kg$^{-1}$", ciwc=r"mg$\,$kg$^{-1}$", cswc=r"mg$\,$kg$^{-1}$",
q_ice=r"mg$\,$kg$^{-1}$", q_liquid=r"mg$\,$kg$^{-1}$", iwp=r"g$\,$m$^{-2}$", iwc=r"mg$\,$m$^{-3}$",
crwc=r"mg$\,$kg$^{-1}$", t="K", q=r"g$\,$kg$^{-1}$", re_ice=r"$\mu$m", re_liquid=r"$\mu$m",
heating_rate_sw=r"K$\,$day$^{-1}$", heating_rate_lw=r"K$\,$day$^{-1}$",
heating_rate_net=r"K$\,$day$^{-1}$",
o3_mmr=r"kg$\,$kg$^{-1}$", o3_vmr=r"ppmv",
flux_dn_sw=r"W$\,$m$^{-2}$", flux_dn_lw=r"W$\,$m$^{-2}$", flux_up_sw=r"W$\,$m$^{-2}$",
flux_up_lw=r"W$\,$m$^{-2}$", flux_net_sw=r"W$\,$m$^{-2}$", flux_net_lw=r"W$\,$m$^{-2}$",
cre_sw=r"W$\,$m$^{-2}$", cre_lw=r"W$\,$m$^{-2}$", cre_total=r"W$\,$m$^{-2}$",
transmissivity_sw="", transmissivity_lw="", reflectivity_sw="", reflectivity_lw="",
od="", scat_od="", od_mean="", scat_od_mean="", g="", g_mean="", od_int="", scat_od_int="", g_int="",
absorption="", absorption_int="",
eglo=r"W$\,$m$^{-2}\,$nm$^{-1}$", eglo_int=r"W$\,$m$^{-2}$", eup=r"W$\,$m$^{-2}\,$nm$^{-1}$",
eup_int=r"W$\,$m$^{-2}$",
aerosol_mmr=r"kg$\,$kg$^{-1}$")
cbarlabels = dict(cloud_fraction="Cloud fraction", clwc="Cloud liquid water content", ciwc="Cloud ice water content",
cswc="Cloud snow water content", crwc="Cloud rain water content", t="Temperature",
q="Specific humidity", q_ice="Ice mass mixing ratio", q_liquid="Liquid mass mixing ratio",
iwp="Ice water path", iwc="Ice water content", o3_mmr="Ozone mass mixing ratio",
o3_vmr="Ozone volume mixing ratio",
re_ice="Ice effective radius", re_liquid="Liquid effective radius",
heating_rate_sw="Solar heating rate", heating_rate_lw="Terrestrial heating rate",
heating_rate_net="Net heating rate",
transmissivity_sw="Solar transmissivity", transmissivity_lw="Terrestrial transmissivity",
reflectivity_sw="Solar reflectivity", reflectivity_lw="Terrestrial reflectivity",
flux_dn_sw="Downward solar irradiance", flux_up_sw="Upward solar irradiance",
flux_dn_lw="Downward terrestrial irradiance", flux_up_lw="Upward terrestrial irradiance",
flux_net_lw="Net terrestrial irradiance", flux_net_sw="Net solar irradiance",
cre_sw="Solar cloud radiative effect", cre_lw="Terrestrial cloud radiative effect",
cre_total="Total cloud radiative effect",
od=f"Total optical depth", scat_od=f"Scattering optical depth", od_mean=f"Mean total optical depth",
scat_od_mean=f"Mean scattering optical depth", g=f"Asymmetry factor", g_mean="Mean asymmetry factor",
od_int="Integrated total optical depth", scat_od_int="Integrated scattering optical depth",
absorption="Absorption", absorption_int="Integrated absorption",
eglo="Spectral global downward irradiance", eglo_int="Global downward irradiance",
eup="Spectral diffuse upward irradiance", eup_int="Diffuse upward irradiance",
aerosol_mmr="Aerosol mass mixing ratio")
scale_factors = dict(cloud_fraction=1, clwc=1e6, ciwc=1e6, cswc=1e6, crwc=1e6, t=1, q=1000, re_ice=1e6,
re_liquid=1e6, q_ice=1e6, q_liquid=1e6, iwp=1000, iwc=1e6, o3_vmr=1e6)
cmaps = dict(t=cmr.prinsenvlag_r,
ciwc=cmr.get_sub_cmap("cmr.freeze", .25, 0.85), cswc=cmr.get_sub_cmap("cmr.freeze", .25, 0.85),
crwc=cmr.get_sub_cmap("cmr.freeze", .25, 0.85), q_ice=cmr.get_sub_cmap("cmr.freeze", .25, 0.85),
iwp=cmr.get_sub_cmap("cmr.freeze", .25, 0.85), iwc=cmr.get_sub_cmap("cmr.freeze", .25, 0.85),
cloud_fraction=cmr.cosmic,
re_ice=cmr.cosmic_r, re_liquid=cmr.cosmic_r,
heating_rate_sw=cmr.get_sub_cmap("cmr.ember_r", 0, 0.75), heating_rate_lw=cmr.fusion_r,
heating_rate_net=cmr.fusion_r,
cre_total=cmr.fusion_r,
flux_dn_sw=cmr.get_sub_cmap("cmr.torch", 0.2, 1))
norms = dict(t=colors.TwoSlopeNorm(vmin=200, vcenter=235, vmax=280), clwc=colors.LogNorm(),
heating_rate_lw=colors.TwoSlopeNorm(vmin=-3, vcenter=0, vmax=1.5),
heating_rate_net=colors.TwoSlopeNorm(vmin=-2.5, vcenter=0, vmax=2),
od=colors.LogNorm(vmax=10), od_scat=colors.LogNorm(),
od_int=colors.LogNorm(vmax=10), scat_od_int=colors.LogNorm(),
cre_total=colors.TwoSlopeNorm(vcenter=0))
# plotting dictionaries for BACARDI
bacardi_labels = dict(F_down_solar=r"$F^{\downarrow}_{\mathrm{sol}}$",
F_down_terrestrial=r"$F^{\downarrow}_{\mathrm{ter}}$",
F_up_solar=r"$F^{\uparrow}_{\mathrm{sol}}$",
F_up_terrestrial=r"$F^{\uparrow}_{\mathrm{ter}}$",
F_net_solar=r"$F_{\mathrm{net, sol}}$",
F_net_terrestrial=r"$F_{\mathrm{net, ter}}$",
CRE_solar=r"CRE$_{\mathrm{sol}}$",
CRE_terrestrial=r"CRE$_{\mathrm{ter}}$",
CRE_total=r"CRE$_{\mathrm{total}}$")
cb_color_cycles = dict(
cartocolor=["#88CCEE", "#CC6677", "#DDCC77", "#332288", "#AA4499", "#44AA99", "#999933",
"#882255", "#661100", "#6699CC", "#117733", "#888888"],
# orange, skyblue, bluishgreen, yellow, blue, vermillion, reddishpurple, gray, black
okabe_ito=["#E69F00", "#56B4E9", "#009E73", "#F0E442", "#0072B2", "#D55E00",
"#CC79A7", "#999999", "#000000"],
petroff_6=["#5790fc", "#f89c20", "#e42536", "#964a8b", "#9c9ca1", "#7a21dd"],
petroff_8=["#1845fb", "#ff5e02", "#c91f16", "#c849a9", "#adad7d", "#86c8dd", "#578dff", "#656364"],
petroff_10=["#3f90da", "#ffa90e", "#bd1f01", "#94a4a2", "#832db6", "#a96b59", "#e76300",
"#b9ac70", "#717581", "#92dadd"])
"""
.. image:: ./figures/cb_color_cycles.png
:alt: Image of the colors
Colorblind friendly color cycles from the `rcartocolor <https://github.com/Nowosad/rcartocolor>`_ package,
the Okabe-Ito palette :cite:p:`zotero-1251` and the survey from :cite:t:`petroff2021`.
"""
[docs]
def get_path(key: str, flight: str = None, campaign: str = "cirrus-hl", instrument: str = None) -> str:
"""
Read paths from the toml file according to the current working directory.
Args:
key: which path to return, see config.toml for possible values
flight: for which flight should the path be provided (e.g. Flight_20210625a for CIRRUS-HL or HALO-AC3_20220311_HALO_RF01 for HALO-AC3)
campaign: campaign for which the paths should be generated
instrument: if key=all which instrument to generate the path to? (e.g. BAHAMAS)
Returns: Path to specified data
"""
# make sure to search for the config file in the project directory
wk_dir = os.getcwd()
log.debug(f"Searching for config.toml in {wk_dir}")
if wk_dir.startswith("C"):
config = toml.load(f"{wk_dir}/config.toml")[campaign]["jr_local"]
elif wk_dir.startswith("/mnt"):
config = toml.load(f"{wk_dir}/config.toml")[campaign]["jr_ubuntu"]
else:
config = toml.load(f"{wk_dir}/config.toml")[campaign]["lim_server"]
flight = "" if flight is None else flight
paths = dict()
base_dir = config.pop("base_dir")
paths["base"] = base_dir
for k in config:
paths[k] = os.path.join(base_dir, flight, config[k])
for k in ["calib", "pixel_wl", "lamp", "panel"]:
try:
paths[k] = config[k]
except KeyError:
log.debug(f"Found no path for key: {k} and campaign {campaign}")
pass
if key == 'all':
paths['all'] = os.path.join(base_dir, config[key], instrument)
return paths[key]
[docs]
def make_dir(folder: str) -> None:
"""
Creates folder if it doesn't exist already.
Args:
folder: folder name or full path
Returns: nothing, but creates a new folder if possible
"""
try:
os.makedirs(folder)
except FileExistsError:
pass
[docs]
def delete_folder_contents(folder: str) -> None:
"""
Deletes all files and subfolders in a folder.
From: https://stackoverflow.com/questions/185936/how-to-delete-the-contents-of-a-folder
Args:
folder: folder name or full path
Returns: nothing, but deletes all files in a folder
"""
for filename in tqdm(os.listdir(folder)):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except OSError as e:
print(f"Failed to delete {file_path}.\n Reason: {e}")
[docs]
def arg_nearest(array, value):
"""
Find the index of the nearest value in an array.
Args:
array: Input has to be convertible to an ndarray
value: Value to search for
Returns: index of closest value
"""
array = np.asarray(array)
idx = np.nanargmin(np.abs(array - value))
return idx
# from pyLARDA.Transformations
[docs]
def set_xticks_and_xlabels(ax: plt.axis, time_extend: datetime.timedelta) -> plt.axis:
"""This function sets the ticks and labels of the x-axis (only when the x-axis is time in UTC).
Options:
- time_extend > 7 days: major ticks every 2 day, minor ticks every 12 hours
- 7 days > time_extend > 2 days: major ticks every day, minor ticks every 6 hours
- 2 days > time_extend > 1 days: major ticks every 12 hours, minor ticks every 3 hours
- 1 days > time_extend > 12 hours: major ticks every 2 hours, minor ticks every 30 minutes
- 12hours > time_extend > 6 hours: major ticks every 1 hours, minor ticks every 30 minutes
- 6 hours > time_extend > 2 hour: major ticks every hour, minor ticks every 15 minutes
- 2 hours > time_extend > 30 min: major ticks every 15 minutes, minor ticks every 5 minutes
- 30 min > time_extend > 5 min: major ticks every 5 minutes, minor ticks every 1 minute
- else: major ticks every minute, minor ticks every 10 seconds
Args:
ax: axis in which the x-ticks and labels have to be set
time_extend: time difference of t_end - t_start (format datetime.timedelta)
Returns:
ax - axis with new ticks and labels
"""
if time_extend > datetime.timedelta(days=30):
pass
elif datetime.timedelta(days=30) > time_extend >= datetime.timedelta(days=7):
ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %d'))
ax.xaxis.set_major_locator(mdates.DayLocator(bymonthday=range(1, 32, 2)))
ax.xaxis.set_minor_locator(mdates.HourLocator(byhour=range(0, 24, 12)))
elif datetime.timedelta(days=7) > time_extend >= datetime.timedelta(days=2):
ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %d'))
ax.xaxis.set_major_locator(mdates.HourLocator(byhour=[0]))
ax.xaxis.set_minor_locator(mdates.HourLocator(byhour=range(0, 24, 6)))
elif datetime.timedelta(days=2) > time_extend >= datetime.timedelta(hours=25):
ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %d\n%H:%M'))
ax.xaxis.set_major_locator(mdates.HourLocator(byhour=range(0, 24, 12)))
ax.xaxis.set_minor_locator(mdates.HourLocator(byhour=range(0, 24, 3)))
elif datetime.timedelta(hours=25) > time_extend >= datetime.timedelta(hours=12):
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.HourLocator(byhour=range(0, 24, 2)))
ax.xaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 30)))
elif datetime.timedelta(hours=12) > time_extend >= datetime.timedelta(hours=6):
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.HourLocator())
ax.xaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 30)))
elif datetime.timedelta(hours=6) > time_extend >= datetime.timedelta(hours=2):
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=1))
ax.xaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 15)))
elif datetime.timedelta(hours=2) > time_extend >= datetime.timedelta(minutes=45):
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.MinuteLocator(byminute=range(0, 60, 15)))
ax.xaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 5)))
elif datetime.timedelta(minutes=45) > time_extend >= datetime.timedelta(minutes=5):
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.MinuteLocator(byminute=range(0, 60, 5)))
ax.xaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 1)))
else:
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.MinuteLocator(byminute=range(0, 60, 1)))
ax.xaxis.set_minor_locator(mdates.SecondLocator(bysecond=range(10, 60, 10)))
return ax
[docs]
def set_yticks_and_ylabels(ax: plt.axis, time_extend: datetime.timedelta) -> plt.axis:
"""This function sets the ticks and labels of the y-axis (only when the y-axis is time in UTC).
Options:
- time_extend > 7 days: major ticks every 2 day, minor ticks every 12 hours
- 7 days > time_extend > 2 days: major ticks every day, minor ticks every 6 hours
- 2 days > time_extend > 1 days: major ticks every 12 hours, minor ticks every 3 hours
- 1 days > time_extend > 12 hours: major ticks every 2 hours, minor ticks every 30 minutes
- 12hours > time_extend > 6 hours: major ticks every 1 hours, minor ticks every 30 minutes
- 6 hours > time_extend > 2 hour: major ticks every hour, minor ticks every 15 minutes
- 2 hours > time_extend > 15 min: major ticks every 15 minutes, minor ticks every 5 minutes
- 15 min > time_extend > 5 min: major ticks every 15 minutes, minor ticks every 5 minutes
- else: major ticks every minute, minor ticks every 10 seconds
Args:
ax: axis in which the y-ticks and labels have to be set
time_extend: time difference of t_end - t_start (format datetime.timedelta)
Returns:
ax - axis with new ticks and labels
"""
if time_extend > datetime.timedelta(days=30):
pass
elif datetime.timedelta(days=30) > time_extend >= datetime.timedelta(days=7):
ax.yaxis.set_major_formatter(mdates.DateFormatter('%b %d'))
ax.yaxis.set_major_locator(mdates.DayLocator(bymonthday=range(1, 32, 2)))
ax.yaxis.set_minor_locator(mdates.HourLocator(byhour=range(0, 24, 12)))
elif datetime.timedelta(days=7) > time_extend >= datetime.timedelta(days=2):
ax.yaxis.set_major_formatter(mdates.DateFormatter('%b %d'))
ax.yaxis.set_major_locator(mdates.HourLocator(byhour=[0]))
ax.yaxis.set_minor_locator(mdates.HourLocator(byhour=range(0, 24, 6)))
elif datetime.timedelta(days=2) > time_extend >= datetime.timedelta(hours=25):
ax.yaxis.set_major_formatter(mdates.DateFormatter('%b %d\n%H:%M'))
ax.yaxis.set_major_locator(mdates.HourLocator(byhour=range(0, 24, 12)))
ax.yaxis.set_minor_locator(mdates.HourLocator(byhour=range(0, 24, 3)))
elif datetime.timedelta(hours=25) > time_extend >= datetime.timedelta(hours=12):
ax.yaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.yaxis.set_major_locator(mdates.HourLocator(byhour=range(0, 24, 2)))
ax.yaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 30)))
elif datetime.timedelta(hours=12) > time_extend >= datetime.timedelta(hours=6):
ax.yaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.yaxis.set_major_locator(mdates.HourLocator())
ax.yaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 30)))
elif datetime.timedelta(hours=6) > time_extend >= datetime.timedelta(hours=2):
ax.yaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.yaxis.set_major_locator(mdates.HourLocator(interval=1))
ax.yaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 15)))
elif datetime.timedelta(hours=2) > time_extend >= datetime.timedelta(minutes=15):
ax.yaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.yaxis.set_major_locator(mdates.MinuteLocator(byminute=range(0, 60, 15)))
ax.yaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 5)))
elif datetime.timedelta(minutes=15) > time_extend >= datetime.timedelta(minutes=5):
ax.yaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.yaxis.set_major_locator(mdates.MinuteLocator(byminute=range(0, 60, 15)))
ax.yaxis.set_minor_locator(mdates.MinuteLocator(byminute=range(0, 60, 5)))
else:
ax.yaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.yaxis.set_major_locator(mdates.MinuteLocator(byminute=range(0, 60, 1)))
ax.yaxis.set_minor_locator(mdates.SecondLocator(bysecond=range(10, 60, 10)))
return ax
[docs]
def set_cb_friendly_colors(name: str = "cartocolor"):
"""
Set new colorblind friendly color cycle.
Args:
name: Name of color cycle
Returns: Modifies the standard pyplot color cycle
"""
try:
new_color_cycle = cb_color_cycles[name]
except KeyError:
raise KeyError(f"No color cycle with name '{name}' found! "
f"Use one of {cb_color_cycles.keys()}")
plt.rcParams['axes.prop_cycle'] = plt.cycler(color=new_color_cycle)
[docs]
def get_cb_friendly_colors(name: str = "cartocolor") -> list:
"""
Get colorblind friendly color cycle.
Args:
name: Name of color cycle
Returns: List with colorblind friendly colors
"""
try:
return cb_color_cycles[name]
except KeyError:
raise KeyError(f"No color cycle with name '{name}' found! "
f"Use one of {cb_color_cycles.keys()}")
[docs]
def nested_dict_values_iterator(dict_obj: dict):
""" Loop over all values in a nested dictionary
See: https://thispointer.com/python-iterate-loop-over-all-nested-dictionary-values/
Args:
dict_obj: nested dictionary
Returns: Each value in a nested dictionary
"""
# Iterate over all values of dict argument
for value in dict_obj.values():
# Check if value is of dict type
if isinstance(value, dict):
# If value is dict then iterate over all its values
for v in nested_dict_values_iterator(value):
yield v
else:
# If value is not dict type then yield the value
yield value
[docs]
def nested_dict_pairs_iterator(dict_obj: dict):
""" Loop over all values in a nested dictionary and return the key, value pair
See: https://thispointer.com/python-how-to-iterate-over-nested-dictionary-dict-of-dicts/
Args:
dict_obj: nested dictionary
Returns: Each value in a nested dictionary with its key
"""
# Iterate over all values of dict argument
for key, value in dict_obj.items():
# Check if value is of dict type
if isinstance(value, dict):
# If value is dict then iterate over all its values
for pair in nested_dict_pairs_iterator(value):
yield key, *pair
else:
# If value is not dict type then yield the key, value pair
yield key, value
[docs]
def setup_logging(dir: str, file: str = None, custom_string: str = None):
"""
Setup up logging to file if script is called from console. If it is executed inside a console setup logging only
to console.
Args:
dir: Directory where to save logging file. Gets created if it doesn't exist yet.
file: Name of the file which called the function. Should be given via the __file__ attribute.
custom_string: Custom String to append to logging file name. Logging file always starts with date and the name
of the script being called.
Returns: Logger
"""
log = logging.getLogger("pylim")
# remove existing handlers, necessary when function is called in a loop
log.handlers = []
if file is not None:
file = os.path.basename(file)
log.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
os.makedirs(dir, exist_ok=True)
fh = logging.FileHandler(f'{dir}/{datetime.datetime.now(datetime.UTC):%Y%m%d}_{file[:-3]}_{custom_string}.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s : %(levelname)s - %(message)s', datefmt="%c")
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
log.addHandler(ch)
log.addHandler(fh)
else:
# __file__ is undefined if script is executed in console, set a normal logger instead
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
return log
# from pyLARDA.SpectraProcessing
def seconds_to_fstring(time_diff):
return str(datetime.timedelta(seconds=time_diff))
[docs]
def read_command_line_args():
"""
Read out command line arguments and save them to a dictionary. Expects arguments in the form key=value.
Returns: dictionary with command line arguments as dict[key] = value
"""
args = dict()
for arg in sys.argv[1:]:
if arg.count('=') == 1:
key, value = arg.split('=')
args[key] = value
return args
def set_cdo_path(path: str = "/home/jroettenbacher/.conda/envs/phd_base/bin/cdo"):
# add cdo path to python environment
os.environ["CDO"] = path
[docs]
def generate_specific_rows(filePath, userows=[]):
"""Function for trajectory plotting"""
with open(filePath) as f:
for i, line in enumerate(f):
if i in userows:
yield line
[docs]
def make_flag(boolean_array, name: str):
"""
Make a list of flag values for plotting using a boolean array as input
Args:
boolean_array: array like input with True and False
name: replace True with this string
Returns: list with as many strings as there are True values in the input array
"""
array = np.array(boolean_array) # convert to numpy.ndarray
return [str(a).replace("True", name) for a in array if a]
[docs]
def find_bases_tops(mask, rg_list):
"""
This function finds cloud bases and tops for a provided binary cloud mask.
Args:
mask (np.array, dtype=bool) : bool array containing False = signal, True=no-signal
rg_list (np.ndarray) : array of range values
Returns:
cloud_prop (list) : list containing a dict for every time step consisting of cloud bases/top indices, range and width
cloud_mask (np.array) : integer array, containing +1 for cloud tops, -1 for cloud bases and 0 for fill_value
"""
cloud_prop = []
cloud_mask = np.full(mask.shape, 0, dtype=int)
# bug fix: add an emtpy first range gate to detect cloud bases of clouds which start at the first range gate
mask = np.hstack((np.full_like(mask, fill_value=True)[:, 0:1], mask))
for iT in range(mask.shape[0]):
cloud = [(k, sum(1 for j in g)) for k, g in groupby(mask[iT, :])]
idx_cloud_edges = np.cumsum([prop[1] for prop in cloud])
bases, tops = idx_cloud_edges[0:][::2][:-1], idx_cloud_edges[1:][::2]
if tops.size > 0:
tops = [t - 1 for t in tops] # reduce top indices by 1 to account for the introduced row
if tops[-1] == cloud_mask.shape[1]:
tops[-1] = cloud_mask.shape[1] - 1 # account for python starting counting at 0
if bases.size > 0:
bases = [b - 1 for b in bases] # reduce base indices by 1 to account for the introduced row
cloud_mask[iT, bases] = -1
cloud_mask[iT, tops] = +1
cloud_prop.append({'idx_cb': bases, 'val_cb': rg_list[bases], # cloud bases
'idx_ct': tops, 'val_ct': rg_list[tops], # cloud tops
'width': [ct - cb for ct, cb in zip(rg_list[tops], rg_list[bases])]
})
return cloud_prop, cloud_mask
[docs]
def longitude_values_for_gaussian_grid(latitudes: np.array,
n_points: np.array,
longitude_boundaries: np.array = None) -> (np.array, np.array):
"""
Calculate the longitude values for each latitude circle on a reduced Gaussian grid.
If the longitude boundaries are given only the longitude values within these boundaries are returned.
The ECMWF uses regular/reduced Gaussian grids to represent their model data.
These have a fixed number of latitudes between the equator and each pole with either a regular amount of longitude
points on each latitude ring or in case of a reduced Gaussian grid with a decreasing number of points towards the
poles on each latitude ring.
For more information on Gaussian grids as used by the ECMWF see: https://confluence.ecmwf.int/display/FCST/Gaussian+grids
When retrieving data on a reduced Gaussian grid the exact longitude values are not included in the data set and have
to be calculated according to the definition of the grid.
For this the latitude rings (latitudes) and the amount of longitude points on each latitude ring is needed (n_points).
As one rarely retrieves the whole domain of the model the longitude boundaries are also needed to return the correct
longitude values.
Args:
latitudes: The latitude values of the Gaussian grid starting in the North
n_points: The number of longitude points on each latitude circle (needs to be of same length as latitudes)
longitude_boundaries: The longitude boundaries (E, W). E =-90, W =90, N=0, S=-180/180
Returns: Two arrays with repeating latitude values and the corresponding longitude values
"""
assert len(latitudes) == len(n_points), "Number of latitudes does not match number of points given!"
lon_values = [np.linspace(0, 360, num=points, endpoint=False) for points in n_points]
lon_values_out = np.array([])
lon_values_list = list()
for i, lons in enumerate(lon_values):
all_lons = np.where(lons > 180, (lons + 180) % 360 - 180, lons)
assert len(all_lons) == len(np.unique(all_lons)), f"Non unique longitude values found for {i}! Check input!"
if longitude_boundaries is not None:
all_lons = all_lons[(all_lons >= longitude_boundaries[0]) & (all_lons <= longitude_boundaries[1])]
all_lons.sort()
lon_values_out = np.concatenate([lon_values_out, all_lons])
lon_values_list.append(all_lons)
# create list of latitude values as coordinate
lat_values_out = np.array([])
for i, lon_array in enumerate(lon_values_list):
lat = np.repeat(latitudes[i], lon_array.size)
lat_values_out = np.concatenate([lat_values_out, lat])
return lat_values_out, lon_values_out
[docs]
def hellinger_distance(p, q):
"""
Compute the Hellinger distance between two probability distributions.
Parameters:
p (numpy array): Probability distribution 1 (e.g., histogram).
q (numpy array): Probability distribution 2 (e.g., histogram).
Returns:
float: Hellinger distance between the two distributions.
Reference:
- https://en.wikipedia.org/wiki/Hellinger_distance
"""
# Ensure that p and q have the same shape
assert p.shape == q.shape, "Input distributions must have the same shape."
# Calculate the Hellinger distance
h = (1.0 / np.sqrt(2.0)) * np.sqrt(np.sum((np.sqrt(p) - np.sqrt(q)) ** 2))
return h
[docs]
def strtobool(value):
"""
Convert a string representation of a boolean value to a Python boolean.
Args:
value (str): String representation of a boolean value.
Returns:
bool: Python boolean value.
"""
if value.lower() in ('true', 't', 'yes', 'y', 'on', '1'):
return True
elif value.lower() in ('false', 'f', 'no', 'n', 'off', '0'):
return False
else:
raise ValueError(f'Invalid boolean value: {value}')
_COLORS = {
"green": "#3cb371",
"darkgreen": "#253A24",
"lightgreen": "#70EB5D",
"yellowgreen": "#C7FA3A",
"yellow": "#FFE744",
"orange": "#ffa500",
"pink": "#B43757",
"red": "#F57150",
"shockred": "#E64A23",
"seaweed": "#646F5E",
"seaweed_roll": "#748269",
"white": "#ffffff",
"lightblue": "#6CFFEC",
"blue": "#209FF3",
"skyblue": "#CDF5F6",
"darksky": "#76A9AB",
"darkpurple": "#464AB9",
"lightpurple": "#6A5ACD",
"purple": "#BF9AFF",
"darkgray": "#2f4f4f",
"lightgray": "#ECECEC",
"gray": "#d3d3d3",
"lightbrown": "#CEBC89",
"lightsteel": "#a0b0bb",
"steelblue": "#4682b4",
"mask": "#C8C8C8",
}
_CLABEL = {
"target_classification": (
("_Clear sky", _COLORS["white"]),
("Droplets", _COLORS["lightblue"]),
("Drizzle or rain", _COLORS["blue"]),
("Drizzle & droplets", _COLORS["purple"]),
("Ice", _COLORS["lightsteel"]),
("Ice & droplets", _COLORS["darkpurple"]),
("Melting ice", _COLORS["orange"]),
("Melting & droplets", _COLORS["yellowgreen"]),
("Aerosols", _COLORS["lightbrown"]),
("Insects", _COLORS["shockred"]),
("Aerosols & insects", _COLORS["pink"]),
("No data", _COLORS["mask"]),
),
"detection_status": (
("_Clear sky", _COLORS["white"]),
("Lidar only", _COLORS["yellow"]),
("Uncorrected atten.", _COLORS["seaweed_roll"]),
("Radar & lidar", _COLORS["green"]),
("_No radar but unknown atten.", _COLORS["purple"]),
("Radar only", _COLORS["lightgreen"]),
("_No radar but known atten.", _COLORS["orange"]),
("Corrected atten.", _COLORS["skyblue"]),
("Clutter", _COLORS["shockred"]),
("_Lidar molecular scattering", _COLORS["pink"]),
("No data", _COLORS["mask"]),)
}