Source code for pylim.smart

#!/usr/bin/env python
"""These are functions in relation with the SMART instrument.

"_" denotes internal functions which are called inside other functions.

The functions are explained in their docstring and can be tested using the main frame.
You can:

The reader functions were moved to :py:mod:`pylim.reader`.

* find the closest pixel and wavelength to any given wavelength for the given wavelength calibration file
* get information (date, measured property, channel) from the filename
* get the dark current for a specified measurement file with either option 1 or 2 and optionally plot it
* correct the raw measurement by the dark current
* plot the mean corrected measurement
* plot smart data either for one wavelength over time or for a range of or all wavelengths
* use the holoviews functions to create a dynamic map for interactive quicklooks in a jupyter notebook

*author*: Johannes Röttenbacher
"""

import logging
import os
import re
from typing import Tuple, Union

import holoviews as hv
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
from holoviews import opts

from pylim import helpers as h
from pylim import reader
from pylim.cirrus_hl import smart_lookup as meta_chl
from pylim.halo_ac3 import smart_lookup as meta_hac

hv.extension('bokeh')

log = logging.getLogger(__name__)

meta = {"cirrus-hl": meta_chl, "halo-ac3": meta_hac}  # setup up metadata dictionary for both campaigns


[docs]def get_info_from_filename(filename: str) -> Tuple[str, str, str]: """ Using regular expressions some information from the filename is extracted. Args: filename: string in the form yyyy_mm_dd_hh_MM.[F/I][up/dw]_[SWIR/VNIR].dat (eg. "2021_03_29_11_07.Fup_SWIR.dat") Returns: A date string, the channel and the direction of measurement including the quantity. """ # find date string and channel from file match = re.search(r"^(?P<date>\d{4}_\d{2}_\d{2}).*.(?P<direction>[FI][a-z]{2})_(?P<channel>[A-Z]{4})", filename) try: date_str = match.group('date') channel = match.group('channel') direction = match.group('direction') except AttributeError: log.info("No date, channel or direction information was found! Check filename!") raise return date_str, channel, direction
[docs]def find_pixel(df: pd.DataFrame, wavelength: float()) -> Tuple[int, float]: """ Given the dataframe with the pixel to wavelength mapping, return the pixel and wavelength closest to the requested wavelength. Args: df: Dataframe with column pixel and wavelength (from read_pixel_to_wavelength) wavelength: which wavelength are you interested in Returns: closest pixel number and wavelength corresponding to the given wavelength """ # find smallest and greatest wavelength in data frame and check if the given wavelength is in it min_wl = df["wavelength"].min() max_wl = df["wavelength"].max() assert max_wl >= wavelength >= min_wl, "Given wavelength not in data frame!" idx = h.arg_nearest(df["wavelength"], wavelength) pixel_nr, wavelength = df["pixel"].iloc[idx], df["wavelength"].iloc[idx] return pixel_nr, wavelength
[docs]def merge_vnir_swir_nc(vnir: xr.Dataset, swir: xr.Dataset) -> xr.Dataset: """ Merge the SMART nc files generated by smart_write_ncfile.py Args: vnir: VNIR data set swir: SWIR data set Returns: merged data set """ # rename variables to merge them vnir = vnir.rename(dict(Fdw_VNIR="Fdw")) swir = swir.rename(dict(Fdw_SWIR="Fdw")) # merge datasets all = xr.merge([swir, vnir]) all["Fdw_bb"] = all["Fdw_VNIR_bb"] + all["Fdw_SWIR_bb"] return all
def _plot_dark_current(wavelengths: Union[pd.Series, list], dark_current: Union[pd.Series, list], filename: str, **kwargs): """ Plot the dark current over the wavelengths from the specified spectrometer and channel. Args: wavelengths: series or list of wavelengths corresponding with the pixel numbers from read_pixel_to_wavelength dark_current: series or list with mean dark current for each pixel filename: name of file **kwargs: save_fig: whether to save the figure in the current directory (True) or just show it (False, default) Returns: """ save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False plt.plot(wavelengths, dark_current, color='k') plt.axhline(dark_current.mean(), color="orange", label="Mean") plt.grid() plt.title(f"Dark Current for {filename}") plt.xlabel("Wavelength (nm)") plt.ylabel("Netto Counts") plt.legend() if save_fig: plt.savefig(f"{filename}_dark_current.png") else: plt.show() plt.close()
[docs]def get_dark_current(flight: str, filename: str, option: int, **kwargs) -> Union[pd.Series, plt.figure]: """ Get the corresponding dark current for the specified measurement file to correct the raw SMART measurement. Args: flight: to which flight does the file belong to? (e.g. Flight_20210707a) filename: filename (e.g. "2021_03_29_11_07.Fup_SWIR.dat") of measurement file option: which option to use for VNIR (1, 2 or 3). Option 1: use measurements below 290 nm. Option 2: use dark measurements from transfer calibration (CIRRUS-HL folder structure expected). Option 3: use scaled dark current measurement from transfer calibration (HALO-AC3 folder structure expected). **kwargs: path (str): path to measurement file if not standard path from config.toml, plot (bool): show plot or not (default: True), date (str): yyyymmdd, date of transfer calibration with dark current measurement to use dark_filepath (str): complete path to dark current file to use campaign (str): campaign to which smart file belongs to Returns: pandas Series with the mean dark current measurements over time for each pixel and optionally a plot of it """ plot = kwargs["plot"] if "plot" in kwargs else False date = kwargs["date"] if "date" in kwargs else None dark_filepath = kwargs["dark_filepath"] if "dark_filepath" in kwargs else None campaign = kwargs["campaign"] if "campaign" in kwargs else "halo-ac3" path = h.get_path("raw", flight, campaign=campaign) path = kwargs["path"] if "path" in kwargs else path pixel_wl_path = h.get_path("pixel_wl") calib_path = h.get_path("calib", campaign=campaign) measurement = reader.read_smart_raw(path, filename) t_int = int(measurement["t_int"][0]) # get integration time date_str, channel, direction = get_info_from_filename(filename) spectrometer = meta[campaign][f"{direction}_{channel}"] pixel_wl = reader.read_pixel_to_wavelength(pixel_wl_path, spectrometer) # calculate dark current depending on channel: # SWIR: use measurements during shutter phases # VNIR: Option 1: use measurements below 290 nm # Option 2: use dark measurements from transfer calibration (CIRRUS-HL folder structure expected) # Option 3: use scaled dark current measurement from transfer calibration (HALO-AC3 folder structure expected) if channel == "VNIR": if option == 1: last_dark_pixel, _ = find_pixel(pixel_wl, 290) dark_pixels = np.arange(1, last_dark_pixel + 1) # one could apply a column mean (axis=1) and subtract the mean dark current from every timestep but for # minutely files this shouldn't be of much importance # TODO: Test difference between column mean and mean over rows and columns dark_current = measurement.loc[:, dark_pixels].mean() wls = pixel_wl[pixel_wl["pixel"].isin(dark_pixels)]["wavelength"] elif option == 2: wls = pixel_wl["wavelength"] # read in cali file # get path depending on spectrometer and inlet instrument = re.search(r'ASP\d{2}', spectrometer)[0] inlet = re.search(r'J\d{1}', spectrometer)[0] # find right folder and right cali for dirpath, dirs, files in os.walk(calib_path): if re.search(f".*{instrument}.*dark.*", dirpath) is not None: d_path, d = os.path.split(dirpath) # check if the date of the calibration matches the date of the file date_check = True if date_str.replace("_", "") in d_path else False # overwrite date check if date is given if date is not None: date_check = True if date in d_path else False # check for the right integration time in folder name t_int_check = str(t_int) in d # ASP06 has 2 SWIR and 2 VNIR inlets thus search for the folder for the given inlet if instrument == "ASP06" and inlet[1] in d: run = True # ASP07 has only one SWIR and VNIR inlet -> no need to search else: run = True if run and date_check and t_int_check: i = 0 for file in files: if re.search(f'.*.{direction}_{channel}.dat', file) is not None: dark_dir, dark_file = dirpath, file log.info(f"Calibration file used:\n{os.path.join(dark_dir, dark_file)}") assert i == 0, f"More than one possible file was found!\n Check {dirpath}!" i += 1 try: dark_current = reader.read_smart_raw(dark_dir, dark_file) dark_current = dark_current.iloc[:, 2:].mean() except UnboundLocalError as e: raise RuntimeError("No dark current file found for measurement!") from e else: assert option == 3, "Option should be either 1, 2 or 3!" wls = pixel_wl["wavelength"] # check if an explicit dark current file is provided, # if not assume that a transfer calibration is provided -> this is the campaign mode if dark_filepath is None: # find dark current transfer calibration if date is None: raise NameError(f"If no 'dark_filepath' is provided 'date' for transfer calibration needs to be " f"given!") else: transfer_calib_dir = os.path.join(calib_path, f"{spectrometer[:-3]}_transfer_calib_{date}") # in the transfer calib directory find the dark current measurement folder with the correct # integration time dark_dir = [d for d in os.listdir(transfer_calib_dir) if "dark" in d and str(t_int) in d][0] dark_dir = os.path.join(transfer_calib_dir, dark_dir) dark_files = [f for f in os.listdir(dark_dir) if f"{direction}_{channel}.dat" in f] # the normal workflow is to merge all VNIR dark current files and delete all single files, thus only # one dark current VNIR file should be left in each dark current folder if len(dark_files) > 1: raise ValueError(f"Too many dark current files found for VNIR in {dark_dir}! " f"\nMerge VNIR files first and delete single files!") else: dark_file = dark_files[0] else: # get dark_dir and dark_file from given dark_filepath dark_dir, dark_file = os.path.dirname(dark_filepath), os.path.basename(dark_filepath) # read in dark current measurement, drop t_int and shutter column and take mean over time dark_current = reader.read_smart_raw(dark_dir, dark_file).iloc[:, 2:].mean() # scale dark current to the measured dark pixels dark_scale = dark_current * np.mean(measurement.mean().iloc[19:99]) / np.mean(dark_current.iloc[19:99]) # get fluctuations over the rolling mean of the scaled dark current dark_scale = dark_scale - dark_scale.rolling(20, min_periods=1).mean() # smooth dark current and add offset found in the first dark pixels dark_scale2 = dark_current.rolling(20, min_periods=1).mean() + ( np.mean(measurement.mean().iloc[19:99]) - np.mean((dark_current.iloc[19:99]))) dark_current = dark_scale2 + dark_scale # add both terms together elif channel == "SWIR": # check if the shutter flag was working: If all values are 1 -> shutter flag is probably not working # even if the flag is working, does not mean the shutter is working if np.sum(measurement.shutter == 1) != measurement.shutter.shape[0]: dark_current = measurement.where(measurement.shutter == 0).iloc[:, 2:].mean() wls = pixel_wl["wavelength"] else: raise ValueError(f"Shutter flag is probably wrong in {path}/{filename}!") else: raise ValueError(f"'channel' should be either 'VNIR' or 'SWIR' but is {channel}!") if plot: _plot_dark_current(wls, dark_current, filename, **kwargs) return dark_current
[docs]def plot_mean_corrected_measurement(campaign: str, flight: str, filename: str, measurement: Union[pd.Series, list], measurement_cor: Union[pd.Series, list], option: int, **kwargs): """ Plot the mean dark current corrected SMART measurement over time together with the raw measurement and the dark current. Args: campaign: campaign name (cirrus-hl or halo-ac3) flight: to which flight does the file belong to? (e.g. Flight_20210707a) filename: name of file measurement: raw SMART measurements for each pixel averaged over time measurement_cor: corrected SMART measurements for each pixel averaged over time option: which option was used for VNIR correction **kwargs: save_fig (bool): save figure to current directory or just show it Returns: plot """ save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False pixel_path = h.get_path("pixel_wl") date_str, channel, direction = get_info_from_filename(filename) path = h.get_path("raw", flight) spectrometer = meta[campaign][f"{direction}_{channel}"] dark_current = get_dark_current(path, filename, option, plot=False) wavelength = reader.read_pixel_to_wavelength(pixel_path, spectrometer)["wavelength"] if channel == "VNIR" and option == 1: plt.axhline(dark_current, label="Dark Current", color='k') else: plt.plot(wavelength, dark_current, label="Dark Current", color='k') plt.plot(wavelength, measurement, label="Measurement") plt.plot(wavelength, measurement_cor, label="Corrected Measurement") plt.grid() plt.title(f"Mean Corrected SMART Measurement {date_str.replace('_', '-')}\n" f"Spectrometer {spectrometer}, Channel: {channel}, Option: {option}") plt.xlabel("Wavelength (nm)") plt.ylabel("Netto Counts") plt.legend() if save_fig: plt.savefig(f"{date_str}_corrected_smart_measurement.png") else: plt.show() plt.close()
[docs]def correct_smart_dark_current(flight: str, smart_file: str, option: int, **kwargs) -> pd.Series: """ Correct the raw SMART measurement for the dark current of the spectrometer. Only returns data when the shutter was open. Args: flight: to which flight does the file belong to? (e.g. Flight_20210707a) smart_file: filename of file to correct option: which option should be used to get the dark current? Only relevant for channel "VNIR". kwargs: path (str): path to file if not raw file path as given in config.toml, date (str): (yyyymmdd) date from which the dark current measurement should be used for VNIR (necessary if \ no transfer calibration was made on a measurement day) campaign (str): campaign to which smart file belongs to Returns: Series with corrected smart measurement """ campaign = kwargs["campaign"] if "campaign" in kwargs else "halo-ac3" path = h.get_path("raw", flight, campaign=campaign) path = kwargs.pop("path") if "path" in kwargs else path date_str, channel, direction = get_info_from_filename(smart_file) smart = reader.read_smart_raw(path, smart_file) dark_current = get_dark_current(flight, smart_file, option, path=path, **kwargs) if channel == "VNIR" and option == 1: dark_current = dark_current.mean() # If get_dark_current returns a column mean, this can to be removed measurement = smart.where(smart.shutter == 1).iloc[:, 2:] # only use data when shutter is open measurement_cor = measurement - dark_current # drop nan values -> dark current measurements at beginning of file measurement_cor.dropna(inplace=True) return measurement_cor
[docs]def plot_smart_data(campaign: str, flight: str, filename: str, wavelength: Union[list, str], **kwargs) -> plt.axes: """ Plot SMART data in the given file. Either a time average over a range of wavelengths or all wavelengths, or a time series of one wavelength. Return an axes object to continue plotting or show it. TODO: add option to plot multiple files Args: campaign: campaign name (cirrus-hl or halo-ac3) flight: to which flight does the file belong to? (e.g. Flight_20210707a) filename: Standard SMART filename wavelength: list with either one or two wavelengths in nm or 'all' **kwargs: path (str): give path to filename if not default from config.toml, save_fig (bool): save figure? (default: False), plot_path (str): where to save figure (default: given in config.toml) ax (plt.axis): axes to already existing matplotlib axis Returns: Shows a figure or saves it to disc. """ raw_path = h.get_path("raw", flight, campaign) pixel_wl_path = h.get_path("pixel_wl", campaign) data_path = h.get_path("data", flight, campaign) calibrated_path = h.get_path("calibrated", flight, campaign) plot_path = h.get_path("plot", campaign=campaign) # read in keyword arguments raw_path = kwargs["path"] if "path" in kwargs else raw_path data_path = kwargs["path"] if "path" in kwargs else data_path calibrated_path = kwargs["path"] if "path" in kwargs else calibrated_path save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False plot_path = kwargs["plot_path"] if "plot_path" in kwargs else plot_path ax = kwargs["ax"] if "ax" in kwargs else None fig = plt.figure() if ax is None: ax = fig.add_subplot(111) date_str, channel, direction = get_info_from_filename(filename) if "calibrated" in filename: smart = reader.read_smart_cor(calibrated_path, filename) title = "\nCorrected for Dark Current and Calibrated" ylabel = "Irradiance (W$\\,$m$^{-2}\\,$nm$^{-1}$)" if "F" in filename else "Radiance (W$\\,$sr$^{-1}\\,$m$^{-2}\\,$nm$^{-1}$)" elif "cor" in filename: smart = reader.read_smart_cor(data_path, filename) title = "Corrected for Dark Current" ylabel = "Netto Counts" else: smart = reader.read_smart_raw(raw_path, filename) smart = smart.iloc[:, 2:] # remove columns t_int and shutter title = "Raw" ylabel = "Netto Counts" pixel_wl = reader.read_pixel_to_wavelength(pixel_wl_path, meta[campaign][f"{direction}_{channel}"]) if len(wavelength) == 2: pixel_nr = [] wl_str = "" for wl in wavelength: pxl, wavel = find_pixel(pixel_wl, wl) pixel_nr.append(pxl) wl_str = f"{wl_str}_{wavel:.1f}" pixel_nr.sort() # make sure wavelengths are in ascending order smart_sel = smart.loc[:, pixel_nr[0]:pixel_nr[1]] # select range of wavelengths begin_dt, end_dt = smart_sel.index[0], smart_sel.index[-1] # read out start and end time smart_mean = smart_sel.mean(axis=0).to_frame() # calculate mean over time and return a dataframe smart_mean = smart_mean.set_index(pd.to_numeric(smart_mean.index)) # update the index to be numeric # join the measurement and pixel to wavelength data frames by pixel smart_plot = smart_mean.join(pixel_wl.set_index(pixel_wl["pixel"])) smart_plot.plot(x="wavelength", y=0, legend=False, xlabel="Wavelength (nm)", ylabel=ylabel, ax=ax, title=f"Time Averaged SMART Measurement {title} {direction} {channel}\n {begin_dt} - {end_dt}") figname = filename.replace('.dat', f'{wl_str}.png') elif len(wavelength) == 1: pixel_nr, wl = find_pixel(pixel_wl, wavelength[0]) smart_sel = smart.loc[:, pixel_nr].to_frame() begin_dt, end_dt = smart_sel.index[0], smart_sel.index[-1] time_extend = end_dt - begin_dt smart_sel.plot(ax=ax, legend=False, xlabel="Time (UTC)", ylabel=ylabel, title=f"SMART Time Series {title} {direction} {channel}\n{wl:.3f} nm {begin_dt:%Y-%m-%d}") h.set_xticks_and_xlabels(ax, time_extend) figname = filename.replace('.dat', f'_{wl:.1f}nm.png') elif wavelength == "all": begin_dt, end_dt = smart.index[0], smart.index[-1] smart_mean = smart.mean().to_frame() smart_mean = smart_mean.set_index(pd.to_numeric(smart_mean.index)) smart_plot = smart_mean.join(pixel_wl.set_index(pixel_wl["pixel"])) smart_plot.plot(x="wavelength", y=0, legend=False, xlabel="Wavelength (nm)", ylabel=ylabel, ax=ax, title=f"Time Averaged SMART Measurement {title} {direction} {channel}\n " f"{begin_dt:%Y-%m-%d %H:%M:%S} - {end_dt:%Y-%m-%d %H:%M:%S}") figname = filename.replace('.dat', f'_{wavelength}.png') else: raise ValueError("wavelength has to be a list of length 1 or 2 or 'all'!") ax.grid() plt.tight_layout() if save_fig: plt.savefig(f"{plot_path}/{figname}", dpi=100) log.info(f"Saved {plot_path}/{figname}") plt.close() else: return ax
[docs]def plot_smart_spectra(path: str, campaign: str, filename: str, index: int, **kwargs) -> None: """ Plot a spectra from a SMART calibrated measurement file for a given index (time step) Args: path: where the file can be found campaign: campaign name (cirrus-hl or halo-ac3) filename: name of the file (standard SMART filename convention) index: which row to plot **kwargs: save_fig (bool): Save figure to plot path given in config.toml (default: False), plot_path (str): Where to save plot if not standard plot path Returns: Shows and or saves a plot """ save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False plot_path = kwargs["plot_path"] if "plot_path" in kwargs else h.get_path("plot", campaign=campaign) pixel_path = h.get_path("pixel_wl", campaign) df = reader.read_smart_cor(path, filename) date_str, channel, direction = get_info_from_filename(filename) spectrometer = meta[campaign][f"{direction}_{channel}"] pixel_wl = reader.read_pixel_to_wavelength(pixel_path, spectrometer) max_id = len(df) - 1 try: df_sel = df.iloc[index, :] except IndexError as e: log.info(f"{e}\nGiven index '{index}' out-of-bounds! Using maximum index '{max_id}'!") df_sel = df.iloc[max_id, :] time_stamp = df_sel.name # get time stamp which is selected pixel_wl[f"{direction}"] = df_sel.reset_index(drop=True) ylabel = "Irradiance (W$\\,$m$^{-2}\\,$nm$^{-1}$)" if "F" in filename else "Radiance (W$\\,$sr$^{-1}\\,$m$^{-2}\\,$nm$^{-1}$)" fig, ax = plt.subplots() ax.plot("wavelength", f"{direction}", data=pixel_wl, label=f"{direction}") ax.set_title(f"SMART Spectra {spectrometer} {direction} {channel} \n {time_stamp:%Y-%m-%d %H:%M:%S}") ax.set_xlabel("Wavelength (nm)") ax.set_ylabel(ylabel) ax.grid() plt.show() if save_fig: figname = filename.replace(".dat", f"_spectra_{time_stamp:%Y%m%d_%H%M%S}.png") figpath = f"{plot_path}/{figname}" plt.savefig(figpath, dpi=100) log.info(f"Saved {figpath}") plt.close()
[docs]def plot_complete_smart_spectra(path: str, campaign: str, filename: str, index: int, **kwargs) -> None: """ Plot the complete spectra given by both channels from SMART calibrated measurement files for a given index (time step) Args: path: where the file can be found campaign: campaign name (cirrus-hl or halo-ac3) filename: name of the file (standard SMART filename convention) index: which row to plot **kwargs: save_fig (bool): Save figure to plot path given in config.toml (default: False) plot_path (str): Where to save plot if not standard plot path Returns: Shows and or saves a plot """ save_fig = kwargs["save_fig"] if "save_fig" in kwargs else False plot_path = kwargs["plot_path"] if "plot_path" in kwargs else h.get_path("plot", campaign=campaign) pixel_path = h.get_path("pixel_wl", campaign) df1 = reader.read_smart_cor(path, filename) date_str, channel, direction = get_info_from_filename(filename) if channel == "SWIR": channel2 = "VNIR" else: channel2 = "SWIR" filename2 = filename.replace(channel, channel2) df2 = reader.read_smart_cor(path, filename2) spectrometer1 = meta[campaign][f"{direction}_{channel}"] spectrometer2 = meta[campaign][f"{direction}_{channel2}"] pixel_wl1 = reader.read_pixel_to_wavelength(pixel_path, spectrometer1) pixel_wl2 = reader.read_pixel_to_wavelength(pixel_path, spectrometer2) # merge pixel dfs and sort by wavelength # pixel_wl = pixel_wl1.append(pixel_wl2, ignore_index=True).sort_values(by="wavelength", ignore_index=True) max_id1 = len(df1) - 1 max_id2 = len(df2) - 1 try: df_sel1 = df1.iloc[index, :] df_sel2 = df2.iloc[index, :] except IndexError as e: log.info(f"{e}\nGiven index '{index}' out-of-bounds! Using maximum index '{max_id1}'!") df_sel1 = df1.iloc[max_id1, :] df_sel2 = df2.iloc[max_id2, :] time_stamp = df_sel1.name # get time stamp which is selected assert time_stamp == df_sel2.name, "Time stamps from VNIR and SWIR are not identical!" pixel_wl1[f"{direction}"] = df_sel1.reset_index(drop=True) pixel_wl2[f"{direction}"] = df_sel2.reset_index(drop=True) ylabel = "Irradiance (W$\\,$m$^{-2}\\,$nm$^{-1}$)" if "F" in filename else "Radiance (W$\\,$sr$^{-1}\\,$m$^{-2}\\,$nm$^{-1}$)" fig, ax = plt.subplots() ax.plot(pixel_wl1["wavelength"], pixel_wl1[f"{direction}"], label=f"{channel}") ax.plot(pixel_wl2["wavelength"], pixel_wl2[f"{direction}"], label=f"{channel2}") # ax.plot("wavelength", f"{direction}", data=pixel_wl1, label=f"{channel}") # ax.plot("wavelength", f"{direction}", data=pixel_wl2, label=f"{channel2}") ax.set_title(f"SMART Spectra {direction} {channel}/{channel2} \n {time_stamp:%Y-%m-%d %H:%M:%S}") ax.set_xlabel("Wavelength (nm)") ax.set_ylabel(ylabel) ax.grid() ax.legend() if save_fig: figname = filename.replace(".dat", f"_spectra_{time_stamp:%Y%m%d_%H%M%S}.png") figname = figname.replace(channel, "VNIR+SWIR") figpath = f"{plot_path}/{figname}" plt.savefig(figpath, dpi=100) log.info(f"Saved {figpath}") plt.show() plt.close()
[docs]def plot_complete_smart_spectra_interactive(path: str, campaign:str, filename: str, index: int) -> hv.Overlay: """ Plot the complete spectra given by both channels from SMART calibrated measurement files for a given index (time step) Args: path: where the file can be found campaign: campaign name (cirrus-hl or halo-ac3) filename: name of the file (standard SMART filename convention) index: which row to plot Returns: Shows and or saves a plot """ pixel_path = h.get_path("pixel_wl", campaign) df1 = reader.read_smart_cor(path, filename) date_str, channel, direction = get_info_from_filename(filename) if channel == "SWIR": channel2 = "VNIR" else: channel2 = "SWIR" filename2 = filename.replace(channel, channel2) df2 = reader.read_smart_cor(path, filename2) spectrometer1 = meta[campaign][f"{direction}_{channel}"] spectrometer2 = meta[campaign][f"{direction}_{channel2}"] pixel_wl1 = reader.read_pixel_to_wavelength(pixel_path, spectrometer1) pixel_wl2 = reader.read_pixel_to_wavelength(pixel_path, spectrometer2) # merge pixel dfs and sort by wavelength # pixel_wl = pixel_wl1.append(pixel_wl2, ignore_index=True).sort_values(by="wavelength", ignore_index=True) max_id1 = len(df1) - 1 max_id2 = len(df2) - 1 try: df_sel1 = df1.iloc[index, :] df_sel2 = df2.iloc[index, :] except IndexError as e: log.info(f"{e}\nGiven index '{index}' out-of-bounds! Using maximum index '{max_id1}'!") df_sel1 = df1.iloc[max_id1, :] df_sel2 = df2.iloc[max_id2, :] time_stamp = df_sel1.name # get time stamp which is selected assert time_stamp == df_sel2.name, "Time stamps from VNIR and SWIR are not identical!" pixel_wl1[f"{direction}"] = df_sel1.reset_index(drop=True) pixel_wl2[f"{direction}"] = df_sel2.reset_index(drop=True) ylabel = "Irradiance (W m^-2 nm^-1)" if "F" in filename else "Radiance (W sr^-1 m^-2 nm^-1)" curve1 = hv.Curve(pixel_wl1, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[(f"{direction}", ylabel)], label=f"{channel}") curve2 = hv.Curve(pixel_wl2, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[(f"{direction}", ylabel)], label=f"{channel2}") overlay = curve1 * curve2 overlay.opts( opts.Curve(height=500, width=900, fontsize=12)) overlay.opts(title=f"SMART Spectra {direction} {channel}/{channel2} {time_stamp:%Y-%m-%d %H:%M:%S.%f}", show_grid=True) return overlay
[docs]def plot_smart_data_interactive(campaign: str, flight: str, filename: str, wavelength: Union[list, str]) -> hv.Curve: """ Plot SMART data in the given file. Either a time average over a range of wavelengths or all wavelengths, or a time series of one wavelength. Args: campaign: campaign name (cirrus-hl or halo-ac3) flight: to which flight does the file belong to? (e.g. Flight_20210707a) filename: Standard SMART filename wavelength: list with either one or two wavelengths in nm or 'all' Returns: Creates an interactive figure """ raw_path = h.get_path("raw", flight, campaign) pixel_wl_path = h.get_path("pixel_wl", campaign) data_path = h.get_path("data", flight, campaign) calibrated_path = h.get_path("calibrated", flight, campaign) date_str, channel, direction = get_info_from_filename(filename) # make sure wavelength is a list if type(wavelength) != list and wavelength != "all": wavelength = [wavelength] if "calibrated" in filename: df = reader.read_smart_cor(calibrated_path, filename) title = "\nCorrected for Dark Current and Calibrated" ylabel = "Irradiance (W m^-2 nm^-1)" if "F" in filename else "Radiance (W sr^-1 m^-2 nm^-1)" elif "cor" in filename: df = reader.read_smart_cor(data_path, filename) title = "Corrected for Dark Current" ylabel = "Netto Counts" else: df = reader.read_smart_raw(raw_path, filename) df = df.iloc[:, 2:] # remove columns t_int and shutter title = "Raw" ylabel = "Netto Counts" pixel_wl = reader.read_pixel_to_wavelength(pixel_wl_path, meta[campaign][f"{direction}_{channel}"]) if len(wavelength) == 2: pixel_nr = [] for wl in wavelength: pxl, wavel = find_pixel(pixel_wl, wl) pixel_nr.append(pxl) pixel_nr.sort() # make sure wavelengths are in ascending order smart_sel = df.loc[:, pixel_nr[0]:pixel_nr[1]] # select range of wavelengths begin_dt, end_dt = smart_sel.index[0], smart_sel.index[-1] # read out start and end time smart_mean = smart_sel.mean(axis=0).to_frame() # calculate mean over time and return a dataframe smart_mean = smart_mean.set_index(pd.to_numeric(smart_mean.index)) # update the index to be numeric # join the measurement and pixel to wavelength data frames by pixel smart_plot = smart_mean.join(pixel_wl.set_index(pixel_wl["pixel"])) smart_plot.columns = ["value", "pixel", "wavelength"] curve = hv.Curve(smart_plot, kdims=[("wavelength", "Wavelength (nm)")], vdims=[("value", ylabel)]) title=f"Time Averaged SMART Measurement {title} {direction} {channel}\n {begin_dt} - {end_dt}" elif len(wavelength) == 1: pixel_nr, wl = find_pixel(pixel_wl, wavelength[0]) smart_sel = df.loc[:, pixel_nr].to_frame() begin_dt, end_dt = smart_sel.index[0], smart_sel.index[-1] time_extend = end_dt - begin_dt smart_sel.reset_index(inplace=True) smart_sel.columns = ["time", "value"] curve = hv.Curve(smart_sel, kdims=[("time", "Time (UTC)")], vdims=[("value", ylabel)]) title=f"SMART Time Series {title} {direction} {channel}\n{wl:.3f} nm {begin_dt:%Y-%m-%d}" # ax = jr.set_xticks_and_xlabels(ax, time_extend) elif wavelength == "all": begin_dt, end_dt = df.index[0], df.index[-1] smart_mean = df.mean().to_frame() smart_mean = smart_mean.set_index(pd.to_numeric(smart_mean.index)) smart_plot = smart_mean.join(pixel_wl.set_index(pixel_wl["pixel"])) smart_plot.columns = ["value", "pixel", "wavelength"] curve = hv.Curve(smart_plot, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[("value", ylabel)]) title=f"Time Averaged SMART Measurement {title} {direction} {channel} {begin_dt:%Y-%m-%d %H:%M:%S} - {end_dt:%Y-%m-%d %H:%M:%S}" else: raise ValueError("wavelength has to be a list of length 1 or 2 or 'all'!") curve.opts( opts.Curve(height=500, width=900, fontsize=12)) curve.opts(title=title, show_grid=True) return curve
[docs]def plot_calibrated_irradiance_flux(campaign: str, filename: str, wavelength: Union[int, list, str], flight: str ) -> hv.Overlay: """ Plot upward and downward irradiance as a time averaged series over the wavelength or as a time series for one wavelength. Args: campaign: campaign name (cirrus-hl or halo-ac3) filename: Standard SMART filename wavelength: single or range of wavelength or "all" flight: flight folder (flight_xx) Returns: holoviews overlay plot with two curves """ # make sure wavelength is a list if type(wavelength) != list and wavelength != "all": wavelength = [wavelength] # get paths and define input path calibrated_path, pixel_path = h.get_path("calibrated", flight, campaign), h.get_path("pixel_wl", flight, campaign) date_str, channel, direction = get_info_from_filename(filename) direction2 = "Fdw" if direction == "Fup" else "Fup" # set opposite direction filename2 = filename.replace(direction, direction2) # read in both irradiance measurements df1 = reader.read_smart_cor(calibrated_path, filename) df2 = reader.read_smart_cor(calibrated_path, filename2) # get spectrometers from smart_lookup dictionary spectro1, spectro2 = meta[campaign][f"{direction}_{channel}"], meta[campaign][f"{direction2}_{channel}"] pixel_wl1 = reader.read_pixel_to_wavelength(pixel_path, spectro1) pixel_wl2 = reader.read_pixel_to_wavelength(pixel_path, spectro2) title = "Corrected for Dark Current and Calibrated" ylabel = "Irradiance (W m^-2 nm^-1)" if len(wavelength) == 2: pixel_nr1 = [] pixel_nr2 = [] for wl in wavelength: pxl, _ = find_pixel(pixel_wl1, wl) pixel_nr1.append(pxl) pxl, _ = find_pixel(pixel_wl2, wl) pixel_nr2.append(pxl) pixel_nr1.sort() # make sure wavelengths are in ascending order pixel_nr2.sort() df1_sel = df1.loc[:, pixel_nr1[0]:pixel_nr1[1]] # select range of wavelengths df1_sel.sort() # sort data frame df2_sel = df2.loc[:, pixel_nr2[0]:pixel_nr2[1]] # select range of wavelengths df2_sel.sort() # sort data frame begin_dt, end_dt = df1_sel.index[0], df1_sel.index[-1] # read out start and end time df1_mean = df1_sel.mean(axis=0).to_frame() # calculate mean over time and return a dataframe df1_mean = df1_mean.set_index(pd.to_numeric(df1_mean.index)) # update the index to be numeric df2_mean = df2_sel.mean(axis=0).to_frame() # calculate mean over time and return a dataframe df2_mean = df2_mean.set_index(pd.to_numeric(df2_mean.index)) # update the index to be numeric # join the measurement and pixel to wavelength data frames by pixel df1_plot = df1_mean.join(pixel_wl.set_index(pixel_wl["pixel"])) df1_plot.columns = ["value", "pixel", "wavelength"] df2_plot = df2_mean.join(pixel_wl.set_index(pixel_wl["pixel"])) df2_plot.columns = ["value", "pixel", "wavelength"] curve1 = hv.Curve(df1_plot, kdims=[("wavelength", "Wavelength (nm)")], vdims=[("value", ylabel)], label=direction) curve2 = hv.Curve(df2_plot, kdims=[("wavelength", "Wavelength (nm)")], vdims=[("value", ylabel)], label=direction2) title = f"Time Averaged SMART Measurement {title} {channel}\n {begin_dt} - {end_dt}" overlay = curve1 * curve2 elif len(wavelength) == 1: pixel_nr1, wl1 = find_pixel(pixel_wl1, wavelength[0]) df1_sel = df1.loc[:, pixel_nr1].to_frame() pixel_nr2, wl2 = find_pixel(pixel_wl2, wavelength[0]) df2_sel = df2.loc[:, pixel_nr2].to_frame() begin_dt, end_dt = df1_sel.index[0], df1_sel.index[-1] df1_sel.reset_index(inplace=True) df1_sel.columns = ["time", "value"] df2_sel.reset_index(inplace=True) df2_sel.columns = ["time", "value"] curve1 = hv.Curve(df1_sel, kdims=[("time", "Time (UTC)")], vdims=[("value", ylabel)], label=direction) curve2 = hv.Curve(df2_sel, kdims=[("time", "Time (UTC)")], vdims=[("value", ylabel)], label=direction2) title = f"SMART Time Series {title} {channel}\n{wl1:.3f} nm {begin_dt:%Y-%m-%d}" overlay = curve1 * curve2 elif wavelength == "all": begin_dt, end_dt = df1.index[0], df1.index[-1] df1_mean = df1.mean().to_frame() df1_mean = df1_mean.set_index(pd.to_numeric(df1_mean.index)) df1_plot = df1_mean.join(pixel_wl.set_index(pixel_wl["pixel"])) df1_plot.columns = ["value", "pixel", "wavelength"] df2_mean = df2.mean().to_frame() df2_mean = df2_mean.set_index(pd.to_numeric(df2_mean.index)) df2_plot = df2_mean.join(pixel_wl.set_index(pixel_wl["pixel"])) df2_plot.columns = ["value", "pixel", "wavelength"] curve1 = hv.Curve(df1_plot, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[("value", ylabel)], label=direction) curve2 = hv.Curve(df2_plot, kdims=[("wavelength", "Wavelenght (nm)")], vdims=[("value", ylabel)], label=direction2) title = f"Time Averaged SMART Irradiance Measurement {title} {channel} {begin_dt:%Y-%m-%d %H:%M:%S} - {end_dt:%Y-%m-%d %H:%M:%S}" overlay = curve1 * curve2 else: raise ValueError("wavelength has to be a list of length 1 or 2 or 'all'!") overlay.opts( opts.Curve(height=500, width=900, fontsize=12)) overlay.opts(title=title, show_grid=True) return overlay
def _set_adjust(nr_bands: int) -> float: """Define bottom adjust according to number of bands displayed Args: nr_bands: number of bands shown in plot Returns: bottom adjust """ if nr_bands < 4: b_adjust = 0.3 elif nr_bands in range(3, 7): b_adjust = 0.35 elif nr_bands in range(6, 10): b_adjust = 0.37 elif nr_bands in range(9, 13): b_adjust = 0.4 else: b_adjust = 0.43 return b_adjust
[docs]def plot_smart_ecrad_bands(smart: xr.Dataset, bands: list, path: str = None, save_fig: bool = False): """Plot banded upward and downward irradiance for given ecRad bands Args: smart: SMART data set containing variables fdn_banded and fup_banded bands: ecRad band numbers path: where to save figure to save_fig: whether to save figure Returns: figure """ for dir, var in zip(["downward", "upward"], ["fdn_banded", "fup_banded"]): fig, ax = plt.subplots() for i, band in enumerate(h.ecRad_bands): if i + 1 in bands: smart[var].loc[dict(band=i + 1)].plot.line(ax=ax, label=f"{i + 1}: {h.ecRad_bands[band]} nm", add_legend=False) plt.ylabel("Irradiance (W$\,$m$^{-2}$)") plt.xlabel("Time (UTC)") plt.title(f"SMART {dir} Irradiances integrated over ecRad Bands {str(smart.time[0].values)[:10]}") plt.grid() # set legend according to number of bands nr_bands = len(bands) b_adjust = _set_adjust(nr_bands) plt.legend(title="ecRad Band", bbox_to_anchor=[1.1, -0.3], ncol=3) plt.subplots_adjust(bottom=b_adjust) if save_fig: figname = os.path.join(path, f"{str(smart.time[0].values)[:10]}_SMART_ecrad_banded_{dir}_irradiance.png") plt.savefig(figname, dpi=100) log.info(f"Saved {figname}") else: plt.show() plt.close()
if __name__ == '__main__': # test read in functions path = "C:/Users/Johannes/Documents/Doktor/campaigns/CIRRUS-HL/SMART/raw_only/ASP06_transfer_calib_20210616/Tint_500ms" filename = "2021_06_16_07_20.Fdw_SWIR.dat" # filename = "2021_03_29_11_15.Fup_SWIR.dat" smart = reader.read_smart_raw(path, filename) pixel_wl_path = h.get_path("pixel_wl") spectrometer = "ASP06_J3" pixel_wl = reader.read_pixel_to_wavelength(pixel_wl_path, spectrometer) # find pixel closest to given wavelength pixel_nr, wavelength = find_pixel(pixel_wl, 525) # input: spectrometer, filename, option # option = 2 # filename = "2021_03_29_11_07.Fup_VNIR.dat" # filename = "2021_03_29_11_07.Fup_SWIR.dat" # dark_current = get_dark_current(filename, option) # # correct raw measurement with dark current # # input: smart measurement, option # option = 2 # filename = "2021_03_29_11_07.Fup_VNIR.dat" # smart_cor = correct_smart_dark_current(filename, option) # # # plot mean corrected smart measurement # raw_path, _, _, _, _ = set_paths() # filename = "2021_03_29_11_07.Fup_VNIR.dat" # option = 2 # smart = read_smart_raw(raw_path, filename) # smart_cor = correct_smart_dark_current(filename, option=option) # measurement = smart.mean().iloc[2:] # measurement_cor = smart_cor.mean() # plot_mean_corrected_measurement(filename, measurement, measurement_cor, option) # # # read corrected file # _, _, _, data_path, _ = set_paths() # filename = "2021_03_29_11_07.Fup_VNIR_cor.dat" # smart_cor = read_smart_cor(data_path, filename) # # # plot any smart measurement given a range of wavelengths, one specific one or all # filename = "2021_03_29_11_07.Fup_VNIR_cor.dat" # raw_file = "2021_03_29_11_07.Fup_VNIR.dat" # wavelength = [525] # plot_smart_data(filename, wavelength) # plot_smart_data(raw_file, wavelength) # # # plot SMART spectra # file = "2021_06_04_13_40.Fdw_VNIR_cor_calibrated.dat" # path = "C:/Users/Johannes/Documents/Doktor/campaigns/CIRRUS-HL/SMART/calibrated_data/flight_00" # index = 500 # plot_smart_spectra(path, file, index) # plot a complete spectra from both channels filename = "2021_06_21_08_41.Fdw_SWIR_cor_calibrated_norm.dat" path = f"{h.get_path('calibrated')}/flight_00" index = 500 plot_path = "C:/Users/Johannes/Documents/Doktor/campaigns/CIRRUS-HL/SMART/plots/quicklooks/flight_00/spectra" plot_complete_smart_spectra(path, filename, index, save_fig=True, plot_path=plot_path) # working section raw_file = "2021_03_29_11_15.Fdw_SWIR.dat" calibrated_file = "2021_06_24_10_28.Fdw_VNIR_cor_calibrated_norm.dat" file = "2021_06_25_06_14.Iup_SWIR.dat" path = "C:/Users/Johannes/Documents/Doktor/campaigns/CIRRUS-HL/SMART/calib/ASP07_transfer_calib_20210625/dark_300ms" plot_smart_data("Flight_20210625a", file, "all", path=path) smart = reader.read_smart_raw(path, file) fig, ax = plt.subplots() smart.iloc[4:, 2:].plot(ax=ax) smart.iloc[:, 2:].plot(ax=ax, label="dark") (smart.iloc[2, 2:] - smart.iloc[21, 2:]).plot(ax=ax, label="diff") plt.legend() plt.grid() plt.show() plt.close()