#!/usr/bin/env python
"""These are functions in relation with the SMART instrument.
"_" denotes internal functions which are called inside other functions.
The functions are explained in their docstring and can be tested using the main frame.
You can:
The reader functions were moved to :py:mod:`pylim.reader`.
* find the closest pixel and wavelength to any given wavelength for the given wavelength calibration file
* get information (date, measured property, channel) from the filename
* get the dark current for a specified measurement file with either option 1 or 2 and optionally plot it
* correct the raw measurement by the dark current
* plot the mean corrected measurement
* plot smart data either for one wavelength over time or for a range of or all wavelengths
* use the holoviews functions to create a dynamic map for interactive quicklooks in a jupyter notebook
*author*: Johannes Röttenbacher
"""
import logging
import os
import re
from typing import Tuple, Union
import holoviews as hv
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
from holoviews import opts
from pylim import helpers as h
from pylim import reader
from pylim.cirrus_hl import smart_lookup as meta_chl
from pylim.halo_ac3 import smart_lookup as meta_hac
hv.extension('bokeh')
log = logging.getLogger(__name__)
meta = {"cirrus-hl": meta_chl, "halo-ac3": meta_hac} # setup up metadata dictionary for both campaigns
[docs]def get_info_from_filename(filename: str) -> Tuple[str, str, str]:
"""
Using regular expressions some information from the filename is extracted.
Args:
filename: string in the form yyyy_mm_dd_hh_MM.[F/I][up/dw]_[SWIR/VNIR].dat (eg. "2021_03_29_11_07.Fup_SWIR.dat")
Returns: A date string, the channel and the direction of measurement including the quantity.
"""
# find date string and channel from file
match = re.search(r"^(?P<date>\d{4}_\d{2}_\d{2}).*.(?P<direction>[FI][a-z]{2})_(?P<channel>[A-Z]{4})", filename)
try:
date_str = match.group('date')
channel = match.group('channel')
direction = match.group('direction')
except AttributeError:
log.info("No date, channel or direction information was found! Check filename!")
raise
return date_str, channel, direction
[docs]def find_pixel(df: pd.DataFrame, wavelength: float()) -> Tuple[int, float]:
"""
Given the dataframe with the pixel to wavelength mapping, return the pixel and wavelength closest to the requested
wavelength.
Args:
df: Dataframe with column pixel and wavelength (from read_pixel_to_wavelength)
wavelength: which wavelength are you interested in
Returns: closest pixel number and wavelength corresponding to the given wavelength
"""
# find smallest and greatest wavelength in data frame and check if the given wavelength is in it
min_wl = df["wavelength"].min()
max_wl = df["wavelength"].max()
assert max_wl >= wavelength >= min_wl, "Given wavelength not in data frame!"
idx = h.arg_nearest(df["wavelength"], wavelength)
pixel_nr, wavelength = df["pixel"].iloc[idx], df["wavelength"].iloc[idx]
return pixel_nr, wavelength
[docs]def merge_vnir_swir_nc(vnir: xr.Dataset, swir: xr.Dataset) -> xr.Dataset:
"""
Merge the SMART nc files generated by smart_write_ncfile.py
Args:
vnir: VNIR data set
swir: SWIR data set
Returns: merged data set
"""
# rename variables to merge them
vnir = vnir.rename(dict(Fdw_VNIR="Fdw"))
swir = swir.rename(dict(Fdw_SWIR="Fdw"))
# merge datasets
all = xr.merge([swir, vnir])
all["Fdw_bb"] = all["Fdw_VNIR_bb"] + all["Fdw_SWIR_bb"]
return all
def _plot_dark_current(wavelengths: Union[pd.Series, list], dark_current: Union[pd.Series, list], filename: str,
**kwargs):
"""
Plot the dark current over the wavelengths from the specified spectrometer and channel.
Args:
wavelengths: series or list of wavelengths corresponding with the pixel numbers from read_pixel_to_wavelength
dark_current: series or list with mean dark current for each pixel
filename: name of file
**kwargs: save_fig: whether to save the figure in the current directory (True) or just show it (False, default)
Returns:
"""
save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False
plt.plot(wavelengths, dark_current, color='k')
plt.axhline(dark_current.mean(), color="orange", label="Mean")
plt.grid()
plt.title(f"Dark Current for {filename}")
plt.xlabel("Wavelength (nm)")
plt.ylabel("Netto Counts")
plt.legend()
if save_fig:
plt.savefig(f"{filename}_dark_current.png")
else:
plt.show()
plt.close()
[docs]def get_dark_current(flight: str, filename: str, option: int, **kwargs) -> Union[pd.Series, plt.figure]:
"""
Get the corresponding dark current for the specified measurement file to correct the raw SMART measurement.
Args:
flight: to which flight does the file belong to? (e.g. Flight_20210707a)
filename: filename (e.g. "2021_03_29_11_07.Fup_SWIR.dat") of measurement file
option: which option to use for VNIR (1, 2 or 3).
Option 1: use measurements below 290 nm.
Option 2: use dark measurements from transfer calibration (CIRRUS-HL folder structure expected).
Option 3: use scaled dark current measurement from transfer calibration (HALO-AC3 folder structure expected).
**kwargs:
path (str): path to measurement file if not standard path from config.toml,
plot (bool): show plot or not (default: True),
date (str): yyyymmdd, date of transfer calibration with dark current measurement to use
dark_filepath (str): complete path to dark current file to use
campaign (str): campaign to which smart file belongs to
Returns: pandas Series with the mean dark current measurements over time for each pixel and optionally a plot of it
"""
plot = kwargs["plot"] if "plot" in kwargs else False
date = kwargs["date"] if "date" in kwargs else None
dark_filepath = kwargs["dark_filepath"] if "dark_filepath" in kwargs else None
campaign = kwargs["campaign"] if "campaign" in kwargs else "halo-ac3"
path = h.get_path("raw", flight, campaign=campaign)
path = kwargs["path"] if "path" in kwargs else path
pixel_wl_path = h.get_path("pixel_wl")
calib_path = h.get_path("calib", campaign=campaign)
measurement = reader.read_smart_raw(path, filename)
t_int = int(measurement["t_int"][0]) # get integration time
date_str, channel, direction = get_info_from_filename(filename)
spectrometer = meta[campaign][f"{direction}_{channel}"]
pixel_wl = reader.read_pixel_to_wavelength(pixel_wl_path, spectrometer)
# calculate dark current depending on channel:
# SWIR: use measurements during shutter phases
# VNIR: Option 1: use measurements below 290 nm
# Option 2: use dark measurements from transfer calibration (CIRRUS-HL folder structure expected)
# Option 3: use scaled dark current measurement from transfer calibration (HALO-AC3 folder structure expected)
if channel == "VNIR":
if option == 1:
last_dark_pixel, _ = find_pixel(pixel_wl, 290)
dark_pixels = np.arange(1, last_dark_pixel + 1)
# one could apply a column mean (axis=1) and subtract the mean dark current from every timestep but for
# minutely files this shouldn't be of much importance
# TODO: Test difference between column mean and mean over rows and columns
dark_current = measurement.loc[:, dark_pixels].mean()
wls = pixel_wl[pixel_wl["pixel"].isin(dark_pixels)]["wavelength"]
elif option == 2:
wls = pixel_wl["wavelength"]
# read in cali file
# get path depending on spectrometer and inlet
instrument = re.search(r'ASP\d{2}', spectrometer)[0]
inlet = re.search(r'J\d{1}', spectrometer)[0]
# find right folder and right cali
for dirpath, dirs, files in os.walk(calib_path):
if re.search(f".*{instrument}.*dark.*", dirpath) is not None:
d_path, d = os.path.split(dirpath)
# check if the date of the calibration matches the date of the file
date_check = True if date_str.replace("_", "") in d_path else False
# overwrite date check if date is given
if date is not None:
date_check = True if date in d_path else False
# check for the right integration time in folder name
t_int_check = str(t_int) in d
# ASP06 has 2 SWIR and 2 VNIR inlets thus search for the folder for the given inlet
if instrument == "ASP06" and inlet[1] in d:
run = True
# ASP07 has only one SWIR and VNIR inlet -> no need to search
else:
run = True
if run and date_check and t_int_check:
i = 0
for file in files:
if re.search(f'.*.{direction}_{channel}.dat', file) is not None:
dark_dir, dark_file = dirpath, file
log.info(f"Calibration file used:\n{os.path.join(dark_dir, dark_file)}")
assert i == 0, f"More than one possible file was found!\n Check {dirpath}!"
i += 1
try:
dark_current = reader.read_smart_raw(dark_dir, dark_file)
dark_current = dark_current.iloc[:, 2:].mean()
except UnboundLocalError as e:
raise RuntimeError("No dark current file found for measurement!") from e
else:
assert option == 3, "Option should be either 1, 2 or 3!"
wls = pixel_wl["wavelength"]
# check if an explicit dark current file is provided,
# if not assume that a transfer calibration is provided -> this is the campaign mode
if dark_filepath is None:
# find dark current transfer calibration
if date is None:
raise NameError(f"If no 'dark_filepath' is provided 'date' for transfer calibration needs to be "
f"given!")
else:
transfer_calib_dir = os.path.join(calib_path, f"{spectrometer[:-3]}_transfer_calib_{date}")
# in the transfer calib directory find the dark current measurement folder with the correct
# integration time
dark_dir = [d for d in os.listdir(transfer_calib_dir) if "dark" in d and str(t_int) in d][0]
dark_dir = os.path.join(transfer_calib_dir, dark_dir)
dark_files = [f for f in os.listdir(dark_dir) if f"{direction}_{channel}.dat" in f]
# the normal workflow is to merge all VNIR dark current files and delete all single files, thus only
# one dark current VNIR file should be left in each dark current folder
if len(dark_files) > 1:
raise ValueError(f"Too many dark current files found for VNIR in {dark_dir}! "
f"\nMerge VNIR files first and delete single files!")
else:
dark_file = dark_files[0]
else:
# get dark_dir and dark_file from given dark_filepath
dark_dir, dark_file = os.path.dirname(dark_filepath), os.path.basename(dark_filepath)
# read in dark current measurement, drop t_int and shutter column and take mean over time
dark_current = reader.read_smart_raw(dark_dir, dark_file).iloc[:, 2:].mean()
# scale dark current to the measured dark pixels
dark_scale = dark_current * np.mean(measurement.mean().iloc[19:99]) / np.mean(dark_current.iloc[19:99])
# get fluctuations over the rolling mean of the scaled dark current
dark_scale = dark_scale - dark_scale.rolling(20, min_periods=1).mean()
# smooth dark current and add offset found in the first dark pixels
dark_scale2 = dark_current.rolling(20, min_periods=1).mean() + (
np.mean(measurement.mean().iloc[19:99]) - np.mean((dark_current.iloc[19:99])))
dark_current = dark_scale2 + dark_scale # add both terms together
elif channel == "SWIR":
# check if the shutter flag was working: If all values are 1 -> shutter flag is probably not working
# even if the flag is working, does not mean the shutter is working
if np.sum(measurement.shutter == 1) != measurement.shutter.shape[0]:
dark_current = measurement.where(measurement.shutter == 0).iloc[:, 2:].mean()
wls = pixel_wl["wavelength"]
else:
raise ValueError(f"Shutter flag is probably wrong in {path}/{filename}!")
else:
raise ValueError(f"'channel' should be either 'VNIR' or 'SWIR' but is {channel}!")
if plot:
_plot_dark_current(wls, dark_current, filename, **kwargs)
return dark_current
[docs]def plot_mean_corrected_measurement(campaign: str, flight: str, filename: str, measurement: Union[pd.Series, list],
measurement_cor: Union[pd.Series, list], option: int, **kwargs):
"""
Plot the mean dark current corrected SMART measurement over time together with the raw measurement and the dark
current.
Args:
campaign: campaign name (cirrus-hl or halo-ac3)
flight: to which flight does the file belong to? (e.g. Flight_20210707a)
filename: name of file
measurement: raw SMART measurements for each pixel averaged over time
measurement_cor: corrected SMART measurements for each pixel averaged over time
option: which option was used for VNIR correction
**kwargs: save_fig (bool): save figure to current directory or just show it
Returns: plot
"""
save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False
pixel_path = h.get_path("pixel_wl")
date_str, channel, direction = get_info_from_filename(filename)
path = h.get_path("raw", flight)
spectrometer = meta[campaign][f"{direction}_{channel}"]
dark_current = get_dark_current(path, filename, option, plot=False)
wavelength = reader.read_pixel_to_wavelength(pixel_path, spectrometer)["wavelength"]
if channel == "VNIR" and option == 1:
plt.axhline(dark_current, label="Dark Current", color='k')
else:
plt.plot(wavelength, dark_current, label="Dark Current", color='k')
plt.plot(wavelength, measurement, label="Measurement")
plt.plot(wavelength, measurement_cor, label="Corrected Measurement")
plt.grid()
plt.title(f"Mean Corrected SMART Measurement {date_str.replace('_', '-')}\n"
f"Spectrometer {spectrometer}, Channel: {channel}, Option: {option}")
plt.xlabel("Wavelength (nm)")
plt.ylabel("Netto Counts")
plt.legend()
if save_fig:
plt.savefig(f"{date_str}_corrected_smart_measurement.png")
else:
plt.show()
plt.close()
[docs]def correct_smart_dark_current(flight: str, smart_file: str, option: int, **kwargs) -> pd.Series:
"""
Correct the raw SMART measurement for the dark current of the spectrometer.
Only returns data when the shutter was open.
Args:
flight: to which flight does the file belong to? (e.g. Flight_20210707a)
smart_file: filename of file to correct
option: which option should be used to get the dark current? Only relevant for channel "VNIR".
kwargs: path (str): path to file if not raw file path as given in config.toml,
date (str): (yyyymmdd) date from which the dark current measurement should be used for VNIR (necessary if \
no transfer calibration was made on a measurement day)
campaign (str): campaign to which smart file belongs to
Returns: Series with corrected smart measurement
"""
campaign = kwargs["campaign"] if "campaign" in kwargs else "halo-ac3"
path = h.get_path("raw", flight, campaign=campaign)
path = kwargs.pop("path") if "path" in kwargs else path
date_str, channel, direction = get_info_from_filename(smart_file)
smart = reader.read_smart_raw(path, smart_file)
dark_current = get_dark_current(flight, smart_file, option, path=path, **kwargs)
if channel == "VNIR" and option == 1:
dark_current = dark_current.mean() # If get_dark_current returns a column mean, this can to be removed
measurement = smart.where(smart.shutter == 1).iloc[:, 2:] # only use data when shutter is open
measurement_cor = measurement - dark_current
# drop nan values -> dark current measurements at beginning of file
measurement_cor.dropna(inplace=True)
return measurement_cor
[docs]def plot_smart_data(campaign: str, flight: str, filename: str, wavelength: Union[list, str], **kwargs) -> plt.axes:
"""
Plot SMART data in the given file. Either a time average over a range of wavelengths or all wavelengths,
or a time series of one wavelength. Return an axes object to continue plotting or show it.
TODO: add option to plot multiple files
Args:
campaign: campaign name (cirrus-hl or halo-ac3)
flight: to which flight does the file belong to? (e.g. Flight_20210707a)
filename: Standard SMART filename
wavelength: list with either one or two wavelengths in nm or 'all'
**kwargs: path (str): give path to filename if not default from config.toml,
save_fig (bool): save figure? (default: False),
plot_path (str): where to save figure (default: given in config.toml)
ax (plt.axis): axes to already existing matplotlib axis
Returns: Shows a figure or saves it to disc.
"""
raw_path = h.get_path("raw", flight, campaign)
pixel_wl_path = h.get_path("pixel_wl", campaign)
data_path = h.get_path("data", flight, campaign)
calibrated_path = h.get_path("calibrated", flight, campaign)
plot_path = h.get_path("plot", campaign=campaign)
# read in keyword arguments
raw_path = kwargs["path"] if "path" in kwargs else raw_path
data_path = kwargs["path"] if "path" in kwargs else data_path
calibrated_path = kwargs["path"] if "path" in kwargs else calibrated_path
save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False
plot_path = kwargs["plot_path"] if "plot_path" in kwargs else plot_path
ax = kwargs["ax"] if "ax" in kwargs else None
fig = plt.figure()
if ax is None:
ax = fig.add_subplot(111)
date_str, channel, direction = get_info_from_filename(filename)
if "calibrated" in filename:
smart = reader.read_smart_cor(calibrated_path, filename)
title = "\nCorrected for Dark Current and Calibrated"
ylabel = "Irradiance (W$\\,$m$^{-2}\\,$nm$^{-1}$)" if "F" in filename else "Radiance (W$\\,$sr$^{-1}\\,$m$^{-2}\\,$nm$^{-1}$)"
elif "cor" in filename:
smart = reader.read_smart_cor(data_path, filename)
title = "Corrected for Dark Current"
ylabel = "Netto Counts"
else:
smart = reader.read_smart_raw(raw_path, filename)
smart = smart.iloc[:, 2:] # remove columns t_int and shutter
title = "Raw"
ylabel = "Netto Counts"
pixel_wl = reader.read_pixel_to_wavelength(pixel_wl_path, meta[campaign][f"{direction}_{channel}"])
if len(wavelength) == 2:
pixel_nr = []
wl_str = ""
for wl in wavelength:
pxl, wavel = find_pixel(pixel_wl, wl)
pixel_nr.append(pxl)
wl_str = f"{wl_str}_{wavel:.1f}"
pixel_nr.sort() # make sure wavelengths are in ascending order
smart_sel = smart.loc[:, pixel_nr[0]:pixel_nr[1]] # select range of wavelengths
begin_dt, end_dt = smart_sel.index[0], smart_sel.index[-1] # read out start and end time
smart_mean = smart_sel.mean(axis=0).to_frame() # calculate mean over time and return a dataframe
smart_mean = smart_mean.set_index(pd.to_numeric(smart_mean.index)) # update the index to be numeric
# join the measurement and pixel to wavelength data frames by pixel
smart_plot = smart_mean.join(pixel_wl.set_index(pixel_wl["pixel"]))
smart_plot.plot(x="wavelength", y=0, legend=False, xlabel="Wavelength (nm)", ylabel=ylabel, ax=ax,
title=f"Time Averaged SMART Measurement {title} {direction} {channel}\n {begin_dt} - {end_dt}")
figname = filename.replace('.dat', f'{wl_str}.png')
elif len(wavelength) == 1:
pixel_nr, wl = find_pixel(pixel_wl, wavelength[0])
smart_sel = smart.loc[:, pixel_nr].to_frame()
begin_dt, end_dt = smart_sel.index[0], smart_sel.index[-1]
time_extend = end_dt - begin_dt
smart_sel.plot(ax=ax, legend=False, xlabel="Time (UTC)", ylabel=ylabel,
title=f"SMART Time Series {title} {direction} {channel}\n{wl:.3f} nm {begin_dt:%Y-%m-%d}")
h.set_xticks_and_xlabels(ax, time_extend)
figname = filename.replace('.dat', f'_{wl:.1f}nm.png')
elif wavelength == "all":
begin_dt, end_dt = smart.index[0], smart.index[-1]
smart_mean = smart.mean().to_frame()
smart_mean = smart_mean.set_index(pd.to_numeric(smart_mean.index))
smart_plot = smart_mean.join(pixel_wl.set_index(pixel_wl["pixel"]))
smart_plot.plot(x="wavelength", y=0, legend=False, xlabel="Wavelength (nm)", ylabel=ylabel, ax=ax,
title=f"Time Averaged SMART Measurement {title} {direction} {channel}\n "
f"{begin_dt:%Y-%m-%d %H:%M:%S} - {end_dt:%Y-%m-%d %H:%M:%S}")
figname = filename.replace('.dat', f'_{wavelength}.png')
else:
raise ValueError("wavelength has to be a list of length 1 or 2 or 'all'!")
ax.grid()
plt.tight_layout()
if save_fig:
plt.savefig(f"{plot_path}/{figname}", dpi=100)
log.info(f"Saved {plot_path}/{figname}")
plt.close()
else:
return ax
[docs]def plot_smart_spectra(path: str, campaign: str, filename: str, index: int, **kwargs) -> None:
"""
Plot a spectra from a SMART calibrated measurement file for a given index (time step)
Args:
path: where the file can be found
campaign: campaign name (cirrus-hl or halo-ac3)
filename: name of the file (standard SMART filename convention)
index: which row to plot
**kwargs: save_fig (bool): Save figure to plot path given in config.toml (default: False),
plot_path (str): Where to save plot if not standard plot path
Returns: Shows and or saves a plot
"""
save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False
plot_path = kwargs["plot_path"] if "plot_path" in kwargs else h.get_path("plot", campaign=campaign)
pixel_path = h.get_path("pixel_wl", campaign)
df = reader.read_smart_cor(path, filename)
date_str, channel, direction = get_info_from_filename(filename)
spectrometer = meta[campaign][f"{direction}_{channel}"]
pixel_wl = reader.read_pixel_to_wavelength(pixel_path, spectrometer)
max_id = len(df) - 1
try:
df_sel = df.iloc[index, :]
except IndexError as e:
log.info(f"{e}\nGiven index '{index}' out-of-bounds! Using maximum index '{max_id}'!")
df_sel = df.iloc[max_id, :]
time_stamp = df_sel.name # get time stamp which is selected
pixel_wl[f"{direction}"] = df_sel.reset_index(drop=True)
ylabel = "Irradiance (W$\\,$m$^{-2}\\,$nm$^{-1}$)" if "F" in filename else "Radiance (W$\\,$sr$^{-1}\\,$m$^{-2}\\,$nm$^{-1}$)"
fig, ax = plt.subplots()
ax.plot("wavelength", f"{direction}", data=pixel_wl, label=f"{direction}")
ax.set_title(f"SMART Spectra {spectrometer} {direction} {channel} \n {time_stamp:%Y-%m-%d %H:%M:%S}")
ax.set_xlabel("Wavelength (nm)")
ax.set_ylabel(ylabel)
ax.grid()
plt.show()
if save_fig:
figname = filename.replace(".dat", f"_spectra_{time_stamp:%Y%m%d_%H%M%S}.png")
figpath = f"{plot_path}/{figname}"
plt.savefig(figpath, dpi=100)
log.info(f"Saved {figpath}")
plt.close()
[docs]def plot_complete_smart_spectra(path: str, campaign: str, filename: str, index: int, **kwargs) -> None:
"""
Plot the complete spectra given by both channels from SMART calibrated measurement files for a given index (time step)
Args:
path: where the file can be found
campaign: campaign name (cirrus-hl or halo-ac3)
filename: name of the file (standard SMART filename convention)
index: which row to plot
**kwargs: save_fig (bool): Save figure to plot path given in config.toml (default: False)
plot_path (str): Where to save plot if not standard plot path
Returns: Shows and or saves a plot
"""
save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False
plot_path = kwargs["plot_path"] if "plot_path" in kwargs else h.get_path("plot", campaign=campaign)
pixel_path = h.get_path("pixel_wl", campaign)
df1 = reader.read_smart_cor(path, filename)
date_str, channel, direction = get_info_from_filename(filename)
if channel == "SWIR":
channel2 = "VNIR"
else:
channel2 = "SWIR"
filename2 = filename.replace(channel, channel2)
df2 = reader.read_smart_cor(path, filename2)
spectrometer1 = meta[campaign][f"{direction}_{channel}"]
spectrometer2 = meta[campaign][f"{direction}_{channel2}"]
pixel_wl1 = reader.read_pixel_to_wavelength(pixel_path, spectrometer1)
pixel_wl2 = reader.read_pixel_to_wavelength(pixel_path, spectrometer2)
# merge pixel dfs and sort by wavelength
# pixel_wl = pixel_wl1.append(pixel_wl2, ignore_index=True).sort_values(by="wavelength", ignore_index=True)
max_id1 = len(df1) - 1
max_id2 = len(df2) - 1
try:
df_sel1 = df1.iloc[index, :]
df_sel2 = df2.iloc[index, :]
except IndexError as e:
log.info(f"{e}\nGiven index '{index}' out-of-bounds! Using maximum index '{max_id1}'!")
df_sel1 = df1.iloc[max_id1, :]
df_sel2 = df2.iloc[max_id2, :]
time_stamp = df_sel1.name # get time stamp which is selected
assert time_stamp == df_sel2.name, "Time stamps from VNIR and SWIR are not identical!"
pixel_wl1[f"{direction}"] = df_sel1.reset_index(drop=True)
pixel_wl2[f"{direction}"] = df_sel2.reset_index(drop=True)
ylabel = "Irradiance (W$\\,$m$^{-2}\\,$nm$^{-1}$)" if "F" in filename else "Radiance (W$\\,$sr$^{-1}\\,$m$^{-2}\\,$nm$^{-1}$)"
fig, ax = plt.subplots()
ax.plot(pixel_wl1["wavelength"], pixel_wl1[f"{direction}"], label=f"{channel}")
ax.plot(pixel_wl2["wavelength"], pixel_wl2[f"{direction}"], label=f"{channel2}")
# ax.plot("wavelength", f"{direction}", data=pixel_wl1, label=f"{channel}")
# ax.plot("wavelength", f"{direction}", data=pixel_wl2, label=f"{channel2}")
ax.set_title(f"SMART Spectra {direction} {channel}/{channel2} \n {time_stamp:%Y-%m-%d %H:%M:%S}")
ax.set_xlabel("Wavelength (nm)")
ax.set_ylabel(ylabel)
ax.grid()
ax.legend()
if save_fig:
figname = filename.replace(".dat", f"_spectra_{time_stamp:%Y%m%d_%H%M%S}.png")
figname = figname.replace(channel, "VNIR+SWIR")
figpath = f"{plot_path}/{figname}"
plt.savefig(figpath, dpi=100)
log.info(f"Saved {figpath}")
plt.show()
plt.close()
[docs]def plot_complete_smart_spectra_interactive(path: str, campaign:str, filename: str, index: int) -> hv.Overlay:
"""
Plot the complete spectra given by both channels from SMART calibrated measurement files for a given index (time step)
Args:
path: where the file can be found
campaign: campaign name (cirrus-hl or halo-ac3)
filename: name of the file (standard SMART filename convention)
index: which row to plot
Returns: Shows and or saves a plot
"""
pixel_path = h.get_path("pixel_wl", campaign)
df1 = reader.read_smart_cor(path, filename)
date_str, channel, direction = get_info_from_filename(filename)
if channel == "SWIR":
channel2 = "VNIR"
else:
channel2 = "SWIR"
filename2 = filename.replace(channel, channel2)
df2 = reader.read_smart_cor(path, filename2)
spectrometer1 = meta[campaign][f"{direction}_{channel}"]
spectrometer2 = meta[campaign][f"{direction}_{channel2}"]
pixel_wl1 = reader.read_pixel_to_wavelength(pixel_path, spectrometer1)
pixel_wl2 = reader.read_pixel_to_wavelength(pixel_path, spectrometer2)
# merge pixel dfs and sort by wavelength
# pixel_wl = pixel_wl1.append(pixel_wl2, ignore_index=True).sort_values(by="wavelength", ignore_index=True)
max_id1 = len(df1) - 1
max_id2 = len(df2) - 1
try:
df_sel1 = df1.iloc[index, :]
df_sel2 = df2.iloc[index, :]
except IndexError as e:
log.info(f"{e}\nGiven index '{index}' out-of-bounds! Using maximum index '{max_id1}'!")
df_sel1 = df1.iloc[max_id1, :]
df_sel2 = df2.iloc[max_id2, :]
time_stamp = df_sel1.name # get time stamp which is selected
assert time_stamp == df_sel2.name, "Time stamps from VNIR and SWIR are not identical!"
pixel_wl1[f"{direction}"] = df_sel1.reset_index(drop=True)
pixel_wl2[f"{direction}"] = df_sel2.reset_index(drop=True)
ylabel = "Irradiance (W m^-2 nm^-1)" if "F" in filename else "Radiance (W sr^-1 m^-2 nm^-1)"
curve1 = hv.Curve(pixel_wl1, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[(f"{direction}", ylabel)], label=f"{channel}")
curve2 = hv.Curve(pixel_wl2, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[(f"{direction}", ylabel)], label=f"{channel2}")
overlay = curve1 * curve2
overlay.opts(
opts.Curve(height=500, width=900, fontsize=12))
overlay.opts(title=f"SMART Spectra {direction} {channel}/{channel2} {time_stamp:%Y-%m-%d %H:%M:%S.%f}",
show_grid=True)
return overlay
[docs]def plot_smart_data_interactive(campaign: str, flight: str, filename: str, wavelength: Union[list, str]) -> hv.Curve:
"""
Plot SMART data in the given file. Either a time average over a range of wavelengths or all wavelengths,
or a time series of one wavelength.
Args:
campaign: campaign name (cirrus-hl or halo-ac3)
flight: to which flight does the file belong to? (e.g. Flight_20210707a)
filename: Standard SMART filename
wavelength: list with either one or two wavelengths in nm or 'all'
Returns: Creates an interactive figure
"""
raw_path = h.get_path("raw", flight, campaign)
pixel_wl_path = h.get_path("pixel_wl", campaign)
data_path = h.get_path("data", flight, campaign)
calibrated_path = h.get_path("calibrated", flight, campaign)
date_str, channel, direction = get_info_from_filename(filename)
# make sure wavelength is a list
if type(wavelength) != list and wavelength != "all":
wavelength = [wavelength]
if "calibrated" in filename:
df = reader.read_smart_cor(calibrated_path, filename)
title = "\nCorrected for Dark Current and Calibrated"
ylabel = "Irradiance (W m^-2 nm^-1)" if "F" in filename else "Radiance (W sr^-1 m^-2 nm^-1)"
elif "cor" in filename:
df = reader.read_smart_cor(data_path, filename)
title = "Corrected for Dark Current"
ylabel = "Netto Counts"
else:
df = reader.read_smart_raw(raw_path, filename)
df = df.iloc[:, 2:] # remove columns t_int and shutter
title = "Raw"
ylabel = "Netto Counts"
pixel_wl = reader.read_pixel_to_wavelength(pixel_wl_path, meta[campaign][f"{direction}_{channel}"])
if len(wavelength) == 2:
pixel_nr = []
for wl in wavelength:
pxl, wavel = find_pixel(pixel_wl, wl)
pixel_nr.append(pxl)
pixel_nr.sort() # make sure wavelengths are in ascending order
smart_sel = df.loc[:, pixel_nr[0]:pixel_nr[1]] # select range of wavelengths
begin_dt, end_dt = smart_sel.index[0], smart_sel.index[-1] # read out start and end time
smart_mean = smart_sel.mean(axis=0).to_frame() # calculate mean over time and return a dataframe
smart_mean = smart_mean.set_index(pd.to_numeric(smart_mean.index)) # update the index to be numeric
# join the measurement and pixel to wavelength data frames by pixel
smart_plot = smart_mean.join(pixel_wl.set_index(pixel_wl["pixel"]))
smart_plot.columns = ["value", "pixel", "wavelength"]
curve = hv.Curve(smart_plot, kdims=[("wavelength", "Wavelength (nm)")], vdims=[("value", ylabel)])
title=f"Time Averaged SMART Measurement {title} {direction} {channel}\n {begin_dt} - {end_dt}"
elif len(wavelength) == 1:
pixel_nr, wl = find_pixel(pixel_wl, wavelength[0])
smart_sel = df.loc[:, pixel_nr].to_frame()
begin_dt, end_dt = smart_sel.index[0], smart_sel.index[-1]
time_extend = end_dt - begin_dt
smart_sel.reset_index(inplace=True)
smart_sel.columns = ["time", "value"]
curve = hv.Curve(smart_sel, kdims=[("time", "Time (UTC)")], vdims=[("value", ylabel)])
title=f"SMART Time Series {title} {direction} {channel}\n{wl:.3f} nm {begin_dt:%Y-%m-%d}"
# ax = jr.set_xticks_and_xlabels(ax, time_extend)
elif wavelength == "all":
begin_dt, end_dt = df.index[0], df.index[-1]
smart_mean = df.mean().to_frame()
smart_mean = smart_mean.set_index(pd.to_numeric(smart_mean.index))
smart_plot = smart_mean.join(pixel_wl.set_index(pixel_wl["pixel"]))
smart_plot.columns = ["value", "pixel", "wavelength"]
curve = hv.Curve(smart_plot, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[("value", ylabel)])
title=f"Time Averaged SMART Measurement {title} {direction} {channel} {begin_dt:%Y-%m-%d %H:%M:%S} - {end_dt:%Y-%m-%d %H:%M:%S}"
else:
raise ValueError("wavelength has to be a list of length 1 or 2 or 'all'!")
curve.opts(
opts.Curve(height=500, width=900, fontsize=12))
curve.opts(title=title, show_grid=True)
return curve
[docs]def plot_calibrated_irradiance_flux(campaign: str, filename: str, wavelength: Union[int, list, str], flight: str
) -> hv.Overlay:
"""
Plot upward and downward irradiance as a time averaged series over the wavelength or as a time series for one
wavelength.
Args:
campaign: campaign name (cirrus-hl or halo-ac3)
filename: Standard SMART filename
wavelength: single or range of wavelength or "all"
flight: flight folder (flight_xx)
Returns: holoviews overlay plot with two curves
"""
# make sure wavelength is a list
if type(wavelength) != list and wavelength != "all":
wavelength = [wavelength]
# get paths and define input path
calibrated_path, pixel_path = h.get_path("calibrated", flight, campaign), h.get_path("pixel_wl", flight, campaign)
date_str, channel, direction = get_info_from_filename(filename)
direction2 = "Fdw" if direction == "Fup" else "Fup" # set opposite direction
filename2 = filename.replace(direction, direction2)
# read in both irradiance measurements
df1 = reader.read_smart_cor(calibrated_path, filename)
df2 = reader.read_smart_cor(calibrated_path, filename2)
# get spectrometers from smart_lookup dictionary
spectro1, spectro2 = meta[campaign][f"{direction}_{channel}"], meta[campaign][f"{direction2}_{channel}"]
pixel_wl1 = reader.read_pixel_to_wavelength(pixel_path, spectro1)
pixel_wl2 = reader.read_pixel_to_wavelength(pixel_path, spectro2)
title = "Corrected for Dark Current and Calibrated"
ylabel = "Irradiance (W m^-2 nm^-1)"
if len(wavelength) == 2:
pixel_nr1 = []
pixel_nr2 = []
for wl in wavelength:
pxl, _ = find_pixel(pixel_wl1, wl)
pixel_nr1.append(pxl)
pxl, _ = find_pixel(pixel_wl2, wl)
pixel_nr2.append(pxl)
pixel_nr1.sort() # make sure wavelengths are in ascending order
pixel_nr2.sort()
df1_sel = df1.loc[:, pixel_nr1[0]:pixel_nr1[1]] # select range of wavelengths
df1_sel.sort() # sort data frame
df2_sel = df2.loc[:, pixel_nr2[0]:pixel_nr2[1]] # select range of wavelengths
df2_sel.sort() # sort data frame
begin_dt, end_dt = df1_sel.index[0], df1_sel.index[-1] # read out start and end time
df1_mean = df1_sel.mean(axis=0).to_frame() # calculate mean over time and return a dataframe
df1_mean = df1_mean.set_index(pd.to_numeric(df1_mean.index)) # update the index to be numeric
df2_mean = df2_sel.mean(axis=0).to_frame() # calculate mean over time and return a dataframe
df2_mean = df2_mean.set_index(pd.to_numeric(df2_mean.index)) # update the index to be numeric
# join the measurement and pixel to wavelength data frames by pixel
df1_plot = df1_mean.join(pixel_wl.set_index(pixel_wl["pixel"]))
df1_plot.columns = ["value", "pixel", "wavelength"]
df2_plot = df2_mean.join(pixel_wl.set_index(pixel_wl["pixel"]))
df2_plot.columns = ["value", "pixel", "wavelength"]
curve1 = hv.Curve(df1_plot, kdims=[("wavelength", "Wavelength (nm)")], vdims=[("value", ylabel)],
label=direction)
curve2 = hv.Curve(df2_plot, kdims=[("wavelength", "Wavelength (nm)")], vdims=[("value", ylabel)],
label=direction2)
title = f"Time Averaged SMART Measurement {title} {channel}\n {begin_dt} - {end_dt}"
overlay = curve1 * curve2
elif len(wavelength) == 1:
pixel_nr1, wl1 = find_pixel(pixel_wl1, wavelength[0])
df1_sel = df1.loc[:, pixel_nr1].to_frame()
pixel_nr2, wl2 = find_pixel(pixel_wl2, wavelength[0])
df2_sel = df2.loc[:, pixel_nr2].to_frame()
begin_dt, end_dt = df1_sel.index[0], df1_sel.index[-1]
df1_sel.reset_index(inplace=True)
df1_sel.columns = ["time", "value"]
df2_sel.reset_index(inplace=True)
df2_sel.columns = ["time", "value"]
curve1 = hv.Curve(df1_sel, kdims=[("time", "Time (UTC)")], vdims=[("value", ylabel)], label=direction)
curve2 = hv.Curve(df2_sel, kdims=[("time", "Time (UTC)")], vdims=[("value", ylabel)], label=direction2)
title = f"SMART Time Series {title} {channel}\n{wl1:.3f} nm {begin_dt:%Y-%m-%d}"
overlay = curve1 * curve2
elif wavelength == "all":
begin_dt, end_dt = df1.index[0], df1.index[-1]
df1_mean = df1.mean().to_frame()
df1_mean = df1_mean.set_index(pd.to_numeric(df1_mean.index))
df1_plot = df1_mean.join(pixel_wl.set_index(pixel_wl["pixel"]))
df1_plot.columns = ["value", "pixel", "wavelength"]
df2_mean = df2.mean().to_frame()
df2_mean = df2_mean.set_index(pd.to_numeric(df2_mean.index))
df2_plot = df2_mean.join(pixel_wl.set_index(pixel_wl["pixel"]))
df2_plot.columns = ["value", "pixel", "wavelength"]
curve1 = hv.Curve(df1_plot, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[("value", ylabel)], label=direction)
curve2 = hv.Curve(df2_plot, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[("value", ylabel)], label=direction2)
title = f"Time Averaged SMART Irradiance Measurement {title} {channel} {begin_dt:%Y-%m-%d %H:%M:%S} - {end_dt:%Y-%m-%d %H:%M:%S}"
overlay = curve1 * curve2
else:
raise ValueError("wavelength has to be a list of length 1 or 2 or 'all'!")
overlay.opts(
opts.Curve(height=500, width=900, fontsize=12))
overlay.opts(title=title, show_grid=True)
return overlay
def _set_adjust(nr_bands: int) -> float:
"""Define bottom adjust according to number of bands displayed
Args:
nr_bands: number of bands shown in plot
Returns: bottom adjust
"""
if nr_bands < 4:
b_adjust = 0.3
elif nr_bands in range(3, 7):
b_adjust = 0.35
elif nr_bands in range(6, 10):
b_adjust = 0.37
elif nr_bands in range(9, 13):
b_adjust = 0.4
else:
b_adjust = 0.43
return b_adjust
[docs]def plot_smart_ecrad_bands(smart: xr.Dataset, bands: list, path: str = None, save_fig: bool = False):
"""Plot banded upward and downward irradiance for given ecRad bands
Args:
smart: SMART data set containing variables fdn_banded and fup_banded
bands: ecRad band numbers
path: where to save figure to
save_fig: whether to save figure
Returns: figure
"""
for dir, var in zip(["downward", "upward"], ["fdn_banded", "fup_banded"]):
fig, ax = plt.subplots()
for i, band in enumerate(h.ecRad_bands):
if i + 1 in bands:
smart[var].loc[dict(band=i + 1)].plot.line(ax=ax, label=f"{i + 1}: {h.ecRad_bands[band]} nm",
add_legend=False)
plt.ylabel("Irradiance (W$\,$m$^{-2}$)")
plt.xlabel("Time (UTC)")
plt.title(f"SMART {dir} Irradiances integrated over ecRad Bands {str(smart.time[0].values)[:10]}")
plt.grid()
# set legend according to number of bands
nr_bands = len(bands)
b_adjust = _set_adjust(nr_bands)
plt.legend(title="ecRad Band", bbox_to_anchor=[1.1, -0.3], ncol=3)
plt.subplots_adjust(bottom=b_adjust)
if save_fig:
figname = os.path.join(path, f"{str(smart.time[0].values)[:10]}_SMART_ecrad_banded_{dir}_irradiance.png")
plt.savefig(figname, dpi=100)
log.info(f"Saved {figname}")
else:
plt.show()
plt.close()
if __name__ == '__main__':
# test read in functions
path = "C:/Users/Johannes/Documents/Doktor/campaigns/CIRRUS-HL/SMART/raw_only/ASP06_transfer_calib_20210616/Tint_500ms"
filename = "2021_06_16_07_20.Fdw_SWIR.dat"
# filename = "2021_03_29_11_15.Fup_SWIR.dat"
smart = reader.read_smart_raw(path, filename)
pixel_wl_path = h.get_path("pixel_wl")
spectrometer = "ASP06_J3"
pixel_wl = reader.read_pixel_to_wavelength(pixel_wl_path, spectrometer)
# find pixel closest to given wavelength
pixel_nr, wavelength = find_pixel(pixel_wl, 525)
# input: spectrometer, filename, option
# option = 2
# filename = "2021_03_29_11_07.Fup_VNIR.dat"
# filename = "2021_03_29_11_07.Fup_SWIR.dat"
# dark_current = get_dark_current(filename, option)
# # correct raw measurement with dark current
# # input: smart measurement, option
# option = 2
# filename = "2021_03_29_11_07.Fup_VNIR.dat"
# smart_cor = correct_smart_dark_current(filename, option)
#
# # plot mean corrected smart measurement
# raw_path, _, _, _, _ = set_paths()
# filename = "2021_03_29_11_07.Fup_VNIR.dat"
# option = 2
# smart = read_smart_raw(raw_path, filename)
# smart_cor = correct_smart_dark_current(filename, option=option)
# measurement = smart.mean().iloc[2:]
# measurement_cor = smart_cor.mean()
# plot_mean_corrected_measurement(filename, measurement, measurement_cor, option)
#
# # read corrected file
# _, _, _, data_path, _ = set_paths()
# filename = "2021_03_29_11_07.Fup_VNIR_cor.dat"
# smart_cor = read_smart_cor(data_path, filename)
#
# # plot any smart measurement given a range of wavelengths, one specific one or all
# filename = "2021_03_29_11_07.Fup_VNIR_cor.dat"
# raw_file = "2021_03_29_11_07.Fup_VNIR.dat"
# wavelength = [525]
# plot_smart_data(filename, wavelength)
# plot_smart_data(raw_file, wavelength)
#
# # plot SMART spectra
# file = "2021_06_04_13_40.Fdw_VNIR_cor_calibrated.dat"
# path = "C:/Users/Johannes/Documents/Doktor/campaigns/CIRRUS-HL/SMART/calibrated_data/flight_00"
# index = 500
# plot_smart_spectra(path, file, index)
# plot a complete spectra from both channels
filename = "2021_06_21_08_41.Fdw_SWIR_cor_calibrated_norm.dat"
path = f"{h.get_path('calibrated')}/flight_00"
index = 500
plot_path = "C:/Users/Johannes/Documents/Doktor/campaigns/CIRRUS-HL/SMART/plots/quicklooks/flight_00/spectra"
plot_complete_smart_spectra(path, filename, index, save_fig=True, plot_path=plot_path)
# working section
raw_file = "2021_03_29_11_15.Fdw_SWIR.dat"
calibrated_file = "2021_06_24_10_28.Fdw_VNIR_cor_calibrated_norm.dat"
file = "2021_06_25_06_14.Iup_SWIR.dat"
path = "C:/Users/Johannes/Documents/Doktor/campaigns/CIRRUS-HL/SMART/calib/ASP07_transfer_calib_20210625/dark_300ms"
plot_smart_data("Flight_20210625a", file, "all", path=path)
smart = reader.read_smart_raw(path, file)
fig, ax = plt.subplots()
smart.iloc[4:, 2:].plot(ax=ax)
smart.iloc[:, 2:].plot(ax=ax, label="dark")
(smart.iloc[2, 2:] - smart.iloc[21, 2:]).plot(ax=ax, label="diff")
plt.legend()
plt.grid()
plt.show()
plt.close()