Source code for genome_kit.variant

# Copyright (C) 2016-2023 Deep Genomics Inc. All Rights Reserved.
from __future__ import annotations

import pickle
from bisect import bisect_left
from builtins import zip
from operator import attrgetter

from . import _cxx
from ._cxx_util import mock, mock_result, mock_unreachable, strip_mock_bases
from .interval import Interval

_NUCLEOTIDE_SYMBOLS = 'ACGTWSMKRYBDHVNZ'


[docs] @strip_mock_bases @_cxx.register class Variant(_cxx.Variant, Interval): """A variant. Bases: :py:class:`~genome_kit.Interval` """ __slots__ = () # <--- NEW SLOTS OK # def __new__(self): pass # <--- DO NOT OVERRIDE BASE CLASS IMPLEMENTATION # def __del__(self): pass # <--- DO NOT IMPLEMENT # noinspection PyMissingConstructor
[docs] @mock def __init__(self, chromosome, start, ref, alt, reference_genome): # pragma: no cover """Initialize a variant from a DNA0 position. Parameters ---------- chromosome : :py:class:`str` The chromosome name ('chr1', ...). start : :py:class:`int` The start of the reference genome spanned by `ref` (inclusive, 0-based). ref : :py:class:`str` The reference allele. alt : :py:class:`str` The alternate allele. reference_genome : :py:class:`str` | :py:class:`~genome_kit.Genome` The reference genome. See :py:class:`~genome_kit.Interval`. """ mock_unreachable()
@mock @property def interval(self): # pragma: no cover """The interval spanned by this variant in the reference genome. Note that :py:class:`~genome_kit.Variant` also inherits from :py:class:`~genome_kit.Interval`. Returns ------- :py:class:`~genome_kit.Interval` The interval spanned by this variant's `ref` sequence in the reference genome. """ return mock_result(Interval) @mock @property def position(self): # pragma: no cover """The integer position of the start (inclusive, 0-based). This property is a synonym for `start` and is provided for historical reasons. For consistency with other interval-based objects, prefer the `start` property. Returns ------- :py:class:`int` The start position of this interval within the reference genome. """ return mock_result(int) @mock @property def ref(self): # pragma: no cover """The reference allele as a DNA string. Returns ------- :py:class:`str` The sequence of the reference allele that is altered by this variant. """ return mock_result(str) @mock @property def alt(self): # pragma: no cover """The alternate allele as a DNA string. Returns ------- :py:class:`str` The sequence of the alternate allele that this variant represents. """ return mock_result(str)
[docs] def as_variant_string(self): """Returns a string representation of the variant. See Also -------- :py:meth:`~genome_kit.Variant.from_string` Returns ------- :py:class:`str` The variant in the format ``"chromosome:start:ref:alt"``, where start is in DNA1 coordinates. """ return '{}:{}:{}:{}'.format(self.chromosome, self.start + 1, self.ref, self.alt)
[docs] def __repr__(self): return '<Variant {}:{}:{}:{}:{}>'.format(self.chrom, self.start + 1, self.ref, self.alt, self.refg)
__str__ = as_variant_string
[docs] @staticmethod def from_string(variant, reference_genome): """Initialize a variant object from a variant string. Parameters ---------- variant : :py:class:`str` A variant in the format ``"chromosome:start:ref:alt"`` where start is in DNA1 coordinates. reference_genome : :py:class:`~genome_kit.Genome` The reference genome that `variant` diverges from. See Also -------- :py:meth:`~genome_kit.Variant.as_variant_string` """ try: components = variant.split(':') except (AttributeError, ValueError, SyntaxError): raise ValueError("Invalid variant: {}.".format(variant)) if not len(components) == 4: raise ValueError("Variant must have exactly four components: {}".format(variant)) chromosome, position, ref, alt = components try: # Allow position to have comma separators position = int(position.replace(',', '')) - 1 except ValueError: raise ValueError("Invalid position in variant: {}".format(variant)) chromosome, ref, alt = Variant._preprocess_variant(chromosome, ref, alt) variant_obj = Variant(chromosome, position, ref, alt, reference_genome) try: variant_obj._validate_variant(reference_genome) except ValueError as e: raise ValueError(f"Variant string was invalid: {variant}") from e return variant_obj
[docs] @staticmethod def spanning(x, y, variants=None): """Extends :meth:`~genome_kit.Interval.spanning` such that it is aware of variants and anchors. Example:: >>> from genome_kit import Genome, Variant, Interval >>> hg19 = Genome("hg19") >>> v = Variant("chr1", 100, "", 10 * "N", hg19) >>> a = Interval("chr1", "+", 100, 150, hg19, 100) >>> b = Interval("chr1", "+", 200, 250, hg19) >>> Interval.spanning(a, b) Interval("chr1", "+", 100, 250, "hg19") >>> Variant.spanning(a, b, [v]) Interval("chr1", "+", 100, 260, "hg19", 100) Parameters ---------- x : :py:class:`~genome_kit.Interval` an interval y : :py:class:`~genome_kit.Interval` another interval variants : :class:`list` of :class:`~genome_kit.Variant` or None, optional variants to apply to `x` and `y`; variants cannot overlap. Returns ------- :py:class:`~genome_kit.Interval` The resulting spanning interval. """ if x.anchor is None and y.anchor is None: return Interval.spanning(x, y) if not variants: raise ValueError("Anchored intervals (%s, %s) require a list of variants." % (x, y)) # verify input chrom = x.chrom if chrom != y.chrom: raise ValueError("Both intervals must be on the same chromosome, got: %s, %s" % (chrom, y.chrom)) strand = x.strand if strand != y.strand: raise ValueError("Both intervals must be on the same strand, got: %s, %s" % (strand, y.strand)) refg = x.refg if refg != y.refg: raise ValueError( "Both intervals must on on the same reference genome, got: %s, %s" % (refg, y.refg)) if x.anchor is None or (y.anchor is not None and ( x.anchor > y.anchor or x.anchor == y.anchor and x.anchor_offset > y.anchor_offset)): # simplify with x being the leftmost anchor x, y = y, x anchor = x.anchor anchor_offset = x.anchor_offset start, end = x.as_dna0() if anchor < start or anchor > end: raise ValueError("Outside anchors are unsupported: %s" % x) def clamp_anchor_offset(variants, starts, anchor, anchor_offset): i = bisect_left(starts, anchor) if i != len(starts) and starts[i] == anchor: v = variants[i] if not v.ref: return min(anchor_offset, len(v.alt)) return 0 variants = sorted((vv for n in (v._normalized_variant for v in variants if v.chrom == chrom) for vv in n), key=attrgetter('start')) starts = None if anchor_offset > 0: starts = [v.start for v in variants] anchor_offset = clamp_anchor_offset(variants, starts, anchor, anchor_offset) # map y into x's anchored interval and then union if y.anchor is None: if y.start < anchor: length = Variant._get_variant_length(variants, Interval(chrom, strand, y.start, anchor, refg)) length += anchor_offset start = min(start, anchor - length) if y.end > anchor: length = Variant._get_variant_length(variants, Interval(chrom, strand, anchor, y.end, refg)) length -= anchor_offset end = max(end, anchor + length) return Interval(chrom, strand, start, end, refg, anchor, x.anchor_offset) # map y's anchor (such that x.anchor + length = y.anchor) if y.anchor < y.start or y.anchor > y.end: raise ValueError("Outside anchors are unsupported: %s" % y) length = Variant._get_variant_length(variants, Interval(chrom, strand, anchor, y.anchor, refg)) length -= anchor_offset if y.anchor_offset > 0: if not starts: starts = [v.start for v in variants] length += clamp_anchor_offset(variants, starts, y.anchor, y.anchor_offset) delta_y_anchor = anchor + length - y.anchor start = min(start, delta_y_anchor + y.start) end = max(end, delta_y_anchor + y.end) return Interval(chrom, strand, start, end, refg, anchor, x.anchor_offset)
@property def _normalized_variant(self): ref = self.ref alt = self.alt overlap = min(len(ref), len(alt)) # strip padding left_pad = next((i for i, (x, y) in enumerate(zip(ref, alt)) if x != y), overlap) ref = ref[left_pad:] alt = alt[left_pad:] overlap -= left_pad right_pad = next((i for i, (x, y) in enumerate(zip(reversed(ref), zip(reversed(alt))))), overlap) ref = ref[:len(ref) - right_pad] alt = alt[:len(alt) - right_pad] start = self.start + left_pad if len(alt) == len(ref): return [] if not ref else [self if ref == self.ref else self._with_ref(start, ref, alt)] # Split up complex variants into a substitution # plus an insertion or deletion # Also used to normalize Clinvar variants if len(alt) > len(ref): # Split into substitution and insertion insertion = self._with_ref(start + len(ref), "", alt[len(ref):]) return [insertion] if not ref else [self._with_ref(start, ref, alt[:len(ref)]), insertion] # Split into substitution and deletion deletion = self._with_ref(start + len(alt), ref[len(alt):], "") return [deletion] if not alt else [self._with_ref(start, ref[:len(alt)], alt), deletion] def _with_ref(self, start, ref, alt): return Variant(self.chrom, start, ref, alt, self.refg) @staticmethod def _preprocess_variant(chromosome, ref, alt): if not chromosome.startswith('chr'): chromosome = 'chr' + chromosome ref = ref.upper() alt = alt.upper() if ref in '-.': ref = "" if alt in '-.': alt = "" return chromosome, ref, alt def _validate_variant(self, genome): for r in self.ref: if r not in _NUCLEOTIDE_SYMBOLS: raise ValueError("Invalid nucleotide in ref allele: {}".format(repr(self.ref))) for a in self.alt: if a not in _NUCLEOTIDE_SYMBOLS: raise ValueError("Invalid nucleotide in alt allele: {}".format(repr(self.alt))) interval = Interval(self.chromosome, '+', self.position, self.position + len(self.ref), genome) wt_seq = genome.dna(interval) if not wt_seq == self.ref: raise ValueError("Variant's ref sequence does not match the genome ({} != {})".format(wt_seq, self.ref)) @staticmethod def _get_variant_length(sorted_normalized_variants, interval): start = interval.start length = len(interval) # Match ordering used in _apply_variants_no_anchor for v in reversed(sorted_normalized_variants): v_start_rel = v.start - start if v_start_rel >= length: continue v_end_rel = v_start_rel + len(v.ref) if v_end_rel < 0: break alt_length = len(v.alt) if v_start_rel < 0: # Clip left side of variant alt_length += v_start_rel v_start_rel = 0 if v_end_rel > length: # Clip right side of variant alt_length -= length - v_end_rel v_end_rel = length assert v_end_rel >= 0 assert v_start_rel >= 0 length += alt_length - (v_end_rel - v_start_rel) return length
[docs] def __getstate__(self) -> bytes: return pickle.dumps([self.reference_genome, self.chromosome, self.start, self.ref, self.alt])
def __setstate__(self, state: bytes) -> None: refg, chrom, start, ref, alt = pickle.loads(state) self.__init__(chrom, start, ref, alt, refg)
########################################################################
[docs] @_cxx.register class VariantTable(_cxx.VariantTable): __slots__ = () # <--- NEW SLOTS OK # def __new__(self): pass # <--- DO NOT OVERRIDE BASE CLASS IMPLEMENTATION # def __del__(self): pass # <--- DO NOT IMPLEMENT
[docs] @mock def find_5p_aligned(self, interval): # pragma: no cover """Returns all variants that have 5' end aligned with 5' end of `interval`. This function is a faster version of the brute-force filter:: [v for v in variants if v.end5 == interval.end5] The results will also be sorted by 5'. Parameters ---------- interval : :py:class:`~genome_kit.Interval` The query interval. Returns ------- :py:class:`list` of :py:class:`~genome_kit.Variant` All variants that have 5' end aligned to the 5' end of `interval`. """ mock_unreachable() return [Variant()]
[docs] @mock def find_3p_aligned(self, interval): # pragma: no cover """Returns all variants that have 3' end aligned with 3' end of `interval`. This function is a faster version of the brute-force filter:: [v for v in variants if v.end3 == interval.end3] The results will also be sorted by 3'. Parameters ---------- interval : :py:class:`~genome_kit.Interval` The query interval. Returns ------- :py:class:`list` of :py:class:`~genome_kit.Variant` All variants that have 3' end aligned to the 3' end of `interval`. """ mock_unreachable() return [Variant()]
[docs] @mock def find_5p_within(self, interval): # pragma: no cover """Returns all variants that have 5'-most position within `interval`. This function is a faster version of the brute-force filter:: [v for v in variants if v.end5.expand(0, 1) in interval] The results will also be sorted by 5'. Parameters ---------- interval : :py:class:`~genome_kit.Interval` The query interval. Returns ------- :py:class:`list` of :py:class:`~genome_kit.Variant` All variants that have 5'-most position falling entirely within `interval`. """ mock_unreachable() return [Variant()]
[docs] @mock def find_3p_within(self, interval): # pragma: no cover """Returns all variants that have 3'-most position within `interval`. This function is a faster version of the brute-force filter:: [v for v in variants if v.end3.expand(1, 0) in interval] The results will also be sorted by 3'. Parameters ---------- interval : :py:class:`~genome_kit.Interval` The query interval. Returns ------- :py:class:`list` of :py:class:`~genome_kit.Variant` All variants that have 3'-most position falling entirely within `interval`. """ mock_unreachable() return [Variant()]
[docs] @mock def find_within(self, interval): # pragma: no cover """Returns all variants that fall entirely within `interval`. This function is a faster version of the brute-force filter:: [v for v in variants if v in interval] The results will also be sorted by 3'. Parameters ---------- interval : :py:class:`~genome_kit.Interval` The query interval. Returns ------- :py:class:`list` of :py:class:`~genome_kit.Variant` All variants that fall entirely within `interval`. """ mock_unreachable() return [Variant()]
[docs] @mock def find_overlapping(self, interval): # pragma: no cover """Returns all variants that overlap `interval`. This function is a faster version of the brute-force filter:: [v for v in variants if v.overlaps(interval)] The results will also be sorted by 5'. Parameters ---------- interval : :py:class:`~genome_kit.Interval` The query interval. Returns ------- :py:class:`list` of :py:class:`~genome_kit.Variant` All variants that overlap `interval`. """ mock_unreachable() return [Variant()]
[docs] @mock def find_exact(self, interval): # pragma: no cover """Returns all variants that span exactly `interval`. This function is a faster version of the brute-force filter:: [v for v in variants if v.interval == interval] The results will also be sorted by 5' and then 3'. Parameters ---------- interval : :py:class:`~genome_kit.Interval` The query interval. Returns ------- :py:class:`list` of :py:class:`~genome_kit.Variant` All variants that span exactly `interval`. """ mock_unreachable() return [Variant()]
@mock @property def stranded(self): # pragma: no cover """If `True` then the strand is significant when calling the `find_x` methods. Returns ------- :py:class:`bool` Whether this table can contain negative stranded intervals. """ return mock_result(bool)
[docs] @mock def __getitem__(self, index): # pragma: no cover """Access to all variants. Allows iteration over all variants in the table:: for variant in table: ... The results will be ordered as in the source file. Parameters ---------- index : :py:class:`int` The integer index of the requested junction. Returns ------- :py:class:`~genome_kit.Variant` The variant at the given index. """ return mock_result(Variant)
[docs] @mock def where(self, mask): # pragma: no cover """Filter variants by numpy mask. Fast extraction of table elements based on a mask:: # Intended to be used like this. variants = table.where(mask) # It is faster but equivalent to this. variants = [table[i] for i in np.where(mask)[0]] Parameters ---------- mask : :py:class:`ndarray` A boolean mask the same size as the table. Returns ------- :py:class:`list` of :py:class:`~genome_kit.Variant` A list of variants selected by the mask. """ mock_unreachable() return [Variant()]
[docs] def __repr__(self): return "<VariantTable, len() = {}>".format(len(self))