"""Array operations for hybrid_jp."""
import numpy as np
import pandas as pd
from .dtypes import arrfloat, arrint
[docs]def create_orthonormal_basis_from_vec(v: arrfloat) -> arrfloat:
"""create a basis of orthonormal vectors from an input vector `v`.
The original components of the vector (i,j,k) in the original basis are transformed
to a new orthonormal basis (e1,e2,e3) where the component (e1,0,0) in the new basis
points in the direction of `v` in the original. Therefore, e2 and e3 lie in the
plane normal to `v` and are mutually orthogonal.
Args:
v (hj.arrfloat): 1D vector length 3
Returns:
hj.arrfloat: square array shape (3,3). e_n=arr[n-1,:] where n=1,2,3
Examples:
>>> v = np.array([1, 2, 3], dtype=np.float64)
>>> basis = create_orthonormal_basis_from_vec(v)
>>> print("e1", basis[:, 0])
>>> print("e2", basis[:, 1])
>>> print("e3", basis[:, 2])
>>> print("||e1|| ||e2|| ||e3|| -> ", np.linalg.norm(basis, axis=1))
>>> print("e1 . e2 = 0?", np.allclose(np.dot(orth[:, 0], orth[:, 1]), 0))
>>> print("e1 . e3 = 0?", np.allclose(np.dot(orth[:, 0], orth[:, 2]), 0))
>>> print("e2 . e3 = 0?", np.allclose(np.dot(orth[:, 1], orth[:, 2]), 0)
"""
# v MUST be 1d
assert len(v.shape) == 1
# v MUST have shape 3
assert v.size == 3
# Make sure it's a unit vector
e1 = v / np.linalg.norm(v)
# 1. Generate a random vector e2
# e2 is NOT length 1 and is NOT orthogonal to `v`
e2 = np.random.rand(3)
# 2. Project e2 into a unit vector in the plane normal to `v`
# Done by subtracting the component parallel to `v` and normalising
e2 -= np.dot(e2, e1) * e1
e2 /= np.linalg.norm(e2)
# 3. Get a third basis vector perpendicular to both v and e2
e3 = np.cross(e1, e2)
return np.stack((e1, e2, e3), axis=0)
[docs]def rotate_arr_to_new_basis(basis: arrfloat, arr: arrfloat) -> arrfloat:
"""Rotate an array (nx, ny, 3) in i,j,k basis to a new `basis`.
Args:
basis (hj.arrfloat): (3,3) basis of (e1, e2, e3) where basis[0,:]=e1 etc.
arr (hj.arrfloat): (nx, ny, 3) array of vectors on a grid (nx, ny)
Returns:
hj.arrfloat: (nx, ny, 3) array of vectors in e1,e2,e3 basis
Example:
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> scale = 100
>>> xx, yy = np.mgrid[0:scale, 0:scale]
>>> grid = np.empty((*xx.shape, 3))
>>> grid[:, :, 0] = yy
>>> grid[:, :, 1] = xx
>>> # Add an oscillating part in k
>>> grid[:, :, 2] = np.sin(2 * np.pi * xx / xx.max())
>>> # e1=k in new basis
>>> vec = np.array([0, 0, 1], dtype=np.float64)
>>> print(f"{vec = }")
>>> basis = create_orthonormal_basis_from_vec(vec)
>>> print(f"{basis = }")
>>> rot = rotate_arr_to_new_basis(basis, grid)
>>> fig, axs = plt.subplots(2, 1)
>>> # Plot i and j streamlines coloured by k
>>> axs[0].streamplot(
>>> xx[:, 0],
>>> yy[0, :],
>>> grid[:, :, 0].T,
>>> grid[:, :, 1].T,
>>> density=2,
>>> arrowstyle="-",
>>> color=grid[:, :, 2].T,
>>> )
>>> # Visualise e1, should be a horizontal sine wave
>>> axs[1].pcolormesh(xx[:, 0], yy[0, :], rot[:, :, 0].T)
>>> axs[0].grid(False)
>>> axs[1].grid(False)
>>> fig.tight_layout()
>>> plt.show()
"""
# 1. Flatten the array into (3, nx*ny) where axis 0 is the ijk components
arr_flat = arr.reshape(-1, 3).T
# 2. Compute the dot product with the orthonormal basis
new_basis_flat = np.dot(basis, arr_flat).T
# 3. Reshape into oridinal dimension
out = new_basis_flat.reshape(arr.shape)
return out
[docs]def logspaced_edges(arr: arrfloat | arrint) -> arrfloat:
"""Expand a (possibly uneven but approximately) logarithmically spaced arr to edges.
`arr` is shape (N,), therefore the returned array is shape (N+1,). The end points
are given by n_0 - (n_1 - n_0)/2 and n_N + (n_N - n_{N-1})/2 for an array
(n_0...n_N) = log10(arr). The spacing between values of the array is preserved, this
is useful in the case of integer logspaced arrays where diff(log10(arr)) is not
constant do to integer rounding. So, each value in the new array is half of the
separation between the original values.
Args:
arr (arrfloat | arrint): Array of values.
Returns:
arrfloat: Array of edges.
Example:
>>> import matplotlib.pyplot as plt
>>> import numpy as np
>>> arr = np.unique(np.logspace(0, 2, 15, dtype=np.int32))
>>> brr = logspaced_edges(arr)
>>> fig, axs = plt.subplots(2, 1, figsize=(8, 2))
>>> axlin, axlog = axs
>>> orig = axlin.scatter(arr, np.zeros_like(arr), marker="x", color="k")
>>> new = axlin.scatter(brr, np.zeros_like(brr), marker="+", color="r")
>>> orig = axlog.scatter(arr, np.zeros_like(arr), marker="x", color="k")
>>> new = axlog.scatter(brr, np.zeros_like(brr), marker="+", color="r")
>>> axlog.set_xscale("log")
>>> axlin.set_title("Linear scale")
>>> axlog.set_title("Log scale")
>>> axlin.set_yticks([])
>>> axlog.set_yticks([])
>>> axlog.set_xlabel("'x' = original, '+' = bin edges")
>>> fig.tight_layout()
>>> plt.show()
"""
log_arr = np.log10(arr) # log scale array
log_diff = np.diff(log_arr) # log difference
# Add points on either side of log scaled array, equidistant
# Original: +....+....+..+....+....+..+....+....+
# New: +....+....+....+..+....+....+..+....+....+....+
log_wide = np.asarray(
[log_arr[0] - log_diff[0]] + log_arr.tolist() + [log_arr[-1] + log_diff[-1]]
)
log_wiff = np.diff(log_wide) # Difference of longer array
# Half of total difference between point i and i+2
# +....+....+....+..+....+....+..+....+....+....+
# Diff: 4 4 4 2 4 4 2 4 4 4
# Offset 4 4 4 2 4 4 2 4 4 4
# 1/2 diff: 4 4 3 3 4 3 3 8 8
log_diff = (log_wiff[:-1] + log_wiff[1:]) / 2
# First point in new arr is half way between first two points in wide arr or
# equivalently half of the difference between first and second points in original
# arr behind the first point.
first_point = (log_wide[0] + log_wide[1]) / 2
lags_wide = np.ones(log_arr.size + 1) * first_point
# Successive points created by adding the cumulative distance of that point from the
# first point
lags_wide[1:] = lags_wide[1:] + np.cumsum(log_diff)
lags_wide = 10 ** (lags_wide) # Rescale out of log space
return lags_wide
[docs]def trim_var(var: np.ndarray, slc: slice) -> np.ndarray:
"""Trim a variable to a slice.
Args:
var (np.ndarray): Variable to trim.
slc (slice): Slice to trim to.
Returns:
np.ndarray: Trimmed variable.
"""
return var[slc]
[docs]def trim_vars(vars_list: list[np.ndarray], slc: slice) -> list[np.ndarray]:
"""Trim a list of variables to a slice.
Args:
vars_list (list[np.ndarray]): List of variables to trim.
slc (slice): Slice to trim to.
Returns:
list[np.ndarray]: Trimmed variables.
Example:
>>> trim_vars([np.arange(10), np.arange(10)], slice(0, 5))
[array([0, 1, 2, 3, 4]), array([0, 1, 2, 3, 4])]
"""
return [trim_var(var, slc) for var in vars_list]
[docs]def df_to_rows_array(data: pd.DataFrame, cols: list[str]) -> np.ndarray:
"""Convert dataframe into 2d np array [x, columns].
Args:
data (pd.DataFrame): data
cols (list[str]): Column names to transform into array
Returns:
np.ndarray: 2d array of data
"""
arr = np.empty((len(data), len(cols)))
for i, col in enumerate(cols):
arr[:, i] = data[col].values
return arr
[docs]def interpolate_to_midpoints(arr: np.ndarray, width: int) -> np.ndarray:
"""Interpolate to midpoints.
returns an array of length len(arr) - width + 1.
Args:
arr (np.ndarray): array to interpolate.
width (int): width of moving average.
Returns:
np.ndarray: interpolated array.
"""
return np.linspace(arr[0], arr[-1], len(arr) - width + 1)
[docs]def mov_avg(data: np.ndarray, width: int) -> np.ndarray:
"""Perform a moving average.
Args:
data (np.ndarray): data to perform moving average on.
width (int): width of moving average.
Returns:
np.ndarray: moving average of data. length is len(data) - width + 1.
"""
return np.convolve(data, np.ones(width), "valid") / width