Module pyaurorax.tools.keogram

Generate keograms.

Functions

def create(images: numpy.ndarray,
timestamp: List[datetime.datetime],
axis: int = 0,
spectra: bool = False,
wavelength: numpy.ndarray | None = None,
spect_emission: Literal['green', 'red', 'blue', 'hbeta'] = 'green',
spect_band: Tuple[float, float] | None = None,
spect_band_bg: Tuple[float, float] | None = None) ‑> Keogram
Expand source code
def create(images: np.ndarray,
           timestamp: List[datetime.datetime],
           axis: int = 0,
           spectra: bool = False,
           wavelength: Optional[np.ndarray] = None,
           spect_emission: Literal["green", "red", "blue", "hbeta"] = "green",
           spect_band: Optional[Tuple[float, float]] = None,
           spect_band_bg: Optional[Tuple[float, float]] = None) -> Keogram:
    """
    Create a keogram from a set of images.

    Args:
        images (numpy.ndarray): 
            A set of images. Normally this would come directly from a data `read` call, but can also 
            be any arbitrary set of images. It is anticipated that the order of axes is [rows, cols, num_images]
            or [row, cols, channels, num_images]. If it is not, then be sure to specify the `axis` parameter
            accordingly.

        timestamp (List[datetime.datetime]): 
            A list of timestamps corresponding to each image.

        axis (int): 
            The axis to extract the keogram slice from. Default is `0`, meaning the rows (or Y) axis.

        spectra (bool): 
            Make a keogram out of spectrograph data, for a specific emission. Defaults to False (ASI data).

        wavelength (numpy.ndarray): 
            The wavelength array corresponding to spectrograph data. If spectra=True, this parameter
            must be supplied.

        spect_emission (str): 
            The emission (green, red, blue, hbeta) to prepare from spectrograph data. Default is 
            'green' (557.7 nm emission).

        spect_band (Tuple[float]): 
            Manual selection of the wavelength region to integrate for obtaining emissions. Use this
            to prepare emissions that are not available in spect_emission.

        spect_band_bg (Tuple[float]): 
            Manual selection of the wavelength region to subtract from integration for manually
            chosen emissions, via the spect_band argument.

    Returns:
        A `pyaurorax.tools.Keogram` object.

    Raises:
        ValueError: issue with supplied parameters.
    """

    # First check if we are dealing with spectrograph data
    if spectra:

        instrument_type = 'spectrograph'
        middle_column_idx = None

        if axis != 0:
            raise ValueError(f"Cannot create keogram for spectrograph data along axis other than 0, received axis: {axis}.")
        if wavelength is None:
            raise ValueError("Parameter 'wavelength' must be supplied when using spectrograph data.")

        # Determine integration bounds for spectrograph data
        wavelength_range = {
            'green': [557.0 - 1.5, 557.0 + 1.5],
            'red': [630.0 - 1.5, 630.0 + 1.5],
            'blue': [427.8 - 3.0, 427.8 + 0.5],
            'hbeta': [486.1 - 1.5, 486.1 + 1.5]
        }[spect_emission]

        wavelength_bg_range = {
            'green': [552.0 - 1.5, 552.0 + 1.5],
            'red': [625.0 - 1.5, 625.0 + 1.5],
            'blue': [430.0 - 1.0, 430.0 + 1.0],
            'hbeta': [480.0 - 1.0, 480.0 + 1.0]
        }[spect_emission]

        # Check if manual integration bands were supplied
        if spect_band is not None:
            wavelength_range = spect_band
            if spect_band_bg is None:
                warnings.warn(
                    "Wavelength band supplied without background band. No background subtraction will be performed.",
                    stacklevel=1,
                )
                wavelength_bg_range = None
            else:
                wavelength_bg_range = spect_band_bg

        # Extract wavelength from metadata, and get integration indices
        int_w = np.where((wavelength >= wavelength_range[0]) & (wavelength <= wavelength_range[1]))
        if wavelength_bg_range is not None:
            int_bg_w = np.where((wavelength >= wavelength_bg_range[0]) & (wavelength <= wavelength_bg_range[1]))

        # Integrate all spectrograph pixels to get emission
        n_wavelengths_in_spectra = images.shape[0]
        n_spatial_bins = images.shape[1]
        n_timestamps_in_spectra = images.shape[2]
        n_wavelengths = wavelength.shape[0]
        n_timestamps = len(timestamp)

        if n_timestamps != n_timestamps_in_spectra:
            raise ValueError(f"Mismatched timestamp dimensions. Received {n_timestamps} "
                             f"timestamps for spectrograph data with {n_timestamps_in_spectra} timestamps.")

        if n_wavelengths != n_wavelengths_in_spectra:
            raise ValueError(f"Mismatched wavelength dimensions. Received {n_wavelengths} "
                             f"wavelengths for spectrograph data with {n_wavelengths_in_spectra} wavelengths.")

        # set y-axis
        ccd_y = np.arange(0, n_spatial_bins)

        # Initialize keogram array
        keo_arr = np.full([n_spatial_bins, n_timestamps], 0, dtype=images.dtype)

        # Iterate through each timestamp and compute emissions for all spatial bins
        for i in range(0, n_timestamps):

            # Integrate over wavelengths to get Rayleighs
            iter_spectra = images[:, :, i]

            rayleighs = np.trapz(iter_spectra[int_w[0], :], x=wavelength[int_w[0]], axis=0)

            if wavelength_bg_range is not None:
                if int_bg_w is not None:  #type: ignore
                    rayleighs -= np.trapz(
                        iter_spectra[int_bg_w[0], :],  #type: ignore
                        x=wavelength[int_bg_w[0]],  #type: ignore
                        axis=0)

            rayleighs = np.nan_to_num(rayleighs, nan=0.0)
            rayleighs[np.where(rayleighs < 0.0)] = 0.0

            keo_arr[:, i] = rayleighs

    # Otherwise, for ASI data, slice keogram as required
    else:
        instrument_type = 'asi'

        # set y axis
        ccd_y = np.arange(0, images.shape[axis])

        # determine if we are single or 3 channel
        n_channels = 1
        if (len(images.shape) == 3):
            # single channel
            n_channels = 1
        elif (len(images.shape) == 4):
            # three channel
            n_channels = 3
        else:
            ValueError("Unable to determine number of channels based on the supplied images. Make sure you are supplying a " +
                       "[rows,cols,images] or [rows,cols,channels,images] sized array.")

        # initialize keogram data
        n_rows = images.shape[0]
        n_imgs = images.shape[-1]
        if (n_channels == 1):
            keo_arr = np.full([n_rows, n_imgs], 0, dtype=images.dtype)
        else:
            keo_arr = np.full([n_rows, n_imgs, n_channels], 0, dtype=images.dtype)

        # extract the keogram slices
        middle_column_idx = int(np.floor((images.shape[1]) / 2 - 1))
        for img_idx in range(0, n_imgs):
            if (n_channels == 1):
                # single channel
                frame = images[:, :, img_idx]
                frame_middle_slice = frame[:, middle_column_idx]
                keo_arr[:, img_idx] = frame_middle_slice
            else:
                # 3-channel
                frame = images[:, :, :, img_idx]
                frame_middle_slice = frame[:, middle_column_idx, :]
                keo_arr[:, img_idx, :] = frame_middle_slice

    # create the keogram object
    keo_obj = Keogram(data=keo_arr, slice_idx=middle_column_idx, timestamp=timestamp, ccd_y=ccd_y, instrument_type=instrument_type)

    # return
    return keo_obj

Create a keogram from a set of images.

Args

images : numpy.ndarray
A set of images. Normally this would come directly from a data read call, but can also be any arbitrary set of images. It is anticipated that the order of axes is [rows, cols, num_images] or [row, cols, channels, num_images]. If it is not, then be sure to specify the axis parameter accordingly.
timestamp : List[datetime.datetime]
A list of timestamps corresponding to each image.
axis : int
The axis to extract the keogram slice from. Default is 0, meaning the rows (or Y) axis.
spectra : bool
Make a keogram out of spectrograph data, for a specific emission. Defaults to False (ASI data).
wavelength : numpy.ndarray
The wavelength array corresponding to spectrograph data. If spectra=True, this parameter must be supplied.
spect_emission : str
The emission (green, red, blue, hbeta) to prepare from spectrograph data. Default is 'green' (557.7 nm emission).
spect_band : Tuple[float]
Manual selection of the wavelength region to integrate for obtaining emissions. Use this to prepare emissions that are not available in spect_emission.
spect_band_bg : Tuple[float]
Manual selection of the wavelength region to subtract from integration for manually chosen emissions, via the spect_band argument.

Returns

A Keogram object.

Raises

ValueError
issue with supplied parameters.
def create_custom(images: numpy.ndarray,
timestamp: List[datetime.datetime],
coordinate_system: Literal['ccd', 'mag', 'geo'],
width: int,
x_locs: List[float | int] | numpy.ndarray,
y_locs: List[float | int] | numpy.ndarray,
preview: bool = False,
skymap: pyucalgarysrs.data.classes.Skymap | None = None,
altitude_km: int | float | None = None,
metric: Literal['mean', 'median', 'sum'] = 'median') ‑> Keogram
Expand source code
def create_custom(
    images: np.ndarray,
    timestamp: List[datetime.datetime],
    coordinate_system: Literal["ccd", "geo", "mag"],
    width: int,
    x_locs: Union[List[Union[float, int]], np.ndarray],
    y_locs: Union[List[Union[float, int]], np.ndarray],
    preview: bool = False,
    skymap: Optional[Skymap] = None,
    altitude_km: Optional[Union[float, int]] = None,
    metric: Literal["mean", "median", "sum"] = "median",
) -> Keogram:
    """
    Create a keogram, from a custom slice of a set of images. The slice used is defined by a set of points, 
    in CCD, geographic, or geomagnetic coordinates, within the bounds of the image data. Keogram is created
    from the bottom up, meaning the first point will correspond to the bottom of the keogram data.

    Args:
        images (numpy.ndarray): 
            A set of images. Normally this would come directly from a data `read` call, but can also 
            be any arbitrary set of images. It is anticipated that the order of axes is [rows, cols, num_images]
            or [row, cols, channels, num_images]. If it is not, then be sure to specify the `axis` parameter
            accordingly.

        timestamp (List[datetime.datetime]): 
            A list of timestamps corresponding to each image.
        
        coordinate_system (str): 
            The coordinate system in which input points are defined. Valid options are "ccd", "geo", or "mag".
        
        width (int): 
            Width of the desired keogram slice, in CCD pixel units.

        x_locs (Sequence[float, int]): 
            Sequence of points giving the x-coordinates that define a path through the image data, from
            which to build the keogram.

        y_locs (Sequence[float, int]): 
            Sequence of points giving the y-coordinates that define a path through the image data, from
            which to build the keogram.

        preview (Optional[bool]): 
            When True, the first frame in images will be displayed, with the keogram slice plotted.

        skymap (Skymap): 
            The skymap to use in georeferencing when working in geographic or magnetic coordinates.

        altitude_km (float, int): 
            The altitude of the image data, in km, to use in georeferencing when working in goegraphic
            or magnetic coordinates.

        metric (str): 
            The metric used to compute values for each keogram pixel. Valid options are "median", "mean",
            and "sum". Defaults to "median".
        
    Returns:
        A `pyaurorax.tools.Keogram` object.

    Raises:
    """

    # If using CCD coordinates we don't need a skymao or altitude
    if (coordinate_system == 'ccd') and (skymap is not None or altitude_km is not None):
        raise ValueError("Confliction in passing a Skymap in when working in CCD coordinates. Skymap is obsolete.")

    # convert any lists to np.arrays  and check shape
    x_locs = np.array(x_locs)
    y_locs = np.array(y_locs)

    # determine if we are single or 3 channel
    n_channels = 1
    if (len(images.shape) == 3):
        # single channel
        n_channels = 1
    elif (len(images.shape) == 4):
        # three channel
        n_channels = 3
    else:
        ValueError("Unable to determine number of channels based on the supplied images. Make sure you are supplying a " +
                   "[rows,cols,images] or [rows,cols,channels,images] sized array.")

    # Initialize empty keogram array
    keo_arr = np.squeeze(np.full((x_locs.shape[0] - 1, len(timestamp), n_channels), 0))

    if len(x_locs.shape) != 1:
        raise ValueError(f"X coordinates may not be multidimensional. Sequence passed with shape {x_locs.shape}")
    if len(y_locs.shape) != 1:
        raise ValueError(f"Y coordinates may not be multidimensional. Sequence passed with shape {y_locs.shape}")
    if len(x_locs.shape) != len(y_locs.shape):
        raise ValueError(f"X and Y coordinates must have same length. Sequences passed with shapes {x_locs.shape} and {y_locs.shape}")

    # Convert lat/lon coordinates to CCD
    if coordinate_system == 'mag':
        if (skymap is None or altitude_km is None):
            raise ValueError("When magnetic coordinates, a Skymap object and Altitude must be passed in through the skymap argument.")
        x_locs, y_locs = __convert_latlon_to_ccd(x_locs, y_locs, timestamp, skymap, altitude_km, magnetic=True)
    elif coordinate_system == 'geo':
        if (skymap is None or altitude_km is None):
            raise ValueError("When geographic coordinates, a Skymap object and Altitude must be passed in through the skymap argument.")
        x_locs, y_locs = __convert_latlon_to_ccd(x_locs, y_locs, timestamp, skymap, altitude_km, magnetic=False)

    # We will use the first image as a preview
    if n_channels == 1:
        preview_img = images.copy()[:, :, 0]
    else:
        preview_img = images.copy()[:, :, :, 0]

    # Now working in CCD Coordinates
    x_max = images.shape[1] - 1
    y_max = images.shape[0] - 1

    # Remove any points that are not within the image CCD
    parsed_x_locs = []
    parsed_y_locs = []
    for i in range(x_locs.shape[0]):
        x = x_locs[i]
        y = y_locs[i]

        if x < 0 or x > x_max:
            continue
        if y < 0 or y > y_max:
            continue

        parsed_x_locs.append(x)
        parsed_y_locs.append(y)
    x_locs = np.array(parsed_x_locs)
    y_locs = np.array(parsed_y_locs)

    # Make sure all supplied points are within image bounds # NOTE: This should be good to remove as it shouldn't hit, testing needed
    if len(np.where(np.logical_or(x_locs < 0, x_locs > x_max))[0]) != 0:
        raise ValueError("The following CCD coordinates passed in through x_locs are outside of the CCD image range: " +
                         str(x_locs[np.where(np.logical_or(x_locs < 0, x_locs > x_max))]))
    if len(np.where(np.logical_or(y_locs < 0, y_locs > y_max))[0]) != 0:
        raise ValueError("The following CCD coordinates passed in through y_locs are outside of the CCD image range: " +
                         str(y_locs[np.where(np.logical_or(y_locs < 0, y_locs > y_max))]))

    # Iterate points in pairs of two
    path_counter = 0
    for i in range(x_locs.shape[0] - 1):

        # Points of concern for this iteration
        x_0 = x_locs[i]
        x_1 = x_locs[i + 1]
        y_0 = y_locs[i]
        y_1 = y_locs[i + 1]

        # Compute the unit vector between the two points
        dx = x_1 - x_0
        dy = y_1 - y_0
        length = np.sqrt(dx**2 + dy**2)
        if length == 0:
            continue

        dx /= length
        dy /= length

        # Compute orthogonal unit vector
        perp_dx = -dy
        perp_dy = dx

        # Calculate (+/-) offsets for each perpendicular direction
        offset1_x = perp_dx * width / 2
        offset1_y = perp_dy * width / 2
        offset2_x = -perp_dx * width / 2
        offset2_y = -perp_dy * width / 2

        # Calculate vertices in correct order for this polygon
        vertex1 = (int(x_0 + offset1_x), int(y_0 + offset1_y))
        vertex2 = (int(x_1 + offset1_x), int(y_1 + offset1_y))
        vertex3 = (int(x_1 + offset2_x), int(y_1 + offset2_y))
        vertex4 = (int(x_0 + offset2_x), int(y_0 + offset2_y))

        # Append vertices in the correct order to form a closed polygon
        vertices = [vertex1, vertex2, vertex3, vertex4]

        # Obtain the indexes into the image of this polygon
        indices_inside = __indices_in_polygon(vertices, (images.shape[0], images.shape[1]))

        if np.any(np.array(indices_inside.shape) == 0):
            continue

        row_idx, col_idx = zip(*indices_inside)

        if n_channels == 1:
            # Update the preview image
            preview_img[row_idx, col_idx] = np.iinfo(preview_img.dtype).max

            # Extract metric from all images and add to keogram
            if metric == 'median':
                pixel_keogram = np.median(images[row_idx, col_idx, :], axis=0)
            elif metric == 'mean':
                pixel_keogram = np.mean(images[row_idx, col_idx, :], axis=0)
            elif metric == 'sum':
                pixel_keogram = np.sum(images[row_idx, col_idx, :], axis=0)
            keo_arr[i, :] = pixel_keogram
        elif n_channels == 3:
            # Update the preview image
            preview_img[row_idx, col_idx, :] = np.iinfo(preview_img.dtype).max

            # Extract metric from all images and add to keogram
            if metric == 'median':
                r_pixel_keogram = np.floor(np.median(images[row_idx, col_idx, 0, :], axis=0))
                g_pixel_keogram = np.floor(np.median(images[row_idx, col_idx, 1, :], axis=0))
                b_pixel_keogram = np.floor(np.median(images[row_idx, col_idx, 2, :], axis=0))
            elif metric == 'mean':
                r_pixel_keogram = np.floor(np.mean(images[row_idx, col_idx, 0, :], axis=0))
                g_pixel_keogram = np.floor(np.mean(images[row_idx, col_idx, 1, :], axis=0))
                b_pixel_keogram = np.floor(np.mean(images[row_idx, col_idx, 2, :], axis=0))
            elif metric == 'sum':
                r_pixel_keogram = np.floor(np.sum(images[row_idx, col_idx, 0, :], axis=0))
                g_pixel_keogram = np.floor(np.sum(images[row_idx, col_idx, 1, :], axis=0))
                b_pixel_keogram = np.floor(np.sum(images[row_idx, col_idx, 2, :], axis=0))

            keo_arr[i, :, 0] = r_pixel_keogram
            keo_arr[i, :, 1] = g_pixel_keogram
            keo_arr[i, :, 2] = b_pixel_keogram

        path_counter += 1

    if path_counter == 0:
        raise ValueError("Could not form keogram path. First ensure that coordinates are within image range. Then " +
                         "try increasing 'width' or decreasing number of points in input coordinates.")

    # Create keogram object
    keo_obj = Keogram(data=keo_arr, timestamp=timestamp, instrument_type='asi')

    if preview:
        plt.figure()
        plt.imshow(preview_img, cmap='gray', origin='lower')
        plt.axis("off")
        plt.title("Keogram Domain Preview")
        plt.show()

    return keo_obj

Create a keogram, from a custom slice of a set of images. The slice used is defined by a set of points, in CCD, geographic, or geomagnetic coordinates, within the bounds of the image data. Keogram is created from the bottom up, meaning the first point will correspond to the bottom of the keogram data.

Args

images : numpy.ndarray
A set of images. Normally this would come directly from a data read call, but can also be any arbitrary set of images. It is anticipated that the order of axes is [rows, cols, num_images] or [row, cols, channels, num_images]. If it is not, then be sure to specify the axis parameter accordingly.
timestamp : List[datetime.datetime]
A list of timestamps corresponding to each image.
coordinate_system : str
The coordinate system in which input points are defined. Valid options are "ccd", "geo", or "mag".
width : int
Width of the desired keogram slice, in CCD pixel units.
x_locs : Sequence[float, int]
Sequence of points giving the x-coordinates that define a path through the image data, from which to build the keogram.
y_locs : Sequence[float, int]
Sequence of points giving the y-coordinates that define a path through the image data, from which to build the keogram.
preview : Optional[bool]
When True, the first frame in images will be displayed, with the keogram slice plotted.
skymap : Skymap
The skymap to use in georeferencing when working in geographic or magnetic coordinates.
altitude_km : float, int
The altitude of the image data, in km, to use in georeferencing when working in goegraphic or magnetic coordinates.
metric : str
The metric used to compute values for each keogram pixel. Valid options are "median", "mean", and "sum". Defaults to "median".

Returns

A Keogram object. Raises: