#!/usr/bin/env python
"""Functions to process and plot libRadtran simulation files
*author*: Johannes Röttenbacher
"""
import datetime
import logging
import os
import re
from subprocess import Popen
import pandas as pd
from geopy.distance import geodesic
from joblib import cpu_count
from tqdm import tqdm
# %% module import
from pylim.cirrus_hl import coordinates, radiosonde_stations
log = logging.getLogger(__name__)
[docs]
def get_netcdf_global_attributes(campaign: str, flight: str, experiment: str):
"""
Return a dictionary with globa attributes for a libRadtran simulation
Args:
campaign: which campaign (cirrus-hl, halo-ac3)
flight: which flight
experiment: which experiment has been run
Returns: dictionary with global attributes
"""
global_attributes = {
"cirrus-hl": dict(
title=f"Simulated downward and upward irradiance along flight track for experiment {experiment}",
Conventions="CF-1.9",
camapign_id=f"{campaign.swapcase()}",
platform_id="HALO",
version_id="1",
comment=f"CIRRUS-HL Campaign, Oberpfaffenhofen, Germany, {flight}",
contact="PI: m.wendisch@uni-leipzig.de, Data: johannes.roettenbacher@uni-leipzig.de",
history=f"Created {datetime.datetime.now(datetime.UTC):%c} UTC",
institution="Leipzig Institute for Meteorology, Leipzig University, Stephanstr.3, 04103 Leipzig, Germany",
source="libRadtran 2.0.4",
references="Emde et al. 2016, 10.5194/gmd-9-1647-2016",
),
"halo-ac3": dict(
title=f"Simulated downward and upward irradiance along flight track for experiment {experiment}",
Conventions="CF-1.9",
campaign_id=f"{campaign.swapcase()}",
platform_id="HALO",
version_id="1",
institution="Leipzig Institute for Meteorology, Leipzig, Germany, Stephanstr.3, 04103 Leipzig, Germany",
history=f"created {datetime.datetime.now(datetime.UTC):%c} UTC",
contact="Johannes Röttenbacher, johannes.roettenbacher@uni-leipzig.de",
PI="André Ehrlich, a.ehrlich@uni-leipzig.de",
source="libRadtran 2.0.4",
references="Emde et al. 2016, 10.5194/gmd-9-1647-2016",
),
}
return global_attributes[campaign]
[docs]
def get_netcdf_variable_attributes(
solar_flag: bool, integrate_str: str, wavelength_str: str
):
"""
Return a dictionary with variable attributes for a libRadtran simulation
Args:
solar_flag: whether the simulation was a solar or a terrestrial simulation
integrate_str: whether the simulations was integrated at the end or not
wavelength_str: wavelength range of the simulation
Returns: dictionary with variable attributes
"""
# set up metadata for general variables which are shared between solar and terrestrial simulations
general_variables = dict(
latitude=dict(
units="degrees_north", long_name="latitude", standard_name="latitude"
),
longitude=dict(
units="degrees_east", long_name="longitude", standard_name="longitude"
),
saa=dict(
units="degree",
long_name="solar azimuth angle",
standard_name="soalr_azimuth_angle",
comment="clockwise from north",
),
sza=dict(
units="degree",
long_name="solar zenith angle",
standard_name="solar_zenith_angle",
comment="0 deg = zenith",
),
CLWD=dict(
units="g m^-3",
long_name="cloud liquid water density",
standard_name="mass_concentration_of_cloud_liquid_water_in_air",
),
CIWD=dict(
units="g m^-3",
long_name="cloud ice water density",
standard_name="mass_concentration_of_cloud_ice_water_in_air",
),
p=dict(
units="hPa", long_name="atmospheric pressure", standard_name="air_pressure"
),
T=dict(units="K", long_name="air temperature", standard_name="air_temperature"),
wavelength=dict(
units="nm", long_name="wavelength", standard_name="radiation_wavelength"
),
re_ice=dict(units="mum", long_name="input ice effective radius"),
iwc=dict(units="g m^-3", long_name="input ice water content"),
)
# set up metadata dictionaries for solar (shortwave) flux
solar_variables = dict(
albedo=dict(
units="1", long_name="surface albedo", standard_name="surface_albedo"
),
altitude=dict(
units="m", long_name="height above mean sea level", standard_name="altitude"
),
direct_fraction=dict(
units="1",
long_name="direct fraction of downward irradiance",
comment=wavelength_str,
),
edir=dict(
units="W m-2",
long_name=f"{integrate_str}direct beam irradiance",
standard_name="direct_downwelling_shortwave_flux_in_air",
comment=wavelength_str,
),
edn=dict(
units="W m-2",
long_name=f"{integrate_str}diffuse downward irradiance",
standard_name="diffuse_downwelling_shortwave_flux_in_air_assuming_clear_sky",
comment=wavelength_str,
),
eup=dict(
units="W m-2",
long_name=f"{integrate_str}diffuse upward irradiance",
standard_name="surface_upwelling_shortwave_flux_in_air_assuming_clear_sky",
comment=wavelength_str,
),
eglo=dict(
units="W m-2",
long_name=f"{integrate_str}global solar downward irradiance",
standard_name="solar_irradiance",
comment=wavelength_str,
),
enet=dict(
units="W m-2",
long_name=f"{integrate_str}net irradiance",
comment=wavelength_str,
),
heat=dict(units="K day-1", long_name="heating rate"),
)
# set up metadata dictionaries for terrestrial (longwave) flux
terrrestrial_variables = dict(
albedo=dict(
units="1", long_name="surface albedo", standard_name="surface_albedo"
),
altitude=dict(
units="m", long_name="height above mean sea level", standard_name="altitude"
),
direct_fraction=dict(
units="1",
long_name="direct fraction of downward irradiance",
comment=wavelength_str,
),
edir=dict(
units="W m-2",
long_name=f"{integrate_str}direct beam irradiance",
standard_name="direct_downwelling_longwave_flux_in_air",
comment=wavelength_str,
),
edn=dict(
units="W m-2",
long_name=f"{integrate_str}downward irradiance",
standard_name="downwelling_longwave_flux_in_air_assuming_clear_sky",
comment=wavelength_str,
),
eup=dict(
units="W m-2",
long_name=f"{integrate_str}upward irradiance",
standard_name="surface_upwelling_longwave_flux_in_air_assuming_clear_sky",
comment=wavelength_str,
),
)
if solar_flag:
attributes_dict = {**general_variables, **solar_variables}
else:
attributes_dict = {**general_variables, **terrrestrial_variables}
return attributes_dict
[docs]
def find_closest_radiosonde_station(latitude: float, longitude: float):
"""
Given longitude and latitude, find the closest radiosonde station from the campaign dictionary
Args:
latitude: in decimal degrees (N=positive)
longitude: in decimal degrees (E=positive)
Returns: Name of the closest radiosonde station
"""
distances = dict()
station_names = [
station[:-6] for station in radiosonde_stations
] # read out station names from list
# loop through stations and save distance in km
for station_name in station_names:
lon_lat = coordinates[station_name]
distances[station_name] = geodesic(
(lon_lat[1], lon_lat[0]), (latitude, longitude)
).km
min_distance = min(distances.values()) # get minimum distance
closest_station = [s for s in station_names if distances[s] == min_distance][0]
closest_station = [s for s in radiosonde_stations if closest_station in s][
0
] # get complete station name
log.info(
f"Closest Radiosonde station {closest_station} is {min_distance:.1f} km away."
)
return closest_station
[docs]
def run_uvspec_parallel(input_files: list, uvspec_exe: str):
"""
Run libRadtran simulations for all input files in parallel and check if an output has been created.
Rerun the failed simulations a couple of times before quitting.
Args:
input_files: list with full paths to each input file
uvspec_exe: path to uvspec executable
Returns: Writes output and log files in directory of the input files
Raises: UsesWarning if a simulation fails more than 10 times
"""
# create output and log file names
output_files = [f.replace(".inp", ".out") for f in input_files]
error_logs = [f.replace(".inp", ".log") for f in input_files]
# call uvspec for all files
processes = set()
max_processes = cpu_count() - 4
tqdm_desc = f"libRadtran simulations"
for infile, outfile, log_file in zip(
tqdm(input_files, desc=tqdm_desc), output_files, error_logs
):
with open(infile, "r") as ifile, open(outfile, "w") as ofile, open(
log_file, "w"
) as lfile:
processes.add(Popen([uvspec_exe], stdin=ifile, stdout=ofile, stderr=lfile))
if len(processes) >= max_processes:
os.wait()
processes.difference_update([p for p in processes if p.poll() is not None])
# wait for all simulations to finish
while len(processes) > 0:
os.wait()
# this will remove elements of the set which are also in the list.
# the list has only terminated processes in it,
# p.poll returns a non None value if the process is still running
processes.difference_update([p for p in processes if p.poll() is not None])
# check if all simulations created an output and rerun them if not
file_check = sum([os.path.getsize(file) == 0 for file in output_files])
# if file size is 0 -> file is empty
counter = 0 # add a counter to terminate loop if necessary
while file_check > 0:
files_to_rerun = [
f for f in input_files if os.path.getsize(f.replace(".inp", ".out")) == 0
]
# rerun simulations
for infile in tqdm(files_to_rerun, desc="redo libRadtran simulations"):
with open(infile, "r") as ifile, open(
infile.replace(".inp", ".out"), "w"
) as ofile, open(infile.replace(".inp", ".log"), "w") as lfile:
processes.add(
Popen([uvspec_exe], stdin=ifile, stdout=ofile, stderr=lfile)
)
if len(processes) >= max_processes:
os.wait()
processes.difference_update(
[p for p in processes if p.poll() is not None]
)
# wait for all simulations to finish
while len(processes) > 0:
# this will remove elements of the set which are also in the list.
# the list has only terminated processes in it,
# p.poll returns a non None value if the process is still running
processes.difference_update([p for p in processes if p.poll() is not None])
# update file_check
file_check = sum([os.path.getsize(file) == 0 for file in output_files])
counter += 1
if counter > 10:
raise UserWarning(
f"Simulation of {files_to_rerun} does not compute!\nCheck for other errors!"
)
return None