import logging
import numpy as np
from scipy.interpolate import interp1d
from scipy.linalg import toeplitz
logger = logging.getLogger(__name__)
[docs]def sph2sph(latP, lonP, latOld, lonOld, VtOld, VpOld):
"""Change coordinates and (horizontal part of) vectors from one spherical
coordinate system to another.
Parameters
----------
latP, lonP : float
Latitude and longitude of the new pole in the old coordinates, [deg]
latOld, lonOld : array
Latitudes and longitudes of the data points to be transformed in the
old coordinate system, [deg]
VtOld, VpOld : array
Theta- and phi-components of a vector field at locations
(latOld,lonOld). If you want to transform only coordinates make these
empty arrays (np.array([])).
Returns
-------
latNew, lonNew : array
Coordinates of the points (latOld,lonOld) in the new system, [deg],
same size as input parameters. Note that the old pole has longitude 0
and longitude is between 0 and 360.
VtNew, VpNew : array
Theta- and phi-components of the vector field in the new system. If no
input vector field was given, these are empty vectors [].
"""
# If pole of the new coordinate system is NaN, return NaN
if np.any(np.isnan([latP, lonP])):
latNew = np.full_like(latOld, np.nan)
lonNew = np.full_like(lonOld, np.nan)
VtNew = np.full_like(VtOld, np.nan)
VpNew = np.full_like(VpOld, np.nan)
return latNew, lonNew, VtNew, VpNew
# If pole of the new coordinate system is close enough to the old one, do
# nothing
if 90.0 - latP < 0.01:
return latOld, lonOld, VtOld, VpOld
# latitude --> co-latitude and deg --> rad
thetaP = np.radians(90 - latP)
phiP = np.radians(lonP)
thetaO = np.radians(90 - latOld)
phiO = np.radians(lonOld)
# Latitude of the data points in the new system
cosThetaPrime = np.cos(thetaO) * np.cos(thetaP) + np.sin(thetaO) * np.sin(
thetaP
) * np.cos(phiO - phiP)
latNew = 90 - np.degrees(np.arccos(cosThetaPrime))
# Find those data points that are NOT too close to the new coordinate pole.
# Points too close will get NaN in all other output parameters except
# latNew, which is set to 90.
# Except those points that have latOld or lonOld NaN, they are put to NaN.
ind = (90 - latNew) > 0.01
latNew[~ind] = 90
ii = np.isnan(latOld + lonOld)
latNew[ii] = np.nan
sinThetaPrime = np.full_like(latOld, np.nan) # np.nan + latOld
# this affects all other output variables
sinThetaPrime[ind] = np.sqrt(1 - cosThetaPrime[ind] ** 2.0)
# Longitude of the data points in the new system
# [old pole has longitude 0]
sinA = np.sin(thetaO) * np.sin(phiP - phiO) / sinThetaPrime
cosA = (np.cos(thetaO) - np.cos(thetaP) * cosThetaPrime) / (
np.sin(thetaP) * sinThetaPrime
)
lonNew = np.degrees(np.arctan2(sinA, cosA))
lonNew[lonNew < 0] += 360
# Horizontal vector components in the new system
if len(VtOld) == len(latOld):
sinC = np.sin(thetaP) * np.sin(phiP - phiO) / sinThetaPrime
cosC = (np.cos(thetaP) - np.cos(thetaO) * cosThetaPrime) / (
np.sin(thetaO) * sinThetaPrime
)
VtNew = cosC * VtOld - sinC * VpOld
VpNew = cosC * VpOld + sinC * VtOld
else:
# return empty variables
VtNew = np.array([])
VpNew = np.array([])
return latNew, lonNew, VtNew, VpNew
[docs]def sub_FindLongestNonZero(x):
"""Find the longest sequence of consecutive non-zero elements in an array.
Return the sequence and indices as y=x[ind].
Parameters
----------
x : array
Input array
Returns
-------
y : array
Longest sequence of consecutive non-zero elements
ind : array
Indices of the sequence
"""
# pad with zeros and find zero elements
x_aux = np.pad(x, (1, 1), "constant", constant_values=0)
zpos = np.nonzero(x_aux == 0)[0]
# Find the longest jump in zero positions
grpidx = np.argmax(np.diff(zpos))
# indices and values in the longest sequence
ind = np.arange(zpos[grpidx], zpos[grpidx + 1] - 1)
y = x[ind]
return y, ind
[docs]def sub_inversion(secsMat, regMat, epsSVD, alpha, magVec):
"""Solve matrix equation sysMat*resultVec = dataVec, using truncated
singular value decomposition.
Parameters
----------
secsMat : ndarray
Matrix giving the magnetic field from SECS amplitudes
regMat : ndarray
Regularization matrix for the SECS amplitudes
epsSVD : float
Pre-defined truncation parameter so that all singular values smaller
than epsSVD*(largest singular value) will be ignored
alpha : float
Parameter scaling the amount of regularization
magVec : array
Magnetic measurements to be fitted
Returns
-------
result_Vec : array
Vector of SECS amplitudes
"""
# Combine into one matrix and add zero constraint on the data vector
if regMat.size == 0 or np.isnan(alpha):
sysMat = secsMat
dataVec = magVec
else:
sysMat = np.concatenate((secsMat, alpha * regMat))
dataVec = np.concatenate((magVec, np.zeros(regMat.shape[0])))
# Calculate SVD
logger.info(
f"\n Calculating SVD of a [{sysMat.shape[0]},{sysMat.shape[1]}] " "matrix ... "
)
# works for 3x3 matrix, check what input matrix is and check again
svdU, svdS, svdVh = np.linalg.svd(sysMat, full_matrices=False)
svdV = svdVh.T
logger.info("done\n")
# Calculate the inverse matrix
lkmS = len(svdS)
slim = epsSVD * svdS[0]
ss = 1.0 / svdS
# ind = np.nonzero(svdS <= slim)
ss[svdS <= slim] = 0
logger.info(
f"epsilon = {epsSVD}, singular values range from {svdS[0]} to " f"{svdS[-1]} \n"
)
logger.info(
f"--> {np.count_nonzero(svdS <= slim)} values smaller than {slim} deleted (of {lkmS} "
"values)\n\n"
)
# Calculate the result vector
resultVec = svdU.T @ dataVec
# works with test example but should check again with real data####
resultVec = np.diagflat(ss)[:lkmS, :lkmS] @ resultVec
resultVec = svdV @ resultVec
return resultVec
[docs]def sub_LonInterp(lat, lon, intLat, method, ekstra=np.nan):
"""Robust interpolation of longitude by interpolating the sine and cosine
of the angle. This way a 360 degree jump does not matter.
Parameters
----------
lat, lon : ndarray
Original coordinates, [degree].
intLat : ndarray
Latitudes where original longitudes are interpolated to, [degree].
method : str
Interpolation method. Same as for scipy.interpolate.interp1d.
Returns
-------
intLon : ndarray
Interpolated longitudes at latitudes intLat, [degree]. -180 <= intlon <= 180.
"""
phi = np.radians(lon)
# check if this is always used with extrapolation
# interpolate sin and cos
fC = interp1d(lat, np.cos(phi), kind=method, fill_value=ekstra, bounds_error=False)
fS = interp1d(lat, np.sin(phi), kind=method, fill_value=ekstra, bounds_error=False)
intC = fC(intLat)
intS = fS(intLat)
# calculate interpolated longitude and convert to degree.
# NOTE: -180 <= intlon <= 180
intLon = np.degrees(np.arctan2(intS, intC))
return intLon
[docs]def sub_Swarm_grids_1D(lat1, lat2, Dlat1D, ExtLat1D):
"""Creates 1D grid around the two Swarm satellite paths.
Parameters
----------
lat1, lat2 : ndarray
Geographic latitudes of the satellites' paths, [degree].
Dlat1D : int or float
1D grid spacing in latitudinal direction, [degree].
ExtLat1D : int
Number of points to extend the 1D grid outside the data area in latitudinal
direction.
Returns
-------
lat1D : ndarray
Geographic latitudes of the 1D grid, [degree].
mat1Dsecond : ndarray
Second gradient matrix.
"""
# Latitudinal extent of the satellite data, only that part where there is
# data from both satellites.
maxlat = min(np.nanmax(lat1), np.nanmax(lat2))
minlat = max(np.nanmin(lat1), np.nanmin(lat2))
# Create 1D grid.
# Limit grid to latitudes -89 < lat2Dsecs < 89.
lat1D = np.arange(minlat - ExtLat1D * Dlat1D, maxlat + (ExtLat1D) * Dlat1D, Dlat1D)
ind = np.nonzero((lat1D > -89) & (lat1D < 89))
lat1D = lat1D[ind]
# Create 2nd gradient matrix
apu = np.ones(len(lat1D))
mat1Dsecond = np.diag(apu[1:], -1) - 2 * np.diag(apu) + np.diag(apu[1:], 1)
# [unit 1/deg^2] (NOTE: not quite same as north component of nabla^2)
mat1Dsecond = mat1Dsecond[1:-1, :] / Dlat1D**2
return lat1D, mat1Dsecond
[docs]def sub_Swarm_grids(lat1, lon1, lat2, lon2, Dlat2D, LonRatio, ExtLat2D, ExtLon2D):
"""Creates 2D grids around the two Swarm satellite paths.
Parameters
----------
lat1, lat2 : ndarray
Geographic latitudes of the satellites' paths, [degree].
lon1, lon2 : ndarray
Geographic longitudes of the satellites' paths, [degree].
Dlat2D : float
2D grid spacing in latitudinal direction, [degree].
LonRatio : float
Ratio between the satellite separation and longitudinal spacing of
the 2D grid.
ExtLat2D, ExtLon2D : int
Number of points to extend the 2D grid outside the data area in latitudinal
and longitudinal directions.
Returns
-------
lat2D, lon2D : ndarray
Geographic latitudes and longitudes of the 2D grid, [degree].
angle2D : ndarray
Half-angle of such a spherical cap that has the same area as the 2D grid
cell, [radian].
mat2DsecondLat : ndarray
Second gradient matrix in latitude.
"""
# Latitudinal extent of the satellite data, only that part where there is
# data from both satellites.
maxlat = min(np.nanmax(lat1), np.nanmax(lat2))
minlat = max(np.nanmin(lat1), np.nanmin(lat2))
# Average satellite path, and the satellite spacing in longitudinal direction.
ind1 = np.nonzero((lat1 >= minlat) & (lat1 <= maxlat))
# ind2 = np.nonzero((lat2 >= minlat) & (lat2 <= maxlat))
latA = lat1[ind1]
lonEro = sub_LonInterp(lat2, lon2, lat1[ind1], "linear", "extrapolate") - lon1[ind1]
lonEro[lonEro > 180] -= 360
lonEro[lonEro < -180] += 360
lonA = lon1[ind1] + lonEro / 2
# Latitudes of the 2D grid.
# Limit grids to latitudes -89 < lat2Dsecs < 89.
apulat = np.arange(
(minlat - ExtLat2D * Dlat2D), (maxlat + (ExtLat2D) * Dlat2D), Dlat2D
)
ind = np.nonzero((apulat > -89) & (apulat < 89))
apulat = apulat[ind]
# Number of points in the 2D grid
Nlat = len(apulat)
Nlon = 1 + 2 * ExtLon2D
# Preformat the 2D variables.
lat2D = np.full((Nlat, Nlon), np.nan)
lon2D = np.full((Nlat, Nlon), np.nan)
angle2D = np.full((Nlat, Nlon), np.nan)
dLon2D = np.full((Nlat, Nlon), np.nan)
# Make the 2D grid
# print(latA.shape)
# print(lonA.shape)
for n in range(Nlat):
lat2D[n, :] = apulat[n]
# Average satellite longitude at this latitude, and the
# grid spacing in longitudinal direction
apulona = sub_LonInterp(latA, lonA, apulat[n], "linear", "extrapolate")
# check interpolation
apuf = interp1d(latA, lonEro, kind="linear", fill_value="extrapolate")
apulonero = apuf(apulat[n])
# apulonero = interp1(latA,lonEro,apulat[n],'linear','extrap')
Dlon = np.abs(apulonero) / LonRatio
# 2D longitudes at this latitude
lon2D[n, :] = (
apulona + np.arange(-(Nlon - 1) / 2.0, (Nlon - 1) / 2.0 + 1.0) * Dlon
)
dLon2D[n, :] = Dlon
# Half-angle of such a spherical cap that has same area as the 2D grid
# cells at this latitude
theta = np.radians(90 - apulat[n])
angle2D[n, :] = np.arccos(
1
- Dlon
/ 360
* (
np.cos(theta - Dlat2D / 360 * np.pi)
- np.cos(theta + Dlat2D / 360 * np.pi)
)
)
# Make 2D second gradient matrix in latitude.
# NOTE: it might be more accurate to make the gradient matric in along-track direction, not latitude.
# Here we assume that poles are listed along latitude at each longitude.
# i.e. [(lat1,lon1) (lat2,lon1) ... (latN,lon1) (lat1,lon2) ...]
# This corresponds to lat2D(:) according to the above construction
# apu = np.ones(Nlat)
# mat2DsecondLat = np.zeros(((Nlat - 2) * Nlon, Nlon * Nlat))
# apumat = np.diag(apu[1:], -1) - 2 * np.diag(apu) + np.diag(apu[1:], 1)
# This gives 2nd deriv. for one column of lat2D
# apumat = apumat[1:-1, :] / Dlat2D**2
# for n in range(Nlon):
# i1 = np.arange(Nlat - 2) + (n - 1) * (Nlat - 2)
# i2 = np.arange(Nlat) + (n - 1) * Nlat
# i1, i2 = np.ix_(i1, i2)
# mat2DsecondLat[i1, i2] = apumat
# need C ordering drop numpy
c1 = np.zeros(((Nlat - 2) * Nlon,))
c1[0] = 1
# c1[Nlon]=-2
# c1[2*Nlon]=1
r1 = np.zeros((Nlat * Nlon,))
r1[0] = 1
r1[Nlon] = -2
r1[2 * Nlon] = 1
mat2DsecondLat = toeplitz(c=c1, r=r1) / Dlat2D**2
return lat2D, lon2D, angle2D, dLon2D, mat2DsecondLat
[docs]def get_eq(ds, lat_filter_max=60, ovals=None):
"""Splits data into a list of pieces suitable for DSECS analysis based on latitude.
Parameters
----------
ds : xarray
Input dataset.
lat_filter_max : int, optional
Maximum geographic latitude (absolute value). All data above this limit is ignored. Default: 60
ovals : list
list of pre-calculated numpy slices with which to split the data.
Returns
-------
out : list of xarray, or None, None by default
List of data segments split for DSECS analysis if they have been precalculated.
"""
# mask= (np.abs(ds.QDLat) > QD_filter_max) | (np.abs(ds.QDLat) < QD_filter_min)
if ovals is None:
mask = np.abs(ds.Latitude) > lat_filter_max
# mask[np.abs(ds.Latitude) > 60] = 1
ovals = np.ma.flatnotmasked_contiguous(np.ma.masked_array(mask, mask=mask))
# ovals=np.ma.flatnotmasked_contiguous(np.ma.masked_array(mask,mask=mask))
# return ovals,ds
# return ovals,ds
out = []
for d in ovals:
out.append(ds.isel(Timestamp=d))
out[-1] = out[-1].assign(unit_B_NEC_Model=_normalizev(out[-1]["B_NEC_Model"]))
out[-1] = out[-1].assign(
{
"B_para_res": (
"Timestamp",
np.einsum(
"ij,ij->i",
out[-1]["B_NEC"] - out[-1]["B_NEC_Model"],
out[-1]["unit_B_NEC_Model"],
),
)
}
)
out[-1] = out[-1].assign(B_NEC_res=out[-1]["B_NEC"] - out[-1]["B_NEC_Model"])
return out, ovals
def _normalizev(v):
"""Creates an unit vector.
Parameters
----------
v : ndarray
Input array.
Returns
-------
ndarray
Normalized unit vector.
"""
return (v.T / np.linalg.norm(v, axis=1)).T
[docs]def sub_points_along_fieldline(thetaSECS, Rsecs, L, minD):
"""Calculate points to be used in the Biot-Savart integral along field line
in function SECS_2D_CurlFree_AntiSym_magnetic_lineintegral
Parameters
----------
thetaSECS : float
Co-latitude of the CF SECS, [radian]
Rsecs : float
Radius of the sphere where CF SECS is located, [km]
L : float
L-value of the field line starting from the CF SECS pole, [km NOTE: this is in kilometers, so really L*Rsecs;
minD : float
minimum horizontal distance between the CF SECS pole and footpoints of
the points where magnetic field is needed, [km], SCALAR
Returns
-------
array
co-latitudes of the integration points (= end points of the current
elements), [radian]
"""
# Minimum length of integration steps (NOTE: actually the minimum average length)
# print('hupep7')
# print('thetaS ' + type(thetaSECS).__name__)
# print(thetaSECS)
minStep = 10 # step in km
# Adjust step according to horizontal distance
# NOTE: maybe should increase more rapidly than linearly
step = min(200, max(minStep, 0.1 * minD))
# print('hupep8')
# Length of the field line from one ionosphere to the other.
x = np.pi / 2 - thetaSECS
# print(x)
s = L * abs(
np.sin(x) * np.sqrt(3 * np.sin(x) ** 2 + 1)
+ 1 / np.sqrt(3) * np.arcsinh(np.sqrt(3) * np.sin(x))
)
# print(s)
# print('hupep9')
# Number of steps and step size in co-latitude assuming uniform horizontally adjusted step length
# print(np.ceil(s / step))
Nint = int(np.ceil(s / step))
# print('hupep10')
dt0 = abs(2 * thetaSECS - np.pi) / Nint
# Take larger steps at high altitudes.
# Altitude of Swarm is 450-520 km.
tmp = np.full((Nint,), np.nan)
tmp[0] = min(thetaSECS, np.pi - thetaSECS) # always start from north hemisphere
n = 0
while tmp[n] < np.pi / 2: # stop just before reaching the equator
h = L * np.sin(tmp[n]) ** 2 - Rsecs # height [km]
# adjust step according to altitude
# NOTE: maybe should increase more rapidly than linearly
dt = dt0 * min(500, max(1, h / 300 - 2)) # start to increase above 900 km
tmp[n + 1] = tmp[n] + dt
n += 1
# Make north and south hemispheres symmetric
tmp = tmp[:n]
# print(tmp)
t = np.hstack((tmp, np.pi / 2, np.pi - tmp[::-1]))
# Flip if start from south hemisphere
if thetaSECS > np.pi / 2:
t = t[::-1]
return t