Source code for pitchscapes.scapes

from math import isclose
from warnings import warn
import numpy as np


[docs] def normalize_non_zero(a, axis=None, replace_zeros=False, skip_type_check=False): """For the given ND array, normalise each 1D array obtained by indexing the 'axis' dimension if the sum along the other dimensions (for that entry) is non-zero. Normalisation is performed in place.""" # check that dtype is float (in place division of integer arrays silently rounds) if not skip_type_check: if not np.issubdtype(a.dtype, np.floating): raise TypeError(f"Cannot guarantee that normalisation works as expected on array of type '{a.dtype}'. " f"Use 'skip_type_check=True' to skip this check.") # normalise along last axis per default if axis is None: axis = a.ndim - 1 # make axis a tuple if it isn't if not isinstance(axis, tuple): axis = (axis,) # compute sum along axis, keeping dimensions s = a.sum(axis=axis, keepdims=True) # check for non-zero entries non_zero = (s != 0) if not np.any(non_zero): # directly return if there are no non-zero entries return a # construct an index tuple to select the appropriate entries for normalisation (the dimensions specified by axis # have to be replaced by full slices ':' to broadcast normalisation along these dimensions) non_zero_arr = tuple(slice(None) if idx in axis else n for idx, n in enumerate(non_zero.nonzero())) # in-place replace non-zero entries by their normalised values a[non_zero_arr] = a[non_zero_arr] / s[non_zero_arr] # add uniform distribution for zeros if requested if replace_zeros: zero_arr = tuple(slice(None) if idx in axis else n for idx, n in enumerate(np.logical_not(non_zero).nonzero())) a[zero_arr] += 1 / np.array(a.shape)[np.array(axis)].prod() # return array return a
[docs] class Scape: """ Abstract base class for scapes objects. Which has a minimum time (min_time) a maximum time (max_time) and a well-defined value for any window (t1, t2) with min_time <= t1 < t2 <= max_time. The value can be retrieved via scape[t1, t2]. """
[docs] def __init__(self, min_time, max_time): """ :param min_time: minimum time :param max_time: maximum time """ self.min_time = min_time self.max_time = max_time
[docs] def __getitem__(self, item): """ Return value of the scape at position :param item: (start, end) with start < end :return: value of the scape for the time window (start, end) """ raise NotImplementedError
[docs] def assert_valid_time_window(self, start, end): if start < self.min_time and not isclose(start, self.min_time): raise ValueError(f"Window start is less than minimum time ({start} < {self.min_time})") if end > self.max_time and not isclose(end, self.max_time): raise ValueError(f"Window end is greater than maximum time ({end} > {self.max_time})") if start > end: raise ValueError(f"Window start is greater than or equal to window end ({start} >= {end})")
[docs] class DiscretePitchScape_(Scape): """ Discrete-time scape that is only well-defined at specific points in time. It computes the weighted sum (optionally the mean) over the given time window i.e. it sums up the values weighted by the corresponding time span and (optionally) divides by the total time interval. """
[docs] def __init__(self, values, times=None, prior_counts=None, strategy="left", normalise=False, normalise_values=False, skip_type_check=False, dtype=None): """ Initialise the scape. :param values: values for each time slot (first dimension of size N runs over time intervals) :param times: [optional] boundaries of the time intervals (array like of length N+1). Defaults to [0, 1, ..., N]. :param prior_counts: scalar prior counts added to the aggregated values. Non-zero prior counts correspond to a Bayesian estimate with a uniform prior; the magnitude adjusts the prior strength. A value of None (default) does not add any prior counts. If return values are normalised (normalise=True) there is a difference between None and 0: A value of 0 is interpreted as the limitting case of non-zero prior counts, which means that outputs will always be normalised and if no data is available (zero counts, e.g. for zero-width time intervals or time intervals only zero values), a uniform distribution is returned. For a value of None, you will get an unnormalised all-zero output instead. :param strategy: one of ["left", "right", "center"] (default: "left"). Determines how a time-index interval [K, L] is split in the recursive computations. For "left": [K,L-1]+[L-1,L]; for "right": [K,K+1]+[K+1,L]; for "center": [K,K+1]+[K+1,L-1]+[L-1,L]. Also see comments for parse_bottom_up concerning efficiency and run time. :param normalise: whether to compute the weighted sum or mean (default: False i.e. compute weighted sum) :param normalise_values: whether to normalise the values per time interval (default: True) :param skip_type_check: If the values are integers there may be problems with in-place normalisation or adding floating-type prior counts. The function raises a warning in these cases, suggesting to use a floating type for the values. This check can be disabled (which is generally a bad idea) by setting skip_type_check=True. Note that the warning is not raised for integer values of no normalisation is performed and the prior counts are None or not of a floating type. :param dtype: type to use for values """ self.prior_counts = prior_counts self.strategy = strategy self.normalise = normalise # set times and initialise super class if times is None: times = list(range(len(values) + 1)) self._len = len(times) self.times = np.array(times) if not np.all(self.times[:-1] <= self.times[1:]): raise ValueError(f"Times are not sorted: {self.times}") # get reverse time indices self.indices = {time: idx for idx, time in enumerate(self.times)} self.min_index = 0 self.max_index = self.times.shape[0] super().__init__(min_time=self.times.min(), max_time=self.times.max()) # values if dtype is None: self.values = np.array(values) else: self.values = np.array(values, dtype=dtype) # warn for non-float types if not skip_type_check and not np.issubdtype(self.values.dtype, np.floating): if normalise_values or normalise: raise TypeError(f"Using non-floating values of type '{self.values.dtype}' may lead to errors or " f"unexpected results in combination with normalisation. Either use float values; or " f"don't use normalisation (normalise_values=False and normalise=False); or set " f"skip_type_check=True to take the risk.") if np.issubdtype(type(self.prior_counts), np.floating): raise TypeError(f"Combining non-floating values of type '{self.values.dtype}' with floating-type prior " f"counts of type '{type(self.prior_counts)}' may lead to errors or unexpected results. " f"Use float values; or set skip_type_check=True to take the risk.") # normalise_values if normalise_values: normalize_non_zero(self.values, skip_type_check=skip_type_check) # pre-compute the overall number of value dimensions (for prior counts and normalisation) self.n_value_dim = np.prod(self.values.shape[1:]) # check dimensions if self.values.shape[0] != self.times.shape[0] - 1: raise ValueError(f"There should be n-1 values for n time steps " f"(values shape: {self.values.shape}, times shape: {self.times.shape})") # initialise scape if self.normalise: self._accumulated_values = {(idx, idx + 1): v for idx, v in enumerate(self.values)} else: self._accumulated_values = {(idx, idx + 1): v * (self.times[idx + 1] - self.times[idx]) for idx, v in enumerate(self.values)}
[docs] def assert_valid_time_window(self, start, end): super().assert_valid_time_window(start, end) if start not in self.indices: raise ValueError(f"Window start ({start}) is not a valid time (not in {self.indices})") if end not in self.indices: raise ValueError(f"Window end ({end}) is not a valid time (not in {self.indices})")
[docs] def assert_valid_index_window(self, start, end): if not (self.min_index <= start <= end <= self.max_index): raise ValueError(f"Invalid start/end index, should be " f"{self.min_index} <= {start} < {end} <= {self.max_index}")
[docs] def __getitem__(self, item): start, end = item self.assert_valid_time_window(start, end) start_idx, end_idx = self.indices[start], self.indices[end] self.assert_valid_index_window(start_idx, end_idx) # check for zero-size windows or return accumulated value if start == end: return self.zero_size_value() else: value = self.get_value_at_index(start_idx, end_idx) delta_t = self.times[end_idx] - self.times[start_idx] self.add_prior_counts(value, delta_t) return value
[docs] def recursive_indices(self, start_idx, end_idx): if end_idx - start_idx < 2: raise ValueError(f"Recursion only defined for windows with size > 1 " f"(start: {start_idx}, end: {end_idx}, size: {end_idx - start_idx})") if self.strategy == "center": index_list = [(start_idx, start_idx + 1), (end_idx - 1, end_idx)] if end_idx - start_idx > 2: index_list.append((start_idx + 1, end_idx - 1)) elif self.strategy == "left": index_list = [(start_idx, end_idx - 1), (end_idx - 1, end_idx)] elif self.strategy == "right": index_list = [(start_idx, start_idx + 1), (start_idx + 1, end_idx)] else: raise ValueError(f"Unknown recursion strategy '{self.strategy}'") return index_list
[docs] def zero_size_value(self): # return zeros, uniform distribution, or prior counts (depending on value of prior_counts) ret = np.zeros(self.values.shape[1:], dtype=self.values.dtype) if self.normalise: if self.prior_counts is not None: # for any value of prior counts (except None) return a uniform distribution ret += 1 / self.n_value_dim else: if self.prior_counts is not None and self.prior_counts != 0: ret += self.prior_counts return ret
[docs] def get_value_at_index(self, start, end): # if value is not stored in memory: compute recursively if (start, end) not in self._accumulated_values: # accumulate indices to compute indices_to_compute = [(start, end)] unprocessed_indices = [(start, end)] while unprocessed_indices: unprocessed_start, unprocessed_end = unprocessed_indices.pop() for new_start, new_end in self.recursive_indices(unprocessed_start, unprocessed_end): if (new_start, new_end) not in self._accumulated_values: unprocessed_indices.append((new_start, new_end)) indices_to_compute.append((new_start, new_end)) # compute for new_start, new_end in reversed(indices_to_compute): # double-check before computing # (might have reoccurred higher in the stack and already been computed) if (new_start, new_end) not in self._accumulated_values: sum_val = None for new_new_start, new_new_end in self.recursive_indices(new_start, new_end): # compute value val = self._accumulated_values[new_new_start, new_new_end] if self.normalise: # get unnormalised value by multiplying with the respective time interval val = val * (self.times[new_new_end] - self.times[new_new_start]) # initialise or update sum if sum_val is None: sum_val = val else: sum_val = sum_val + val if self.normalise: # if output should be normalised, store normalised values sum_val /= self.times[new_end] - self.times[new_start] self._accumulated_values[new_start, new_end] = sum_val # return a copy of the stored value return self._accumulated_values[start, end].copy()
[docs] def add_prior_counts(self, value, delta_t): """Add prior counts ensuring correct normalisation. NOTE: value is modified in place.""" if self.normalise: # get unnormalised value from normalised to correctly add prior counts value *= delta_t if self.prior_counts is not None and self.prior_counts != 0: # add prior counts value += self.prior_counts if self.normalise: # renormalise if self.prior_counts == 0 and np.all(value == 0): # edge case of prior counts --> 0: return uniform distribution value += 1 / self.n_value_dim elif self.prior_counts is None: # prior counts ignored in normalisation value /= delta_t else: # prior counts included in normalisation value /= delta_t + self.n_value_dim * self.prior_counts if not np.all(value == 0): # compensate floating point errors n = value.sum() value /= n if not np.isclose(n, 1, atol=1e-2): warn(f"Inexact normalisation corrected ({n})", RuntimeWarning)
[docs] def parse_bottom_up(self): """ Compute all values bottom up. Performs an efficient dynamic programming pass through the entire scape and avoids time consuming recursive look-ups for compting single values (run time ~O(n^2)). This can be used if all values within the scape are expected to be needed. For high-resolution scapes where only few values need to be computed this may be inefficient as a single lookup has only ~O(n) run time. So if fewer than n values are needed, separate lookup will be more efficient. Moreover, if multiple value have the same start or end time, the overall run time for computing these values is only ~O(n). Depending on whether star or end time are in common the "left" or "right" strategy is more efficient. """ for width in range(2, self.max_index - self.min_index): for start_idx in range(self.min_index, self.max_index - width): self.get_value_at_index(start_idx, start_idx + width)
[docs] class DiscreteScape(Scape): """ Discrete-time scape that is only well-defined at specific points in time. It computes the weighted sum over the given time window i.e. it sums up the values weighted by the corresponding time span. """
[docs] def __init__(self, values, times=None, weights=None, strategy="left"): """ Initialise the scape. :param values: values for each time slot (first dimension of size N runs over time intervals) :param times: [optional] boundaries of the time intervals (array like of length N+1). Defaults to [0, 1, ..., N]. :param weights: [optional] non-negative weights for the values; if not provided, the size of respective time interval is used. :param strategy: one of ["left", "right", "center"] (default: "left"). Determines how a time-index interval [K, L] is split in the recursive computations. For "left": [K,L-1]+[L-1,L]; for "right": [K,K+1]+[K+1,L]; for "center": [K,K+1]+[K+1,L-1]+[L-1,L]. Also see comments for parse_bottom_up concerning efficiency and run time. """ self.strategy = strategy # set times and initialise super class if times is None: times = list(range(len(values) + 1)) self._len = len(times) self.times = np.array(times) if not np.all(self.times[:-1] <= self.times[1:]): raise ValueError(f"Times are not sorted: {self.times}") # get reverse time indices self.indices = {time: idx for idx, time in enumerate(self.times)} self.min_index = 0 self.max_index = self.times.shape[0] super().__init__(min_time=self.times.min(), max_time=self.times.max()) # get weights if weights is None: self.weights = self.times[1:] - self.times[:-1] else: self.weights = np.array(weights) if np.any(self.weights < 0): raise ValueError(f"Weights must be non-negative: {self.weights}") # values self.values = np.array(values) # value for windows of zero width self.zero_size_value = np.zeros(self.values.shape[1:], dtype=self.values.dtype) # check dimensions if self.values.shape[0] != self.times.shape[0] - 1: raise ValueError(f"There should be n-1 values for n time steps " f"(values shape: {self.values.shape}, times shape: {self.times.shape})") if self.weights.shape[0] != self.values.shape[0]: raise ValueError(f"There should be a many weights as values " f"(values shape: {self.values.shape}, weight shape: {self.weights.shape})") # initialise bottom level of scape with weighted values self._accumulated_values = {(idx, idx + 1): v * w for idx, (v, w) in enumerate(zip(self.values, self.weights))}
[docs] def assert_valid_time_window(self, start, end): super().assert_valid_time_window(start, end) if start not in self.indices: raise ValueError(f"Window start ({start}) is not a valid time (not in {self.indices})") if end not in self.indices: raise ValueError(f"Window end ({end}) is not a valid time (not in {self.indices})")
[docs] def assert_valid_index_window(self, start, end): if not (self.min_index <= start <= end <= self.max_index): raise ValueError(f"Invalid start/end index, should be " f"{self.min_index} <= {start} < {end} <= {self.max_index}")
[docs] def __getitem__(self, item): start, end = item self.assert_valid_time_window(start, end) start_idx, end_idx = self.indices[start], self.indices[end] self.assert_valid_index_window(start_idx, end_idx) return self.get_value_at_index(start_idx, end_idx)
[docs] def recursive_indices(self, start_idx, end_idx): if end_idx - start_idx < 2: raise ValueError(f"Recursion only defined for windows with size > 1 " f"(start: {start_idx}, end: {end_idx}, size: {end_idx - start_idx})") if self.strategy == "center": index_list = [(start_idx, start_idx + 1), (end_idx - 1, end_idx)] if end_idx - start_idx > 2: index_list.append((start_idx + 1, end_idx - 1)) elif self.strategy == "left": index_list = [(start_idx, end_idx - 1), (end_idx - 1, end_idx)] elif self.strategy == "right": index_list = [(start_idx, start_idx + 1), (start_idx + 1, end_idx)] else: raise ValueError(f"Unknown recursion strategy '{self.strategy}'") return index_list
[docs] def get_value_at_index(self, start, end): # zero-size window if start == end: return self.zero_size_value.copy() # if value is not stored in memory: compute recursively if (start, end) not in self._accumulated_values: # accumulate indices to compute indices_to_compute = [(start, end)] unprocessed_indices = [(start, end)] while unprocessed_indices: unprocessed_start, unprocessed_end = unprocessed_indices.pop() for new_start, new_end in self.recursive_indices(unprocessed_start, unprocessed_end): if (new_start, new_end) not in self._accumulated_values: unprocessed_indices.append((new_start, new_end)) indices_to_compute.append((new_start, new_end)) # compute for new_start, new_end in reversed(indices_to_compute): # double-check before computing # (might have reoccurred higher in the stack and already been computed) if (new_start, new_end) not in self._accumulated_values: sum_val = None for new_new_start, new_new_end in self.recursive_indices(new_start, new_end): # compute value val = self._accumulated_values[new_new_start, new_new_end] # initialise or update sum if sum_val is None: sum_val = val else: sum_val = sum_val + val self._accumulated_values[new_start, new_end] = sum_val # return a copy of the stored value return self._accumulated_values[start, end].copy()
[docs] def parse_bottom_up(self): """ Compute all values bottom up. Performs an efficient dynamic programming pass through the entire scape and avoids time consuming recursive look-ups for compting single values (run time ~O(n^2)). This can be used if all values within the scape are expected to be needed. For high-resolution scapes where only few values need to be computed this may be inefficient as a single lookup has only ~O(n) run time. So if fewer than n values are needed, separate lookup will be more efficient. Moreover, if multiple value have the same start or end time, the overall run time for computing these values is only ~O(n). Depending on whether star or end time are in common the "left" or "right" strategy is more efficient. """ for width in range(2, self.max_index - self.min_index): for start_idx in range(self.min_index, self.max_index - width): self.get_value_at_index(start_idx, start_idx + width)
[docs] class PitchScape_(Scape): """ Continuous-time scape that takes a discrete-time scape and interpolates linearly for points that lie in between the discrete time points. This is the exact continuous generalisation when using DiscretePitchScape as discrete-time scape. """
[docs] def __init__(self, values=None, scape=None, **kwargs): """ Initialise PitchScape either from count values or from DiscretePitchScape. :param values: pitch-class counts (do not provide together with scape) :param scape: DiscretePitchScape object (do not provide together with values) :param kwargs: keyword arguments passed on to initialize DiscretePitchScape (use only when also providing values) """ if (values is None) == (scape is None): raise ValueError("Please specify EITHER 'values' (and optional keyword arguments) OR 'scape'") if scape is None: scape = DiscretePitchScape_(values=values, **kwargs) elif kwargs: raise TypeError("Cannot take keyword arguments if scape object is given.") self.scape = scape self.normalise = scape.normalise super().__init__(min_time=self.scape.min_time, max_time=self.scape.max_time)
[docs] def __getitem__(self, item): try: return self.scape[item] except ValueError: return self.interpolate(*item)
[docs] @staticmethod def get_adjacent_indices(start, end, times): # get upper/lower adjacent indices upper_start_idx = np.searchsorted(times, start, side='right') lower_start_idx = upper_start_idx - 1 upper_end_idx = np.searchsorted(times, end, side='left') lower_end_idx = upper_end_idx - 1 n_times = len(times) # handle floating point round-off errors if lower_start_idx == -1 and isclose(start, times[0]): lower_start_idx = 0 upper_start_idx = 1 start = times[0] if upper_end_idx == n_times and isclose(end, times[-1]): lower_end_idx = n_times - 2 upper_end_idx = n_times - 1 end = times[-1] # check bounds if lower_start_idx < 0: raise ValueError("Start below valid times") if upper_end_idx >= n_times: raise ValueError("End beyond valid times") return start, end, lower_start_idx, upper_start_idx, lower_end_idx, upper_end_idx
[docs] def interpolate(self, start, end): # check for zero-size windows if start == end: return self.scape.zero_size_value() # get adjacent indices (start, end, lower_start_idx, upper_start_idx, lower_end_idx, upper_end_idx) = self.get_adjacent_indices(start, end, self.scape.times) # get corresponding times upper_start_time = self.scape.times[upper_start_idx] lower_start_time = self.scape.times[lower_start_idx] upper_end_time = self.scape.times[upper_end_idx] lower_end_time = self.scape.times[lower_end_idx] if lower_start_idx == lower_end_idx and upper_start_idx == upper_end_idx: # window start/end lie in the same time interval if self.normalise: value = self.scape.get_value_at_index(lower_start_idx, upper_start_idx) else: value = (end - start) / (upper_start_time - lower_start_time) * \ self.scape.get_value_at_index(lower_start_idx, upper_start_idx) elif upper_start_time == lower_end_time: # window start/end lie in between two adjacent time intervals e = self.scape.get_value_at_index(lower_start_idx, upper_start_idx) f = self.scape.get_value_at_index(lower_end_idx, upper_end_idx) if self.normalise: f1 = upper_start_time - start f2 = end - lower_end_time value = (f1 * e + f2 * f) / (f1 + f2) else: f1 = (upper_start_time - start) / (upper_start_time - lower_start_time) f2 = (end - lower_end_time) / (upper_end_time - lower_end_time) value = f1 * e + f2 * f else: # at least one complete time interval lies in between window start/end assert lower_start_time <= start <= upper_start_time, \ f"NOT {lower_start_time} <= {start} <= {upper_start_time}" assert lower_end_time <= end <= upper_end_time, \ f"NOT {lower_end_time} <= {end} <= {upper_end_time}" d = self.scape.get_value_at_index(upper_start_idx, lower_end_idx) e = self.scape.get_value_at_index(lower_start_idx, upper_start_idx) f = self.scape.get_value_at_index(lower_end_idx, upper_end_idx) if self.normalise: f1 = upper_start_time - start f2 = end - lower_end_time f3 = lower_end_time - upper_start_time value = (f3 * d + f1 * e + f2 * f) / (f1 + f2 + f3) else: f1 = (upper_start_time - start) / (upper_start_time - lower_start_time) f2 = (end - lower_end_time) / (upper_end_time - lower_end_time) value = d + f1 * e + f2 * f # add prior counts self.scape.add_prior_counts(value, end - start) return value
[docs] class ContinuousScape(Scape): """ Continuous-time scape that takes a discrete-time scape and interpolates linearly for points that lie in between the discrete time points. This is the exact continuous generalisation when using DiscretePitchScape as discrete-time scape. """
[docs] def __init__(self, values=None, scape=None, **kwargs): """ Initialise PitchScape either from count values or from DiscretePitchScape. :param values: pitch-class counts (do not provide together with scape) :param scape: DiscretePitchScape object (do not provide together with values) :param kwargs: keyword arguments passed on to initialize DiscretePitchScape (use only when also providing values) """ if (values is None) == (scape is None): raise ValueError("Please specify EITHER 'values' (and optional keyword arguments) OR 'scape'") if scape is None: scape = DiscreteScape(values=values, **kwargs) elif kwargs: raise TypeError("Cannot take keyword arguments if scape object is given.") self.scape = scape super().__init__(min_time=self.scape.min_time, max_time=self.scape.max_time)
[docs] def __getitem__(self, item): try: return self.scape[item] except ValueError: return self.interpolate(*item)
[docs] @staticmethod def get_adjacent_indices(start, end, times): # get upper/lower adjacent indices upper_start_idx = np.searchsorted(times, start, side='right') lower_start_idx = upper_start_idx - 1 upper_end_idx = np.searchsorted(times, end, side='left') lower_end_idx = upper_end_idx - 1 n_times = len(times) # handle floating point round-off errors if lower_start_idx == -1 and isclose(start, times[0]): lower_start_idx = 0 upper_start_idx = 1 start = times[0] if upper_end_idx == n_times and isclose(end, times[-1]): lower_end_idx = n_times - 2 upper_end_idx = n_times - 1 end = times[-1] # check bounds if lower_start_idx < 0: raise ValueError("Start below valid times") if upper_end_idx >= n_times: raise ValueError("End beyond valid times") return start, end, lower_start_idx, upper_start_idx, lower_end_idx, upper_end_idx
[docs] def interpolate(self, start, end): # check for zero-size windows if start == end: return self.scape.zero_size_value.copy() # get adjacent indices (start, end, lower_start_idx, upper_start_idx, lower_end_idx, upper_end_idx) = self.get_adjacent_indices(start, end, self.scape.times) # get corresponding times upper_start_time = self.scape.times[upper_start_idx] lower_start_time = self.scape.times[lower_start_idx] upper_end_time = self.scape.times[upper_end_idx] lower_end_time = self.scape.times[lower_end_idx] if lower_start_idx == lower_end_idx and upper_start_idx == upper_end_idx: # window start/end lie in the same time interval value = self.scape.get_value_at_index(lower_start_idx, upper_start_idx) value *= (end - start) / (upper_start_time - lower_start_time) elif upper_start_time == lower_end_time: # window start/end lie in between two adjacent time intervals e = self.scape.get_value_at_index(lower_start_idx, upper_start_idx) f = self.scape.get_value_at_index(lower_end_idx, upper_end_idx) f1 = (upper_start_time - start) / (upper_start_time - lower_start_time) f2 = (end - lower_end_time) / (upper_end_time - lower_end_time) value = f1 * e + f2 * f else: # at least one complete time interval lies in between window start/end assert lower_start_time <= start <= upper_start_time, \ f"NOT {lower_start_time} <= {start} <= {upper_start_time}" assert lower_end_time <= end <= upper_end_time, \ f"NOT {lower_end_time} <= {end} <= {upper_end_time}" d = self.scape.get_value_at_index(upper_start_idx, lower_end_idx) e = self.scape.get_value_at_index(lower_start_idx, upper_start_idx) f = self.scape.get_value_at_index(lower_end_idx, upper_end_idx) f1 = (upper_start_time - start) / (upper_start_time - lower_start_time) f2 = (end - lower_end_time) / (upper_end_time - lower_end_time) value = d + f1 * e + f2 * f return value
[docs] class PitchScape(Scape):
[docs] def __init__(self, values=None, scape=None, prior_counts=0, normalise=False, **kwargs): self.prior_counts = prior_counts self.normalise = normalise # use float values values = np.array(values, dtype=float) # dimensionality of the output self.n_dim = np.prod(values.shape[1:]) # value of uniform distribution self.uniform = 1 / self.n_dim # custom weights not allowed with normalisation if self.normalise and "weights" in kwargs: raise ValueError("Custom weights are not supported in conjunction with normalisation.") # initialise underlying scape if (values is None) == (scape is None): raise ValueError("Please specify EITHER 'values' (and optional keyword arguments) OR 'scape'") if scape is None: # if required: normalise allong all axes except the first if self.normalise: axis = tuple(range(1, len(values.shape))) normalize_non_zero(values, axis=axis, replace_zeros=True) scape = ContinuousScape(values=values, **kwargs) elif kwargs: raise TypeError("Cannot take keyword arguments if scape object is given.") self.scape = scape # init super super().__init__(min_time=self.scape.min_time, max_time=self.scape.max_time)
[docs] def __getitem__(self, item): # get value from underlying scape value = self.scape[item] # handle zeros if np.all(value == 0): if self.prior_counts is None: # if prior_counts is None: just return value of zeros pass elif self.normalise: # if prior_counts is not None and normalisation is requested: return uniform distribution value += self.uniform else: # if prior_counts is not None and NO normalisation is requested: add prior counts value += self.prior_counts return value # add prior counts if self.prior_counts is not None: value += self.prior_counts # normalise if self.normalise: # get time interval for normalisation start, end = item delta_t = end - start # if prior_counts is not None: add to normalisation if self.prior_counts is None: value /= delta_t else: value /= delta_t + self.n_dim * self.prior_counts return value