import warnings, yaml, re
import astropy.units as u
import numpy as np
from atlast_sc.utils import DataHelper, Decorators
from atlast_sc.exceptions import CalculatedValueInvalidWarning
from atlast_sc.exceptions import ValueOutOfRangeException
from atlast_sc.exceptions import InstrumentNotApplicableException
from atlast_sc.parameter_setup import ParameterSetup
from atlast_sc.parameters.user_input_parameters import UserInputParameters
from atlast_sc.parameters.telescope_and_environment_parameters import TelescopeAndEnvironmentParameters
from atlast_sc.parameters.derived_parameters import DerivedParameters
[docs]
class Calculator:
"""
Calculator class that provides an interface to the main
calculator functionality and performs the core calculations
to determine the output sensitivity or integration time.
:param user_input: Dictionary containing user-defined input parameters
:type user_input: dict
:param instrument_setup: Dictionary containing instrument setup parameters.
**NB: usage not tested, and may not be supported in future.**
:type instrument_setup: dict
"""
def __init__(self, user_input={}):
if user_input:
self._param_setup = ParameterSetup(user_input=user_input)
# self.calculator = self._create_calculator(self.param_setup)
else: # use the default values
self._param_setup = ParameterSetup()
# self.calculator = self._create_calculator(self.param_setup)
# Parameter setup class that contains models with default values
# self._param_setup = self.param_setup
# Special classes for customisation of models
self._user_input = UserInputParameters(self._param_setup)
self._telescope_and_environment = TelescopeAndEnvironmentParameters(self._param_setup)
self._derived_parameters = DerivedParameters(self._param_setup)
# Calculated value variables of calculation result model
self._calculated_sensitivity = self._param_setup.calculation_results.calculated_sensitivity
self._calculated_t_int = self._param_setup.calculation_results.calculated_t_int
# @staticmethod
# def _create_calculator(param_setup):
# return Calculator(param_setup)
@property
def user_input(self):
"""
User inputs to the calculation
"""
return self._user_input
@property
def telescope_and_environment(self):
"""
Telescope and environment parameters
"""
return self._telescope_and_environment
@property
def derived_parameters(self):
"""
Derived parameters
"""
return self._derived_parameters
@property
def calculated_sensitivity(self):
"""
Calculated sensitivity value
"""
return self._calculated_sensitivity.value
@calculated_sensitivity.setter
@Decorators.validate_value
def calculated_sensitivity(self, value):
self._calculated_sensitivity.value = value
@property
def calculated_t_int(self):
"""
Calculated integration time value
"""
return self._calculated_t_int.value
@calculated_t_int.setter
@Decorators.validate_value
def calculated_t_int(self, value):
self._calculated_t_int.value = value
@property
def chosen_instrument(self):
"""
Name of chosen instrument
"""
return self._param_setup.chosen_instrument.name
@chosen_instrument.setter
def chosen_instrument(self, instrument_name):
old_inst_name = self._param_setup.chosen_instrument.name
instrument_name = instrument_name.capitalize()
try:
requested_inst_name = \
self._param_setup.loaded_instruments[instrument_name].name
# Check if the requested instrument can be selected given the
# existing user input parameters
requested_inst_is_applicable = \
self.requested_inst_is_applicable(requested_inst_name)
try:
# User inputted obs_freq and bandwidth are in range
if requested_inst_is_applicable:
self._param_setup.chosen_instrument = (
self._param_setup.loaded_instruments[requested_inst_name]
)
# Recalculate derived parameters because new
# instrument has been chosen
self._param_setup._calculate_derived_parameters()
new_inst_name = self._param_setup.chosen_instrument.name
if old_inst_name != new_inst_name:
print("Instrument has been changed from " + old_inst_name + " to " + \
new_inst_name + ".")
else:
# User inputted obs_freq and bandwidth are not
# in range of the requested instrument
raise InstrumentNotApplicableException(
requested_inst_name,
self.chosen_instrument
)
except InstrumentNotApplicableException as e:
raise
except KeyError as e:
print('Instrument name provided is not available. '\
'Proceeding with an applicable instrument from '\
'the list of instruments.')
@property
def loaded_instruments(self):
"""
Dictionary of each loaded instrument with its respective
specified observing frequency and bandwidth ranges
"""
loaded_instrument_dict = {}
loaded_instrument_modules = self._param_setup.loaded_instruments
for inst_name, inst_module in loaded_instrument_modules.items():
inst_obs_freq_list = inst_module.obs_freq_ranges_and_unit
inst_bandwidth_list = inst_module.bandwidth_ranges_and_unit
loaded_instrument_dict[inst_name] = {
'obs_freq': inst_obs_freq_list,
'bandwidth': inst_bandwidth_list}
return loaded_instrument_dict
#################################################
# Public methods for performing sensitivity and #
# integration time calculations #
#################################################
[docs]
def calculate_sensitivity(self, t_int=None, update_calculator=True):
"""
Calculates the telescope sensitivity (mJy) for a
given integration time `t_int`.
:param t_int: integration time. Optional. Defaults to the internally
stored value
:type t_int: astropy.units.Quantity
:param update_calculator: True if the calculator should be updated with
the specified integration time and calculated sensitivity.
Optional. Defaults to True
:type update_calculator: bool
:return: sensitivity in mJy
:rtype: astropy.units.Quantity
"""
if t_int is not None:
if update_calculator:
self.user_input.t_int = t_int
else:
DataHelper.validate(self.user_input, 't_int', t_int)
else:
t_int = self.user_input.t_int
sensitivity_result = \
self.derived_parameters.sefd / \
(self.derived_parameters.eta_s * np.sqrt(self.user_input.n_pol * self.user_input.bandwidth * t_int))
# Convert the output to the most convenient units
sensitivity_result = sensitivity_result.to(u.mJy)
if sensitivity_result < 1*u.mJy:
sensitivity_result = sensitivity_result.to(u.uJy)
elif (sensitivity_result >= 1*u.mJy) & (sensitivity_result < 1000*u.mJy):
sensitivity_result = sensitivity_result.to(u.mJy)
elif sensitivity_result >= 1000*u.mJy:
sensitivity_result = sensitivity_result.to(u.Jy)
# Try to update the sensitivity stored in the calculator
if update_calculator:
try:
self.calculated_sensitivity = sensitivity_result
except ValueOutOfRangeException as e:
# This point is actually unreachable, but it's sensible to
# have the code in place in case the permitted range of
# the sensitivity changes and becomes possible to achieve with
# the right combination of input parameters.
message = \
Calculator._calculated_value_error_msg(sensitivity_result, e)
warnings.warn(message, CalculatedValueInvalidWarning)
return sensitivity_result
[docs]
def calculate_t_integration(self, sensitivity=None,
update_calculator=True):
"""
Calculates the integration time required for a given `sensitivity`
to be reached.
:param sensitivity: required sensitivity. Optional. Defaults
to the internally stored value
:type sensitivity: astropy.units.Quantity
:param update_calculator: True if the calculator should be updated with
the specified sensitivity and calculated integration time.
Optional. Defaults to True
:type update_calculator: bool
:return: integration time in seconds
:rtype: astropy.units.Quantity
"""
if sensitivity is not None:
if update_calculator:
self.user_input.sensitivity = sensitivity
else:
DataHelper.validate(self, 'calculated_sensitivity', sensitivity)
else:
sensitivity = self.user_input.sensitivity
t_int_result = (self.derived_parameters.sefd / (sensitivity * self.derived_parameters.eta_s)) ** 2 \
/ (self.user_input.n_pol * self.user_input.bandwidth)
# Convert the output to the most convenient units
t_int_result = t_int_result.to(u.s)
if t_int_result < 60*u.s:
t_int_result = t_int_result.to(u.s)
elif (t_int_result >= 60*u.s) & (t_int_result < 3600*u.s):
t_int_result = t_int_result.to(u.min)
elif t_int_result >= 3600*u.s:
t_int_result = t_int_result.to(u.h)
# Try to update the integration time stored in the calculator
if update_calculator:
try:
self.calculated_t_int = t_int_result
except ValueOutOfRangeException as e:
message = Calculator._calculated_value_error_msg(t_int_result, e)
warnings.warn(message, CalculatedValueInvalidWarning)
return t_int_result
###################
# Utility methods #
###################
[docs]
def reset(self):
"""
Resets all calculator parameters to their initial values.
"""
# Reset the _param_setup calculation inputs to their original values
self._param_setup.reset()
# Recalculate the derived parameters
self._param_setup._calculate_derived_parameters()
[docs]
def requested_inst_is_applicable(self, requested_inst_name):
"""
Check if the requested instrument can be selected to be used in the
calculations. The already existing user input parameters will be
cross checked with the applicable ranges of the requested instrument.
:param requested_inst_name: name of the requested instrument
:type requested_inst_name: String
:return: applicability of requested instrument
:rtype: boolean
"""
inst_applicable = False
obs_freq_applicable = False
bandwidth_applicable = False
# See if the requested instrument fits the existent user input values
user_obs_freq = self.user_input.obs_freq
# Check if obs_freq units are the same
user_obs_freq_unit = str(self.user_input.obs_freq.unit)
requested_inst_obs_freq_unit = self.loaded_instruments[requested_inst_name]['obs_freq']['unit']
# If the units are not the same, convert user input obs_freq to the unit
# of the requested instrument obs_freq ranges
if user_obs_freq_unit != requested_inst_obs_freq_unit:
user_obs_freq = user_obs_freq.to(u.Unit(requested_inst_obs_freq_unit))
user_obs_freq = user_obs_freq.value
# Convert bandwidth value to Hz
user_bandwidth = self.user_input.bandwidth
user_bandwidth = user_bandwidth.to(u.Hz)
user_bandwidth = user_bandwidth.value
# Requested instrument ranges
instrument_obs_freqs = \
self.loaded_instruments[requested_inst_name]['obs_freq']['ranges']
instrument_bandw_vals = \
self.loaded_instruments[requested_inst_name]['bandwidth']['ranges']
# Check if user inputted observing frequency value falls in
# the range of the requested instrument ranges
for range in instrument_obs_freqs:
range = re.findall(r"[\d.]+", range)
min_freq = float(range[0])
max_freq = float(range[1])
if user_obs_freq >= min_freq and user_obs_freq <= max_freq:
obs_freq_applicable = True
# Check if user inputted bandwidth value falls in the range of
# the requested instrument ranges
if requested_inst_name != 'Default':
for range in instrument_bandw_vals:
range = re.findall(r"[\d.]+", range)
min_bandw = float(range[0])
max_bandw = float(range[1])
if user_bandwidth >= min_bandw and user_bandwidth <= max_bandw:
bandwidth_applicable = True
else: # If the requested instrument is Default
bandwidth_applicable = True
# If both user inputted parameters fall in the requested
# instrument range
if obs_freq_applicable and bandwidth_applicable:
inst_applicable = True
return inst_applicable
[docs]
def list_instruments(self):
"""
Show loaded instruments and their observing frequency and bandwidth ranges
in a pretty format.
"""
output = "\n" + "="*70 + "\n"
output += "AVAILABLE INSTRUMENTS\n"
output += "="*70 + "\n\n"
for inst_name, inst_info in self.loaded_instruments.items():
output += f"* {inst_name}\n"
obs_freq_ranges = inst_info['obs_freq']['ranges']
obs_freq_unit = inst_info['obs_freq']['unit']
output += f" Observing Frequency: {obs_freq_ranges} {obs_freq_unit}\n"
bandwidth_ranges = inst_info['bandwidth']['ranges']
bandwidth_unit = inst_info['bandwidth']['unit']
if inst_name == 'Default':
bandwidth_ranges = "[Any positive value]"
else:
# Convert bandwidth ranges to easily readible units if needed
bandwidth_ranges, bandwidth_unit = \
self._format_bandwidth_ranges(bandwidth_ranges, bandwidth_unit)
output += f" Bandwidth: {bandwidth_ranges} {bandwidth_unit}\n\n"
output += "-"*70 + "\n"
output += "To select an instrument:\n"
output += ' calculator.chosen_instrument = "Finer"\n'
output += "-"*70 + "\n"
print(output)
def _format_bandwidth_ranges(self, bandwidth_ranges, bandwidth_unit):
"""
Format bandwidth ranges, converting to MHz or kHz if the values are too
large to be easily read in Hz.
:param bandwidth_ranges: list or string of bandwidth range(s)
:type bandwidth_ranges: list or str
:param bandwidth_unit: the unit of the bandwidth ranges
:type bandwidth_unit: str
:return: formatted ranges and appropriate unit
:rtype: tuple(str, str)
"""
if bandwidth_unit.lower() != 'hz':
# If not in Hz, return as-is
return bandwidth_ranges, bandwidth_unit
# Determine the appropriate unit based on max values
max_val = 0
if isinstance(bandwidth_ranges, list):
for range_str in bandwidth_ranges:
matches = re.findall(r"[\d.]+", range_str)
if len(matches) >= 2:
val = float(matches[1])
max_val = max(max_val, val)
else:
matches = re.findall(r"[\d.]+", str(bandwidth_ranges))
if len(matches) >= 2:
max_val = float(matches[1])
# Determine display unit
if max_val >= 1e6:
display_unit = "MHz"
divisor = 1e6
elif max_val >= 1e3:
display_unit = "kHz"
divisor = 1e3
else:
display_unit = "Hz"
divisor = 1
# Convert ranges if needed
if divisor != 1:
formatted_ranges = []
if isinstance(bandwidth_ranges, list):
for range_str in bandwidth_ranges:
matches = re.findall(r"[\d.]+", range_str)
if len(matches) >= 2:
min_val = float(matches[0]) / divisor
max_val = float(matches[1]) / divisor
formatted_ranges.append(f"({min_val}-{max_val})")
else:
formatted_ranges.append(range_str)
return formatted_ranges, display_unit
else:
matches = re.findall(r"[\d.]+", str(bandwidth_ranges))
if len(matches) >= 2:
min_val = float(matches[0]) / divisor
max_val = float(matches[1]) / divisor
return f"({min_val}-{max_val})", display_unit
return bandwidth_ranges, bandwidth_unit
@staticmethod
def _calculated_value_error_msg(calculated_value, validation_error):
"""
The message displayed when a calculated value (t_int or sensitivity) is
outside the permitted range.
:param calculated_value: the calculated value of the target parameter
:type calculated_value: astropy.units.Quantity
:param validation_error: the error raised when validating the
calculated parameter value
:type validation_error: atlast_sc.exceptions.ValueOutOfRangeException
"""
message = f"The calculated value {calculated_value.round(4)} " \
f"is outside of the permitted range " \
f"for parameter '{validation_error.parameter}'. " \
f"{validation_error.message} " \
f"The Calculator will not be updated with the new value. " \
f"Please adjust the input parameters and recalculate."
return message
###################################################
# Methods to throw an error for old functionality #
###################################################
def __getattr__(self, name):
"""
Handle deprecated way of accessing parameters.
"""
deprecated_params = {
# user input parameters
'obs_freq': 'user_input.obs_freq',
'bandwidth': 'user_input.bandwidth',
'sensitivity': 'user_input.sensitivity',
't_int': 'user_input.t_int',
'n_pol': 'user_input.n_pol',
'weather': 'user_input.weather',
'elevation': 'user_input.elevation',
# telescope and environment parameters
'surface_rms': 'telescope_and_environment.surface_rms',
'dish_radius': 'telescope_and_environment.dish_radius',
'eta_eff': 'telescope_and_environment.eta_eff',
'eta_ill': 'telescope_and_environment.eta_ill',
'eta_spill': 'telescope_and_environment.eta_spill',
'eta_block': 'telescope_and_environment.eta_block',
'eta_pol': 'telescope_and_environment.eta_pol',
'T_cmb': 'telescope_and_environment.T_cmb',
'T_amb': 'telescope_and_environment.T_amb',
'T_amb': 'telescope_and_environment.T_amb'
}
if name in deprecated_params:
raise RuntimeError(
f"calculator.{name} is not a valid input. "
f"Use calculator.{deprecated_params[name]} instead."
)
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
def __setattr__(self, name, value):
"""
Handle deprecated way of setting parameters.
"""
deprecated_params = {
# user input parameters
'obs_freq': 'user_input.obs_freq',
'bandwidth': 'user_input.bandwidth',
'sensitivity': 'user_input.sensitivity',
't_int': 'user_input.t_int',
'n_pol': 'user_input.n_pol',
'weather': 'user_input.weather',
'elevation': 'user_input.elevation',
# telescope and environment parameters
'surface_rms': 'telescope_and_environment.surface_rms',
'dish_radius': 'telescope_and_environment.dish_radius',
'eta_eff': 'telescope_and_environment.eta_eff',
'eta_ill': 'telescope_and_environment.eta_ill',
'eta_spill': 'telescope_and_environment.eta_spill',
'eta_block': 'telescope_and_environment.eta_block',
'eta_pol': 'telescope_and_environment.eta_pol',
'T_cmb': 'telescope_and_environment.T_cmb',
'T_amb': 'telescope_and_environment.T_amb',
'T_amb': 'telescope_and_environment.T_amb'
}
if name in deprecated_params:
raise RuntimeError(
f"calculator.{name} is not a valid input. "
f"Use calculator.{deprecated_params[name]} instead."
)
super().__setattr__(name, value)