"""
Geosoft databases for line-oriented spatial data.
:Classes:
:`Geosoft_gdb`: Geosoft line database
:`Line`: line handling
:`Channel`: channel handling
:Constants:
:LINE_TYPE_NORMAL: `geosoft.gxapi.DB_LINE_TYPE_NORMAL`
:LINE_TYPE_BASE: `geosoft.gxapi.DB_LINE_TYPE_BASE`
:LINE_TYPE_TIE: `geosoft.gxapi.DB_LINE_TYPE_TIE`
:LINE_TYPE_TEST: `geosoft.gxapi.DB_LINE_TYPE_TEST`
:LINE_TYPE_TREND: `geosoft.gxapi.DB_LINE_TYPE_TREND`
:LINE_TYPE_SPECIAL: `geosoft.gxapi.DB_LINE_TYPE_SPECIAL`
:LINE_TYPE_RANDOM: `geosoft.gxapi.DB_LINE_TYPE_RANDOM`
:LINE_CATEGORY_FLIGHT: `geosoft.gxapi.DB_CATEGORY_LINE_FLIGHT`
:LINE_CATEGORY_GROUP: `geosoft.gxapi.DB_CATEGORY_LINE_GROUP`
:LINE_CATEGORY_NORMAL: `geosoft.gxapi.DB_CATEGORY_LINE_NORMAL`
:FORMAT_NORMAL: `geosoft.gxapi.DB_CHAN_FORMAT_NORMAL`
:FORMAT_EXP: `geosoft.gxapi.DB_CHAN_FORMAT_EXP`
:FORMAT_TIME: `geosoft.gxapi.DB_CHAN_FORMAT_TIME`
:FORMAT_DATE: `geosoft.gxapi.DB_CHAN_FORMAT_DATE`
:FORMAT_GEOGR: `geosoft.gxapi.DB_CHAN_FORMAT_GEOGR`
:FORMAT_SIGDIG: `geosoft.gxapi.DB_CHAN_FORMAT_SIGDIG`
:FORMAT_HEX: `geosoft.gxapi.DB_CHAN_FORMAT_HEX`
:CHAN_ALL: None
:CHAN_NORMAL: 0
:CHAN_ARRAY: 1
:CHAN_DISPLAYED: 2
:SYMB_LINE_NORMAL: `geosoft.gxapi.DB_CATEGORY_LINE_NORMAL`
:SYMB_LINE_FLIGHT: `geosoft.gxapi.DB_CATEGORY_LINE_FLIGHT`
:SYMB_LINE_GROUP: `geosoft.gxapi.DB_CATEGORY_LINE_GROUP`
:SELECT_INCLUDE: `geosoft.gxapi.DB_LINE_SELECT_INCLUDE`
:SELECT_EXCLUDE: `geosoft.gxapi.DB_LINE_SELECT_EXCLUDE`
:COMP_NONE: `geosoft.gxapi.DB_COMP_NONE`
:COMP_SPEED: `geosoft.gxapi.DB_COMP_SPEED`
:COMP_SIZE: `geosoft.gxapi.DB_COMP_SIZE`
:READ_REMOVE_DUMMYROWS: 1
:READ_REMOVE_DUMMYCOLUMNS: 2
:SYMBOL_LOCK_NONE: `geosoft.gxapi.DB_LOCK_NONE`
:SYMBOL_LOCK_READ: `geosoft.gxapi.DB_LOCK_READONLY`
:SYMBOL_LOCK_WRITE: `geosoft.gxapi.DB_LOCK_READWRITE`
:DRAW_AS_POINTS: 0
:DRAW_AS_LINES: 1
.. seealso:: `geosoft.gxapi.GXGB`, `geosoft.gxapi.GXEDB`,
`geosoft.gxapi.GXDBREAD`, `geosoft.gxapi.GXDBWRITE`
.. note::
Regression tests provide usage examples:
`Tests <https://github.com/GeosoftInc/gxpy/blob/master/geosoft/gxpy/tests/test_gdb.py>`_
"""
import os
import sys
import math
import numpy as np
import pandas as pd
import geosoft
import geosoft.gxapi as gxapi
from . import vv as gxvv
from . import va as gxva
from . import utility as gxu
from . import gx as gx
from . import coordinate_system as gxcs
from . import metadata as gxmeta
from . import map as gxmap
from . import view as gxview
from . import group as gxgroup
from . import geometry as gxgeo
__version__ = geosoft.__version__
def _t(s):
return geosoft.gxpy.system.translate(s)
LINE_TYPE_NORMAL = gxapi.DB_LINE_TYPE_NORMAL
LINE_TYPE_BASE = gxapi.DB_LINE_TYPE_BASE
LINE_TYPE_TIE = gxapi.DB_LINE_TYPE_TIE
LINE_TYPE_TEST = gxapi.DB_LINE_TYPE_TEST
LINE_TYPE_TREND = gxapi.DB_LINE_TYPE_TREND
LINE_TYPE_SPECIAL = gxapi.DB_LINE_TYPE_SPECIAL
LINE_TYPE_RANDOM = gxapi.DB_LINE_TYPE_RANDOM
LINE_CATEGORY_FLIGHT = gxapi.DB_CATEGORY_LINE_FLIGHT
LINE_CATEGORY_GROUP = gxapi.DB_CATEGORY_LINE_GROUP
LINE_CATEGORY_NORMAL = gxapi.DB_CATEGORY_LINE_NORMAL
FORMAT_NORMAL = gxapi.DB_CHAN_FORMAT_NORMAL
FORMAT_EXP = gxapi.DB_CHAN_FORMAT_EXP
FORMAT_TIME = gxapi.DB_CHAN_FORMAT_TIME
FORMAT_DATE = gxapi.DB_CHAN_FORMAT_DATE
FORMAT_GEOGR = gxapi.DB_CHAN_FORMAT_GEOGR
FORMAT_SIGDIG = gxapi.DB_CHAN_FORMAT_SIGDIG
FORMAT_HEX = gxapi.DB_CHAN_FORMAT_HEX
CHAN_ALL = None
CHAN_NORMAL = 0
CHAN_ARRAY = 1
CHAN_DISPLAYED = 2
SYMB_LINE_NORMAL = gxapi.DB_CATEGORY_LINE_NORMAL
SYMB_LINE_FLIGHT = gxapi.DB_CATEGORY_LINE_FLIGHT
SYMB_LINE_GROUP = gxapi.DB_CATEGORY_LINE_GROUP
SELECT_INCLUDE = gxapi.DB_LINE_SELECT_INCLUDE
SELECT_EXCLUDE = gxapi.DB_LINE_SELECT_EXCLUDE
COMP_NONE = gxapi.DB_COMP_NONE
COMP_SPEED = gxapi.DB_COMP_SPEED
COMP_SIZE = gxapi.DB_COMP_SIZE
READ_REMOVE_DUMMYROWS = 1
READ_REMOVE_DUMMYCOLUMNS = 2
SYMBOL_LOCK_NONE = gxapi.DB_LOCK_NONE
SYMBOL_LOCK_READ = gxapi.DB_LOCK_READONLY
SYMBOL_LOCK_WRITE = gxapi.DB_LOCK_READWRITE
DRAW_AS_POINTS = 0
DRAW_AS_LINES = 1
[docs]class GdbException(geosoft.GXRuntimeError):
"""
Exceptions from `geosoft.gxpy.gdb`.
.. versionadded:: 9.1
"""
pass
def _gdb_name(name):
name = name.strip()
name_ext = os.path.splitext(name)
if name_ext[1].lower() == '.gdb':
return name
else:
return os.path.normpath(name + ".gdb")
def _va_width(data):
if len(data.shape) == 1:
width = 1
elif len(data.shape) == 2:
width = data.shape[1]
else:
raise GdbException(_t("Only one or two-dimensional data allowed."))
return width
[docs]def is_valid_line_name(name):
"""
Return True if this is a valid line name.
See also `create_line_name`
.. versionadded:: 9.3
"""
name = str(name)
try:
int(name)
return False
except ValueError:
return bool(gxapi.GXDB.is_line_name(name))
[docs]def create_line_name(number=0, line_type=LINE_TYPE_NORMAL, version=0):
"""
Returns a valid database line name constructed from the component parts.
:param number: line number, or a string, default is 0
:param line_type: one of LINE_TYPE constants, default is LINE_TYPE_NORMAL
:param version: version number, default is 0
:return: string line name
Line name strings are constructed using the line naming convention as in the following:
====== =======================================
L10.4 LINE_TYPE_NORMAL, number 10, version 4
B10.4 LINE_TYPE_BASE, number 10, version 4
D10.4 LINE_TYPE_RANDOM, number 10, version 4
P10.4 LINE_TYPE_SPECIAL, number 10, version 4
T10.4 LINE_TYPE_TIE, number 10, version 4
S10.4 LINE_TYPE_TEST, number 10, version 4
R10.4 LINE_TYPE_TREND, number 10, version 4
====== =======================================
.. versionadded:: 9.3
"""
sr = gxapi.str_ref()
gxapi.GXDB.set_line_name2(str(number), line_type, version, sr)
return sr.value
[docs]def delete_files(file_name):
"""
Delete all files associates with this database name.
:param file_name: name of the database
.. versionadded:: 9.3
"""
if file_name is not None:
path = _gdb_name(file_name)
root, ext = os.path.splitext(os.path.basename(path))
if ext.lower() != '.gdb':
raise GdbException(_t('File is not a Geosoft database file (no gdb extension): {}'.format(file_name)))
gxu.delete_file(file_name)
gxu.delete_file(file_name + '.xml')
[docs]class Geosoft_gdb(gxgeo.Geometry):
"""
Class to work with Geosoft databases. This class wraps many of the functions found in
`geosoft.gxapi.GXDB`.
:Constructors:
========= =========================================================================
`open` open an existing file, or if not specified open/lock the current database
`new` create a new database
========= =========================================================================
**Some typical programming patterns**
Python Oasis extension opens and reads through all data in the current database:
.. code::
import os,sys
import numpy as np
import gxpy.gx as gxp
import gxpy.gdb as gxdb
# open the current database in the open project
gdb = gxdb.Geosoft_gdb.open()
for line in gdb.list_lines():
npd,ch,fid = gdb.read_line(line)
# npd is a 2D numpy array to all data in this line.
# ch is a list of the channels, one channel for each column in npd.
# Array channels are expanded with channel names "name[0]", "name[1]" ...
# fid is a tuple (start,increment) fiducial, which will be the minimum start and smallest increment.
# ... do something with the data in npd ...
External Python program to open and read through all data in a database:
.. code::
import os,sys
import numpy as np
import gxpy.gx as gx
import gxpy.gdb as gxdb
# initalize the gx environment - required for external programs.
gxp = gx.GXpy()
# open a database
gdb = gxdb.Geosoft_gdb.open('test.gdb')
for line in gdb.list_lines():
npd,ch,fid = gdb.read_line(line)
# npd is a 2D numpy array to all data in this line.
# ch is a list of the channels, one channel for each column in npd.
# Array channels are expanded with channel names "name[0]", "name[1]" ...
# fid is a tuple (start,increment) fiducial, which will be the minimum start and smallest increment.
# ... do something with the data in npd ...
The following creates a new channel that is the distance from the origin to the X,Y,Z location of every point.
.. code::
...
gdb = gxdb.Geosoft_gdb.open('test.gdb')
for line in gdb.list_lines():
npd,ch,fid = gdb.read_line(line, channels=['X','Y','Z'])
npd = np.square(npd)
distance_from_origin = np.sqrt(npd[0] + npd[1] + npd[2])
gdb.write_channel(line, 'distance', distance_from_origin, fid)
.. versionadded:: 9.1
.. versionchanged:: 9.3 float numpy arrays use np.nan for dummies so dummy filtering no longer necessary.
.. versionchanged:: 9.3.1 inherits from `geosoft.gxpy.geometry.Geometry`
"""
def __enter__(self):
return self
def __exit__(self, _type, _value, _traceback):
self.__del__()
def __del__(self):
if hasattr(self, '_close'):
self._close()
def _close(self, pop=True, discard=False):
if hasattr(self, '_open'):
if self._open:
if self._db:
if self._edb is not None:
if self._edb.is_locked():
self._edb.un_lock()
discard = False
self._edb = None
if not discard and self._xmlmetadata_changed:
with open(self._file_name + '.xml', 'w+') as f:
f.write(gxu.xml_from_dict(self._xmlmetadata))
self._db.sync()
self._db = None
if discard:
gxu.delete_files_by_root(self._file_name)
if pop:
gx.pop_resource(self._open)
self._open = None
def __repr__(self):
return "{}({})".format(self.__class__, self.__dict__)
def __str__(self):
return '{}({} lines, {} channels)'.format(os.path.basename(self.name), self.used_lines, self.used_channels)
[docs] def __init__(self, name=None, db=None):
self._lst = gxapi.GXLST.create(2000)
self._file_name = None
self._db = db
self._edb = None
self._xmlmetadata = None
self._xmlmetadata_changed = False
self._xmlmetadata_root = ''
self._extent = {'xyz': None, 'extent': None}
if name is None:
if self._db:
s = gxapi.str_ref()
self._db.get_name(gxapi.DB_NAME_FILE, s)
self._file_name = os.path.normpath(s.value)
name = os.path.basename(self._file_name)
else:
name = '_gdb_'
else:
name = os.path.basename(name)
super().__init__(name=name)
self._open = gx.track_resource(self.__class__.__name__, self._file_name)
[docs] def close(self, discard=False):
"""
Close the database and free resources
:param discard: True to discard the database files(s) after closing.
.. versionadded:: 9.4
"""
self._close(discard=discard)
[docs] @classmethod
def open(cls, name=None):
"""
Open an existing database.
:param name: name of the database, default is the current project database
:returns: `Geosoft_gdb` instance
.. versionadded:: 9.1
"""
gdb = cls(name)
if name is None:
gdb._edb = gxapi.GXEDB.current()
gdb._db = gxapi.GXEDB.lock(gdb._edb)
else:
gdb._edb = None
gdb._db = gxapi.GXDB.open(_gdb_name(name), 'SUPER', '')
sr = gxapi.str_ref()
gdb._db.get_name(gxapi.DB_NAME_FILE, sr)
gdb._file_name = os.path.normpath(sr.value)
return gdb
[docs] @classmethod
def new(cls, name=None, max_lines=500, max_channels=200, max_blobs=0, page_size=1024,
comp=None, overwrite=False):
"""
Create a new database.
:param name: database name, if None a temporary database is created
:param max_lines: maximum number of lines, default 500
:param max_channels: maximum number of channels, default 200
:param max_blobs: maximum number of blobs, default lines*channels+20
:param comp: compression:
| COMP_NONE
| COMP_SPEED (default)
| COMP_SIZE
:param overwrite: `True` to overwrite existing database. Default is `False`, GdbException if file exists.
:param page_size: page size (default is 1024), which limits the amount of compressed data that can be
stored in a single channel on a line. The maximum compressed data size for a channel
will be this number * 65534 (default 1024 * 65534 = 64 MB of compressed data).
This will be forced to a power of 2 between 64 and 4096, which would allow for a
maximum of 256 MB compressed data per channel per line.
:returns: `Geosoft_gdb` instance
.. versionadded:: 9.1
.. versionchanged:: 9.3
added parameter `overwrite=False`
.. versionchanged:: 9.4 `name=None` creates a temporary database
"""
max_lines = max(10, max_lines)
max_channels = max(25, max_channels)
min_blobs = max_channels + max_lines + 20
max_blobs = max(min_blobs, max_blobs)
if not comp:
comp = COMP_SPEED
# validate page_size:
ps = 64
while ps < page_size:
ps *= 2
if ps > 4096:
raise GdbException(_t('Page size cannot be larger than 4096 (256 MB per line-channel).'))
page_size = ps
if name is None:
name = gx.gx().temp_file('gdb')
name = _gdb_name(name)
if not overwrite and os.path.isfile(name):
raise GdbException(_t('Cannot overwrite existing database \'{}\''.format(name)))
gxu.delete_files_by_root(name)
gxapi.GXDB.create_comp(name,
max_lines, max_channels, max_blobs, 10, 100,
'SUPER', '',
page_size, comp)
return cls.open(name)
[docs] def commit(self):
"""
Commit database changes.
.. versionadded:: 9.1
"""
self._db.commit()
[docs] def discard(self):
"""
Discard database changes.
.. versionadded:: 9.1
"""
self._db.discard()
# ============================================================================
# internal helper functions
[docs] def exist_symb_(self, symb, symb_type):
"""
Check if a symbol exists of the required type.
:param symb: symbol name, number or instance
:param symb_type: one of DB_SYMB_TYPE
:returns: `True` if the symbol exists and is the expected symbol type, `False` otherwise
.. versionadded:: 9.1
"""
if isinstance(symb, str):
return self._db.exist_symb(symb, symb_type)
elif isinstance(symb, int):
return self._db.valid_symb(symb, symb_type)
elif isinstance(symb, Line) and (symb_type == gxapi.DB_SYMB_LINE):
return True
elif isinstance(symb, Channel) and (symb_type == gxapi.DB_SYMB_CHAN):
return True
return False
# ============================================================================
# Information
@property
def gxdb(self):
"""The `geosoft.gxapi.GXDB` instance handle"""
return self._db
@property
def xyz_channels(self):
"""
The currently identified (x, y, z) channels. Methods that work on spatial locations will use these
channels for locating the data at each fiducial of the data. Can be set using a tuple of two or
three strings. For example:
.. code::
gdb.xyz_channels = ('Easting', 'Northing')
gdb.xyz_channels = ('Easting', 'Northing', 'Elevation')
.. versionadded:: 9.2
"""
sr = gxapi.str_ref()
self.gxdb.get_xyz_chan(0, sr)
x = sr.value
self.gxdb.get_xyz_chan(1, sr)
y = sr.value
self.gxdb.get_xyz_chan(2, sr)
z = sr.value
if not self.is_channel(x):
x = None
if not self.is_channel(y):
y = None
if not self.is_channel(z):
z = None
return x, y, z
@xyz_channels.setter
def xyz_channels(self, xyz):
if len(xyz) >= 3:
x, y, z = xyz
self.is_channel(z, True)
else:
x, y = xyz
z = None
self.is_channel(x, True)
self.is_channel(y, True)
self.gxdb.set_xyz_chan(0, x)
self.gxdb.set_xyz_chan(1, y)
if z:
self.gxdb.set_xyz_chan(2, z)
self.clear_extent()
def _init_xmlmetadata(self):
if not self._xmlmetadata:
self._xmlmetadata = gxu.geosoft_metadata(self._file_name)
self._xmlmetadata_root = tuple(self._xmlmetadata.items())[0][0]
@property
def metadata(self):
"""
Return the database XML metadata as a dictionary. Can be set, in which case
the dictionary items passed will be added to, or replace existing XML metadata.
.. versionadded:: 9.2
"""
self._init_xmlmetadata()
return self._xmlmetadata[self._xmlmetadata_root]
@metadata.setter
def metadata(self, meta):
self._init_xmlmetadata()
self._xmlmetadata[self._xmlmetadata_root] = gxu.merge_dict(self._xmlmetadata[self._xmlmetadata_root], meta)
self._xmlmetadata_changed = True
@property
def file_name(self):
"""Database file name."""
return os.path.abspath(self._file_name)
@property
def coordinate_system(self):
"""
Coordinate system of the current `xyz_channels`.
Can be set from any `geosoft.gxpy.coordinate_system.Coordinate_system` constructor.
.. versionchanged:: 9.3
added setter
"""
try:
x, y, z = self.xyz_channels
ipj = gxapi.GXIPJ.create()
self.gxdb.get_ipj(self.channel_name_symb(x)[1], ipj)
return gxcs.Coordinate_system(ipj)
except GdbException:
return gxcs.Coordinate_system()
@coordinate_system.setter
def coordinate_system(self, cs):
if not isinstance(cs, gxcs.Coordinate_system):
cs = gxcs.Coordinate_system(cs)
x, y, z = self.xyz_channels
self.gxdb.set_ipj(self.channel_name_symb(x)[1], self.channel_name_symb(y)[1], cs.gxipj)
x, _, z = self.xyz_channels
if z:
z = Channel(self, z)
if not z.unit_of_measure:
z.unit_of_measure = Channel(self, x).unit_of_measure
@property
def max_blobs(self):
"""maximum blobs allowed"""
return self._db.get_info(gxapi.DB_INFO_BLOBS_MAX)
@property
def max_lines(self):
"""maximum number of lines allowed"""
return self._db.get_info(gxapi.DB_INFO_LINES_MAX)
@property
def max_channels(self):
"""maximum number of channels allowed"""
return self._db.get_info(gxapi.DB_INFO_CHANS_MAX)
@property
def used_blobs(self):
"""number of blobs used"""
return self._db.get_info(gxapi.DB_INFO_BLOBS_USED)
@property
def used_lines(self):
"""number of lines used"""
return self._db.get_info(gxapi.DB_INFO_LINES_USED)
@property
def used_channels(self):
"""number of channels used"""
return self._db.get_info(gxapi.DB_INFO_CHANS_USED)
@property
def max_compressed_channel_bytes(self):
"""maximum compressed data per channel per line in bytes"""
ps = self._db.get_info(gxapi.DB_INFO_PAGE_SIZE)
return ps * 65534
@property
def number_of_blocks(self):
"""number of blocks"""
return self._db.get_info(gxapi.DB_INFO_DATA_SIZE)
@property
def lost_blocks(self):
"""lost blocks that might be freed"""
return self._db.get_info(gxapi.DB_INFO_LOST_SIZE)
@property
def free_blocks(self):
"""number of free blocks"""
return self._db.get_info(gxapi.DB_INFO_FREE_SIZE)
@property
def compression(self):
"""database compression setting"""
return self._db.get_info(gxapi.DB_INFO_COMP_LEVEL)
@property
def pages_for_blobs(self):
"""pages consumed by blobs"""
return self._db.get_info(gxapi.DB_INFO_BLOB_SIZE)
@property
def db_size_kb(self):
"""database size in kb"""
return self._db.get_info(gxapi.DB_INFO_FILE_SIZE)
@property
def index_size_kb(self):
"""index size in kb"""
return self._db.get_info(gxapi.DB_INFO_INDEX_SIZE)
@property
def max_block_size_bytes(self):
"""maximum block size in bytes"""
return self._db.get_info(gxapi.DB_INFO_MAX_BLOCK_SIZE)
@property
def data_has_changed(self):
"""`True` if data has changed"""
return self._db.get_info(gxapi.DB_INFO_CHANGESLOST)
[docs] def is_line(self, line, raise_err=False):
"""
Returns `True` if the named line exists in the database.
:param line: line name
:param raise_err: True to raise an error if it does not exist
.. versionadded:: 9.1
"""
exist = self._db.find_symb(str(line), gxapi.DB_SYMB_LINE) != gxapi.NULLSYMB
if raise_err and not exist:
raise GdbException(_t('"{}" is not a line in the database'.format(line)))
return exist
[docs] def is_channel(self, chan, raise_err=False):
"""
Returns `True` if the channel name exists in the database.
:param chan: channel name
:param raise_err: True to raise an error if it does not exist
.. versionadded:: 9.1
"""
exist = self._db.find_chan(chan) != gxapi.NULLSYMB
if raise_err and not exist:
raise GdbException(_t('"{}" is not a channel in the database'.format(chan)))
return exist
@property
def extent(self):
"""
Return the spatial extent of all selected data in the database as a `geosoft.gxpy.geometry.Point2`.
:returns: `geosoft.gxpy.geometry.Point2` of minimum, maximum, or None if no spatial information.
.. versionadded:: 9.2
"""
def expand(_min, _max, _data):
if np.isnan(_data).all():
return _min, _max
mdata = np.nanmin(_data)
if _min is None:
_min = mdata
_max = np.nanmax(_data)
return _min, _max
if mdata < _min:
_min = mdata
return _min, _max
mdata = np.nanmax(_data)
if mdata > _max:
_max = mdata
return _min, _max
lines = self.lines()
if len(lines):
xyz = self.xyz_channels
if str(xyz) == self._extent['xyz']:
return self._extent['extent']
xmin = xmax = ymin = ymax = zmin = zmax = None
if None in xyz:
if None in xyz[0:2]:
return None
xyz = xyz[0:2]
for l in lines:
data = self.read_line(l, channels=xyz)[0]
xmin, xmax = expand(xmin, xmax, data[:, 0])
ymin, ymax = expand(ymin, ymax, data[:, 1])
if data.shape[1] > 2:
zmin, zmax = expand(zmin, zmax, data[:, 2])
ext = gxgeo.Point2((xmin, ymin, zmin, xmax, ymax, zmax), coordinate_system=self.coordinate_system)
self._extent['xyz'] = str(xyz)
self._extent['extent'] = ext
return ext
return None
def _get(self, s, fn):
self.lock_read_(s)
try:
v = fn(s)
finally:
self.unlock_(s)
return v
[docs] def lock_set_(self, s, fn, v):
self.lock_write_(s)
try:
fn(s, v)
finally:
self.unlock_(s)
[docs] def line_name_symb(self, line, create=False):
"""
Return line name, symbol
:param line: line name, or symbol number
:param create: `True` to create a line if one does not exist
:returns: line name, symbol
:raises: GdbException if line not found or cannot be created
.. versionadded:: 9.1
"""
if isinstance(line, Line):
return line.name, line.symbol
elif isinstance(line, str):
if self.exist_symb_(line, gxapi.DB_SYMB_LINE):
symb = self._db.find_symb(line, gxapi.DB_SYMB_LINE)
return line, symb
if create:
return line, self.new_line(line)
else:
raise GdbException(_t('Line \'{}\' not found'.format(line)))
else:
sr = gxapi.str_ref()
self._db.get_symb_name(line, sr)
return sr.value, line
[docs] def channel_name_symb(self, chan):
"""
Return channel name, symbol
:param chan: channel name, or symbol number or Channel instance
:returns: line name, symbol, returns ('',-1) if invalid
:raises: GdbException if channel does not exist
.. versionadded:: 9.1
"""
if isinstance(chan, Channel):
return chan.name, chan.symbol
if isinstance(chan, str):
symb = self._db.find_symb(chan, gxapi.DB_SYMB_CHAN)
if symb == -1:
raise GdbException(_t('Channel \'{}\' not found'.format(chan)))
return chan, symb
if not self.exist_symb_(chan, gxapi.DB_SYMB_CHAN):
raise GdbException(_t('Channel symbol \'{}\' not found'.format(chan)))
sr = gxapi.str_ref()
self._db.get_symb_name(chan, sr)
return sr.value, chan
[docs] def channel_width(self, channel):
"""
Channel array width, 1 for normal channels, >1 for VA channels.
:param channel: channel symbol or name
:returns: array dimension, 1 for non-array channels
.. versionadded:: 9.1
"""
return self._get(self.channel_name_symb(channel)[1], self._db.get_col_va)
[docs] def list_channels(self, chan=None):
"""
Return a dict of channels in the database.
:param chan: channel filter, default CHAN_ALL:
=============== ============================
CHAN_ALL all channels, normal and VA
CHAN_NORMAL normal channels only
CHAN_ARRAY VA channels only
=============== ============================
:returns: dictionary {channel_names: channel_symbols}
.. versionadded:: 9.1
"""
def clean_chan_dict():
""" returns list without any temporary VA sliced channels """
self._db.chan_lst(self._lst)
_dct = gxu.dict_from_lst(self._lst)
cdct = {}
for ck in _dct:
if '[' in ck:
continue
cdct[ck] = _dct.get(ck)
return cdct
if chan == CHAN_ALL:
dct = clean_chan_dict()
else:
self._db.array_lst(self._lst)
va = gxu.dict_from_lst(self._lst)
if chan == CHAN_ARRAY:
dct = va
else:
# filter VA channels out of the list
allc = clean_chan_dict()
va = list(va)
dct = {}
for k in allc:
if not(k in va):
dct[k] = allc.get(k)
# convert symbol strings to ints
for k in dct:
dct[k] = int(dct.get(k))
return dct
[docs] def lines(self, select=True):
"""
.. deprecated:: 9.2 use list_lines()
"""
return self.list_lines(select)
[docs] def list_lines(self, select=True):
"""
List of lines in the database, returned as a {name: symbol} dictionary
:param select: `True` to return selected lines, `False` to return all lines
:returns: dictionary (line name: symbol)
.. versionadded:: 9.1
"""
if select:
self._db.selected_line_lst(self._lst)
else:
self._db.line_lst(self._lst)
dct = gxu.dict_from_lst(self._lst)
for k in dct:
dct[k] = int(dct.get(k))
return dct
[docs] def line_details(self, line):
"""
Return dictionary of line details
:param line: channel name or symbol
:returns: dictionary:
=========== ==============================================================
Key Meaning
=========== ==============================================================
name line name
symbol line symbol
type line type, one of gxapi.DB_LINE_TYPE
category one of SYMB_LINE
date date of the line
number numeric line number
flight flight number
version line version number
groupclass class name for grouped lines, None if not a grouped line
=========== ==============================================================
.. versionadded:: 9.1
"""
def get_detail(fn):
try:
sr = gxapi.str_ref()
fn(ls, sr)
return sr.value
except geosoft.gxapi.GXAPIError:
return ''
ln, ls = self.line_name_symb(line)
detail = {}
self.lock_read_(ls)
try:
detail['name'] = ln
detail['symbol'] = ls
detail['category'] = self._db.line_category(ls)
detail['date'] = self._db.line_date(ls)
detail['flight'] = self._db.line_flight(ls)
detail['number'] = self._db.line_number(ls)
detail['version'] = self._db.line_version(ls)
detail['type'] = self._db.line_type(ls)
if self._db.line_category(ls) == gxapi.DB_CATEGORY_LINE_GROUP:
detail['groupclass'] = get_detail(self._db.get_group_class)
else:
detail['groupclass'] = None
finally:
self.unlock_(ls)
return detail
[docs] def channel_details(self, channel):
"""
Return dictionary of channel details
:param channel: channel name or symbol
:returns: dictionary:
======= ==============================================================
Key Meaning
======= ==============================================================
name channel name
symbol channel symbol
class class name
format format, one of gxapi.DB_CHAN_FORMAT constants
width display width in characters
decimal decimal places to display
unit measurement unit
label channel label, which can be different from the channel name
protect protection: 0 can be modified; 1 protected from modification
columns number data columns, 1 for normal channels, n for VA channels
type data type, one of gxapi.DB_CATEGORY_CHAN constants
======= ==============================================================
.. versionadded:: 9.1
"""
def get_detail(fn):
sr = gxapi.str_ref()
fn(cs, sr)
return sr.value
cn, cs = self.channel_name_symb(channel)
detail = {}
self.lock_read_(cs)
try:
detail['name'] = cn
detail['symbol'] = cs
detail['class'] = get_detail(self._db.get_chan_class)
detail['format'] = self._db.get_chan_format(cs)
detail['width'] = self._db.get_chan_width(cs)
detail['decimal'] = self._db.get_chan_decimal(cs)
detail['unit'] = get_detail(self._db.get_chan_unit)
detail['label'] = get_detail(self._db.get_chan_label)
detail['protect'] = self._db.get_chan_protect(cs)
detail['array'] = self.channel_width(cs)
detail['type'] = self._db.get_chan_type(cs)
finally:
self.unlock_(cs)
return detail
[docs] def set_channel_details(self, channel, detail):
"""
Set/change channel details from dictionary
:param channel: channel name or symbol
:param detail: dictionary, see chan_details
.. versionadded:: 9.1
"""
def set_detail(what, fn):
det = detail.get(what)
if det is not None:
fn(cs, det)
cs = self.channel_name_symb(channel)[1]
self.lock_write_(cs)
try:
set_detail('class', self._db.set_chan_class)
set_detail('format', self._db.set_chan_format)
set_detail('width', self._db.set_chan_width)
set_detail('decimal', self._db.set_chan_decimal)
set_detail('unit', self._db.set_chan_unit)
set_detail('label', self._db.set_chan_label)
protect = detail.get('protect')
if protect is not None:
self._db.set_chan_protect(cs, protect)
finally:
self.unlock_(cs)
[docs] def channel_dtype(self, channel):
"""
Returns channel numpy dtype
:param channel: channel name or symbol
:returns: numpy dtype
.. versionadded:: 9.1
"""
return gxu.dtype_gx(self._db.get_chan_type(self.channel_name_symb(channel)[1]))
[docs] def channel_fid(self, line, channel):
"""
Return the fiducial of a line, channel
:param line: line name, symbol or Line
:param channel: channel name, symbol or channel
:returns: (start,increment)
"""
ls = self.line_name_symb(line)[1]
cs = self.channel_name_symb(channel)[1]
self.lock_read_(cs)
try:
fid_start = self._db.get_fid_start(ls, cs)
fid_incr = self._db.get_fid_incr(ls, cs)
finally:
self.unlock_(cs)
return fid_start, fid_incr
# ========================================================================================
# management
[docs] def new_channel(self, name, dtype=np.float64, array=1, dup=None, details=None):
"""
Return a channel symbol, create if it does not exist.
:param name: channel name
:param dtype: numpy dtype (ie. np.int64)
:param array: array columns (default is 1)
:param dup: duplicate properties of this channel (name, symbol, channel)
:param details: dictionary containing channel details, see channel_details()
:returns: channel symbol
Examples:
.. code::
symb = gdb.newChan('X')
symb = gdb.newChan('X', dtype=np.float64, details={'decimal':4})
.. versionadded:: 9.1
.. versionchanged:: 9.3
added support for duplication an existing channel via dup=
"""
symb = self._db.find_symb(name, gxapi.DB_SYMB_CHAN)
if array < 1:
array = 1
if symb == gxapi.NULLSYMB:
if dup:
symb = self._db.dup_symb_no_lock(self.channel_name_symb(dup)[1], name)
else:
symb = self._db.create_symb_ex(name,
gxapi.DB_SYMB_CHAN,
gxapi.DB_OWN_SHARED,
gxu.gx_dtype(dtype),
array)
if details:
self.set_channel_details(symb, details)
elif not dup:
self.set_channel_details(symb, {'width': 12, 'decimal': 2})
return symb
[docs] def new_line(self, line, linetype=None, group=None, dup=None):
"""
Create a new line symbol. If line exists an error is raised.
:param line: line name
:param linetype: line type for creating a new line, ignored if group defines
================= =========================================
SYMB_LINE_NORMAL normal lines, name is a string
SYMB_LINE_FLIGHT flight lines, first letter is line type
================= =========================================
:param group: group name for a grouped class
:param dup: duplicate from an existing line (name, symbol of Line)
:returns: line symbol
.. seealso:: function `create_line_name` to create a valid line name.
.. versionadded:: 9.1
"""
if group is None and dup is None and not is_valid_line_name(line):
raise GdbException(_t('Invalid line name \'{}\'. Use create_line_name() to create a valid name.'.
format(line)))
symb = self._db.find_symb(line, gxapi.DB_SYMB_LINE)
if symb != gxapi.NULLSYMB:
raise GdbException(('Cannot create existing line \'{}\''.format(line)))
if dup:
dup_symb = self.line_name_symb(dup)[1]
symb = self._db.dup_line_symb(dup_symb, line)
else:
if group:
linetype = SYMB_LINE_GROUP
elif not linetype:
linetype = SYMB_LINE_NORMAL
symb = self._db.create_symb_ex(line,
gxapi.DB_SYMB_LINE,
gxapi.DB_OWN_SHARED,
linetype,
0)
if group:
Line(self, symb).group = group
self.clear_extent()
return symb
[docs] def clear_extent(self):
"""
Clear the extent cache.
.. versionadded:: 9.3.1
"""
self._extent['xyz'] = None
[docs] def delete_channel(self, channels):
"""
Delete channel(s) by name or symbol.
:param channels: channel name or symbol, or a list of channel names or symbols
.. versionadded:: 9.1
"""
if isinstance(channels, str) or isinstance(channels, int):
channels = [channels]
protected_channels = []
for s in channels:
try:
c = Channel(self, s)
if c.protect:
protected_channels.append(c.name)
else:
c.delete()
except GdbException:
continue
if len(protected_channels):
raise GdbException(_t('Cannot delete protected channels: {}'.format(protected_channels)))
[docs] def delete_line(self, lines):
"""
Delete line(s) by name or symbol.
:param lines: line name/symbol, or a list of names/symbols
.. versionadded:: 9.1
"""
if isinstance(lines, str) or isinstance(lines, int):
lines = [lines]
for s in lines:
if type(s) is str and not self.exist_symb_(s, gxapi.DB_SYMB_LINE):
continue
ls = self.line_name_symb(s)[1] if type(s) is str else s
self.unlock_(ls)
self.lock_write_(ls)
self._db.delete_symb(ls)
[docs] def delete_line_data(self, lines):
"""
Delete all data in line(s) by name or symbol but keep the line.
:param lines: line name/symbol, or a list of names/symbols
.. versionadded:: 9.6
"""
if isinstance(lines, str) or isinstance(lines, int):
lines = [lines]
for s in lines:
ls = self.line_name_symb(s)[1] if type(s) is str else s
self._delete_line_data(ls)
def _delete_line_data(self, ls):
channels = self.sorted_chan_list()
for ch in channels:
cn, cs = self.channel_name_symb(ch)
dtype = self.channel_dtype(cs)
w = self.channel_width(cs)
if w == 1:
vv = gxvv.GXvv(dtype=dtype)
self.write_channel_vv(ls, cs, vv)
else:
va = gxva.GXva(width=w, dtype=dtype)
self.write_channel_va(ls, cs, va)
[docs] def select_lines(self, selection='', select=True):
"""
Change selected state of a line, or group of lines
:param selection: string representing selection, comma-delimit multiple selections, or provide a list
of selections.
:param select: `True` to select, `False` to deselect
"L99:800" will select all lines of type "L" in range 99 through 800.
| Use a "T" prefix for Tie lines.
| Use an "F" prefix to specify lines of a specific flight.
| For example, "F10" would select all lines of flight 10.
| Use an empty string ("") to select/deselect ALL lines.
Invalid line names are ignored.
.. versionadded:: 9.1
"""
if isinstance(selection, str):
selection = selection.split(',')
for s in selection:
if select:
self._db.select(s, gxapi.DB_LINE_SELECT_INCLUDE)
else:
self._db.select(s, gxapi.DB_LINE_SELECT_EXCLUDE)
self.clear_extent()
# =====================================================================================
# reading and writing
def _to_string_chan_list(self, channels):
if isinstance(channels, str):
if ',' in channels:
channels = [c.strip() for c in channels.split(',')]
else:
channels = [channels]
elif isinstance(channels, int):
channels = [channels]
return [self.channel_name_symb(c)[0] if isinstance(channels, int) else c for c in channels]
[docs] def sorted_chan_list(self, channels=None):
"""
Get a list of sorted channels from Gdb, placing x, y and z channels (if defined) at front of list.
:param channels: list of channels, strings or symbol number. If None, read all channels
:returns: list containing channel names
.. versionadded:: 9.6
"""
if channels is not None:
ch = self._to_string_chan_list(channels)
else:
ch = list(self.list_channels())
ch.sort(key=str.lower)
ch_lower = [c.lower() for c in ch]
channels = []
nxlower = nylower = nzlower = ''
# put x,y,z at the front
xch = self._db.get_xyz_chan_symb(gxapi.DB_CHAN_X)
if xch != -1:
nx, _ = self.channel_name_symb(xch)
nxlower = nx.lower()
if nxlower in ch_lower:
channels.append(nx)
ych = self._db.get_xyz_chan_symb(gxapi.DB_CHAN_Y)
if ych != -1:
ny, _ = self.channel_name_symb(ych)
nylower = ny.lower()
if nylower in ch_lower:
channels.append(ny)
zch = self._db.get_xyz_chan_symb(gxapi.DB_CHAN_Z)
if zch != -1:
nz, _ = self.channel_name_symb(zch)
nzlower = nz.lower()
if nzlower in ch_lower:
channels.append(nz)
for c in ch:
clower = c.lower()
if (clower == nxlower) or (clower == nylower) or (clower == nzlower):
continue
channels.append(c)
return channels
def _expand_chan_list(self, channels):
""" expand VA channels and return lists of names, symbols and types"""
ch_names = []
ch_symbs = []
c_type = []
for c in channels:
cn, cs = self.channel_name_symb(c)
w = self.channel_width(cs)
if w == 1:
ch_names.append(cn)
ch_symbs.append(cs)
c_type.append(self._db.get_chan_type(cs))
else:
for i in range(w):
ccn, ccs = self.channel_name_symb("{}[{}]".format(cn, i))
ch_names.append(ccn)
ch_symbs.append(ccs)
c_type.append(self._db.get_chan_type(cs))
return ch_names, ch_symbs, c_type
[docs] def lock_read_(self, s):
"""internal function to lock a symbol for read"""
try:
self._db.lock_symb(s, SYMBOL_LOCK_READ, gxapi.DB_WAIT_INFINITY)
except GdbException:
raise GdbException(_t('Cannot read lock symbol {}'.format(s)))
[docs] def lock_write_(self, s):
"""internal function to lock a symbol for write"""
try:
self._db.lock_symb(s, SYMBOL_LOCK_WRITE, gxapi.DB_WAIT_INFINITY)
except GdbException:
raise GdbException(_t('Cannot write lock symbol {}'.format(s)))
[docs] def unlock_(self, s):
"""internal_function to unlock a symbol"""
if self._db.get_symb_lock(s) != SYMBOL_LOCK_NONE:
self._db.un_lock_symb(s)
[docs] def unlock_all(self):
"""
Unlock all locked symbols.
.. versionadded:: 9.3
"""
self._db.un_lock_all_symb()
[docs] def read_channel_vv(self, line, channel, dtype=None):
"""
Read data from a single channel, return in a vv.
:param line: line name or symbol
:param channel: channel name or symbol
:param dtype: type wanted, default same as the channel data
:returns: vv
.. versionadded:: 9.2
"""
ln, ls = self.line_name_symb(line, create=True)
cn, cs = self.channel_name_symb(channel)
if self.channel_width(cs) != 1:
raise GdbException(_t("Cannot read a VA channel into a VV."))
if dtype is None:
dtype = self.channel_dtype(cs)
vv = gxvv.GXvv(dtype=dtype)
self.lock_read_(cs)
try:
self._db.get_chan_vv(ls, cs, vv.gxvv)
finally:
self.unlock_(cs)
vv.unit_of_measure = Channel(self, cs).unit_of_measure
return vv
[docs] def read_channel_va(self, line, channel, dtype=None):
"""
Read VA data from a single channel, return in a va.
:param line: line name or symbol
:param channel: channel name or symbol
:param dtype: type wanted, default same as the channel data
:returns: va
.. versionadded:: 9.2
"""
ln, ls = self.line_name_symb(line, create=True)
cn, cs = self.channel_name_symb(channel)
if dtype is None:
dtype = self.channel_dtype(cs)
w = self.channel_width(cs)
va = gxva.GXva(width=w, dtype=dtype)
self.lock_read_(cs)
try:
self._db.get_chan_va(ls, cs, va.gxva)
finally:
self.unlock_(cs)
va.unit_of_measure = Channel(self, cs).unit_of_measure
return va
[docs] def read_channel(self, line, channel, dtype=None):
"""
Read data from a single channel.
:param line: line name or symbol
:param channel: channel name or symbol
:param dtype: type wanted, default same as the channel data
:returns: numpy data, fid (start, increment)
For dtype=np.float, dummy values will be np.nan. For integer types dummy values will be the
Geosoft dummy values.
.. versionadded:: 9.1
"""
if self.channel_width(channel) == 1:
vv = self.read_channel_vv(line, channel, dtype)
return vv.get_data(vv.dtype)[0], vv.fid
else:
va = self.read_channel_va(line, channel, dtype)
return va.get_data(va.dtype)[0], va.fid
[docs] def read_line_vv(self, line, channels=None, dtype=None, fid=None, common_fid=False, chan_dtypes=False):
"""
Read a line of data into VVs stored in a dictionary by channel.
:param line: line to read, string or symbol number
:param channels: list of channels, strings or symbol number. If None, read all channels
:param dtype: numpy data type for the array, default np.float64 for multi-channel data (unless
chan_dtypes is `True`), data type for single channel data. Use "<Unnn" for string type.
:param common_fid: `True` to resample all channels to a common fiducial
:param chan_dtypes: `True` to determine dtype for each vv from channel type, default `False`
:returns: list of tuples [(channel_name, vv), ...]
If a requested channel is a VA, it is with channel names 'name[0]', 'name[1]', etc.
Examples:
.. code::
# npd - returned numpy array shape (n, number of channels)
# ch - list of returned channels names, array channels expanded to array[0], array[1], ...
# fid - tuple (fidStart,fidIncrement), channels resampled as necessary
data = gdb.read_line_vv('L100') # read all channels in line "L100"
data = gdb.read_line_vv(681) # read all channels in line symbol 681
data = gdb.read_line_vv('L100','X') # read channel 'X' from line 'L100'
data = gdb.read_line_vv('L100',2135) # read channel symbol 2135 from 'L100"
data = gdb.read_line_vv('L100',channels=['X','Y','Z']) # read a list of channels to (n,3) array
data = gdb.read_line_vv('L100','X',np.int32) # read channel 'X' into integer array
.. versionadded:: 9.2
"""
ln, ls = self.line_name_symb(line)
if channels is None:
channels = self.sorted_chan_list()
else:
channels = self._to_string_chan_list(channels)
# make up channel list, expanding VA channels
ch_names, ch_symb, c_type = self._expand_chan_list(channels)
if chan_dtypes:
dtype = None
elif dtype is None:
dtype = np.float64
# read the data into vv
chvv = []
for c in ch_names:
cs = self._db.find_symb(c, gxapi.DB_SYMB_CHAN)
vv = self.read_channel_vv(ls, cs, dtype=dtype)
chvv.append((c, vv))
# resample?
if common_fid:
# determine fiducial range from data
start = gxapi.GS_R8MX
incr = gxapi.GS_R8MX
fend = gxapi.GS_R8MN
for vv in chvv:
if vv[1].length > 0:
fd = vv[1].fid
if fd[0] != gxapi.rDUMMY:
if fd[0] < start:
start = fd[0]
if fd[1] < incr:
incr = fd[1]
dend = start + incr * (vv[1].length - 1)
if dend > fend:
fend = dend
if fid is None:
if start == gxapi.GS_R8MX:
fid = (0.0, 1.0)
else:
fid = (start, incr)
if start == gxapi.GS_R8MX:
nvd = 0
else:
nvd = math.ceil(max((fend - fid[0] - sys.float_info.epsilon), 0) / fid[1]) + 1
for vv in chvv:
vv[1].refid(fid, nvd)
return chvv
[docs] def scan_line_fid(self, line, channels=None):
"""
Scan channels in a line and return the smallest common fid, line length, data width, list of channels
:param line: line to read, string or symbol number
:param channels: list of channels, strings or symbol number. If empty, read all channels
:returns: (fid_start, fid_increment, fid_last, data_width, channel_list)
.. versionadded:: 9.4
"""
if channels is None:
channels = self.sorted_chan_list()
else:
channels = self._to_string_chan_list(channels)
if len(channels) == 0:
return 0, 1., 0, 0, []
ln, ls = self.line_name_symb(line)
cs = self.channel_name_symb(channels[0])[1]
fid_start, fid_increment = self.channel_fid(ls, cs)
self.lock_read_(cs)
nrows = self.gxdb.get_channel_length(ls, cs)
self.unlock_(cs)
if nrows == 0:
fid_last = fid_start
else:
fid_last = fid_start + fid_increment * (nrows - 1)
n_width = self.channel_width(cs)
for c in channels[1:]:
cs = self.channel_name_symb(c)[1]
n_width += self.channel_width(cs)
c_start, c_increment = self.channel_fid(ls, cs)
if c_start != gxapi.rDUMMY:
self.lock_read_(cs)
c_last = c_start + c_increment * (self.gxdb.get_channel_length(ls, cs) - 1)
self.unlock_(cs)
if fid_start == gxapi.rDUMMY or c_start < fid_start:
fid_start = c_start
if fid_increment == gxapi.rDUMMY or c_increment < fid_increment:
fid_increment = c_increment
if c_last > fid_last:
fid_last = c_last
if fid_start == gxapi.rDUMMY or fid_increment == gxapi.rDUMMY:
return 0., 1., 0., 0, channels
return fid_start, fid_increment, fid_last, n_width, channels
[docs] def readLine(self, *args, **kwargs):
"""
.. deprecated:: 9.2 use read_line()
"""
return self.read_line(*args, **kwargs)
@classmethod
def _num_rows_from_fid(cls, src_fid_start, src_fid_last, fid):
return int((src_fid_last - fid[0])/fid[1] + 1.5)
[docs] def read_line(self, line, channels=None, dtype=None, fid=None, dummy=None):
"""
Read a line of data into a numpy array.
:param line: line to read, string or symbol number
:param channels: list of channels, strings or symbol number. If empty, read all channels
:param dtype: numpy data type for the array, default np.float64 for multi-channel data,
data type for single channel data. Use "<Unnn" for string type.
:param fid: required fiducial as tuple (start,incr), default smallest in data
:param dummy: dummy_handling for multi-channel read, default leaves dummies in place.:
======================== ===================================================
READ_REMOVE_DUMMYROWS remove rows with dummies, fiducials lose meaning
READ_REMOVE_DUMMYCOLUMNS remove columns with dummies
======================== ===================================================
:returns: 2D numpy array shape(records,channels), list of channel names, (fidStart,fidIncr)
:raises: GdbException if first channel requested is empty
VA channels are expanded by element with channel names name[0], name[1], etc.
This method is intended for relatively simple databases in relatively simple applications.
If your database has a lot of channels, or wide array channels it will be more efficient
to read and work with just the channels you need. See `read_channel`, `read_channel_vv`
and `read_channel_va`.
Examples:
.. code::
# npd - returned numpy array shape (n, number of channels)
# ch - list of returned channels names, array channels expanded to array[0], array[1], ...
# fid - tuple (fidStart,fidIncrement), channels resampled as necessary
npd,ch,fid = gdb.read_line('L100') # read all channels in line "L100"
npd,ch,fid = gdb.read_line(681) # read all channels in line symbol 681
npd,ch,fid = gdb.read_line('L100','X') # read channel 'X' from line 'L100'
npd,ch,fid = gdb.read_line('L100',2135) # read channel symbol 2135 from 'L100"
npd,ch,fid = gdb.read_line('L100',channels=['X','Y','Z']) # read a list of channels to (n,3) array
npd,ch,fid = gdb.read_line('L100','X',np.int32) # read channel 'X' into integer array
.. versionadded:: 9.1
"""
ls = self.line_name_symb(line)[1]
fid_start, fid_incr, fid_last, ncols, channels = self.scan_line_fid(line, channels)
if fid is None:
fid = (fid_start, fid_incr)
nrows = self._num_rows_from_fid(fid_start, fid_last, fid)
if nrows == 0 or ncols == 0:
if len(channels) == 0:
data = np.array([], dtype=dtype)
else:
data = np.array([], dtype=dtype).reshape((-1, len(channels)))
return data, channels, fid
# read to a numpy array
npd = np.empty((nrows, ncols), dtype=dtype)
if npd.dtype == np.float32 or npd.dtype == np.float64:
dummy_value = np.nan
else:
dummy_value = gxu.gx_dummy(npd.dtype)
all_empty = True
ch_names = []
icol = 0
for ch in channels:
cn, cs = self.channel_name_symb(ch)
w = self.channel_width(cs)
if w == 1:
vv = self.read_channel_vv(ls, cs, dtype=npd.dtype)
if vv.length > 0:
all_empty = False
vv.refid(fid, nrows)
npd[:, icol] = vv.np
icol += 1
ch_names.append(cn)
else:
va = self.read_channel_va(ls, cs, dtype=npd.dtype)
if va.length > 0:
all_empty = False
va.refid(fid, nrows)
npd[:, icol:icol+w] = va.np
icol += w
for i in range(w):
ch_names.append('{}[{}]'.format(cn, str(i)))
nch = len(ch_names)
if all_empty:
npd = np.empty((0, ncols), dtype=dtype)
elif dummy:
# dummy handling
if dummy == READ_REMOVE_DUMMYCOLUMNS:
n_ok = 0
# shift data and channel names to remove columns containing a dummy
for i in range(nch):
if np.isnan(dummy_value):
if np.isnan(npd[:, i]).any():
continue
elif dummy_value in npd[:, i]:
continue
if n_ok != i:
npd[:, n_ok] = npd[:, i]
ch_names[n_ok] = ch_names[i]
n_ok += 1
if n_ok != nch:
npd = npd[:, 0:n_ok]
ch_names = ch_names[0:n_ok]
elif dummy == READ_REMOVE_DUMMYROWS:
if np.isnan(dummy_value):
mask = np.apply_along_axis(lambda a: not (np.isnan(a).any()), 1, npd)
else:
mask = np.apply_along_axis(lambda a: not (dummy_value in a), 1, npd)
npd = npd[mask, :]
fid = (0.0, 1.0)
else:
raise GdbException(_t('Unrecognized dummy={}').format(dummy))
return npd, ch_names, fid
[docs] def read_line_dataframe(self, line, channels=None, fid=None):
"""
Read a line of data into a Pandas DataFrame
:param line: line to read, string or symbol number
:param channels: list of channels, strings or symbol number. If empty, read all channels
:param fid: required fiducial as tuple (start,incr), default smallest in data
:returns: Pandas DataFrame, list of channel names, (fidStart,fidIncr)
:raises: GdbException if first channel requested is empty
VA channels are expanded by element with channel names name[0], name[1], etc.
This method can be used to conveniently get a table structure of all data corresponding to the
native types of the channels. It is however not necessarily the most efficient way to get at the data.
If your database has a lot of channels, or wide array channels it will be more efficient
to read and work with just the channels you need. See `read_channel`, `read_channel_vv`
and `read_channel_va`. This method also does not currently support dummy removal in the same
way as `read_line`.
Examples:
.. code::
# df - Pandas DataFrame
# ch - list of returned channels names
# fid - tuple (fidStart,fidIncrement), channels resampled as necessary
df,ch,fid = gdb.read_line('L100') # read all channels in line "L100"
df,ch,fid = gdb.read_line(681) # read all channels in line symbol 681
df,ch,fid = gdb.read_line('L100','X') # read channel 'X' from line 'L100'
df,ch,fid = gdb.read_line('L100',2135) # read channel symbol 2135 from 'L100"
df,ch,fid = gdb.read_line('L100',channels=['X','Y','Z']) # read a list of channels to (n,3) array
.. versionadded:: 9.5
"""
df = pd.DataFrame()
ls = self.line_name_symb(line)[1]
fid_start, fid_incr, fid_last, ncols, channels = self.scan_line_fid(line, channels)
ch_names = []
if fid is None:
fid = (fid_start, fid_incr)
nrows = self._num_rows_from_fid(fid_start, fid_last, fid)
if nrows == 0 or ncols == 0:
for ch in channels:
cn, cs = self.channel_name_symb(ch)
w = self.channel_width(cs)
if w == 1:
df[cn] = ()
ch_names.append(cn)
else:
for i in range(w):
va_cn = '{}[{}]'.format(cn, str(i))
df[va_cn] = ()
ch_names.append(va_cn)
return df, ch_names, fid
icol = 0
all_empty = True
for ch in channels:
cn, cs = self.channel_name_symb(ch)
w = self.channel_width(cs)
if w == 1:
vv = self.read_channel_vv(ls, cs)
if vv.length > 0:
all_empty = False
vv.refid(fid, nrows)
df[cn] = vv.np
icol += 1
ch_names.append(cn)
else:
va = self.read_channel_va(ls, cs)
if va.length > 0:
all_empty = False
va.refid(fid, nrows)
icol += w
for i in range(w):
va_cn = '{}[{}]'.format(cn, str(i))
df[va_cn] = va.np[:, i]
ch_names.append(va_cn)
if all_empty:
# Delete one and only row
df = df.drop([0])
return df, ch_names, fid
[docs] def write_channel_vv(self, line, channel, vv):
"""
Write data to a single channel.
:param line: line name or symbol
:param channel: channel name or symbol
:param vv: vv data to write
.. versionadded:: 9.2
"""
ln, ls = self.line_name_symb(line, create=True)
try:
cn, cs = self.channel_name_symb(channel)
except GdbException:
if type(channel) is str:
cs = self.new_channel(channel, vv.dtype)
cn = channel
else:
raise
if cn in self.xyz_channels:
self.clear_extent()
self.lock_write_(cs)
try:
self._db.put_chan_vv(ls, cs, vv.gxvv)
finally:
self.unlock_(cs)
if vv.unit_of_measure:
Channel(self, cs).unit_of_measure = vv.unit_of_measure
[docs] def write_channel_va(self, line, channel, va):
"""
Write VA data to a single channel.
:param line: line name or symbol
:param channel: channel name or symbol
:param va: va data to write
.. versionadded:: 9.2
"""
ln, ls = self.line_name_symb(line, create=True)
try:
cn, cs = self.channel_name_symb(channel)
except GdbException:
if type(channel) is str:
cs = self.new_channel(channel, va.dtype, array=va.width)
else:
raise
self.lock_write_(cs)
try:
self._db.put_chan_va(ls, cs, va.gxva)
finally:
self.unlock_(cs)
if va.unit_of_measure:
Channel(self, cs).unit_of_measure = va.unit_of_measure
[docs] def writeDataChan(self, *args, **kwargs):
"""
.. deprecated:: 9.2 use `write_channel`
"""
self.write_channel(*args, **kwargs)
[docs] def write_channel(self, line, channel, data, fid=(0.0, 1.0), unit_of_measure=None):
"""
Write data to a single channel.
:param line: line name or symbol
:param channel: channel name or symbol
:param data: numpy array (2D for VA channel), or a list
:param fid: tuple (fid start, increment), default (0.0,1.0)
:param unit_of_measure: data unit of measurement
.. versionchanged:: 9.3 support for setting channel from a list
added unit_of_measure
.. versionadded:: 9.1
"""
ln, ls = self.line_name_symb(line, create=True)
if not isinstance(data, np.ndarray):
data = np.array(data)
if isinstance(channel, str):
cn = channel
cs = self.new_channel(channel, data.dtype, array=_va_width(data))
else:
cn, cs = self.channel_name_symb(channel)
if cn in self.xyz_channels:
self.clear_extent()
if _va_width(data) == 0:
# no data to write
return
w = self.channel_width(cs)
if w != _va_width(data):
raise GdbException(
_t("Array data width {} does not fit into channel '{}' with width {}").
format(_va_width(data), cn, w))
# 1D channel
if w == 1:
# get a VV of the data
vv = gxvv.GXvv(data, fid=fid)
self.lock_write_(cs)
try:
self._db.put_chan_vv(ls, cs, vv.gxvv)
finally:
self.unlock_(cs)
else:
# get a VA of the data
va = gxva.GXva(data, fid=fid)
self.lock_write_(cs)
try:
self._db.put_chan_va(ls, cs, va.gxva)
finally:
self.unlock_(cs)
if unit_of_measure:
Channel(self, cs).unit_of_measure = unit_of_measure
[docs] def write_line_vv(self, line, chan_data):
"""
Write data to multiple channels in a line. If no channel list is provided it assumes that the
data is for all channels from the line, the compliment of read_line().
:param line: line to write to, name or symbol
:param chan_data: numpy array shape (records,channels). If single dimension, one channel.
Channels are created if they do not exist. VA channels must exist.
:param chan_data: list of tuples [(channel_name, vv), ]
.. note::
chan_data may contain VA data, which is defined by slice (ie. name[0], name[4]...).
If VA data is included the VA channels must already exist.
.. versionadded:: 9.2
"""
for chvv in chan_data:
ch = chvv[0]
vv = chvv[1]
self.write_channel_vv(line, ch, vv)
[docs] def write_line(self, line, data, channels=None, fid=(0.0, 1.0)):
"""
Write data to a multiple channels in a line. If no channel list is provided it assumes that the
data is for all channels from the line, the compliment of read_line().
:param line: line to write to, name or symbol
:param data: numpy array shape (records,channels). If single dimension, one channel
:param channels: channel name or symbol list, or a single name/symbol. If a single name is specified
for multi-column data, a VA channel is assumed. If None, a sorted list of all channels
is assumed.
:param fid: option fid tuple (start, increment), default (0.0,1.0)
.. versionadded:: 9.1
"""
if type(channels) is str:
self.write_channel(line, channels, data, fid=fid)
else:
if channels is None:
channels = self.sorted_chan_list()
else:
channels = self._to_string_chan_list(channels)
if not isinstance(data, np.ndarray):
data = np.array(data)
if data.ndim == 1:
data = data.reshape((-1, 1))
# ensure data matches channels
np_data = 0
for chan in channels:
try:
ch, cs = self.channel_name_symb(chan)
w = self.channel_width(cs)
except GdbException:
w = 1
np_data += w
# channel - data mismatch
if data.shape[1] != np_data:
raise GdbException(_t('Data dimension ({}) does not match data required by channels ({}).').
format(data.shape, channels))
# all good, write the data
np_index = 0
for chan in channels:
try:
ch, cs = self.channel_name_symb(chan)
w = self.channel_width(cs)
except GdbException:
w = 1
cs = chan
self.write_channel(line, cs, data[:, np_index: np_index + w], fid=fid)
np_index += w
[docs] def list_values(self, chan, umax=1000, selected=True, dupl=50, progress=None, stop=None):
"""
Build a list of unique values in a channel. Uniqueness depends on the current display format for
the field.
:param chan: channel to scan
:param umax: maximum values allowed, once this maximum is reached scanning stops, default 1000
:param selected: `True` to scan only selected lines
:param dupl: Stop growing list after this many lines fail to grow the list, 0 scans all lines
:param progress: progress reporting function
:param stop: stop check function
:returns: list of values, represented as a string
.. versionadded:: 9.1
"""
lines = list(self.list_lines(select=selected))
cn, cs = self.channel_name_symb(chan)
details = self.channel_details(cs)
dtype = np.dtype('<U{}'.format(details.get('width')))
lines.sort(key=str.lower)
vset = []
n = 0
nset = -1
ndup = 0
for l in lines:
try:
d, c, f = self.read_line(l, cs, dtype=dtype)
except GdbException:
continue
if d.shape[0] == 0:
continue
d = np.unique(d)
vset = np.append(vset, d)
vset = np.unique(vset)
if vset.shape[0] > umax:
break
if dupl > 0:
if vset.shape[0] == nset:
ndup += 1
if ndup > dupl:
break
else:
ndup = 0
nset = vset.shape[0]
n += 1
if progress:
progress('Scanning unique values in "{}", {}'.format(cn, str(l)), (n * 100.0) / len(lines))
if stop:
if stop():
return vset.tolist()
if vset.shape[0] > umax:
vset = vset[:umax]
return vset.tolist()
[docs]class Channel:
"""
Class to work with database channels. Use constructor `Channel.new` to create a new channel.
Use instance properties to work with channel properties.
:param gdb: database instance
:param name: channel name string, must exist - see `new()` to create a new channel
.. versionadded:: 9.3
"""
def _get(self, fn):
self.gdb.lock_read_(self._symb)
try:
return fn(self._symb)
finally:
self.gdb.unlock_(self._symb)
def _get_str(self, fn):
self.gdb.lock_read_(self._symb)
try:
sr = gxapi.str_ref()
fn(self._symb, sr)
return sr.value
finally:
self.gdb.unlock_(self._symb)
[docs] def lock_set_(self, fn, v):
self.gdb.lock_write_(self._symb)
try:
fn(self._symb, v)
finally:
self.gdb.unlock_(self._symb)
def __repr__(self):
return "{}({})".format(self.__class__, self.__dict__)
def __str__(self):
return self.name
[docs] def __init__(self, gdb, name):
self.gdb = gdb
name, self._symb = gdb.channel_name_symb(name)
[docs] @classmethod
def new(cls, gdb, name, dtype=np.float64, array=1, dup=None, details=None, replace=False, unit_of_measure=None):
"""
Create a new channel.
:param gdb: Geosoft_gdb instance
:param name: channel name
:param dtype: numpy data type, defaule np.float64
:param array: array size, default 1
:param dup: duplicate properties of this channal (name, symbol or Channel)
:param details: dictionary of other channel properties - see `Geosoft_gdb.set_channel_details`
:param replace: `True` to replace existing channel. Existing channel information and data is lost.
default is `False`.
:param unit_of_measure: unit of measurement of the data
:return: Channel instance
"""
if gdb.exist_symb_(name, gxapi.DB_SYMB_CHAN):
if replace:
gdb.delete_channel(name)
else:
raise GdbException(_t("Cannot replace existing channel '{}'".format(name)))
symb = gdb.new_channel(name, dtype, array=array, dup=dup)
if details:
gdb.set_channel_details(symb, details)
chan = cls(gdb, name)
if unit_of_measure:
chan.unit_of_measure = unit_of_measure
return chan
@property
def name(self):
"""
Channel name.
.. versionadded:: 9.3
"""
return self._get_str(self.gdb.gxdb.get_chan_name)
@name.setter
def name(self, name):
name = str(name)
if name != self.name:
if not self.gdb.gxdb.is_chan_name(name):
raise GdbException(_t('Invalid channel name \'{}\''.format(name)))
if self.gdb.exist_symb_(name, gxapi.DB_SYMB_CHAN):
raise GdbException(_t('Cannot rename to an existing channel name \'{}\''.format(name)))
self.lock_set_(self.gdb.gxdb.set_chan_name, name)
@property
def symbol(self):
"""
Channel symbol
.. versionadded:: 9.3
"""
return self._symb
@property
def array(self):
"""
Array channel width, 1 for non-array channels
.. versionadded:: 9.3
"""
return self.gdb.channel_width(self._symb)
@property
def is_array(self):
"""
`True` if this is an array channel
.. versionadded:: 9.3
"""
return bool(self.array > 1)
@property
def decimal(self):
"""
Number of displayed decimal places, can be set.
.. versionadded:: 9.3
"""
return self.gdb.gxdb.get_chan_decimal(self._symb)
@decimal.setter
def decimal(self, value):
self.lock_set_(self.gdb.gxdb.set_chan_decimal, value)
@property
def format(self):
"""
Channel display format:
============= ========================================
FORMAT_NORMAL normal decimal or integer format
FORMAT_EXP exponential
FORMAT_TIME geosoft time (HH:MM:SS.ssss)
FORMAT_DATE date (YYYY/MM/DD)
FORMAT_GEOGR geographic (deg.mm.ss.ssss)
FORMAT_SIGDIG decimals is number of significant digits
FORMAT_HEX hexadecimal
============= ========================================
.. versionadded:: 9.3
"""
return self.gdb.gxdb.get_chan_format(self._symb)
@format.setter
def format(self, value):
self.lock_set_(self.gdb.gxdb.set_chan_format, value)
@property
def label(self):
"""
Channel label used in display graphics, normally the same as the channel name.
Can be set.
.. versionadded:: 9.3
"""
sr = gxapi.str_ref()
self.gdb.gxdb.get_chan_label(self._symb, sr)
return sr.value
@label.setter
def label(self, value):
self.lock_set_(self.gdb.gxdb.set_chan_label, value)
@property
def type(self):
"""
Geosoft data type.
.. versionadded:: 9.3
"""
return self.gdb.gxdb.get_chan_type(self._symb)
@property
def unit_of_measure(self):
"""
Unit of measure, can be set.
.. versionadded:: 9.3
"""
sr = gxapi.str_ref()
self.gdb.gxdb.get_chan_unit(self._symb, sr)
return sr.value
@unit_of_measure.setter
def unit_of_measure(self, value):
self.lock_set_(self.gdb.gxdb.set_chan_unit, value)
@property
def width(self):
"""
Display window width in characters.
Can be set.
.. versionadded:: 9.3
"""
return self.gdb.gxdb.get_chan_width(self._symb)
@width.setter
def width(self, value):
self.lock_set_(self.gdb.gxdb.set_chan_width, value)
@property
def class_(self):
"""
Class name to which this channel is associated.
Can be set.
.. versionadded:: 9.3
"""
sr = gxapi.str_ref()
self.gdb.gxdb.get_chan_class(self._symb, sr)
return sr.value
@class_.setter
def class_(self, value):
self.lock_set_(self.gdb.gxdb.set_chan_class, value)
@property
def protect(self):
"""
`True` if this channel is protected from modification.
Can be set.
.. versionadded:: 9.3
"""
return bool(self.gdb.gxdb.get_chan_protect(self._symb))
@protect.setter
def protect(self, value):
if value:
value = 1
else:
value = 0
self.lock_set_(self.gdb.gxdb.set_chan_protect, value)
@property
def locked(self):
"""
True if symbol is locked. Use property :any:`lock` to determine if read or write lock, or to
set the lock.
Setting to `False` unlocks the symbol.
.. versionadded:: 9.3
"""
return self.lock != SYMBOL_LOCK_NONE
@locked.setter
def locked(self, value):
if not value:
self.gdb.unlock_(self._symb)
else:
raise GdbException(_t('Use property \'lock\' to set SYMBOL_READ or SYMBOL_WRITE lock.'))
@property
def lock(self):
"""
Lock setting:
| -1 unlocked (SYMBOL_LOCK_NONE)
| 0 read-locked (SYMBOL_LOCK_READ)
| 1 write-locked (SYMBOL_LOCK_WRITE)
Can be set.
.. versionadded 9.3
"""
return self.gdb.gxdb.get_symb_lock(self.symbol)
@lock.setter
def lock(self, value):
if self.lock != value:
self.gdb.unlock_(self.symbol)
self.gdb.gxdb.lock_symb(self.symbol, value, gxapi.DB_WAIT_INFINITY)
[docs] def delete(self):
"""
Delete the channel and all associated data. After calling this method this
channel instance is no longer valid.
.. versionadded:: 9.3
"""
if self.protect:
raise GdbException(_t("Cannot delete protected channel '{}'".format(self.name)))
self.lock = SYMBOL_LOCK_WRITE
self.gdb.gxdb.delete_symb(self._symb)
self._symb = gxapi.NULLSYMB
[docs]class Line:
"""
Class to work with database lines. Use constructor `Line.new` to create a new line.
Use instance properties to work with line properties.
:param gdb: `Geosoft_gdb` instance
:param name: line name string, must exist - see `new()` to create a new line
.. versionadded:: 9.3
"""
def _get(self, fn):
self.gdb.lock_read_(self._symb)
try:
return fn(self._symb)
finally:
self.gdb.unlock_(self._symb)
def _get_str(self, fn):
self.gdb.lock_read_(self._symb)
try:
sr = gxapi.str_ref()
fn(self._symb, sr)
return sr.value
finally:
self.gdb.unlock_(self._symb)
[docs] def lock_set_(self, fn, v):
"""write_lock, set and release a gdb attribute that requires locking to write."""
self.gdb.lock_write_(self._symb)
try:
fn(self._symb, v)
finally:
self.gdb.unlock_(self._symb)
def __repr__(self):
return "{}({})".format(self.__class__, self.__dict__)
def __str__(self):
return self.name
[docs] def __init__(self, gdb, name):
self.gdb = gdb
name, self._symb = gdb.line_name_symb(name)
[docs] @classmethod
def new(cls, gdb, name, linetype=None, group=None, dup=None, replace=False):
"""
Create a new line.
:param gdb: `Geosoft_gdb` instance
:param name: line name
:param linetype: line type for creating a new line, ignored if group defines
================= =========================================
SYMB_LINE_NORMAL normal lines, name is a string
SYMB_LINE_FLIGHT flight lines, first letter is line type
================= =========================================
:param group: group name for a grouped class
:param dup: duplicate properties of this line (name, symbol or Line).
:param replace: `True` to replace line if it exists. Default is `False` .
:returns: Line instance
.. versionadded:: 9.3
"""
if group is None and dup is None and not is_valid_line_name(name):
raise GdbException(_t('Invalid line name: {}'.format(name)))
if gdb.exist_symb_(name, gxapi.DB_SYMB_LINE):
if replace:
gdb.delete_line(name)
else:
raise GdbException(_t("Cannot replace existing line '{}'".format(name)))
gdb.new_line(name, linetype, group=group, dup=dup)
return cls(gdb, name)
@property
def name(self):
"""
Line name, consistent with names constructed by `create_line_name`.
To change a line name change the type, number or version.
.. versionadded:: 9.3
"""
return self._get_str(self.gdb.gxdb.get_symb_name)
@property
def symbol(self):
"""
Line symbol
.. versionadded:: 9.3
"""
return self._symb
@property
def type(self):
"""
Line type, which can be set:
| LINE_TYPE_NORMAL
| LINE_TYPE_BASE
| LINE_TYPE_TIE
| LINE_TYPE_TEST
| LINE_TYPE_TREND
| LINE_TYPE_SPECIAL
| LINE_TYPE_RANDOM
.. versionadded:: 9.3
"""
return self._get(self.gdb.gxdb.line_type)
@type.setter
def type(self, value):
self.lock_set_(self.gdb.gxdb.set_line_type, value)
@property
def category(self):
"""
Line category, which can be set:
| LINE_CATAGORY_FLIGHT
| LINE_CATEGORY_GROUP
| LINE_CATEGORY_NORMAL
.. versionadded:: 9.3
"""
return self._get(self.gdb.gxdb.line_category)
@property
def date(self):
"""
Line date. Can be set.
.. versionadded:: 9.3
"""
return self._get(self.gdb.gxdb.line_date)
@date.setter
def date(self, value):
self.lock_set_(self.gdb.gxdb.set_line_date, value)
@property
def flight(self):
"""
Line flight number (flight/cruise/survey event). Can be set.
.. versionadded:: 9.3
"""
return self._get(self.gdb.gxdb.line_flight)
@flight.setter
def flight(self, value):
self.lock_set_(self.gdb.gxdb.set_line_flight, value)
@property
def number(self):
"""
Line number. Can be set
.. versionadded:: 9.3
"""
return self._get(self.gdb.gxdb.line_number)
@number.setter
def number(self, value):
self.lock_set_(self.gdb.gxdb.set_line_num, int(value))
@property
def version(self):
"""
Line version number. Can be set.
.. versionadded:: 9.3
"""
return self._get(self.gdb.gxdb.line_version)
@version.setter
def version(self, value):
self.lock_set_(self.gdb.gxdb.set_line_ver, value)
@property
def grouped(self):
"""
True if this is a grouped line.
.. versionadded:: 9.3
"""
return self.category == LINE_CATEGORY_GROUP
@property
def group(self):
"""
The lines group class name, '' for a group lines (LINE_CATEGORY_GROUP).
Only works for lines that are part of a group.
Can be set.
.. versionadded:: 9.3
"""
if self.category == LINE_CATEGORY_GROUP:
return self._get_str(self.gdb.gxdb.get_group_class)
else:
return None
@group.setter
def group(self, value):
if self.category == LINE_CATEGORY_GROUP:
self.lock_set_(self.gdb.gxdb.set_group_class, value)
else:
raise GdbException(_t('Line \'{}\' is not a grouped line.'.format(self.name)))
@property
def selected(self):
"""True if this line is selected, can be set."""
return self.gdb.gxdb.get_line_selection(self._symb) == gxapi.DB_LINE_SELECT_INCLUDE
@selected.setter
def selected(self, value):
if bool(value):
self.gdb.gxdb.set_line_selection(self._symb, gxapi.DB_LINE_SELECT_INCLUDE)
else:
self.gdb.gxdb.set_line_selection(self._symb, gxapi.DB_LINE_SELECT_EXCLUDE)
@property
def locked(self):
"""
True if symbol is locked. Use property :any:`lock` to determine if read or write lock, or to
set the lock.
Setting to `False` unlocks the symbol.
.. versionadded:: 9.3
"""
return self.lock != SYMBOL_LOCK_NONE
@locked.setter
def locked(self, value):
if not value:
self.gdb.unlock_(self._symb)
else:
raise GdbException(_t('Use property \'lock\' to set SYMBOL_READ or SYMBOL_WRITE lock.'))
@property
def lock(self):
"""
Lock setting:
| -1 unlocked (SYMBOL_LOCK_NONE)
| 0 read-locked (SYMBOL_LOCK_READ)
| 1 write-locked (SYMBOL_LOCK_WRITE)
Can be set.
.. versionadded 9.3
"""
return self.gdb.gxdb.get_symb_lock(self.symbol)
@lock.setter
def lock(self, value):
if self.lock != value:
self.gdb.unlock_(self.symbol)
self.gdb.gxdb.lock_symb(self.symbol, value, gxapi.DB_WAIT_INFINITY)
[docs] def delete(self):
"""
Delete the line and all data associated with the line. After calling this method this
line instance is no longer valid.
.. versionadded:: 9.3
"""
self.gdb.delete_line(self.symbol)
self._symb = gxapi.NULLSYMB
[docs] def delete_data(self):
"""
Delete all data in a line but keep the line
.. versionadded:: 9.6
"""
self.gdb.delete_line_data(self.symbol)
# =================================
# methods that work with line data
[docs] def bearing(self):
"""
Return bearing of a line based on location of the first and last point in the line.
Returns None if the line is empty or first and last points are the same.
.. versionadded:: 9.3
"""
x, y, z = self.gdb.xyz_channels
x = self.gdb.channel_name_symb(x)[1]
y = self.gdb.channel_name_symb(y)[1]
self.gdb.lock_read_(x)
self.gdb.lock_read_(y)
try:
bearing = gxapi.GXDU.direction(self.gdb.gxdb, self._symb, x, y)
finally:
self.gdb.unlock_(y)
self.gdb.unlock_(x)
self.lock_set_(self.gdb.gxdb.set_line_bearing, bearing)
if bearing == gxapi.rDUMMY:
return None
return bearing