# Copyright (C) 2016-2023 Deep Genomics Inc. All Rights Reserved.
from __future__ import annotations
import numpy as np
from . import _cxx
from ._cxx_util import mock, mock_result, mock_unreachable
from .interval import Interval
#########################################################################
[docs]
@_cxx.register
class GenomeTrackBuilder(_cxx.GenomeTrackBuilder):
"""Builds a .gtrack file.
Creating a .gtrack file with :py:class:`~genome_kit.GenomeTrackBuilder`
generally involves four steps:
1. Open the file for writing.
2. Set further configuration options.
3. Set data on intervals, either as numpy arrays or from disk.
4. Call :py:meth:`~genome_kit.GenomeTrackBuilder.finalize`.
For example, assuming we had a list of genomic intervals and
corresponding data arrays we wanted to fill them with::
# 1. Open a .gtrack 8-bit encoding of floats in range [0, 1]
track = GenomeTrackBuilder("foo.gtrack", "f8", "single_stranded", Genome("hg19"))
# 2. Configure the track further
track.set_default_value(0) # Fill gaps in the data with 0
# 3. Set data for each interval
for interval, data in entries:
track.set_data(interval, data)
# 4. Finish writing the file
track.finalize()
Tracks can have any dimension (the `dim` argument) or resolution
(the `resolution` argument).
Resolution allows track data to be specified and stored at coarser
than 1bp resolution.
For example, a track with `resolution=5` can
load a WIG file with step/span of 5bp::
fixedStep chrom=chr1 start=1 step=5 span=5
0.57
0.23
...
The resulting track is equivalent to the following::
0 1 2 3 4 5 6 7 8 9 ... <-- position
0.57 0.57 0.57 0.57 0.57 0.23 0.23 0.23 0.23 0.23 ... <-- decoded value
"""
__slots__ = () # <--- ADDING SLOTS IS OK
[docs]
@mock
def __init__(self, outfile, etype, strandedness, reference_genome, dim=1, resolution=1): # pragma: no cover
"""Open a .gtrack for writing.
Parameters
----------
outfile : :py:class:`str`
The .gtrack file to create.
etype : :py:class:`str`
The encoding format.
See :py:class:`~genome_kit.GenomeTrack` for details.
strandedness : :py:class:`str`
Determines the order by which the data is applied.
* ``"single_stranded"``: both strands share the same data. The data is applied in Interval coordinate (reference strand) order.
* ``"strand_unaware"``: ignores the Interval strand, data is applied in Interval coordinate (reference strand) order.
* ``"strand_aware"``: data is applied from 5" end to 3" end (sense strand order).
Negative strand data from bedgraph or wig files must be provided from a separate file. When calling
:py:meth:`~genome_kit.GenomeTrackBuilder.set_data_from_bedgraph` or :py:meth:`~genome_kit.GenomeTrackBuilder.set_data_from_wig`, set ``"strand_unaware"``.
reference_genome : :class:`~genome_kit.Genome`
The reference genome for the track data to build.
dim : :py:class:`int`
Optional. The number of dimensions in the resulting track.
resolution : :py:class:`int`
Optional. The resolution of the track data, in genomic positions.
"""
[docs]
@mock
def set_default_value(self, value): # pragma: no cover
"""Set the fill value for gaps in the track.
The default value does not need to be encodable.
For example, a default value of -1.0 for a dictionary that
does not contain -1.0 is fine.
Parameters
----------
value : :py:class:`float`
The value to fill gaps with.
"""
mock_unreachable()
[docs]
@mock
def set_sparsity(self, min_run=48, min_delta=0.0): # pragma: no cover
"""Enable automatic sparsification of data intervals.
Reduces file size of tracks that are dominated by `default_value`,
with non-default values appearing at sparse intervals.
*Highly recommended for variableStep WIG files.*
The default behaviour of a track is to encode all data
passed to :py:meth:`~genome_kit.GenomeTrackBuilder.set_data`.
After calling :py:meth:`~genome_kit.GenomeTrackBuilder.set_sparsity`,
the encoder will analyze the data arrays passed in and avoid
encoding runs of `default_value`.
Specifically, if there exists a run of at least `min_run`
values such that each ``abs(data[i] - default_value) <= min_delta``,
then that entire run will be treated as `default_value` and
not explicitly stored in the file.
Note that, in order to exclude a run from the track, a 12-byte entry
must be addd to the track index, so setting `min_run` to very small values
can actually increase file size and will definitely increase query time.
Parameters
----------
min_run : :py:class:`int`
Optional. The minimum length of a run before it's excluded.
min_delta : :py:class:`float`
Optional. The minimum delta for a value to be considered distinct
from `default_value`.
"""
mock_unreachable()
[docs]
@mock
def set_clamping(self): # pragma: no cover
"""Clamp all values to the encodable range.
By default, a value outside the encodable range will trigger
an error, to notify the user that his/her data may not be encoded
properly by the current format.
Calling this method shows intent that values should be clamped
to the encodable range. Currently allowed only for f-type encodings.
"""
mock_unreachable()
[docs]
@mock
def set_dict(self, dict): # pragma: no cover
"""Set the dictionary for etypes `f2..8`.
The dictionary must be a float16/32 numpy array.
For etype f<n>, there must be `2**n` entries in the array.
Numerical values must be in non-decreasing order.
The last entry in `dict` can be :py:data:`np.nan`
if you wish to be able to encode NaN at specific positions.
Parameters
----------
dict : :py:class:`np.ndarray`
The array of values comprising the dictionary.
"""
mock_unreachable()
[docs]
@mock
def set_restriction(self, restriction): # pragma: no cover
"""Restrict data to a specific interval.
The restriction interval acts as a allowed region, so that all
data outside that region is either ignored or cropped away by
:py:meth:`~genome_kit.GenomeTrackBuilder.set_data`.
The restriction interval's strand is ignored, i.e. the restriction
always allows data on either strand.
This method is useful for making small version of full-sized data pipeline,
for the sake of creating unit tests or iterating faster during development.
If the restriction interval is not aligned with the track resolution, some
then the restriction interval will be expanded until it is aligned.
Parameters
----------
restriction : :py:class:`~genome_kit.Interval`
The interval on which to keep data.
"""
mock_unreachable()
[docs]
@mock
def set_data(self, interval, data): # pragma: no cover
"""Set the track data on a specific interval.
If the interval overlaps a previously-specified interval,
an error is raised.
The `dtype` of the data array must be compatible with the
`etype` of the track being built.
Normally, `data` must be an NxM array where N is
the length of `interval` and M is `dim`.
If track resolution R is greater than 1, then `data`
must be (N/R)xM where N is the length of `interval`.
The start and end of `interval` must be aligned to R.
In other words, data is supplied (and stored) at
a coarsened resolution.
Parameters
----------
interval : :py:class:`~genome_kit.Interval`
The interval to fill with data.
data : :py:class:`np.ndarray`
The data array, ordered according to the strandedness argument passed to the
builder's constructor.
"""
mock_unreachable()
[docs]
@mock
def set_data_from_wig(self, pos_strand_file, neg_strand_file=None): # pragma: no cover
"""Load all data from a .wig or .wig.gz file.
Loads all data from a fixedStep or variableStep WIG file.
Set ``strandedness="strand_unaware"`` in :py:meth:`~genome_kit.GenomeTrackBuilder.__init__`.
The number of data columns in the WIG must match the track dimension.
The span of the data must match the track resolution, with the exception
of the very last datum on a chromosome.
Adjacent WIG intervals will automatically be merged into a single GTRACK
interval, with different data encoded for each spanned position.
Consider calling :py:meth:`~genome_kit.GenomeTrackBuilder.set_sparsity`.
Parameters
----------
pos_strand_file : :py:class:`str`
Path to the .wig file for positive strand.
neg_strand_file : :py:class:`str`
Optional. Path to the .wig file for negative strand, if stranded.
"""
mock_unreachable()
[docs]
@mock
def set_data_from_bedgraph(self, pos_strand_file, neg_strand_file=None): # pragma: no cover
"""Load all data from a .bedgraph or .bedgraph.gz file.
Loads all data from a BEDGRAPH file.
A direct analogue of :py:meth:`~genome_kit.GenomeTrackBuilder.set_data_from_wig`.
Set ``strandedness="strand_unaware"`` in :py:meth:`~genome_kit.GenomeTrackBuilder.__init__`.
Parameters
----------
pos_strand_file : :py:class:`str`
Path to the .bedgraph file for positive strand.
neg_strand_file : :py:class:`str`
Optional. Path to the .bedgraph file for negative strand, if stranded.
"""
mock_unreachable()
[docs]
@mock
def set_data_from_bed(self, bedfile, categories=None): # pragma: no cover
"""Load all data from a .bed or .bed.gz file.
Loads each interval from a BED file and incorporates it into the track.
If `categories` is not specified, then the value contained in the `score`
column of the BED file will be stored in the corresponding interval.
If `categories` is specified, then the `name` column of the BED file
determines the value stored on each interval. The interval's `name` will
be used to find an index in `categories`, and that index will be stored.
The strandedness of the track must match the BED file (i.e. an unstranded
track expects the `strand` column to contain ``'.'``).
The track dimension must be 1.
The start and end of each interval must match the track resolution,
with the exception of the very last position of a chromosome.
Adjacent BED intervals will automatically be merged into a single contiguous
GTRACK interval.
In a GTRACK file, the data value of an interval is repeated for the size of the
interval, unlike in a BED file where it's specified only once per interval.
This can result in large GTRACK files, especially when at 1bp resolution (the default).
Consider using :py:meth:`~genome_kit.GenomeTrackBuilder.set_default_value`
and :py:meth:`~genome_kit.GenomeTrackBuilder.set_sparsity` to manage file size.
Parameters
----------
bedfile : :py:class:`str`
Path to the .bed file.
categories : :py:class:`list` of :py:class:`str`
Optional. List of categories to match with the `name` column
in the BED file.
"""
mock_unreachable()
[docs]
@mock
def flush(self): # pragma: no cover
"""Flush the currently accumulated data to disk.
This function can free up memory while building a track,
but should only be called immediately after an entire chromosome
has had its data set. Once this is called all chromosomes
that currently have *any* data on them become fixed.
"""
mock_unreachable()
[docs]
@mock
def finalize(self): # pragma: no cover
"""Finish writing the file and close it.
This function must be called last, to complete creation of the GTRACK file.
"""
mock_unreachable()
@mock
@property
def dim(self): # pragma: no cover
"""The dimensionality of the track.
Returns
-------
:py:class:`int`
The number of columns in each returned array.
"""
return mock_result(int)
@mock
@property
def resolution(self): # pragma: no cover
"""The resolution of the track.
Returns
-------
:py:class:`int`
The number of genomic positions spanned by each datum.
"""
return mock_result(int)
@mock
@property
def stranded(self): # pragma: no cover
"""The strandedness of the track.
Returns
-------
:py:class:`bool`
Whether the negative strand stores data unique from the positive strand.
"""
return mock_result(bool)
@mock
@property
def etype(self): # pragma: no cover
"""The encoding type of the track.
Returns
-------
:py:class:`str`
A string identifying the encoding type.
"""
return mock_result(str)
@mock
@property
def dtype(self): # pragma: no cover
"""The default dtype for the track.
Returns
-------
:py:class:`type`
The numpy dtype that this track will decode to by default.
"""
return mock_result(type)
@mock
@property
def reference_genome(self): # pragma: no cover
"""The name of the reference genome the track will be mapped to.
Returns
-------
:py:class:`str`
The name of the reference genome, e.g. "hg38".
"""
return mock_result(str)
@mock
@property
def filename(self): # pragma: no cover
"""The path to `.gtrack` file being written to.
Returns
-------
:py:class:`str`
The path to the file, e.g. "/data/phastcons.gtrack"
"""
return mock_result(str)
@mock
@property
def data_size(self): # pragma: no cover
"""The number of positions spanned by data in the GTRACK file.
If sparsity is disabled, the data size is the number of positions
spanned by data via
:py:meth:`~genome_kit.GenomeTrackBuilder.set_data`.
Enabling sparsity may decrease the data size.
Using resolution > 1bp will also decrease the data size, since data is
encoded into a coarse coordinate system, not at full genomic resolution.
The number of bits that the data occupies in the GTRACK file is
approximately `data_size * dim * bits_per_datum`, but the exact
computation is non-trivial, so portion of GTRACK file dedicated
to data may be higher or lower than this rough estimate.
Returns
-------
:py:class:`int`
The number of data spanned by encoded data.
"""
return mock_result(int)
@mock
@property
def index_size(self): # pragma: no cover
"""The number of separate intervals in the GTRACK index.
If sparsity is disabled, the index size is the number of times
:py:meth:`~genome_kit.GenomeTrackBuilder.set_data` was called.
Enabling sparsity may increase the index size.
Returns
-------
:py:class:`int`
The number of unique intervals in the index.
"""
return mock_result(int)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
[docs]
def __repr__(self):
return "<GenomeTrackBuilder '{}'>".format(self.filename)
#########################################################################
[docs]
@_cxx.register
class GenomeTrack(_cxx.GenomeTrack):
"""Access to a genomic data track.
A :py:class:`~genome_kit.GenomeTrack` provides a read-only view of a `.gtrack` file.
A track can be easily opened and queried::
>>> interval = Interval('chr17', '+', 41246881, 41246886, 'hg19')
>>> track = GenomeTrack("hg19.phastcons_mammal.f4.gtrack")
>>> track(interval)
array([[ 1. ],
[ 0.857],
[ 0.571],
[ 0.357],
[ 0.286]], dtype=float16)
>>> track(interval.as_opposite_strand())
array([[ 0.286],
[ 0.357],
[ 0.571],
[ 0.857],
[ 1. ]], dtype=float16)
You can also open a track with a specific scope::
>>> with GenomeTrack("foo.gtrack") as track:
... values = track(interval)
Tracks can be encoded in formats called `etypes`.
The available etypes trade representation power
for storage size and query speed::
etypes = [ # DECODABLE TYPES RANGE ENCODED TYPE
"m0", # float16/32, uint8, bool {1} 0-bit (mask)
"u1", # float16/32, uint8, bool {0..1} 1-bit
"u2", # float16/32, uint8 {0..3} 2-bit
"u3", # float16/32, uint8 {0..7} 3-bit
"u4", # float16/32, uint8 {0..15} 4-bit
"u5", # float16/32, uint8 {0..31} 5-bit
"u6", # float16/32, uint8 {0..63} 6-bit
"u8", # float16/32, uint8 {0..255} 8-bit
"i8", # float16/32, int8 {-128..127} 8-bit
"f2", # float16/32 [-65504,65504] 2-bit dictionary
"f3", # float16/32 [-65504,65504] 3-bit dictionary
"f4", # float16/32 [-65504,65504] 4-bit dictionary
"f5", # float16/32 [-65504,65504] 5-bit dictionary
"f6", # float16/32 [-65504,65504] 6-bit dictionary
"f8", # float16/32 [-65504,65504] 8-bit dictionary
"f16", # float16/32 [-65504,65504] 16-bit float
"f32", # float16/32 [-1.2e-38, 3.4e38] 32-bit float
]
The default dictionary for `f2..8` types is the range [0,1] computed
as ``np.linspace(0.0, 1.0, 2**num_bits, dtype=np.float16)``.
Decoded types are called `dtypes`, and are indeed represented by
numpy dtypes::
dtypes = [
np.bool_, # {False, True}
np.uint8, # {0..255}
np.int8, # {-128..127}
np.float16, # [-65504, 65504] IEEE 754 half-precision
np.float32, # [-1.2e-38, 3.4e38]
]
New tracks can be can created using :py:class:`~genome_kit.GenomeTrackBuilder`.
"""
__slots__ = () # <--- ADDING SLOTS IS OK
[docs]
@mock
def __init__(self, infile): # pragma: no cover
"""Open a .gtrack file.
Parameters
----------
infile : :py:class:`str`
The .gtrack file to open.
"""
[docs]
@mock
def __call__(self, interval, dtype=None, out=None): # pragma: no cover
"""Extract data from a genomic track.
Data is always returned in sense-strand order, meaning it is
always sensitive to the strand of the interval,
even on an unstranded track.
You can optionally specify the dtype that a track should be
decoded to.
Parameters
----------
interval : :py:class:`~genome_kit.Interval`
The stranded query interval.
dtype : :py:class:`type`
Optional. The numpy dtype of the resulting array. Ignored if
``out`` is provided.
out : :py:class:`~numpy.ndarray`
Optional. A numpy array to hold the result. If provided, it
overrides ``dtype`` and it must have a shape that the inputs
broadcast to. If not provided or None, a freshly-allocated array
is returned.
Returns
-------
:py:class:`~numpy.ndarray`
The decoded track data with ``len(interval)`` rows and ``dim`` columns.
"""
return mock_result(np.ndarray)
@mock
@property
def dim(self): # pragma: no cover
"""The dimensionality of the track.
Returns
-------
:py:class:`int`
The number of columns in each returned array.
"""
return mock_result(int)
@mock
@property
def resolution(self): # pragma: no cover
"""The resolution of the track.
Returns
-------
:py:class:`int`
The number of genomic positions spanned by each datum.
"""
return mock_result(int)
@mock
@property
def stranded(self): # pragma: no cover
"""The strandedness of the track.
Returns
-------
:py:class:`bool`
Whether the negative strand stores data unique from the positive strand.
"""
return mock_result(bool)
@mock
@property
def etype(self): # pragma: no cover
"""The encoding type of the track.
Returns
-------
:py:class:`str`
A string identifying the encoding type.
"""
return mock_result(str)
@mock
@property
def dtype(self): # pragma: no cover
"""The default dtype for the track.
Returns
-------
:py:class:`type`
The numpy dtype that this track will decode to by default.
"""
return mock_result(type)
@mock
@property
def reference_genome(self): # pragma: no cover
"""The name of the reference genome this track is mapped to.
Returns
-------
:py:class:`str`
The name of the reference genome, e.g. "hg38".
"""
return mock_result(str)
@mock
@property
def filename(self): # pragma: no cover
"""The path to `.gtrack` file from which data is extracted.
Returns
-------
:py:class:`str`
The path to the file, e.g. "/data/phastcons.gtrack"
"""
return mock_result(str)
[docs]
@mock
def intervals(self): # pragma: no cover
"""The list of intervals covered by this track.
Returns
-------
:py:class:`list` of :py:class:`~genome_kit.Interval`
The list of intervals with data in this track.
"""
return mock_result(str)
[docs]
def histogram(self, region, separate_dims=False):
"""Generate a histogram of the values in the track.
Returns a dictionary where each key is a float appearing
at least once in the track, and each value is a count of the
number of times it occurred. NaN values are omitted.
If `region` is a :py:class:`~genome_kit.Genome`, computes
a genome-wide histogram.
If `region` is an interval or list of intervals, computes the
histogram over that region only.
If `separate_dims` is true, returns a list of
:py:attr:`~genome_kit.GenomeTrack.dim` histograms,
one for each track dimension.
Parameters
----------
region : :py:class:`~genome_kit.Genome` | :py:class:`~genome_kit.Interval` | \
:py:class:`list` of :py:class:`~genome_kit.Interval`
Specific interval(s) upon which to compute
the histogram.
separate_dims : :py:class:`bool`
Optional. Whether to generate a single histogram over all
dimensions, or to return a separate histogram for each.
Returns
-------
:py:class:`dict` | :py:class:`list` of :py:class:`dict`
A dictionary ``{ value : count }`` where the `value`
appearing in the track `count` times. Zero
"""
# Get `intervals` list from the `interval` arg
from .genome import Genome
if isinstance(region, Genome):
intervals = self._intervals_for_genome(region)
elif isinstance(region, Interval):
intervals = [region]
elif isinstance(region, list):
intervals = region
else:
raise TypeError("Expected region to be a Genome, Interval, or list of Interval")
# Build an array where counters[i,j] is the number of times in dimension i that
# the float16 value represented by j=0..2^16 appeared.
counters = np.zeros((self.dim, 2**16), dtype=np.int64)
for interval in intervals:
counters += self._histogram_counters_for_interval(interval)
# Collapse each dimension into one single histogram by summing up the counts unless separate_dims=True
if separate_dims:
return [self._histogram_dict_from_counters(row) for row in counters]
else:
return self._histogram_dict_from_counters(counters.sum(axis=0))
def _intervals_for_genome(self, genome):
"""Returns a list of intervals, one for each chromosome, which effectively partition the entire reference genome
"""
if genome.refg != self.refg:
raise ValueError("Track reference genome '{}' does not match argument '{}'".format(self.refg, genome.refg))
strands = ('+', '-') if self.stranded else ('+', )
genome_intervals = []
for chrom in genome.chromosomes:
chrom_size = genome.chromosome_size(chrom)
for strand in strands:
genome_intervals.append(Interval(chrom, strand, 0, chrom_size, genome))
return genome_intervals
def _histogram_counters_for_interval(self, interval, slice_size=100000):
"""Returns an int64 array `counters` of size (dim, 2**16) where `counters[i,j]` is
the number of times float16 value indexed by j appeared in dimension i in the
given interval."""
counters = np.zeros((self.dim, 2**16), dtype=np.int64) # int64 for compatibility with np.bincount
# Outer loop partitions each interval into chunks, to keep peak memory consistent
for start in range(interval.start, interval.end, slice_size):
end = min(start + slice_size, interval.end)
# build an interval for this particular slice, and read the track values for it.
interval_slice = Interval(interval.chromosome, interval.strand, start, end, interval.refg)
slice_data_float16 = self(interval_slice, dtype=np.float16)
try:
slice_data_uint16 = np.frombuffer(
memoryview(slice_data_float16), dtype=np.uint16).reshape(slice_data_float16.shape)
except AttributeError:
slice_data_uint16 = np.frombuffer(
np.getbuffer(slice_data_float16), dtype=np.uint16).reshape(slice_data_float16.shape)
# increment the appropriate key in counters (int_indexes[dim_index]) for each dimension
# corresponding to the value of the track for that dimension
for dim in range(self.dim):
counters[dim] += np.bincount(slice_data_uint16[:, dim].astype(np.int64), minlength=2**16)
return counters
def _histogram_dict_from_counters(self, counters):
"""Converts a counters array into a dict, where each key is a float and each value is a non zero count.
"""
# counters[i] is the number of times the float16 value float_keys[i]
# appeared, for i=0..65536. Build a histogram of all non-nan keys.
# merge the count for -0.0 into the count for +0.0
pos_zero_index = 0 # index of +0.0
neg_zero_index = 2**15 # index of -0.0
counters[pos_zero_index] += counters[neg_zero_index]
counters[neg_zero_index] = 0
# generate key for all 65536 possible float16 values, so that
# float_keys[i] is the float16 value counted by counters[i].
int_keys = np.arange(2**16, dtype=np.uint16)
try:
float_keys = np.frombuffer(memoryview(int_keys), dtype=np.float16)
except AttributeError:
float_keys = np.frombuffer(np.getbuffer(int_keys), dtype=np.float16)
# make dict of all non-nan keys and all non-zero values
mask = np.logical_and(~np.isnan(float_keys), counters != 0)
histogram = dict(zip(float_keys[mask], counters[mask]))
return histogram
[docs]
@mock
def close(self): # pragma: no cover
"""Close the file handle.
Note that if you use a `with` statement the file handle is automatically closed::
# Open the file
with GenomeTrack('foo.gtrack') as track:
...
# <-- File is now closed
"""
mock_unreachable()
[docs]
@mock
@staticmethod
def gtrack_version(): # pragma: no cover
"""The `gtrack` file format version that this build supports.
Attempting to open a `gtrack` file of a mismatched version will raise an :py:exc:`IOError`.
Returns
-------
:py:class:`int`
The version number.
"""
return mock_result(int)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
[docs]
def __repr__(self):
return "<GenomeTrack '{}'>".format(self.filename)