Source code for geosoft.gxpy.grid_fft

"""
Geosoft Fast Fourier Transform processes for 2D gridded data.

:Classes:
    :`GridFFT`: Grid FFT

Note that 'wavenumber' in this module refers to cycles/unit_distance. Multiply by 2*math.pi for the
angular wavenumber.

.. seealso:: :class:`geosoft.gxapi.GXFFT2`

.. note::

    Regression tests provide usage examples:     
    `Tests <https://github.com/GeosoftInc/gxpy/blob/master/geosoft/gxpy/tests/test_grid_fft.py>`_
    
"""
import numpy as np
import math

import geosoft
import geosoft.gxapi as gxapi
from . import gx as gx
from . import grid as gxgrd
from . import grid_utility as gxgrdu
from . import utility as gxu
from . import vv as gxvv

__version__ = geosoft.__version__


def _t(s):
    return geosoft.gxpy.system.translate(s)


FILL_MAXIMUM_ENTROPY = 0
FILL_MINIMUM_CURVATURE = 1

TRN_SOURCE = 0
TRN_FILTERED = 1

I_WAVENUMBER = 0
I_SAMPLE_COUNT = 1
I_LOG_POWER = 2
I_DEPTH_3 = 3
I_DEPTH_5 = 4


[docs]class GridFFTException(geosoft.GXRuntimeError): """ Exceptions from this module. """ pass
[docs]class GridFFT: """ Descrete Fourier Transform of a grid. :param grid: grid file name or a `geosoft.gxpy.grid.Grid` instance. :param expand: minimum expansion percent to create a periodic function. The default is 10. :param trend_order: trend order to remove, default is 1 :param fill_method: FILL_MAXIMUM_ENTROPY (default) or FILL_MINIMUM_CURVATURE. Maximum entropy prediction fills the expanded area in a way that preserves the character of the radially-averaged power spectrum so that spectral analysis based on the shape of the spectrum will be more reliable. The following parameters only apply for maximum-entropy prediction. The defaults will be fine in all but exceptional situations where edge effects unduly distort the result. :param buffer: percentage buffer area, default 2%. The buffer expands the size of the grid footprint before filling internal space with a minimum-curvacture surface. This minimizes edge effects from high-amplitude features at the edge of the grid. :param buff_iterations: maximum iterations to resolve the minimum curvature surface for the internal fill. :param filter_length: maximum entropy filter length, :param amplitude_limit: Amplitudes limiting, which starts at halve this setting. Default no limiting. :param edge_limit: edge amplitude limiting, starting at half this value. Default no limiting. :param edge_limit_cells: if edge limiting, start this many cells from the edge :param smooth: `True` (default) to smooth the filled expanded area. :param feather: `True` to feather expanded data to mean value at the expanded edges. If `False` the data will be periodic at the edges. Feathering may be useful should the prediction function introduce unreasonable edge effects. .. versionadded:: 9.4 """ def __enter__(self): return self def __exit__(self, _type, _value, _traceback): self.__del__() def __del__(self): if hasattr(self, '_close'): self._close() def _close(self): if hasattr(self, '_open'): if self._open: self._source_transform.close(discard=True) if self._filtered_transform: self._filtered_transform.close(discard=True) self._prep_grid.close(discard=True) self._source_grid = None self._trend = None gx.pop_resource(self._open) self._open = None def __repr__(self): return "{}({})".format(self.__class__, self.__dict__) def __str__(self): return '<class GridFFT>: {} ({}, {})'.format(self._name, self._source_grid.nx, self._source_grid.ny)
[docs] def __init__(self, grid, buffer=2., buff_iterations=250, buffer_tolerance=None, trend_order=1, trend_edge=1, expand=10., fill_method=FILL_MAXIMUM_ENTROPY, filter_length=0, amplitude_limit=0., edge_limit=-1., edge_limit_cells=0, smooth=1, feather=False): def max_entropy_fill(btol, melen, feath): def tpg_rows(n): if n >= grid.ny: return None tpg.read_row(n, 0, 0, rvv.gxvv) xyv[:, 1] = grid.y0 + n * grid.dy xyv[:, 2] = rvv.np return xyv xyv = np.empty((grid.nx, 3)) rvv = gxvv.GXvv() # expand buffer and fill if buffer == 0.: buffer_cells = 0 else: buffer_cells = max(int(0.5 + min(grid.nx, grid.ny) * buffer / 100.), 1) gxc.log(_t('Internal fill with {} cell buffer...').format(buffer_cells)) expanded_area = (grid.x0 - buffer_cells * grid.dx, grid.y0 - buffer_cells * grid.dy, grid.x0 + (grid.nx + buffer_cells - 1) * grid.dx, grid.y0 + (grid.ny + buffer_cells - 1) * grid.dy) rvv.length = grid.nx xyv[:, 0] = [(grid.x0 + i * grid.dx) for i in range(grid.nx)] bkd = max(grid.nx, grid.ny) * grid.dx if btol is None: btol = grid.statistics()['sd'] * 0.001 buffer_grid = gxgrd.Grid.minimum_curvature(tpg_rows, cs=grid.dx, area=expanded_area, bkd=bkd, itrmax=buff_iterations, pastol=99., tol=btol, icgr=16, max_segments=grid.ny) # expand for periodic function gxc.log(_t('Expand from ({}, {})').format(grid.nx, grid.ny)) xpg = gxapi.GXPG.create(1, 1, tpg.e_type()) gxapi.GXPGU.expand(buffer_grid.gxpg(), xpg, expand, 1, 0, 0) xnx = xpg.n_cols() xny = xpg.n_rows() gxc.log(_t(' to ({}, {})...').format(xnx, xny)) # fill gxc.log(_t('Maximum-entropy prediction fill...')) reference_file = gx.gx().temp_file('grd') gxapi.GXPGU.ref_file(xpg, reference_file) gxapi.GXPGU.fill(xpg, 2, # Roll off weighting option: 1 - linear, 2 - square gxapi.rDUMMY, # the value to roll off to, GS_R8DM for line mean 0, # roll-off distance in cells, 0 for none, -1 default melen, # max. filter length. -1 for no max. entropy. 0 for the default. 0, # max. pred. sample 0 for the default of 2*lMxf. amplitude_limit, # limit amplitudes to this level, starting at half, 0. for none edge_limit, # limit edge amplitudes to this level. <0.0 for no none edge_limit_cells, # edge limit width in cells, 0 for default. int(bool(smooth)), # pass smooth filter, 0 or 1. reference_file) gxu.delete_files_by_root(reference_file) # prepped grid properties = grid.properties() properties['x0'], properties['y0'] = grid.xy_from_index((grid.nx - xpg.n_cols()) / 2., (grid.ny - xpg.n_rows()) / 2.) prep_grid = gxgrd.Grid.from_data_array(xpg, properties=properties) if feath: _xx, _xy = (xpg.n_cols() - grid.nx) // 2, (xpg.n_rows() - grid.ny) // 2 prep_grid = gxgrdu.feather(prep_grid, min(_xx, _xy)) return prep_grid # lets do this... gxc = gx.gx() # for logging if not isinstance(grid, gxgrd.Grid): grid = gxgrd.Grid.open(grid) self._source_grid = grid self._name = self._source_grid.name if grid.rot != 0.: # TODO: add support for rotated grids raise GridFFTException(_t('Rotated grids are not supported.')) if grid.dx != grid.dy: raise GridFFTException(_t('Cell size must be square')) gxc.log(_t('\nGridFFT from: {}').format(grid.file_name)) # remove trend method = _t('edge') if trend_edge == 1 else _t('all') gxc.log(_t('Remove {} order trend determined from {} data ...').format(trend_order, method)) self._trend = gxapi.GXTR.create(trend_order) tpg = gxapi.GXPG.create(grid.ny, grid.nx, self._source_grid.gxtype) gxapi.GXPGU.trend(grid.gxpg(), tpg, self._trend, 0, trend_edge, grid.x0, grid.y0, grid.dx, grid.dy) if fill_method == FILL_MAXIMUM_ENTROPY: self._prep_grid = max_entropy_fill(buffer_tolerance, filter_length, feather) else: # minimum-curvature gxc.log(_t('Expand from ({}, {})').format(grid.nx, grid.ny)) ppg = gxapi.GXPG.create(1, 1, tpg.e_type()) gxapi.GXPGU.expand(grid.gxpg(), ppg, expand, 1, 0, 0) gxc.log(_t(' to ({}, {})...').format(ppg.n_cols(), ppg.n_rows())) props = grid.properties() xx, xy = (ppg.n_cols() - grid.nx) // 2, (ppg.n_rows() - grid.ny) // 2 props['x0'], props['y0'] = grid.xy_from_index(-xx, -xy) exp_grid = gxgrd.Grid.from_data_array(ppg, properties=props) gxc.log(_t('Minimum-curvature surface fill...')) self._prep_grid = gxgrdu.feather(gxgrdu.flood(exp_grid), min(xx, xy)) self._prep_grid.gximg.set_tr(self._trend) # fft gxc.log(_t('FFT...')) fpg = self._prep_grid.gxpg(True) fpg.re_allocate(self._prep_grid.ny, self._prep_grid.nx + 2) gxapi.GXFFT2.trans_pg(fpg, gxapi.FFT2_PG_FORWARD) trn_file = gx.gx().temp_file('.trn(GRD)') self._source_transform = gxgrd.Grid.from_data_array(fpg, file_name=trn_file, properties=self._prep_grid.properties()) self._source_transform.gximg.set_tr(self._trend) self._filtered_transform = None self._next = 0 self._ny2 = self._source_transform.ny // 2 self._u = None self._u2 = None self._source_spectrum = None self._filtered_spectrum = None self._source_average_spectral_density = None self._filtered_average_spectral_density = None # track self._open = gx.track_resource(self.__class__.__name__, str(self))
[docs] def uv_row_from_tr(self, i): """ Returns (u, v) space row index of a transform row. .. versionadded:: 9.4 """ return (i + self._ny2) if i < self._ny2 else (i - self._ny2)
[docs] def tr_row_from_uv(self, i): """ Returns transform row index from (u, v) space row index. .. versionadded:: 9.4 """ return (i - self._ny2) if i >= self._ny2 else (i + self._ny2)
[docs] def read_uv_row(self, row, trn=TRN_SOURCE): """ Read a row (constant wavenumber v) from (u, v) transform. :param row: row number in (u, v) space, row 0 is minimum v :param trn: `TRN_SOURCE` from the source transform (default) or `TRN_FILTERED` :return: (u_array, v, real_array, imaginary_array) To calculate a wavenumber array: wavenumber = np.sqrt(u_array**2 + v**2). Upward continuation example: .. code:: import geosoft.gxpy.gx as gx import geosoft.gxpy.grid_fft as gfft gxc = gx.GXpy(log=print) with gxfft.GridFFT('some_mag_grid_file.grd') as fft: # for each row v in (u, v) for vrow in range(fft.nv): # read the row u, v, r, i = fft.read_uv_row(vrow) # angular wavenumber along the row wn = np.sqrt(u**2 + v**2) * 2. * math.pi # upward continue 500 grid distance units continuation_filter = np.exp(wn * -500.) r *= continuation_filter i *= continuation_filter # write the filtered result to the TRN_FILTERED transform fft.write_uv_row(r, i, vrow, trn=gxfft.TRN_FILTERED) # create an output grid of the upward-continued result fft.result_grid(file_name='upward_continued_500.grd') .. seealso:: `write_uv_row()` .. versionadded:: 9.4 """ if trn == TRN_SOURCE: tr = self.source_transform else: tr = self.filtered_transform data = tr.read_row(self.tr_row_from_uv(row)).np r = data[0::2] i = data[1::2] if self._u is None: self._u = np.arange(len(r)) * self.dv self._u2 = self._u**2 v = (row - self._ny2) * self.dv return self._u, v, r, i
[docs] def write_uv_row(self, r, i, row, trn=TRN_SOURCE): """ Write a row (constant wavenumber v) to the (u, v) transform. :param r: reals as a numpy array length half the width of the transform (as returned from `read_row`). :param i: imaginary as a numpy array, matches r. :param row: row number in (u, v) space, row 0 is minimum v :param trn: `TRN_SOURCE` from the source transform (default) or `TRN_FILTERED` .. seealso:: `read_uv_row()` .. versionadded:: 9.4 """ if trn == TRN_SOURCE: tr = self.source_transform else: tr = self.filtered_transform data = np.empty(len(r) * 2, dtype=tr.dtype) data[0::2] = r data[1::2] = i tr.write_row(data, self.tr_row_from_uv(row)) if row == tr.ny - 1: if trn == TRN_SOURCE: self._source_transform = gxgrd.reopen(tr) else: self._filtered_transform = gxgrd.reopen(tr)
@property def du(self): """ Wavenumber increment in the grid X direction in (cycles / grid distance uom)""" return 1.0 / (self._source_transform.dx * (self._source_transform.nx - 2)) @property def dv(self): """ Wavenumber increment in the grid Y direction in (cycles / grid distance uom).""" return 1.0 / (self._source_transform.dy * self._source_transform.ny) @property def nu(self): """ Number of discrete wavenumbers in grid X direction. The transform is folded in the x direction, will be half the transform width + 1 """ return self._source_transform.nx // 2 @property def nv(self): """ Number of discrete wavenumbers in the grid Y direction. """ return self._source_transform.ny @property def u0(self): """ First u (X-direction) wavenumber, always 0. """ return 0. @property def v0(self): """ First v (Y-direction) wavenumber. """ return (self.nv // 2) * self.dv
[docs] def filter(self, filters=None, trn=TRN_SOURCE, height='', mag_inclination='', mag_declination='', mag_strength=''): """ Apply a pre-defined filter. See filter reference: https://github.com/GeosoftInc/gxc/blob/master/reference/con_files/magmap.con :param filters: list of filters to apply. Each filter can be a string, or a tuple with the first item being the filter name followed by the filter parameters. See `magmap.con` referenced above for the full list of filters. :param trn: `TRN_SOURCE` apply to the source transform (default) or `TRN_FILTERED` to apply to the current filtered transform. The following parameter are the default for magnetic filed filters like pole/equator reduction and aparent susceptibility. :param height: survey ground clearance in grid distance units :param mag_inclination: magnetic field inclination :param mag_declination: magnetic field declination :param mag_strength: total magnetic filed strength for converting magnetization to susceptibility. Example upward continuation 500 grid distance units and a first vertical derivative: .. code:: import geosoft.gxpy.gx as gx import geosoft.gxpy.grid_fft as gfft gxc = gx.GXpy(log=print) with gxfft.GridFFT('some_mag_grid_file.grd') as fft: # apply the filer fft.filter(['CNUP 500', 'DRVZ 1']) # equlavalent to `fft.filter([('CNUP', 500), ('DRVZ', 1)])` # create an output grid of the upward-continued result fft.result_grid(file_name='upward_continued_500.grd') .. versionadded:: 9.4 """ if (trn == TRN_SOURCE) or (self._filtered_transform is None): transform = self._source_transform else: transform = self._filtered_transform tpg = transform.gxpg(True) transform.gximg.get_tr(self._trend) # control file con_file = gx.gx().temp_file('con') with open(con_file, 'x') as cf: # control-file header parameters: cf.write('\n') # title not used cf.write('{} /\n'.format(height)) cf.write('{} /\n'.format(mag_inclination)) cf.write('{} /\n'.format(mag_declination)) cf.write('{} /\n'.format(mag_strength)) if isinstance(filters, str): filters = [filters] # filters if filters: for f in filters: if isinstance(f, str): cf.write('{} /\n'.format(f)) else: for p in f: cf.write('{} '.format(p)) cf.write('/\n') # filter gxapi.GXFFT2.filter_pg(tpg, con_file, self._trend, self._source_grid.dx, self._source_grid.dy, self._source_grid.rot) gxu.delete_file(con_file) file_name = gx.gx().temp_file('.trn(GRD)') self._filtered_transform = gxgrd.Grid.from_data_array(tpg, file_name=file_name, properties=self._source_transform.properties()) self._filtered_transform.gximg.set_tr(self._trend) self._filtered_average_spectral_density = None self._filtered_spectrum = None
[docs] def radially_averaged_spectrum(self, trn=TRN_SOURCE): """ Radially averaged spectrum as a Numpy array shaped (n_wavenumbers, 5). :param trn: `TRN_SOURCE` (default) return spectrum of the source data, or `TRN_FILTERED` return spectrum of the current filtered state. .. note:: Numpy array shaped (n_wavenumbers, 5), where each row contains: [wavenumber, sample_count, log_power, 3-point_depth, 5-point_depth], wavenumber in cycles per 1000 * distance unit of measure (cycle/km for metres), and log_power is the natural log of the power. Point depths are calculated by dividing the local slope(3 points and 5 points) of the log_power by (4 * pi) (see Spector and Grant, 1970). For code clarity, the following index constants can be used to reference columns in the spectrum array: ============== === I_WAVENUMBER 0 I_SAMPLE_COUNT 1 I_LOG_POWER 2 I_DEPTH_3 3 I_DEPTH_5 4 ============== === .. versionadded:: 9.4 """ if trn == TRN_SOURCE: if self._source_spectrum is not None: return self._source_spectrum tr = self.source_transform else: if self._filtered_spectrum is not None: return self._filtered_spectrum tr = self.filtered_transform # spectrum spec_file = gx.gx().temp_file() try: gxapi.GXFFT2.rad_spc(tr.gximg, spec_file) except geosoft.gxapi.GXAPIError: tpg = gxapi.GXPG.create(tr.ny, tr.nx, gxapi.GS_FLOAT) tpg.copy(tr.gxpg()) with gxgrd.Grid.from_data_array(tpg, properties=tr.properties()) as tgd: tgd.delete_files() gxapi.GXFFT2.rad_spc(tgd.gximg, spec_file) length = max(tr.nx, tr.ny) // 2 spectrum = np.zeros((length, 5)) wavenumber = spectrum[:, 0] n_sample = spectrum[:, 1] log_power = spectrum[:, 2] depth_3 = spectrum[:, 3] depth_5 = spectrum[:, 4] i = 0 asd = None with open(spec_file) as f: for sl in f: if sl: if sl[0] == '/': if '=' in sl: try: asd = float(sl.split('=')[1]) except ValueError: asd = None else: pv = sl.split() wavenumber[i] = float(pv[0]) n_sample[i] = float(pv[1]) log_power[i] = float(pv[2]) try: depth_3[i] = float(pv[3]) except ValueError: depth_3[i] = np.nan try: depth_5[i] = float(pv[4]) except ValueError: depth_5[i] = np.nan i += 1 spectrum = spectrum[:i] gxu.delete_file(spec_file) # add the average spectral density back into the log_power spectrum[:, I_LOG_POWER] += asd if trn == TRN_SOURCE: self._source_spectrum = spectrum self._source_average_spectral_density = asd else: self._filtered_spectrum = spectrum self._filtered_average_spectral_density = asd return spectrum
[docs] def log_average_spectral_density(self, trn=TRN_SOURCE): """ Log of the average spectral density of the transform. :param trn: `TRN_SOURCE` (default) source data spectrum, or `TRN_FILTERED` current filtered transform. .. versionadded:: 9.4 """ if trn == TRN_SOURCE: if self._source_average_spectral_density: return self._source_average_spectral_density else: if self._filtered_average_spectral_density: return self._filtered_average_spectral_density # estimate from radial spectrum data rspec = self.radially_averaged_spectrum(trn) tot_samples = np.sum(rspec[I_SAMPLE_COUNT]) tot_energy = np.sum(np.exp(rspec[I_LOG_POWER])) asd = math.log(tot_energy / tot_samples) if trn == TRN_SOURCE: self._source_average_spectral_density = asd else: self._filtered_average_spectral_density = asd return asd
[docs] def spectrum_grid(self, trn=TRN_SOURCE, file_name=None, overwrite=False): """ Return the 2D log(power) amplitude as a grid in wavenumber domain (u, v). Amplitude = log(real**2 + imaginary**2) :param trn: `TRN_SOURCE` source spectrum (default) or `TRN_FILTERED` filtered spectrum :param file_name: name for the grid file, default is a temporary grid. :param overwrite: `True` to overwrite existing grid. :return: `geosoft.gxpy.grid.Grid` instance .. versionadded:: 9.4 """ if trn == TRN_SOURCE: tr = self._source_transform else: tr = self._filtered_transform du = 1.0 / (tr.dx * (tr.nx - 2)) dv = 1.0 / (tr.dy * tr.ny) props = tr.properties() props['nx'] = tr.nx // 2 props['ny'] = tr.ny props['x0'] = 0 props['y0'] = -(tr.ny // 2) * dv props['dx'] = du props['dy'] = dv nperr = {} sgrd = gxgrd.Grid.new(file_name=file_name, properties=props, overwrite=overwrite) try: nperr = np.seterr(under='ignore') for row in range(tr.ny): data = tr.read_row(row).np r = np.clip(data[0::2]**2, 1.0e-20, None) i = np.clip(data[1::2]**2, 1.0e-20, None) sgrd.write_row(np.log(r + i), self.uv_row_from_tr(row)) finally: np.seterr(**nperr) return sgrd
@property def source_grid(self): """ Source grid as a `geosoft.gxpy.grid.Grid` instance. """ return self._source_grid @property def expanded_filled_grid(self): """ Expanded and filled grid as a `geosoft.gxpy.grid.Grid` instance. """ return self._prep_grid @property def source_transform(self): """ Folded descrete Fourier transform as a `geosoft.gxpy.grid.Grid` instance.""" return self._source_transform @property def filtered_transform(self): """ Folded descrete Fourier transform after filters applied.""" if self._filtered_transform is None: self._filtered_transform = gxgrd.Grid.new(properties=self._source_transform.properties()) return self._filtered_transform
[docs] def result_grid(self, file_name=None, overwrite=False): """ Produce a filter result grid. :param file_name: result grid file, default greates a temporary grid :param overwrite: `True` to overwrite an existing grid :return: `geosoft.gxpy.grid.Grid` instance .. versionadded:: 9.4 """ if self._filtered_transform is None: self._source_transform = gxgrd.reopen(self._source_transform) trn = self._source_transform else: self._filtered_transform = gxgrd.reopen(self._filtered_transform) trn = self._filtered_transform tpg = trn.gxpg(True) # fft result gxapi.GXFFT2.trans_pg(tpg, gxapi.FFT2_PG_INVERSE) # reduce ix0 = ((tpg.n_cols() - 2) - self._source_grid.nx) // 2 iy0 = (tpg.n_rows() - self._source_grid.ny) // 2 rpg = gxapi.GXPG.create(self._source_grid.ny, self._source_grid.nx, self._source_grid.gxtype) rpg.copy_subset(tpg, 0, 0, iy0, ix0, self._source_grid.ny, self._source_grid.nx) # put back the trend, which may have been filtered gxapi.GXIMG.get_tr(trn.gximg, self._trend) result_pg = gxapi.GXPG.create(rpg.n_rows(), rpg.n_cols(), rpg.e_type()) gxapi.GXPGU.trend(rpg, result_pg, self._trend, 2, 1, self._source_grid.x0, self._source_grid.y0, self._source_grid.dx, self._source_grid.dy) result = gxgrd.Grid.from_data_array(result_pg, properties=self._source_grid.properties(), file_name=file_name, overwrite=overwrite) # mask against original grid result.mask(self.source_grid) return result