Source code for jdaviz.configs.specviz2d.plugins.spectral_extraction.spectral_extraction
import numpy as np
from functools import cached_property
from traitlets import Bool, List, Unicode, observe
from jdaviz.configs.mosviz.plugins.viewers import Spectrum1DViewer
from jdaviz.core.events import (SnackbarMessage, NewViewerMessage,
ViewerVisibleLayersChangedMessage)
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import (PluginTemplateMixin,
SelectPluginComponent,
DatasetSelect,
AddResults,
_populate_viewer_items,
skip_if_no_updates_since_last_active,
skip_if_not_tray_instance,
skip_if_not_relevant,
with_spinner)
from jdaviz.core.user_api import PluginUserApi
from jdaviz.core.custom_traitlets import IntHandleEmpty, FloatHandleEmpty
from jdaviz.core.marks import PluginMarkCollection, PluginLine
from astropy.modeling import models
from astropy.nddata import StdDevUncertainty, VarianceUncertainty, UnknownUncertainty
import astropy.units as u
from specutils import Spectrum
from specreduce import tracing
from specreduce import background
from specreduce import extract
__all__ = ['SpectralExtraction2D']
_model_cls = {'Spline': models.Spline1D,
'Polynomial': models.Polynomial1D,
'Legendre': models.Legendre1D,
'Chebyshev': models.Chebyshev1D}
[docs]
@tray_registry('spectral-extraction-2d', label="2D Spectral Extraction",
category="data:reduction")
class SpectralExtraction2D(PluginTemplateMixin):
"""
The Spectral Extraction 2D plugin exposes specreduce methods for tracing,
background subtraction, and spectral extraction from 2D spectra.
See the :ref:`2D Spectral Extraction Plugin Documentation <specviz2d-spectral-extraction>`
for more details.
Only the following attributes and methods are available through the
:ref:`public plugin API <plugin-apis>`:
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.show`
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.open_in_tray`
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.close_in_tray`
* ``interactive_extract``
Whether to automatically extract when parameters change.
* ``trace_dataset`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
controls the input dataset for generating the trace.
* ``trace_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
controls the type of trace to be generated.
* ``trace_peak_method`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
only applicable if ``trace_type`` is not ``Flat``.
* ``trace_pixel``
pixel of the trace. If ``trace_type`` is not ``Flat``, then this
is the "guess" for the automated trace.
* ``trace_do_binning``
only applicable if ``trace_type`` is not ``Flat``. Bin the input data when fitting the
trace.
* ``trace_bins``
only applicable if ``trace_type`` is not ``Flat`` and ``trace_do_binning``.
* ``trace_order``
order of the polynomial fit for the trace.
* ``trace_offset``
offset to apply to the trace.
* ``trace_trace`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
existing trace to use as a reference.
* ``trace_window``
full width of the trace.
* :meth:`import_trace`
* :meth:`export_trace`
* ``bg_dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`):
controls the input dataset for generating the background.
* ``bg_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`):
controls the type of background to be generated.
* ``bg_trace`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
existing trace to use as a reference for the background.
* ``bg_trace_pixel``
only applicable if ``bg_type`` is set to ``Manual``
* ``bg_separation``
only applicable if ``bg_type`` set set to ``OneSided`` or ``TwoSided``.
Separation from the referenced trace for the center of each of the background window(s).
* ``bg_width``
full width of each background window(s).
* ``bg_statistic`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
statistic to use when computing the background. 'Average' will account for partial pixel
weights, 'Median' will include all partial pixels.
* ``bg_add_results`` (:class:`~jdaviz.core.template_mixin.AddResults`)
* ``bg_sub_add_results``
* :meth:`import_bg`
* :meth:`export_bg`
* :meth:`export_bg_spectrum`
* :meth:`export_bg_img`
* :meth:`export_bg_sub`
* ``ext_dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`):
controls the input dataset for generating the extracted spectrum. "From Plugin" will
use the background-subtracted image as defined by the background options above. To skip
background extraction, select the original input 2D spectrum.
* ``ext_trace`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`)
* ``ext_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
* ``ext_width``
full width of the extraction window.
* ``horne_ext_profile`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
For Horne extract, choice of 'Gaussian' or 'Self (interpolated)' to use
empirical profile from data.
* ``self_prof_n_bins``
Number of bins to use when computing the self-derived profile for Horne Extract.
* ``self_prof_interp_degree_x``
Interpolation degree (in X) to use when computing the self-derived profile
for Horne Extract.
* ``self_prof_interp_degree_y``
Interpolation degree (in Y) to use when computing the self-derived profile
for Horne Extract.
* ``ext_add_results`` (:class:`~jdaviz.core.template_mixin.AddResults`)
* :meth:`import_extract`
* :meth:`export_extract`
* :meth:`export_extract_spectrum`
"""
dialog = Bool(False).tag(sync=True)
template_file = __file__, "spectral_extraction.vue"
uses_active_status = Bool(True).tag(sync=True)
active_step = Unicode().tag(sync=True)
# SETTINGS
interactive_extract = Bool(True).tag(sync=True)
# TRACE
trace_trace_items = List().tag(sync=True)
trace_trace_selected = Unicode().tag(sync=True)
trace_offset = IntHandleEmpty(0).tag(sync=True)
trace_dataset_items = List().tag(sync=True)
trace_dataset_selected = Unicode().tag(sync=True)
trace_type_items = List().tag(sync=True)
trace_type_selected = Unicode().tag(sync=True)
trace_pixel = FloatHandleEmpty(0).tag(sync=True)
trace_order = IntHandleEmpty(3).tag(sync=True)
trace_peak_method_items = List().tag(sync=True)
trace_peak_method_selected = Unicode().tag(sync=True)
trace_do_binning = Bool(True).tag(sync=True)
trace_bins = IntHandleEmpty(20).tag(sync=True)
trace_window = IntHandleEmpty(0).tag(sync=True)
trace_results_label = Unicode().tag(sync=True)
trace_results_label_default = Unicode().tag(sync=True)
trace_results_label_auto = Bool(True).tag(sync=True)
trace_results_label_invalid_msg = Unicode('').tag(sync=True)
trace_results_label_overwrite = Bool().tag(sync=True)
trace_add_to_viewer_items = List().tag(sync=True)
trace_add_to_viewer_selected = Unicode().tag(sync=True)
trace_add_to_viewer_create_new_items = List().tag(sync=True)
trace_add_to_viewer_create_new_selected = Unicode().tag(sync=True)
trace_add_to_viewer_label_value = Unicode().tag(sync=True)
trace_add_to_viewer_label_default = Unicode().tag(sync=True)
trace_add_to_viewer_label_auto = Bool(True).tag(sync=True)
trace_add_to_viewer_label_invalid_msg = Unicode('').tag(sync=True)
trace_spinner = Bool(False).tag(sync=True)
# BACKGROUND
bg_dataset_items = List().tag(sync=True)
bg_dataset_selected = Unicode().tag(sync=True)
bg_type_items = List().tag(sync=True)
bg_type_selected = Unicode().tag(sync=True)
bg_trace_items = List().tag(sync=True)
bg_trace_selected = Unicode().tag(sync=True)
bg_trace_pixel = FloatHandleEmpty(0).tag(sync=True)
bg_statistic_items = List().tag(sync=True)
bg_statistic_selected = Unicode().tag(sync=True)
bg_separation = FloatHandleEmpty(0).tag(sync=True)
bg_width = FloatHandleEmpty(0).tag(sync=True)
bg_results_label = Unicode().tag(sync=True)
bg_results_label_default = Unicode().tag(sync=True)
bg_results_label_auto = Bool(True).tag(sync=True)
bg_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_results_label_overwrite = Bool().tag(sync=True)
bg_add_to_viewer_items = List().tag(sync=True)
bg_add_to_viewer_selected = Unicode().tag(sync=True)
bg_add_to_viewer_create_new_items = List().tag(sync=True)
bg_add_to_viewer_create_new_selected = Unicode().tag(sync=True)
bg_add_to_viewer_label_value = Unicode().tag(sync=True)
bg_add_to_viewer_label_default = Unicode().tag(sync=True)
bg_add_to_viewer_label_auto = Bool(True).tag(sync=True)
bg_add_to_viewer_label_invalid_msg = Unicode('').tag(sync=True)
bg_img_spinner = Bool(False).tag(sync=True)
bg_spec_results_label = Unicode().tag(sync=True)
bg_spec_results_label_default = Unicode().tag(sync=True)
bg_spec_results_label_auto = Bool(True).tag(sync=True)
bg_spec_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_spec_results_label_overwrite = Bool().tag(sync=True)
bg_spec_add_to_viewer_items = List().tag(sync=True)
bg_spec_add_to_viewer_selected = Unicode().tag(sync=True)
bg_spec_add_to_viewer_create_new_items = List().tag(sync=True)
bg_spec_add_to_viewer_create_new_selected = Unicode().tag(sync=True)
bg_spec_add_to_viewer_label_value = Unicode().tag(sync=True)
bg_spec_add_to_viewer_label_default = Unicode().tag(sync=True)
bg_spec_add_to_viewer_label_auto = Bool(True).tag(sync=True)
bg_spec_add_to_viewer_label_invalid_msg = Unicode('').tag(sync=True)
bg_spec_spinner = Bool(False).tag(sync=True)
bg_sub_results_label = Unicode().tag(sync=True)
bg_sub_results_label_default = Unicode().tag(sync=True)
bg_sub_results_label_auto = Bool(True).tag(sync=True)
bg_sub_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_sub_results_label_overwrite = Bool().tag(sync=True)
bg_sub_add_to_viewer_items = List().tag(sync=True)
bg_sub_add_to_viewer_selected = Unicode().tag(sync=True)
bg_sub_add_to_viewer_create_new_items = List().tag(sync=True)
bg_sub_add_to_viewer_create_new_selected = Unicode().tag(sync=True)
bg_sub_add_to_viewer_label_value = Unicode().tag(sync=True)
bg_sub_add_to_viewer_label_default = Unicode().tag(sync=True)
bg_sub_add_to_viewer_label_auto = Bool(True).tag(sync=True)
bg_sub_add_to_viewer_label_invalid_msg = Unicode('').tag(sync=True)
bg_sub_spinner = Bool(False).tag(sync=True)
# EXTRACT
ext_dataset_items = List().tag(sync=True)
ext_dataset_selected = Unicode().tag(sync=True)
ext_trace_items = List().tag(sync=True)
ext_trace_selected = Unicode().tag(sync=True)
ext_type_items = List().tag(sync=True)
ext_type_selected = Unicode().tag(sync=True)
horne_ext_profile_items = List().tag(sync=True)
horne_ext_profile_selected = Unicode().tag(sync=True)
self_prof_n_bins = IntHandleEmpty(10).tag(sync=True)
self_prof_interp_degree_x = IntHandleEmpty(1).tag(sync=True)
self_prof_interp_degree_y = IntHandleEmpty(1).tag(sync=True)
ext_width = FloatHandleEmpty(0).tag(sync=True)
ext_uncert_warn = Bool(False).tag(sync=True)
ext_specreduce_err = Unicode().tag(sync=True)
ext_results_label = Unicode().tag(sync=True)
ext_results_label_default = Unicode().tag(sync=True)
ext_results_label_auto = Bool(True).tag(sync=True)
ext_results_label_invalid_msg = Unicode('').tag(sync=True)
ext_results_label_overwrite = Bool().tag(sync=True)
ext_add_to_viewer_items = List().tag(sync=True)
ext_add_to_viewer_selected = Unicode().tag(sync=True)
ext_add_to_viewer_create_new_items = List().tag(sync=True)
ext_add_to_viewer_create_new_selected = Unicode().tag(sync=True)
ext_add_to_viewer_label_value = Unicode().tag(sync=True)
ext_add_to_viewer_label_default = Unicode().tag(sync=True)
ext_add_to_viewer_label_auto = Bool(True).tag(sync=True)
ext_add_to_viewer_label_invalid_msg = Unicode('').tag(sync=True)
# uses default "spinner"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# description displayed under plugin title in tray
self._plugin_description = 'Extract 1D spectrum from 2D image.'
# TRACE
self.trace_trace = DatasetSelect(self,
'trace_trace_items',
'trace_trace_selected',
default_text='New Trace',
filters=['is_trace'])
self.trace_dataset = DatasetSelect(self,
'trace_dataset_items',
'trace_dataset_selected',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.trace_dataset.get_data_cls = Spectrum
self.trace_type = SelectPluginComponent(self,
items='trace_type_items',
selected='trace_type_selected',
manual_options=['Flat', 'Polynomial',
'Legendre', 'Chebyshev',
'Spline'])
self.trace_peak_method = SelectPluginComponent(self,
items='trace_peak_method_items',
selected='trace_peak_method_selected',
manual_options=['Gaussian', 'Centroid', 'Max']) # noqa
self.trace_add_results = AddResults(self, 'trace_results_label',
'trace_results_label_default',
'trace_results_label_auto',
'trace_results_label_invalid_msg',
'trace_results_label_overwrite',
'trace_add_to_viewer_items',
'trace_add_to_viewer_selected',
'trace_add_to_viewer_create_new_items',
'trace_add_to_viewer_create_new_selected',
'trace_add_to_viewer_label_value',
'trace_add_to_viewer_label_default',
'trace_add_to_viewer_label_auto',
'trace_add_to_viewer_label_invalid_msg')
# Populate viewer items using _get_trace_supported_viewers
supported_viewers = self._get_trace_supported_viewers()
viewer_create_new_items, viewer_filter = _populate_viewer_items(
self, supported_viewers)
self.trace_add_to_viewer_create_new_items = viewer_create_new_items
self.trace_add_results.viewer.add_filter(viewer_filter)
self.trace_add_results.viewer.select_default()
self.trace_results_label_default = 'trace'
# BACKGROUND
self.bg_dataset = DatasetSelect(self,
'bg_dataset_items',
'bg_dataset_selected',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.bg_type = SelectPluginComponent(self,
items='bg_type_items',
selected='bg_type_selected',
manual_options=['TwoSided', 'OneSided', 'Manual'])
self.bg_trace = DatasetSelect(self,
items='bg_trace_items',
selected='bg_trace_selected',
default_text='From Plugin',
filters=['is_trace'])
self.bg_statistic = SelectPluginComponent(self,
items='bg_statistic_items',
selected='bg_statistic_selected',
manual_options=['Average', 'Median'])
self.bg_add_results = AddResults(self, 'bg_results_label',
'bg_results_label_default',
'bg_results_label_auto',
'bg_results_label_invalid_msg',
'bg_results_label_overwrite',
'bg_add_to_viewer_items',
'bg_add_to_viewer_selected',
'bg_add_to_viewer_create_new_items',
'bg_add_to_viewer_create_new_selected',
'bg_add_to_viewer_label_value',
'bg_add_to_viewer_label_default',
'bg_add_to_viewer_label_auto',
'bg_add_to_viewer_label_invalid_msg')
# Populate viewer items using _get_bg_supported_viewers
supported_viewers = self._get_bg_supported_viewers()
viewer_create_new_items, viewer_filter = _populate_viewer_items(
self, supported_viewers)
self.bg_add_to_viewer_create_new_items = viewer_create_new_items
self.bg_add_results.viewer.add_filter(viewer_filter)
self.bg_add_results.viewer.select_default()
self.bg_results_label_default = 'background'
self.bg_spec_add_results = AddResults(self, 'bg_spec_results_label',
'bg_spec_results_label_default',
'bg_spec_results_label_auto',
'bg_spec_results_label_invalid_msg',
'bg_spec_results_label_overwrite',
'bg_spec_add_to_viewer_items',
'bg_spec_add_to_viewer_selected',
'bg_spec_add_to_viewer_create_new_items',
'bg_spec_add_to_viewer_create_new_selected',
'bg_spec_add_to_viewer_label_value',
'bg_spec_add_to_viewer_label_default',
'bg_spec_add_to_viewer_label_auto',
'bg_spec_add_to_viewer_label_invalid_msg')
# Populate viewer items using _get_bg_spec_supported_viewers
supported_viewers = self._get_bg_spec_supported_viewers()
viewer_create_new_items, viewer_filter = _populate_viewer_items(
self, supported_viewers)
self.bg_spec_add_to_viewer_create_new_items = viewer_create_new_items
self.bg_spec_add_results.viewer.add_filter(viewer_filter)
self.bg_spec_add_results.viewer.select_default()
self.bg_spec_results_label_default = 'background-spectrum'
self.bg_sub_add_results = AddResults(self, 'bg_sub_results_label',
'bg_sub_results_label_default',
'bg_sub_results_label_auto',
'bg_sub_results_label_invalid_msg',
'bg_sub_results_label_overwrite',
'bg_sub_add_to_viewer_items',
'bg_sub_add_to_viewer_selected',
'bg_sub_add_to_viewer_create_new_items',
'bg_sub_add_to_viewer_create_new_selected',
'bg_sub_add_to_viewer_label_value',
'bg_sub_add_to_viewer_label_default',
'bg_sub_add_to_viewer_label_auto',
'bg_sub_add_to_viewer_label_invalid_msg')
# Populate viewer items using _get_bg_sub_supported_viewers
supported_viewers = self._get_bg_sub_supported_viewers()
viewer_create_new_items, viewer_filter = _populate_viewer_items(
self, supported_viewers)
self.bg_sub_add_to_viewer_create_new_items = viewer_create_new_items
self.bg_sub_add_results.viewer.add_filter(viewer_filter)
self.bg_sub_add_results.viewer.select_default()
self.bg_sub_results_label_default = 'background-subtracted'
# EXTRACT
self.ext_dataset = DatasetSelect(self,
'ext_dataset_items',
'ext_dataset_selected',
default_text='From Plugin',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.ext_trace = DatasetSelect(self,
items='ext_trace_items',
selected='ext_trace_selected',
default_text='From Plugin',
filters=['is_trace'])
self.ext_type = SelectPluginComponent(self,
items='ext_type_items',
selected='ext_type_selected',
manual_options=['Boxcar', 'Horne'])
self.horne_ext_profile = SelectPluginComponent(self,
items='horne_ext_profile_items',
selected='horne_ext_profile_selected',
manual_options=['Gaussian', 'Self (interpolated)']) # noqa
self.ext_add_results = AddResults(self, 'ext_results_label',
'ext_results_label_default',
'ext_results_label_auto',
'ext_results_label_invalid_msg',
'ext_results_label_overwrite',
'ext_add_to_viewer_items',
'ext_add_to_viewer_selected',
'ext_add_to_viewer_create_new_items',
'ext_add_to_viewer_create_new_selected',
'ext_add_to_viewer_label_value',
'ext_add_to_viewer_label_default',
'ext_add_to_viewer_label_auto',
'ext_add_to_viewer_label_invalid_msg')
# Populate viewer items using _get_ext_supported_viewers
supported_viewers = self._get_ext_supported_viewers()
viewer_create_new_items, viewer_filter = _populate_viewer_items(
self, supported_viewers)
self.ext_add_to_viewer_create_new_items = viewer_create_new_items
self.ext_add_results.viewer.add_filter(viewer_filter)
self.ext_add_results.viewer.select_default()
# NOTE: defaults to overwriting original spectrum
self.ext_add_results.label_whitelist_overwrite = ['1D Spectrum', '2D Spectrum (auto-ext)']
self.ext_results_label_default = '2D Spectrum (auto-ext)'
self.app.hub.subscribe(self, ViewerVisibleLayersChangedMessage,
lambda _: self._update_plugin_marks())
if self.config == "deconfigged":
self.observe_traitlets_for_relevancy(traitlets_to_observe=['trace_dataset_items'])
def _get_trace_supported_viewers(self):
"""Return viewer types that can display trace data."""
return [{'label': '2D Spectrum', 'reference': 'spectrum-2d-viewer'}]
def _get_bg_supported_viewers(self):
"""Return viewer types that can display background 2D image."""
return [{'label': '2D Spectrum', 'reference': 'spectrum-2d-viewer'}]
def _get_bg_spec_supported_viewers(self):
"""Return viewer types that can display background spectrum."""
return [{'label': '1D Spectrum', 'reference': 'spectrum-1d-viewer'}]
def _get_bg_sub_supported_viewers(self):
"""Return viewer types that can display background-subtracted image."""
return [{'label': '2D Spectrum', 'reference': 'spectrum-2d-viewer'}]
def _get_ext_supported_viewers(self):
"""Return viewer types that can display extracted 1D spectrum."""
return [{'label': '1D Spectrum', 'reference': 'spectrum-1d-viewer'}]
@property
def user_api(self):
return PluginUserApi(self, expose=('interactive_extract',
'trace_bins', 'trace_dataset',
'trace_do_binning', 'trace_offset',
'trace_order', 'trace_peak_method',
'trace_pixel', 'trace_trace',
'trace_type', 'trace_window',
'import_trace', 'export_trace',
'bg_add_results', 'bg_dataset',
'bg_separation', 'bg_statistic',
'bg_sub_add_results', 'bg_trace',
'bg_trace_pixel', 'bg_type', 'bg_width',
'export_bg', 'export_bg_img',
'export_bg_spectrum', 'export_bg_sub',
'import_bg',
'ext_dataset', 'ext_trace', 'ext_type',
'ext_width', 'ext_add_results',
'horne_ext_profile',
'self_prof_n_bins',
'self_prof_interp_degree_x',
'self_prof_interp_degree_y',
'import_extract',
'export_extract', 'export_extract_spectrum'))
def _clear_default_inputs(self):
self.trace_pixel = 0
self.trace_window = 0
self.bg_trace_pixel = 0
self.bg_separation = 0
self.bg_width = 0
self.ext_width = 0
@observe('irrelevant_msg')
def _updates_when_becoming_relevant(self, msg):
if msg.get('new') != '':
return
# reset all defaults for the selected trace dataset, _trace_dataset_selected
# should be triggered shortly after
self._clear_default_inputs()
def _extract_in_new_instance(self, dataset=None, add_data=False):
# create a new instance of the 2D Spectral Extraction plugin (to not
# affect the instance in the tray) and extract the entire cube with defaults.
plg = self.new()
# all other settings remain at their plugin defaults
plg._clear_default_inputs()
plg.selected = self.selected if dataset is None else dataset
plg.trace_dataset.selected = self.trace_dataset.selected if dataset is None else dataset
plg.bg_dataset.selected = self.bg_dataset.selected if dataset is None else dataset
plg._trace_dataset_selected() # should only be necessary if default dataset
return plg.export_extract_spectrum(add_data=add_data)
@observe('trace_dataset_selected')
@skip_if_not_relevant()
def _trace_dataset_selected(self, msg=None):
if not hasattr(self, 'trace_dataset'):
# happens when first initializing plugin outside of tray
return
if not len(self.trace_dataset.selected):
return
trace_dataset = self.trace_dataset
# If we encouter the case where the 2d spectrum being loaded
# has spectral axis units incompatible with the selected spectral axis
# display unit (e.g UC plugin has the single option of 'pix' but the new
# dataset has 'um') do not use display units when estimating defaults.
use_display_units = True
orig = trace_dataset.get_selected_spectrum(use_display_units=False).spectral_axis.unit
display = self.app._get_display_unit('spectral')
if orig is not None and display is not None:
display = u.Unit(display)
unit_types = [str(x) for x in [orig.physical_type, display.physical_type]]
# check if we have one pixel/unknown unit and one known unit type,
# and if so, ignore setting of spectral axis display unit
if unit_types.count('unknown') == 1:
use_display_units = False
width = trace_dataset.get_selected_spectrum(use_display_units=use_display_units).shape[0]
# estimate the pixel number by taking the median of the brightest pixel index
# in each column, ignoring columns where the sum in that column is not
# positive (ie. columns of all zeros or nans)
trace_flux = self.trace_dataset.get_selected_spectrum(use_display_units).flux
trace_flux_ignore_zeros = trace_flux[:, np.nansum(trace_flux, axis=0) != 0]
if trace_flux_ignore_zeros.shape[1] == 0:
# default to trace in middle of image
brightest_pixel = int(trace_flux.shape[0]/2)
else:
brightest_pixel = int(np.nanmedian(np.nanargmax(trace_flux_ignore_zeros, axis=0)))
# do not allow to be an edge pixel
if brightest_pixel < 1:
brightest_pixel = 1
if brightest_pixel > width - 1:
brightest_pixel = width - 1
distance_from_edge = min(brightest_pixel, width-brightest_pixel)
# default width will be 10% of cross-dispersion "height",
# but no larger than distance from the edge
default_bg_width = int(np.ceil(width / 10))
default_width = min(default_bg_width, distance_from_edge * 2)
# sign for one-sided and single trace-pixel depending on whether the brightest pixel is
# above or below the middle of the image
if default_bg_width * 2 >= distance_from_edge:
sign = 1 if (brightest_pixel < width / 2) else -1
default_bg_separation = sign * default_bg_width * 2
else:
default_bg_separation = default_bg_width * 2
if self.trace_pixel == 0:
self.trace_pixel = brightest_pixel
if self.trace_window == 0:
self.trace_window = default_width
if self.bg_trace_pixel == 0:
self.bg_trace_pixel = brightest_pixel + default_bg_separation
if self.bg_separation == 0:
if default_bg_width * 2 >= distance_from_edge:
self.bg_type_selected = 'OneSided'
self.bg_separation = default_bg_separation
if self.bg_width == 0:
self.bg_width = default_bg_width
if self.ext_width == 0:
self.ext_width = default_width
[docs]
def update_marks(self, step=None):
"""
Manually update the live-preview marks for a given step in spectral extraction. This API
mimics opening the plugin and interacting with one of the steps.
Parameters
----------
step : str
Step in the extraction process to visualize. Must be one of: 'trace', 'bg', 'ext'.
"""
if step is not None:
if step == 'trace':
self._interaction_in_trace_step()
elif step == 'bg':
self._interaction_in_bg_step()
elif step == 'ext':
self._interaction_in_ext_step()
elif step == '':
return
else:
raise ValueError("step must be one of: trace, bg, ext")
@observe('ext_add_to_viewer_selected', 'ext_results_label_overwrite')
def _spectrum1d_viewer_changed(self, *args):
self.update_marks(self.active_step)
@observe('is_active', 'active_step')
@skip_if_not_tray_instance()
def _update_plugin_marks(self, msg={}):
if self.app._jdaviz_helper is None:
return
if not len(self.marks):
# plugin has never been opened, no need to create marks just to hide them,
# we'll create marks when the plugin is first opened
return
if not (self.is_active):
for step, mark in self.marks.items():
mark.clear()
return
if self.active_step == '':
# on load, default to 'extract' (this will then trigger the observe to update the marks)
self.active_step = 'ext'
return
marks_info = self.marks_info(include_mark_obj=False) # viewer + step info for each mark
viewers = {'1d': self.marks_viewers1d, '2d': self.marks_viewers2d}
for step, mark in self.marks.items():
visible = self.active_step in marks_info[step]['steps']
mark.set_for_viewers('visible', visible, viewers[marks_info[step]['viewer']])
mark.clear_if_not_in_viewers(viewers[marks_info[step]['viewer']])
@property
def marks_viewers2d(self):
if self.active_step == 'bg':
return self.bg_dataset.viewers_with_selected_visible
return self.trace_dataset.viewers_with_selected_visible
@property
def marks_viewers1d(self):
return self.ext_add_results.results_viewers
[docs]
def marks_info(self, include_mark_obj=True):
"""
A dictionary containing each marker name, the viewer(s) (2d/1d spectral)
it belongs in, which step (bg, trace, extract) and plotting style kwargs.
"""
markers = {'trace': {'viewer': '2d', 'steps': ['trace', 'bg', 'ext']},
'extract': {'viewer': '1d', 'steps': ['trace', 'bg', 'ext']},
'bg1_lower': {'viewer': '2d', 'steps': ['bg']},
'bg1_upper': {'viewer': '2d', 'steps': ['bg']},
'bg1_center': {'viewer': '2d', 'steps': ['bg'], 'kw': {'line_style': 'dotted'}},
'bg2_lower': {'viewer': '2d', 'steps': ['bg']},
'bg2_upper': {'viewer': '2d', 'steps': ['bg']},
'bg2_center': {'viewer': '2d', 'steps': ['bg'], 'kw': {'line_style': 'dotted'}},
'bg_spec': {'viewer': '1d', 'steps': ['bg'], 'kw': {'stroke_width': 1}},
'ext_lower': {'viewer': '2d', 'steps': ['ext']},
'ext_upper': {'viewer': '2d', 'steps': ['ext']}}
if include_mark_obj: # add PluginMarkCollection
for key, d in markers.items():
kwargs = {'visible': self.is_active} | d.get('kw', {})
d['mark'] = PluginMarkCollection(PluginLine, **kwargs)
return markers
@cached_property
def marks(self):
"""
Access the marks created by this plugin in both the spectrum-viewer
and spectrum-2d-viewer.
"""
if not self._tray_instance:
return {}
return {k: v['mark'] for k, v in self.marks_info().items()}
@observe('interactive_extract')
@skip_if_no_updates_since_last_active()
@skip_if_not_tray_instance()
@skip_if_not_relevant()
def _update_interactive_extract(self, event={}):
# also called by any of the _interaction_in_*_step
if self.interactive_extract:
try:
sp1d = self.export_extract_spectrum(add_data=False)
except Exception as e:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
# NOTE: FitTrace or manual background are often giving a
# "background regions overlapped" error from specreduce
self.ext_specreduce_err = repr(e)
self.marks['extract'].clear()
else:
self.ext_specreduce_err = ''
self.marks['extract'].update_xy(sp1d.spectral_axis.value,
sp1d.flux.value,
viewers=self.marks_viewers1d)
else:
self.marks['extract'].clear()
if self.interactive_extract and self.active_step == 'bg':
try:
spec = self.export_bg_spectrum()
except Exception:
self.marks['bg_spec'].clear()
else:
self.marks['bg_spec'].update_xy(spec.spectral_axis,
spec.flux,
viewers=self.marks_viewers1d)
else:
self.marks['bg_spec'].clear()
@observe('is_active', 'trace_dataset_selected', 'trace_type_selected',
'trace_trace_selected', 'trace_offset', 'trace_order',
'trace_pixel', 'trace_peak_method_selected',
'trace_do_binning', 'trace_bins', 'trace_window', 'active_step')
@skip_if_not_tray_instance()
@skip_if_no_updates_since_last_active()
@skip_if_not_relevant()
def _interaction_in_trace_step(self, event={}):
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step != 'trace')
or not self.is_active):
return
try:
trace = self.export_trace(add_data=False)
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
self.marks['trace'].clear()
else:
self.marks['trace'].update_xy(range(len(trace.trace)),
trace.trace,
viewers=self.marks_viewers2d)
self.marks['trace'].line_style = 'solid'
self._update_interactive_extract(event)
self.active_step = 'trace'
@observe('is_active', 'bg_dataset_selected', 'bg_type_selected',
'bg_trace_selected', 'bg_trace_pixel',
'bg_separation', 'bg_width', 'bg_statistic_selected', 'active_step')
@skip_if_not_tray_instance()
@skip_if_no_updates_since_last_active()
@skip_if_not_relevant()
def _interaction_in_bg_step(self, event={}):
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step != 'bg')
or not self.is_active):
return
try:
trace = self._get_bg_trace()
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
for mark in ['trace', 'bg1_center', 'bg1_lower', 'bg1_upper',
'bg2_center', 'bg2_lower', 'bg2_upper', 'bg_spec']:
self.marks[mark].clear()
else:
xs = range(len(trace.trace))
self.marks['trace'].update_xy(xs,
trace.trace,
viewers=self.marks_viewers2d)
self.marks['trace'].line_style = 'dashed'
if self.bg_type_selected in ['OneSided', 'TwoSided']:
self.marks['bg1_center'].update_xy(xs,
trace.trace+self.bg_separation,
viewers=self.marks_viewers2d)
self.marks['bg1_lower'].update_xy(xs,
trace.trace+self.bg_separation-self.bg_width/2,
viewers=self.marks_viewers2d)
self.marks['bg1_upper'].update_xy(xs,
trace.trace+self.bg_separation+self.bg_width/2,
viewers=self.marks_viewers2d)
else:
self.marks['bg1_center'].clear()
self.marks['bg1_lower'].update_xy(xs,
trace.trace-self.bg_width/2,
viewers=self.marks_viewers2d)
self.marks['bg1_upper'].update_xy(xs,
trace.trace+self.bg_width/2,
viewers=self.marks_viewers2d)
if self.bg_type_selected == 'TwoSided':
self.marks['bg2_center'].update_xy(xs,
trace.trace-self.bg_separation,
viewers=self.marks_viewers2d)
self.marks['bg2_lower'].update_xy(xs,
trace.trace-self.bg_separation-self.bg_width/2,
viewers=self.marks_viewers2d)
self.marks['bg2_upper'].update_xy(xs,
trace.trace-self.bg_separation+self.bg_width/2,
viewers=self.marks_viewers2d)
else:
for mark in ['bg2_center', 'bg2_lower', 'bg2_upper']:
self.marks[mark].clear()
self._update_interactive_extract(event)
self.active_step = 'bg'
@observe('is_active', 'ext_dataset_selected', 'ext_trace_selected',
'ext_type_selected', 'ext_width', 'active_step',
'horne_ext_profile_selected', 'self_prof_n_bins',
'self_prof_interp_degree_x', 'self_prof_interp_degree_y')
@skip_if_not_tray_instance()
@skip_if_no_updates_since_last_active()
@skip_if_not_relevant()
def _interaction_in_ext_step(self, event={}):
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step not in ('ext', '')) # noqa
or not self.is_active):
return
try:
trace = self._get_ext_trace()
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
for mark in ['trace', 'ext_lower', 'ext_upper']:
self.marks[mark].clear()
else:
xs = range(len(trace.trace))
self.marks['trace'].update_xy(xs,
trace.trace,
viewers=self.marks_viewers2d)
self.marks['trace'].line_style = 'dashed'
if self.ext_type_selected == 'Boxcar':
self.marks['ext_lower'].update_xy(xs,
trace.trace-self.ext_width/2,
viewers=self.marks_viewers2d)
self.marks['ext_upper'].update_xy(xs,
trace.trace+self.ext_width/2,
viewers=self.marks_viewers2d)
else:
for mark in ['ext_lower', 'ext_upper']:
self.marks[mark].clear()
self._update_interactive_extract(event)
self.active_step = 'ext'
# TODO: remove this, the traitlet, and the row in spectral_extraction.vue
# when specutils handles the warning/exception
if self.ext_type_selected == 'Horne':
inp_sp2d = self._get_ext_input_spectrum()
self.ext_uncert_warn = isinstance(inp_sp2d.uncertainty, UnknownUncertainty)
else:
self.ext_uncert_warn = False
def _set_create_kwargs(self, **kwargs):
invalid_kwargs = [k for k in kwargs.keys() if not hasattr(self, k)]
if len(invalid_kwargs):
raise ValueError(f"{invalid_kwargs} are not valid attributes to pass as kwargs")
for k, v in kwargs.items():
setattr(self, k, v)
[docs]
def import_trace(self, trace):
"""
Import the input parameters from an existing specreduce Trace object into the plugin.
Parameters
----------
trace : specreduce.tracing.Trace
Trace object to import
"""
if not isinstance(trace, tracing.Trace): # pragma: no cover
raise TypeError("trace must be a specreduce.tracing.Trace object")
if isinstance(trace, tracing.FlatTrace):
self.trace_type_selected = 'Flat'
self.trace_pixel = trace.trace_pos
elif isinstance(trace, tracing.FitTrace):
self.trace_type_selected = trace.trace_model.__class__.__name__.strip('1D')
self.trace_pixel = trace.guess
self.trace_window = trace.window
self.trace_bins = trace.bins
self.trace_do_binning = True
if hasattr(trace.trace_model, 'degree'):
self.trace_order = trace.trace_model.degree
elif isinstance(trace, tracing.ArrayTrace): # pragma: no cover
raise NotImplementedError(f"cannot import ArrayTrace into plugin. Use viz.load instead") # noqa
else: # pragma: no cover
raise NotImplementedError(f"trace of type {trace.__class__.__name__} not supported")
[docs]
@with_spinner('trace_spinner')
def export_trace(self, add_data=False, **kwargs):
"""
Create a specreduce Trace object from the input parameters
defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting trace to the application, according to the options
defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'trace':
self.update_marks(step='trace')
if self.trace_trace_selected != 'New Trace':
# then we're offsetting an existing trace
# for FlatTrace, we can keep and expose a new FlatTrace (which has the advantage of
# being able to load back into the plugin)
orig_trace = self.trace_trace.selected_obj
if isinstance(orig_trace, tracing.FlatTrace):
trace = tracing.FlatTrace(self.trace_dataset.selected_obj,
orig_trace.trace_pos+self.trace_offset)
else:
trace = tracing.ArrayTrace(self.trace_dataset.selected_obj,
self.trace_trace.selected_obj.trace+self.trace_offset)
elif self.trace_type_selected == 'Flat':
trace = tracing.FlatTrace(self.trace_dataset.selected_obj,
self.trace_pixel)
elif self.trace_type_selected in _model_cls:
trace_model = _model_cls[self.trace_type_selected](degree=self.trace_order)
trace = tracing.FitTrace(self.trace_dataset.selected_obj,
guess=self.trace_pixel,
bins=int(self.trace_bins) if self.trace_do_binning else None,
window=self.trace_window,
peak_method=self.trace_peak_method_selected.lower(),
trace_model=trace_model)
else:
raise NotImplementedError(f"trace_type={self.trace_type_selected} not implemented")
if add_data:
self.trace_add_results.add_results_from_plugin(trace,
format='Trace',
replace=False)
return trace
def _get_bg_trace(self):
if self.bg_type_selected == 'Manual':
trace = tracing.FlatTrace(self.trace_dataset.get_selected_spectrum(
use_display_units=True),
self.bg_trace_pixel)
elif self.bg_trace_selected == 'From Plugin':
trace = self.export_trace(add_data=False)
else:
trace = self.bg_trace.get_selected_spectrum(use_disaply_units=True)
return trace
[docs]
def import_bg(self, bg):
"""
Import the input parameters from an existing specreduce Background object into the plugin.
Parameters
----------
bg : specreduce.background.Background
Background object to import
"""
if not isinstance(bg, background.Background): # pragma: no cover
raise TypeError("bg must be a specreduce.background.Background object")
# TODO: should we detect/set the referenced dataset?
trace = self._get_bg_trace()
if len(bg.traces) == 2:
# try to detect constant separation
seps1 = bg.traces[0].trace - trace.trace
seps2 = trace.trace - bg.traces[1].trace
if np.all(seps1 == seps1[0]) and np.all(seps2 == seps1[0]):
self.bg_type_selected = 'TwoSided'
self.bg_separation = abs(int(seps1[0]))
else: # pragma: no cover
raise NotImplementedError("backgrounds with custom traces not supported (could not detect common separation)") # noqa
elif len(bg.traces) == 1:
# either one_sided or trace, let's see if its constant offset from the trace
seps = bg.traces[0].trace - trace.trace
if np.all(seps == seps[0]):
self.bg_type_selected = 'OneSided'
self.bg_separation = int(seps[0])
else: # pragma: no cover
raise NotImplementedError("backgrounds with custom traces not supported (could not detect common separation)") # noqa
else: # pragma: no cover
raise NotImplementedError("backgrounds with more than 2 traces not supported")
self.bg_width = bg.width
[docs]
@with_spinner('bg_spinner')
def export_bg(self, **kwargs):
"""
Create a specreduce Background object from the input parameters defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'bg':
self.update_marks(step='bg')
trace = self._get_bg_trace()
if self.bg_type_selected == 'Manual':
bg = background.Background(self.bg_dataset.get_selected_spectrum(
use_display_units=True),
[trace], width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
elif self.bg_type_selected == 'OneSided':
bg = background.Background.one_sided(self.bg_dataset.get_selected_spectrum(
use_display_units=True),
trace,
self.bg_separation,
width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
elif self.bg_type_selected == 'TwoSided':
bg = background.Background.two_sided(self.bg_dataset.get_selected_spectrum(
use_display_units=True),
trace,
self.bg_separation,
width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
else: # pragma: no cover
raise NotImplementedError(f"bg_type={self.bg_type_selected} not implemented")
return bg
[docs]
@with_spinner('bg_img_spinner')
def export_bg_img(self, add_data=False, **kwargs):
"""
Create a background 2D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting image to the application, according to the options
defined in the plugin.
"""
bg_spec = self.export_bg(**kwargs).bkg_image()
if add_data:
self.bg_add_results.add_results_from_plugin(bg_spec,
format='2D Spectrum',
replace=True)
return bg_spec
[docs]
def vue_create_bg_img(self, *args):
try:
self.export_bg_img(add_data=True)
except Exception as e:
self.app.hub.broadcast(
SnackbarMessage(f"Specreduce background failed with the following error: {repr(e)}",
color='error', sender=self, traceback=e)
)
[docs]
@with_spinner('bg_spec_spinner')
def export_bg_spectrum(self, add_data=False, **kwargs):
"""
Create a background 1D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting spectrum to the application, according to the options
defined in the plugin.
"""
spec = self.export_bg(**kwargs).bkg_spectrum()
if add_data:
self.bg_spec_add_results.add_results_from_plugin(spec,
format='1D Spectrum',
replace=False)
return spec
[docs]
@with_spinner('bg_sub_spinner')
def export_bg_sub(self, add_data=False, **kwargs):
"""
Create a background-subtracted 2D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting image to the application, according to the options
defined in the plugin.
"""
bg_sub_spec = self.export_bg(**kwargs).sub_image()
if add_data:
self.bg_sub_add_results.add_results_from_plugin(bg_sub_spec,
format='2D Spectrum',
replace=True)
return bg_sub_spec
def _get_ext_trace(self):
if self.ext_trace_selected == 'From Plugin':
return self.export_trace(add_data=False)
else:
return self.ext_trace.get_selected_spectrum(use_display_units=True)
def _get_ext_input_spectrum(self):
if self.ext_dataset_selected == 'From Plugin':
return self.export_bg_sub(add_data=False)
else:
return self.ext_dataset.get_selected_spectrum(use_display_units=True)
[docs]
def import_extract(self, ext):
"""
Import the input parameters from an existing specreduce extract object into the plugin.
Parameters
----------
ext : specreduce.extract.BoxcarExtract
Extract object to import
"""
if isinstance(ext, extract.BoxcarExtract):
self.ext_type_selected = 'Boxcar'
self.ext_width = ext.width
elif isinstance(ext, extract.HorneExtract):
self.ext_type_selected = 'Horne'
else: # pragma: no cover
raise TypeError("ext must be a specreduce.extract.BoxcarExtract or specreduce.extract.HorneExtract object") # noqa
[docs]
def export_extract(self, **kwargs):
"""
Create a specreduce extraction object from the input parameters defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'ext':
self.update_marks(step='ext')
trace = self._get_ext_trace()
inp_sp2d = self._get_ext_input_spectrum()
if self.ext_type_selected == 'Boxcar':
ext = extract.BoxcarExtract(inp_sp2d, trace, width=self.ext_width)
elif self.ext_type_selected == 'Horne':
spatial_profile = None
if inp_sp2d.uncertainty is None:
inp_sp2d.uncertainty = VarianceUncertainty(np.ones_like(inp_sp2d.data))
if not hasattr(inp_sp2d.uncertainty, 'uncertainty_type'):
inp_sp2d.uncertainty = StdDevUncertainty(inp_sp2d.uncert)
if self.horne_ext_profile_selected == 'Self (interpolated)':
# check inputs
if self.self_prof_n_bins <= 0:
raise ValueError('`self_prof_n_bins` must be greater than 0.')
if self.self_prof_interp_degree_x <= 0:
raise ValueError('`self_prof_interp_degree_x` must be greater than 0.')
if self.self_prof_interp_degree_y <= 0:
raise ValueError('`self_prof_interp_degree_y` must be greater than 0.')
# setup dict of interpolation options
n_bins_interpolated_profile = self.self_prof_n_bins
interp_degree = (self.self_prof_interp_degree_x, self.self_prof_interp_degree_y)
spatial_profile = {'name': 'interpolated_profile',
'n_bins_interpolated_profile': n_bins_interpolated_profile,
'interp_degree': interp_degree}
elif self.horne_ext_profile_selected == 'Gaussian':
spatial_profile = 'gaussian'
else:
raise ValueError("Horne extraction profile must either be 'Gaussian' or 'Self (interpolated)'") # noqa
ext = extract.HorneExtract(inp_sp2d, trace, spatial_profile=spatial_profile)
else:
raise NotImplementedError(f"extraction type '{self.ext_type_selected}' not supported") # noqa
return ext
[docs]
@with_spinner('spinner')
def export_extract_spectrum(self, add_data=False, **kwargs):
"""
Create an extracted 1D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting spectrum to the application, according to the options
defined in the plugin.
"""
extract = self.export_extract(**kwargs)
spectrum = extract.spectrum
if add_data:
# TODO: eventually generalize this logic into add_results_from_plugin
if not len(self.marks_viewers1d):
# no spectrum1d viewer, create one now and set the default viewer
viewer_ref = self.app.return_unique_name('1D Spectrum',
typ='viewer')
self.app._on_new_viewer(NewViewerMessage(Spectrum1DViewer,
data=None,
sender=self.app),
vid=viewer_ref, name=viewer_ref,
open_data_menu_if_empty=False)
self.ext_add_results.viewer = viewer_ref
self.ext_add_results.add_results_from_plugin(spectrum,
format='1D Spectrum',
replace=False)
return spectrum