Source code for pydsd.DropSizeDistribution

# -*- coding: utf-8 -*-
"""
The Drop Size Distribution model contains the DropSizeDistribution class.
This class represents drop size distributions returned from the various
readers in the io module. The class knows how to perform scattering
simulations on itself.
"""

import numpy as np
import pytmatrix
import scipy
from scipy.optimize import curve_fit
from math import gamma

from pytmatrix.tmatrix import Scatterer
from pytmatrix.psd import PSDIntegrator
from pytmatrix import orientation, radar, tmatrix_aux, refractive
from datetime import date
from .utility.expfit import expfit, expfit2

from . import DSR
from .utility import dielectric
from .utility import configuration
from .utility import filter

SPEED_OF_LIGHT = 299792458


[docs]class DropSizeDistribution(object): """ DropSizeDistribution class to hold DSD's and calculate parameters and relationships. Should be returned from the disdrometer*reader style functions. Attributes ---------- time: array_like An array of times corresponding to the time each dsd was sampled in minutes relative to time_start. time_start: datetime A datetime object indicated start of disdrometer recording. fields: dictionary Dictionary of scattered components. Nd : 2d Array A list of drop size distributions spread: array_like Array giving the bin spread size for each size bin of the disdrometer. velocity: array_like Terminal Fall Velocity for each size bin. This is based on the disdrometer assumptions. Z: array_like The equivalent reflectivity factory from the disdrometer. Often taken as D**6. bin_edges: array_like N+1 sized array of the boundaries of each size bin. For 30 bins for instance, there will be 31 different bin boundaries. diameter: array_like The center size for each dsd bin. """ def __init__(self, reader, time_start=None, location=None): """Initializer for the DropSizeDistribution class. The DropSizeDistribution class holds dsd's returned from the various readers in the io module. Parameters ---------- reader: object Object returned by package readers. time_start: datetime Recording Start time. location: tuple (Latitude, Longitude) pair in decimal format. Returns ------- dsd: `DropSizeDistribution` instance Drop Size Distribution instance. """ self.config = configuration.Configuration() self.time = reader.time self.Nd = reader.fields["Nd"] self.spread = reader.spread try: self.rain_rate = reader.fields["rain_rate"] except: self.rain_rate = None try: self.Z = reader.fields["reflectivity"] except: self.Z = None try: self.num_particles = reader.fields["num_particles"] except: self.num_particles = None try: self.bin_edges = reader.bin_edges except: self.bin_edges = None try: self.diameter = reader.diameter except: self.diameter = None self.fields = reader.fields self.time_start = time_start try: self.info = reader.info except: self.info = {} self.numt = len(reader.time["data"]) location = {} if location: self.location = {"latitude": location[0], "longitude": location[1]} self.scattering_table_consistent = False self.scattering_params = {} self.set_scattering_temperature_and_frequency() self.set_canting_angle() try: # We need to make sure this is a dictionary self.velocity = reader.fields["terminal_velocity"] except: self.velocity = None if self.velocity is None: self.calculate_fall_speed(self.diameter["data"], inplace=True) if type(self.velocity) is np.ndarray: self.velocity = {"data": self.velocity} if "drop_spectrum" in reader.fields: try: self.spectrum_fall_velocity = reader.spectrum_fall_velocity except KeyError: print( "Spectrum is stored, but associated velocity is missing. Please fix this in the reader.\ We will continue but this will likely cause errors down the road." ) try: self.effective_sampling_area = reader.effective_sampling_area except: self.effective_sampling_area = None
[docs] def set_scattering_temperature_and_frequency( self, scattering_temp=10, scattering_freq=9.7e9 ): """ Change the scattering temperature. After use, re-run calculate_radar_parameters to see the effect this has on the parameters. Temperatures are in Celsius. Defaults to 10C X-band. Parameters ---------- scattering_temp: optional, float Scattering temperature [C]. scattering_freq: optional, float Scattering frequency [Hz]. """ self.scattering_params["scattering_freq"] = scattering_freq self.scattering_params["scattering_temp"] = scattering_temp self.scattering_params["m_w"] = dielectric.get_refractivity( scattering_freq, scattering_temp ) self.scattering_table_consistent = False
[docs] def set_canting_angle(self, canting_angle=20): """ Change the canting angle for scattering calculations. Requires scattering table to be regenerated afterwards. """ self.scattering_params["canting_angle"] = canting_angle self.scattering_table_consistent = False
[docs] def calculate_radar_parameters( self, dsr_func=DSR.bc, scatter_time_range=None, max_diameter=9.0, scatter_table_filename=None, ): """ Calculates radar parameters for the Drop Size Distribution. Calculates the radar parameters and stores them in the object. Defaults to X-Band,Beard and Chuang 10C setup. Sets the dictionary parameters in fields dictionary: Zh, Zdr, Kdp, Ai(Attenuation) Parameters: ---------- wavelength: optional, pytmatrix wavelength Wavelength to calculate scattering coefficients at. dsr_func: optional, function Drop Shape Relationship function. Several are availab le in the `DSR` module. Defaults to Beard and Chuang scatter_time_range: optional, tuple Parameter to restrict the scattering to a time interval. The first element is the start time, while the second is the end time. """ if self.scattering_table_consistent is False: self._setup_scattering( SPEED_OF_LIGHT / self.scattering_params["scattering_freq"] * 1000.0, dsr_func, max_diameter, scatter_table_filename=scatter_table_filename, ) self._setup_empty_fields() if scatter_time_range is None: self.scatter_start_time = 0 self.scatter_end_time = self.numt else: if scatter_time_range[0] < 0: print("Invalid Start time specified, aborting") return self.scatter_start_time = scatter_time_range[0] self.scatter_end_time = scatter_time_range[1] if scatter_time_range[1] > self.numt: print( "End of Scatter time is greater than end of file." + "Scattering to end of included time." ) self.scatter_end_time = self.numt self.scatterer.set_geometry( tmatrix_aux.geom_horiz_back ) # We break up scattering to avoid regenerating table. for t in range(self.scatter_start_time, self.scatter_end_time): if np.sum(self.Nd["data"][t]) is 0: continue BinnedDSD = pytmatrix.psd.BinnedPSD( self.bin_edges["data"], self.Nd["data"][t] ) self.scatterer.psd = BinnedDSD self.fields["Zh"]["data"][t] = 10 * np.log10(radar.refl(self.scatterer)) self.fields["Zdr"]["data"][t] = 10 * np.log10(radar.Zdr(self.scatterer)) self.fields["delta_co"]["data"][t] = ( radar.delta_hv(self.scatterer) * 180.0 / np.pi ) self.scatterer.set_geometry(tmatrix_aux.geom_horiz_forw) for t in range(self.scatter_start_time, self.scatter_end_time): BinnedDSD = pytmatrix.psd.BinnedPSD( self.bin_edges["data"], self.Nd["data"][t] ) self.scatterer.psd = BinnedDSD self.fields["Kdp"]["data"][t] = radar.Kdp(self.scatterer) self.fields["Ai"]["data"][t] = radar.Ai(self.scatterer) self.fields["Adr"]["data"][t] = radar.Ai(self.scatterer) - radar.Ai( self.scatterer, h_pol=False )
def _setup_empty_fields(self): """ Preallocate arrays of zeros for the radar moments """ params_list = ["Zh", "Zdr", "delta_co", "Kdp", "Ai", "Adr"] for param in params_list: self.fields[param] = self.config.fill_in_metadata( param, np.ma.zeros(self.numt) ) def _setup_scattering( self, wavelength, dsr_func, max_diameter, scatter_table_filename=None ): """ Internal Function to create scattering tables. This internal function sets up the scattering table. It takes a wavelength as an argument where wavelength is one of the pytmatrix accepted wavelengths. Parameters: wavelength : tmatrix wavelength PyTmatrix wavelength. dsr_func : function Drop Shape Relationship function. Several built-in are available in the `DSR` module. max_diameter: float Maximum drop diameter to generate scattering table for. """ self.scatterer = Scatterer( wavelength=wavelength, m=self.scattering_params["m_w"] ) self.scatterer.psd_integrator = PSDIntegrator() self.scatterer.psd_integrator.axis_ratio_func = lambda D: 1.0 / dsr_func(D) self.dsr_func = dsr_func self.scatterer.psd_integrator.D_max = max_diameter self.scatterer.psd_integrator.geometries = ( tmatrix_aux.geom_horiz_back, tmatrix_aux.geom_horiz_forw, ) self.scatterer.or_pdf = orientation.gaussian_pdf( self.scattering_params["canting_angle"] ) self.scatterer.orient = orientation.orient_averaged_fixed if scatter_table_filename is None: self.scatterer.psd_integrator.init_scatter_table(self.scatterer) else: self.scatterer.psd_integrator.load_scatter_table(scatter_table_filename) self.scattering_table_consistent = True def _calc_mth_moment(self, m): """Calculates the mth moment of the drop size distribution. Returns the mth moment of the drop size distribution E[D^m]. Parameters: ----------- m: float order of the moment """ if len(self.spread["data"]) > 0: bin_width = self.spread["data"] else: bin_width = [ self.bin_edges["data"][i + 1] - self.bin_edges["data"][i] for i in range(0, len(self.bin_edges["data"]) - 1) ] mth_moment = np.ma.zeros(self.numt) for t in range(0, self.numt): dmth = np.power(self.diameter["data"], m) mth_moment[t] = np.dot(np.multiply(dmth, self.Nd["data"][t]), bin_width) return mth_moment
[docs] def calculate_dsd_parameterization(self, method="bringi"): """Calculates DSD Parameterization. This calculates the dsd parameterization and stores the result in the fields dictionary. This includes the following parameters: Nt, W, D0, Nw, Dmax, Dm, N0, mu Parameters: ----------- method: optional, string Method to use for DSD estimation Further Info: ------ For D0 and Nw we use the method due to Bringi and Chandrasekar. """ params_list = ["D0", "Dmax", "Dm", "Nt", "Nw", "N0", "W", "mu", "Lambda"] for param in params_list: self.fields[param] = self.config.fill_in_metadata( param, np.ma.zeros(self.numt) ) rho_w = 1e-03 # grams per mm cubed Density of Water vol_constant = np.pi / 6.0 * rho_w self.fields["Dm"]["data"] = np.divide( self._calc_mth_moment(4), self._calc_mth_moment(3) ) for t in range(0, self.numt): if np.sum(self.Nd["data"][t]) == 0: continue self.fields["Nt"]["data"][t] = np.dot( self.spread["data"], self.Nd["data"][t] ) self.fields["W"]["data"][t] = vol_constant * np.dot( np.multiply(self.Nd["data"][t], self.spread["data"]), np.array(self.diameter["data"]) ** 3, ) self.fields["D0"]["data"][t] = self._calculate_D0(self.Nd["data"][t]) self.fields["Nw"]["data"][t] = ( 256.0 / (np.pi * rho_w) * np.divide( self.fields["W"]["data"][t], self.fields["Dm"]["data"][t] ** 4 ) ) self.fields["Dmax"]["data"][t] = self.__get_last_nonzero(self.Nd["data"][t]) self.fields["mu"]["data"][:] = list( map(self._estimate_mu, list(range(0, self.numt))) ) Lambda, N0 = self._calculate_exponential_params() self.fields["Lambda"]["data"] = Lambda self.fields["N0"]["data"] = N0
def __get_last_nonzero(self, N): """ Gets last nonzero entry in an array. Gets last non-zero entry in an array. Parameters ---------- N: array_like Array to find nonzero entry in Returns ------- max: int last nonzero entry in an array. """ if np.ma.count(N): return self.diameter["data"][np.max(N.nonzero())] else: return 0 def _calculate_D0(self, N): """ Calculate Median Drop diameter. Calculates the median drop diameter for the array N. This assumes diameter and bin widths in the dsd object have been properly set. Parameters: ----------- N: array_like Array of drop counts for each size bin. Notes: ------ This works by calculating the two bins where cumulative water content goes over 0.5, and then interpolates the correct D0 value between these two bins. """ rho_w = 1e-3 W_const = rho_w * np.pi / 6.0 if np.nansum(N) == 0: return 0 if ( np.count_nonzero(N[np.isfinite(N)]) == 1 ): # If there is only one nonzero/nan element, return that diameter. return self.diameter["data"][ np.nanargmax(N) ] # This gets around weirdness with only one valid point. cum_W = W_const * np.nancumsum( [ N[k] * self.spread["data"][k] * (self.diameter["data"][k] ** 3) for k in range(0, len(N)) ] ) cross_pt = list(cum_W < (cum_W[-1] * 0.5)).index(False) - 1 slope = (cum_W[cross_pt + 1] - cum_W[cross_pt]) / ( self.diameter["data"][cross_pt + 1] - self.diameter["data"][cross_pt] ) run = (0.5 * cum_W[-1] - cum_W[cross_pt]) / slope return self.diameter["data"][cross_pt] + run def _calculate_exponential_params(self, moment_1=2, moment_2=4): """ Calculate Exponential DSD parameters. Calculate Exponential DSD parameters using method of moments. The choice of moments is given in the parameters. Uses method from [1] Parameters: moment_1: float First moment to use. moment_2: float Second moment to use. References: ------ [1] Zhang, et. al., 2008, Diagnosing the Intercept Parameter for Exponential Raindrop Size Distribution Based on Video Disdrometer Observations: Model Development. J. Appl. Meteor. Climatol., https://doi.org/10.1175/2008JAMC1876.1 """ m1 = self._calc_mth_moment(moment_1) m2 = self._calc_mth_moment(moment_2) num = m1 * gamma(moment_2 + 1) den = m2 * gamma(moment_1 + 1) Lambda = np.power(np.divide(num, den), (1 / (moment_2 - moment_1))) N0 = m1 * np.power(Lambda, moment_1 + 1) / gamma(moment_1 + 1) return Lambda, N0
[docs] def set_air_density(self, air_density): """ Set air density at ground level """ self.air_density = 1000.0
[docs] def calculate_fall_speed(self, diameter, density=1000, inplace=False): """ Calculate terminal fall velocity for drops[1] adjusted by the density of the air[2] Parameters ---------- diameter: array_like[float] Array of diameters to calculate fall speed for. density: float, optional Density of the air in millibars. Defaults to 1000mb. Returns ------- terminal_fall_speed: array_like[float] Array of fall speeds matching size of diameter, adjusted for air density. References ---------- [1] Atlas, D., Srivastava, R. C., and Sekhon, R. S. (1973), Doppler radar characteristics of precipitation at vertical incidence, Rev. Geophys., 11( 1), 1– 35, doi:10.1029/RG011i001p00001. [2] Foote, G. B. and duToit, P. S.: Terminal velocity of raindrops aloft, J. Appl. Meteorol., 8, 249–253, doi:10.1175/1520- 0450(1969)008<0249:TVORA>2.0.CO;2, 1969. """ self.set_air_density(density) velocity = 9.65 - 10.3 * np.exp(-0.6 * diameter) velocity[0] = 0.0 speed_adjustment = (density / 1000.0) ** 0.4 # Based on Yu 2016 terminal_fall_speed = velocity * speed_adjustment if self.velocity is None: self.velocity = {} self.velocity["data"] = terminal_fall_speed self.velocity["air_density"] = self.air_density if inplace is True: self.velocity["data"] = terminal_fall_speed self.velocity["air_density"] = self.air_density return terminal_fall_speed
[docs] def calculate_RR(self): """Calculate instantaneous rain rate. This calculates instantaneous rain rate based on the flux of water. """ self.fields["rain_rate"] = {"data": np.ma.zeros(self.numt)} for t in range(0, self.numt): # self.rain_rate['data'][t] = 0.6*3.1415 * 10**(-3) * np.dot(np.multiply(self.rain_rate['data'],np.multiply(self.Nd['data'][t],self.spread['data'] )), # np.array(self.diameter['data'])**3) self.fields["rain_rate"]["data"][t] = ( 0.6 * np.pi * 1e-03 * np.sum( self._mmultiply( self.velocity["data"], self.Nd["data"][t], self.spread["data"], np.array(self.diameter["data"]) ** 3, ) ) )
[docs] def calculate_R_Kdp_relationship(self): """ calculate_R_kdp_relationship calculates a power fit for the rainfall-kdp relationship based upon the calculated radar parameters(which should have already been run). It returns the scale and exponential parameter a and b in the first tuple, and the second returned argument gives the covariance matrix of the fit. """ if "rain_rate" in list(self.fields.keys()): filt = np.logical_and( self.fields["Kdp"]["data"] > 0, self.fields["rain_rate"]["data"] > 0 ) popt, pcov = expfit( self.fields["Kdp"]["data"][filt], self.fields["rain_rate"]["data"][filt] ) return popt, pcov else: print("Please run calculate_RR() function first.") return None
[docs] def calculate_R_Zh_relationship(self): """ calculate_R_Zh_relationship calculates the power law fit for Zh based upon scattered radar parameters. It returns the scale and exponential parameter a and b in the first tuple, and the second returned argument gives the covariance matrix of the fit. Returns: -------- popt: tuple a,b,c fits for relationship. pcov: array Covariance matrix of fits. """ popt, pcov = expfit( np.power( 10, 0.1 * self.fields["Zh"]["data"][self.fields["rain_rate"]["data"] > 0], ), self.fields["rain_rate"]["data"][self.fields["rain_rate"]["data"] > 0], ) return popt, pcov
[docs] def calculate_R_Zh_Zdr_relationship(self): """ calculate_R_Zh_Zdr_relationship calculates the power law fit for Zh,Zdr based upon scattered radar parameters. It returns the scale and exponential parameters a, b, and c in the first tuple, and the second returned argument gives the covariance matrix of the fit. Uses a set of filters to remove bad data: rain_rate > 0 Zdr > 0 Kdp > 0 """ filt = np.logical_and( np.logical_and( self.fields["rain_rate"]["data"] > 0, np.greater(self.fields["Zdr"]["data"], 0), ), self.fields["Kdp"]["data"] > 0, ) popt, pcov = expfit2( [ self._idb(self.fields["Zh"]["data"][filt]), self._idb(self.fields["Zdr"]["data"][filt]), ], self.fields["rain_rate"]["data"][filt], ) return popt, pcov
[docs] def calculate_R_Zh_Kdp_relationship(self): """ calculate_R_Zh_Kdp_relationship calculates the power law fit for Zh,Kdp based upon scattered radar parameters. It returns the scale and exponential parameters a, b, and c in the first tuple, and the second returned argument gives the covariance matrix of the fit. rain_rate > 0 Zdr > 0 Kdp > 0 """ filt = np.logical_and( np.logical_and( self.fields["rain_rate"]["data"] > 0, self.fields["Zdr"]["data"] > 0 ), self.fields["Kdp"]["data"] > 0, ) popt, pcov = expfit2( [ self._idb(self.fields["Zh"]["data"][filt]), self.fields["Kdp"]["data"][filt], ], self.fields["rain_rate"]["data"][filt], ) return popt, pcov
[docs] def calculate_R_Zdr_Kdp_relationship(self): """ calculate_R_Zdr_Kdp_relationship calculates the power law fit for Zdr,Kdp based upon scattered radar parameters. It returns the scale and exponential parameters a, b, and c in the first tuple, and the second returned argument gives the covariance matrix of the fit. rain_rate > 0 Zdr > 0 Kdp > 0 """ filt = np.logical_and( np.logical_and( self.fields["rain_rate"]["data"] > 0, self.fields["Zdr"]["data"] > 0 ), self.fields["Kdp"]["data"] > 0, ) popt, pcov = expfit2( [ self._idb(self.fields["Zdr"]["data"][filt]), self.fields["Kdp"]["data"][filt], ], self.fields["rain_rate"]["data"][filt], ) return popt, pcov
[docs] def calculate_dsd_from_spectrum(self, effective_sampling_area=None, replace=True): """ Calculate N(D) from the drop spectrum based on the effective sampling area. Updates the entry for ND in fields. Requires that drop_spectrum be present in fields, and that the dsd has spectrum_fall_velocity defined. Parameters ---------- effective_sampling_area: function Function that returns the effective sampling area as a function of diameter. Optionally a array with effective sampling area matched to diameter dimension can be provided as an array. replace: boolean Whether to replace Nd with the newly calculated one. If true, no return value to save memory. """ D = self.diameter["data"] if effective_sampling_area is not None: if callable(effective_sampling_area): A = effective_sampling_area(D) else: A = np.array(effective_sampling_area) elif self.effective_sampling_area is not None: A = self.effective_sampling_area["data"] else: print( "Defaulting to Parsivel Sampling Area. This is probably wrong. Make sure effective_sampling_area variable is set" ) A = filter.parsivel_sampling_area(D) delta_t = np.mean(np.diff(self.time["data"][0:4])) # Sampling time in seconds velocity = self.spectrum_fall_velocity["data"] spread = self.spread["data"] if replace: self.fields["Nd"]["data"] = ( 1e6 * np.dot( np.swapaxes(self.fields["drop_spectrum"]["data"], 1, 2), 1 / velocity, ) / (A * spread * delta_t) ) self.fields["Nd"]["source"] = "Calculated from spectrum." else: return ( 1e6 * np.dot( np.swapaxes(self.fields["drop_spectrum"]["data"], 1, 2), 1 / velocity, ) / (A * spread * delta_t) )
[docs] def save_scattering_table(self, scattering_filename): """ Save scattering table used by PyDSD to be reloaded later. Note this should only be used on disdrometers with the same setup for scattering (frequency, bins, max size, etc). This feature is currently experimental and may be removed in the future if it turns out to not work correctly or to cause issues. """ if hasattr(self.scatterer, "psd_integrator"): self.scatterer.psd_integrator.save_scatter_table(scattering_filename) else: raise AttributeError( "ERROR: No scattering object has been generated. Please calculate scattering table first." )
def _idb(self, db): """ Converts dB to linear scale. """ return np.power(10, np.multiply(0.1, db)) def _mmultiply(self, *args): """ _mmultiply extends numpy multiply to arbitrary number of same sized matrices. Multiplication is elementwise. Parameters: ----------- *args: matrices Matrices to multiply. Must be same shape. """ i_value = np.ones(len(args[0])) for i in args: i_value = np.multiply(i_value, i) return i_value def _estimate_mu(self, idx): """ Estimate $\mu$ for a single drop size distribution Estimate the shape parameter $\mu$ for the drop size distribution `Nd`. This uses the method due to Bringi and Chandrasekar. It is a minimization of the MSE error of a created gamma and measured DSD. Parameters ---------- Nd : array_like A drop size distribution D0: optional, float Median drop diameter in mm. If none is given, it will be estimated. Nw: optional, float Normalized Intercept Parameter. If none is given, it will be estimated. Returns ------- mu: integer Best estimate for DSD shape parameter $\mu$. """ if np.sum(self.Nd["data"][idx]) == 0: return np.nan res = scipy.optimize.minimize_scalar( self._mu_cost, bounds=(-10, 20), args=(idx,), method="bounded" ) if self._mu_cost(res.x, idx) == np.nan or res.x > 20: return np.nan else: return res.x def _mu_cost(self, mu, idx): """ Cost function for goodness of fit of a distribution. Calculates the MSE cost comparison of two distributions to fit $\mu$. Parameters ---------- idx: integer index into DSD field mu: float Potential Mu value """ gdsd = pytmatrix.psd.GammaPSD( self.fields["D0"]["data"][idx], self.fields["Nw"]["data"][idx], mu ) return np.sqrt( np.nansum( np.power(np.abs(self.Nd["data"][idx] - gdsd(self.diameter["data"])), 2) ) )