Magnetic Spectrum#

This notebook demonstrates the algorithm behind obtaining magnetic spectra.

Imports#

First we import the necessary libraries.

import time
from multiprocessing import set_start_method

from dotenv import dotenv_values
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.collections import LineCollection
from matplotlib.colors import LogNorm, Normalize
from numpy import typing as npt
from phdhelper import mpl
from phdhelper.colours import sim as colours
from scipy import optimize
from tqdm import tqdm

import hybrid_jp as hj
import hybrid_jp.analysis as hja

Setting up the data#

Some parameters are set using environment variables. We load the deck first then the .sdf files. This is so we can set the timestep, dt, which is defined in the deck output. We also scale the magnetic field from its SI base units, Teslas \(T\), to nanoTestlas \(nT\), and the same for the grid midpoints, \(m\rightarrow km\). Finally we create a CentredShock object that holds the simulation data and has methods for splitting the data held indside sdf files and grouping them into ‘chunks’, based on distance upstream and downstream of the shock.

env = dotenv_values("../../../.env")
START, END = 20, 200
set_start_method("fork")
mpl.format()
data_folder = str(env["DATA_DIR"])
deck = hja.load_deck(data_dir=data_folder)
SDFs, fpaths = hja.load_sdfs_para(
    sdf_dir=data_folder,
    dt=deck.output.dt_snapshot,
    threads=7,
    start=START,
    stop=END,
)
for SDF in SDFs:
    SDF.mag *= 1e9  # Convert to nT
    SDF.mid_grid *= 1e-3  # Convert to km
cs = hja.CenteredShock(SDFs, deck)
100%|██████████| 181/181 [00:06<00:00, 29.36it/s]

Getting values in a ‘frame’#

A ‘frame’ is an array of values at a given time and in a given chunk. The method cs.get_qty_in_frame(func, chunk, t_idx) will slice the data returned by the function func, at timestep t_idx, at chunk chunk. func must have the signature Callable[[SDF], NDArray[np.float64]], i.e. it must take exactly one parameter, an SDF object, and return an array. Below wer define the function get_bx() which returns the \(b_x\) component from the SDF. This returns the whole array, which get_qty_in_frame() then slices to the correct range in \(x\) to line up with the specified chunk.

Important

the property cs.n_chunks must be set before calling most methods on cs, otherwise an NChunksNotSetError will be raised.

Note

Alternatively, we could have obtained the meadian value of \(b_x\) in each cell, i.e. \(\left<b_x\right>_y\), using a function such as:

def get_median_bx(sdf: SDF) -> NDArray[np.float64]:
    return np.median(sdf.mag.bx, axis=1)
N_CHUNKS = 10
cs.n_chunks = N_CHUNKS
shock_chunk = cs.downstream_start_chunk


def get_bx(sdf: hj.sdf_files.SDF) -> npt.NDArray[np.float64]:
    return sdf.mag.bx


t = int(np.argmax(cs.valid_chunks[shock_chunk, :]))
qty, slc = cs.get_qty_in_frame(get_bx, shock_chunk, t)
plt.pcolormesh(
    cs.grid.x[slice(*slc)] - cs.grid.x[cs.get_x_offset_for_frame(shock_chunk, t)],
    cs.grid.y,
    qty.T,
)
plt.ylabel(r"$y\ [km]$")
plt.xlabel(r"Distance downstream of shock, $x\ [km]$")
plt.tight_layout()
plt.show()
../_images/88259f91154cd3144188899f58df97bc444ba6b5af55199b75af1e59444df920.png

2D FFT and Subdivisions#

We now perform a 2d Fourier transform on the frame. Under the hood, this uses np.fft.fft2() function. Calling the function

bz_func = lambda x: x.mag.bz
bz = cs.get_qty_in_frame(bz_func, shock_chunk, t)[0]
pxy, kx, ky = hja.ffts.power_xy(bz, cs.dx, cs.dy)
axs: list[plt.Axes]
fig, axs = plt.subplots(1, 2, figsize=(10, 5))  # type: ignore
axs[0].pcolormesh(kx, ky, pxy.T, norm=LogNorm())
axs[0].set_yscale("log")
axs[0].set_xscale("log")
axs[0].set_aspect("equal")

pxy4, kx4, ky4 = hja.ffts.subdivide_repeat(4, pxy, kx, ky)
axs[1].pcolormesh(kx4, ky4, pxy4.T, norm=LogNorm())
axs[1].set_yscale("log")
axs[1].set_xscale("log")
axs[1].set_aspect("equal")
fig.tight_layout()
plt.show()
../_images/f5cb662ac0063fae568dc5b98abbd152d803389d59187668f4b2baa383abd64e.png
n_r_bins: int = 100
n_sub: int = 8
Pr4, kr = hja.ffts.radial_power(pxy, kx, ky, n_sub, n_r_bins)
kr = np.logspace(np.log10(kr[0]), np.log10(kr[-1]), n_r_bins)
print(Pr4.shape, kr.shape, n_r_bins * n_sub)

fig, ax = plt.subplots()  # type: ignore
ax.plot(kr, Pr4, c="k", ls="-")
ax.set_xscale("log")
ax.set_yscale("log")

fig.tight_layout()
plt.show()
(100,) (100,) 800
../_images/c0c5e58a271266e0239c38a90c7c238ca059b3aa2ca47186e67f37af4634c75d.png
pr, kr = hja.frame_power(cs, shock_chunk, t, n_sub, n_r_bins)
fig, ax = plt.subplots()
ax.loglog(kr, pr, c="k", ls="-")
fig.tight_layout()
plt.show()
../_images/49739adc16409846665875c75fc912ed7c4f5d2c311b08612069c21c20a40d66.png
vc = cs.valid_chunks
val: list[list[int]] = [list(np.nonzero(vc[i, :])[0]) for i in range(vc.shape[0])]


def get_power_para(
    cs: hja.CenteredShock,
    val: list[list[int]],
    n_r_bins: int,
    n_sub: int,
):
    assert cs.n_chunks is not None

    kr = np.empty(n_r_bins)
    avg_pwr = np.empty((n_r_bins, cs.n_chunks))

    start_time = time.time()
    for i, v in enumerate(val):
        out = np.empty((n_r_bins, len(v)))
        for j, t in tqdm(enumerate(v), total=len(v), desc=f"Chunk {i}/{cs.n_chunks-1}"):
            out[:, j], kr = hja.frame_power(cs, i, t, n_sub, n_r_bins)
        avg_pwr[:, i] = out.mean(axis=1)

    end_time = time.time()
    print(f"Time taken: {end_time - start_time:.1f}s")

    return avg_pwr.T, kr


avg_pwr, kr = get_power_para(cs, val, n_r_bins, n_sub)
Chunk 0/9: 100%|██████████| 36/36 [00:04<00:00,  7.65it/s]
Chunk 1/9: 100%|██████████| 74/74 [00:09<00:00,  7.73it/s]
Chunk 2/9: 100%|██████████| 108/108 [00:13<00:00,  7.77it/s]
Chunk 3/9: 100%|██████████| 153/153 [00:19<00:00,  7.76it/s]
Chunk 4/9: 100%|██████████| 181/181 [00:23<00:00,  7.78it/s]
Chunk 5/9: 100%|██████████| 155/155 [00:19<00:00,  7.78it/s]
Chunk 6/9: 100%|██████████| 127/127 [00:16<00:00,  7.79it/s]
Chunk 7/9: 100%|██████████| 88/88 [00:11<00:00,  7.78it/s]
Chunk 8/9: 100%|██████████| 47/47 [00:06<00:00,  7.74it/s]
Chunk 9/9: 100%|██████████| 8/8 [00:01<00:00,  7.76it/s]
Time taken: 125.8s

fig, axs = plt.subplots(1, 2, gridspec_kw=dict(width_ratios=[97, 3]))  # type: ignore
ax = axs[0]
cax = axs[1]


segments = np.array([np.stack([kr] * cs.n_chunks, axis=0), avg_pwr])
segments = np.moveaxis(segments, [0, 1, 2], [2, 0, 1])
norm = Normalize(vmin=0, vmax=cs.n_chunks)
lc = LineCollection(
    segments,  # type: ignore
    array=np.arange(cs.n_chunks),
    norm=norm,
    cmap=mpl.Cmaps.isoluminant,
)  # type: ignore
ax.add_collection(lc)  # type: ignore
ax.loglog(*segments[shock_chunk].T, c=colours.red())
ax.set_xscale("log")
ax.set_yscale("log")

fig.colorbar(lc, cax=cax, label="Chunk #")
cax.axhline(shock_chunk, c=colours.red(), lw=2)

ax.set_xlabel("$k$ [$km^{-1}$]")
ax.set_ylabel("$P(k)$")

fig.tight_layout()
fig.subplots_adjust(wspace=0)
plt.show()
../_images/8ff4871cdda91eb07401f216a85707c2ce6f4403b36085cc322294b379979da5.png
# Slopes
def powerlaw(x, amplitude, exponent, constant):
    return amplitude * x**exponent + constant


def line(x, m, c):
    return m * x + c


min_x = 2e-3
max_x = 2e-2


def fitline(arr, min_x, max_x, func):
    # arr.shape = (n_chunks, 2) where 2 is (x, y)
    mask = (arr[:, 0] >= min_x) & (arr[:, 0] <= max_x)
    arr = np.log10(arr[mask, :])
    fit, _ = optimize.curve_fit(func, arr[:, 0], arr[:, 1])
    return fit


fig, ax = plt.subplots()
slopes = np.empty((cs.n_chunks, 2))
for i, s in enumerate(segments):
    slopes[i, :] = fitline(s, min_x, max_x, line)
    ax.plot(*np.log10(s).T, c="k", ls="-", lw=0.5)
    xr = np.log10(np.linspace(min_x, max_x, 2))
    ax.plot(xr, line(xr, *slopes[i, :]), c="r", ls="--")

plt.show()
../_images/394dfb6dfbe37a646ae0c7109bd9cc80f5c3b0bdd8b4f0f28570b3fa613c5d42.png
fig, ax = plt.subplots()

dists = (
    (np.arange(cs.n_chunks) - shock_chunk)
    * cs.dx
    * cs.chunk_i
    / (deck.constant.di * 1e-3)
)
ax.scatter(dists, slopes[:, 0], c="k", marker="x")  # type: ignore

ax.set_xlabel("Distance from shock [$d_i$]")
ax.set_ylabel("Power law index")


fig.tight_layout()
plt.show()
../_images/c69287feee1796b9201d87fa5f9297ccf516864ceef55d35a4f5d3f72769de2f.png