Source code for genome_kit.diseq

from copy import deepcopy
import enum
from dataclasses import dataclass
from typing import Sequence, Literal

from .interval import Interval
from .genome_annotation import Transcript


[docs] class IndexDirection(enum.Enum): """Controls how indices are assigned to the 5' and 3' ends of a coordinate space. Changing this value will result in undefined behaviour for :py:class:`DisjointIntervalSequence` objects created under a different convention. ``TRANSCRIPT_FIVE_TO_THREE`` Index 0 is always at the coordinate transcript's 5' end, regardless of genomic strand. ``POSITIVE_STRAND_LEFT_TO_RIGHT`` Index 0 is always at the leftmost genomic position relative to the positive strand. On the negative strand, this means the 3' end is at index 0. """ TRANSCRIPT_FIVE_TO_THREE = "transcript_five_to_three" POSITIVE_STRAND_LEFT_TO_RIGHT = "positive_strand_left_to_right"
@dataclass(frozen=True) class _CoordinateMetadata: name: str | None reference_genome: str chromosome: str transcript_strand: Literal["+", "-"] @dataclass(frozen=True) class _SegmentMetadata: name: str | None on_coordinate_strand: bool
[docs] class DisjointIntervalSequence: """A flattened coordinate system over a sequence of disjoint genomic Intervals. A DIS represents two layers: - A **coordinate space** defined by a sequence of non-overlapping genomic :py:class:`~genome_kit.Interval` objects (e.g. the exons of a transcript), which are flattened into a contiguous 0-based index space. Indices for the coordinate space are assigned according to the current :py:class:`IndexDirection` value. - A **segment** within that coordinate space, defined by a 5' and 3' index. The segment may lie on the same, or opposite, strand as the coordinate space. Use :py:meth:`from_transcript` or :py:meth:`from_intervals` to construct instances rather than calling the constructor directly. """ _index_direction: IndexDirection = IndexDirection.TRANSCRIPT_FIVE_TO_THREE
[docs] @classmethod def set_index_direction(cls, direction: IndexDirection) -> None: """Set the index direction convention for all DIS instances. Parameters ---------- direction : :py:class:`IndexDirection` The index direction convention to use. """ cls._index_direction = direction
[docs] @classmethod def get_index_direction(cls) -> IndexDirection: """Get the current index direction convention. Returns ------- :py:class:`IndexDirection` """ return cls._index_direction
[docs] def __init__( self, coordinate_intervals: Sequence[Interval], *, coord_name: str | None = None, segment_name: str | None = None, on_coordinate_strand: bool = True, start: int | None = None, end: int | None = None, ): """Low-level constructor. Prefer :py:meth:`from_transcript` or :py:meth:`from_intervals` for public construction. Parameters ---------- coordinate_intervals : Sequence[:py:class:`~genome_kit.Interval`] Non-empty sequence of non-overlapping Intervals on the same chromosome, strand, and reference genome. coord_name : :py:class:`str` or None Optional name for the coordinate space. segment_name : :py:class:`str` or None Optional name for the segment. on_coordinate_strand : :py:class:`bool` Whether the segment is on the same strand as the coordinate intervals. Defaults to True. Can be used to represent a sequence that binds to the transcript if set to False. start : :py:class:`int` or None start index of the segment in the coordinate space. Defaults to 0 end : :py:class:`int` or None end index of the segment in the coordinate space. Defaults to the length of the coordinate space. Raises ------ ValueError If coordinate intervals are empty, inconsistent, overlapping, or if start is greater than end. TypeError If any element is not an Interval. """ if len(coordinate_intervals) == 0: raise ValueError("coordinate_intervals must be non-empty") for i, iv in enumerate(coordinate_intervals): if not isinstance(iv, Interval): raise TypeError( f"coordinate_intervals[{i}] is {type(iv).__name__}, expected Interval" ) if iv.anchor is not None: raise ValueError( f"coordinate_intervals[{i}] has an anchor set; " f"anchored Intervals are not supported" ) # Consistent chromosome, strand, reference_genome iv0 = coordinate_intervals[0] for iv in coordinate_intervals[1:]: if iv.chromosome != iv0.chromosome: raise ValueError( f"All intervals must share the same chromosome, " f"got {iv0.chromosome!r} and {iv.chromosome!r}" ) if iv.strand != iv0.strand: raise ValueError( f"All intervals must share the same strand, " f"got {iv0.strand!r} and {iv.strand!r}" ) if iv.reference_genome != iv0.reference_genome: raise ValueError( f"All intervals must share the same reference genome, " f"got {iv0.reference_genome!r} and {iv.reference_genome!r}" ) # Sort 5'->3' if iv0.strand == "+": sorted_intervals = sorted(coordinate_intervals, key=lambda iv: iv.start) else: # On negative strand end is the 5' end since start < end. # Sort by -end to get 5'->3' order. sorted_intervals = sorted(coordinate_intervals, key=lambda iv: -iv.end) # No overlaps (adjacent/touching OK) for i in range(len(sorted_intervals) - 1): cur_iv, next_iv = sorted_intervals[i], sorted_intervals[i + 1] plus_strand_overlap = iv0.strand == "+" and cur_iv.end > next_iv.start minus_strand_overlap = iv0.strand == "-" and cur_iv.start < next_iv.end if plus_strand_overlap or minus_strand_overlap: raise ValueError( f"Intervals must not overlap: [{cur_iv.start}, {cur_iv.end}) and [{next_iv.start}, {next_iv.end})" ) self._coordinate_intervals: tuple[Interval, ...] = tuple(sorted_intervals) self._coord_metadata = _CoordinateMetadata( name=coord_name, reference_genome=iv0.reference_genome, chromosome=iv0.chromosome, transcript_strand=iv0.strand, ) self._segment_metadata = _SegmentMetadata( name=segment_name, on_coordinate_strand=on_coordinate_strand, ) # Default segment start/end to span the full coordinate if start is None: start = 0 if end is None: end = self.coordinate_length # Validate that start is less than or equal to end if start > end: raise ValueError( f"start index {start} cannot be greater than end index {end}" ) self._start: int = start self._end: int = end
[docs] @classmethod def from_intervals( cls, intervals: Sequence[Interval], *, coord_name: str | None = None, segment_name: str | None = None, ) -> "DisjointIntervalSequence": """Construct a DIS from a sequence of Intervals (or :py:class:`~genome_kit.Exon`/:py:class:`~genome_kit.Cds`/:py:class:`~genome_kit.Utr` objects). Parameters ---------- intervals : Sequence[:py:class:`~genome_kit.Interval`] Sequence of Interval or annotation objects. coord_name : :py:class:`str` or None Optional name for the coordinate space. segment_name : :py:class:`str` or None Optional name for the segment. Returns ------- :py:class:`DisjointIntervalSequence` """ coord_intervals = [ iv.interval if hasattr(iv, "interval") else iv for iv in intervals ] return cls(coord_intervals, coord_name=coord_name, segment_name=segment_name)
[docs] @classmethod def from_transcript( cls, transcript: Transcript, *, region: Literal["exons", "cds", "utr5", "utr3"] = "exons", coord_name: str | None = None, segment_name: str | None = None, ) -> "DisjointIntervalSequence": """Construct a DIS from a transcript's exons, CDS, or UTR regions. Parameters ---------- transcript : :py:class:`~genome_kit.Transcript` The source Transcript object. region : :py:class:`str` Which region to extract — ``"exons"``, ``"cds"``, ``"utr5"``, or ``"utr3"``. coord_name : :py:class:`str` or None Optional name for the coordinate space. Defaults to ``transcript.id``. segment_name : :py:class:`str` or None Optional name for the segment. Defaults to ``transcript.id``. Returns ------- :py:class:`DisjointIntervalSequence` Raises ------ ValueError If region is not one of the allowed values. """ match region: case "exons": region_elements = transcript.exons case "cds": region_elements = transcript.cdss case "utr5": region_elements = transcript.utr5s case "utr3": region_elements = transcript.utr3s case _: raise ValueError(f"Invalid region: {region!r}") coord_intervals = [element.interval for element in region_elements] if coord_name is None: coord_name = transcript.id if segment_name is None: segment_name = transcript.id return cls(coord_intervals, coord_name=coord_name, segment_name=segment_name)
@property def coord_name(self) -> str | None: """Name of the coordinate space, or None.""" return self._coord_metadata.name @property def reference_genome(self) -> str: """Reference genome of the coordinate intervals.""" return self._coord_metadata.reference_genome @property def chromosome(self) -> str: """Chromosome of the coordinate intervals.""" return self._coord_metadata.chromosome @property def coord_strand(self) -> Literal["+", "-"]: """Strand of the coordinate intervals (the transcript strand).""" return self._coord_metadata.transcript_strand @property def name(self) -> str | None: """Name of the segment, or None.""" return self._segment_metadata.name @property def on_coordinate_strand(self) -> bool: """True if the segment is on the same strand as the coordinate intervals.""" return self._segment_metadata.on_coordinate_strand @property def strand(self) -> Literal["+", "-"]: """Effective strand of the segment, accounting for on_coordinate_strand.""" if self.on_coordinate_strand: return self.coord_strand # Segment is on opposite strand if self.coord_strand == "+": return "-" return "+" @property def coordinate_end5_index(self) -> int: """5' index of the coordinate space.""" if self._index_direction == IndexDirection.TRANSCRIPT_FIVE_TO_THREE: return 0 if self.coord_strand == "+": return 0 return self.coordinate_length @property def coordinate_end3_index(self) -> int: """3' index of the coordinate space.""" if self._index_direction == IndexDirection.TRANSCRIPT_FIVE_TO_THREE: return self.coordinate_length if self.coord_strand == "+": return self.coordinate_length return 0 @property def end5_index(self) -> int: """5' index of the segment.""" if self._upstream_index_step() == -1: return self._start return self._end @property def end3_index(self) -> int: """3' index of the segment.""" if self._upstream_index_step() == -1: return self._end return self._start @property def start(self) -> int: """Start index of the segment in the coordinate space.""" return self._start @property def end(self) -> int: """End index of the segment in the coordinate space.""" return self._end def _at_index( self, idx: int, on_coordinate_strand: bool ) -> "DisjointIntervalSequence": """Return a 0-length DIS at the given index position.""" return DisjointIntervalSequence( self._coordinate_intervals, coord_name=self._coord_metadata.name, on_coordinate_strand=on_coordinate_strand, start=idx, end=idx, ) @property def end5(self) -> "DisjointIntervalSequence": """0-length DIS at the segment's 5' end.""" return self._at_index( self.end5_index, on_coordinate_strand=self.on_coordinate_strand ) @property def end3(self) -> "DisjointIntervalSequence": """0-length DIS at the segment's 3' end.""" return self._at_index( self.end3_index, on_coordinate_strand=self.on_coordinate_strand ) @property def coord_end5(self) -> "DisjointIntervalSequence": """0-length DIS at the coordinate space's 5' end.""" return self._at_index(self.coordinate_end5_index, on_coordinate_strand=True) @property def coord_end3(self) -> "DisjointIntervalSequence": """0-length DIS at the coordinate space's 3' end.""" return self._at_index(self.coordinate_end3_index, on_coordinate_strand=True) @property def coordinate_intervals(self) -> tuple[Interval, ...]: """The underlying genomic intervals of the coordinate-space, sorted 5'->3'.""" # Deepcopy to preserve imutability of this DIS return deepcopy(self._coordinate_intervals) @property def coordinate_length(self) -> int: """Total length of the coordinate space in bases.""" return sum(len(iv) for iv in self._coordinate_intervals) @property def length(self) -> int: """Length of the segment on the coordinate space.""" return self.end - self.start def _upstream_index_step(self, on_coordinate_strand: bool | None = None) -> int: """Return +1 or -1 indicating the upstream direction in index space. Args: on_coordinate_strand: Override for which strand to compute the step for. Defaults to this segment's on_coordinate_strand. """ if on_coordinate_strand is None: on_coordinate_strand = self.on_coordinate_strand if self._index_direction == IndexDirection.TRANSCRIPT_FIVE_TO_THREE: return -1 if on_coordinate_strand else 1 # POSITIVE_STRAND_LEFT_TO_RIGHT: effective strand determines direction return -1 if self.strand == "+" else 1 def _validate_same_coordinate_space( self, other: "DisjointIntervalSequence" ) -> None: """Raise if other does not share the same coordinate space.""" if not isinstance(other, DisjointIntervalSequence): raise TypeError( f"Expected DisjointIntervalSequence, got {type(other).__name__}" ) if self._coordinate_intervals != other._coordinate_intervals: raise ValueError("DIS objects must share the same coordinate intervals") def _from_end_indices(self, end5: int, end3: int) -> "DisjointIntervalSequence": """Return a new DIS with the same coordinate space but different segment indices.""" # Validate end5 is upstream of or equal to end3 if self._upstream_index_step() == -1: if end5 > end3: raise ValueError( f"Invalid indices: end5 index {end5} is downstream of end3 index {end3}" ) if self._upstream_index_step() == 1: if end5 < end3: raise ValueError( f"Invalid indices: end5 index {end5} is downstream of end3 index {end3}" ) return DisjointIntervalSequence( self._coordinate_intervals, coord_name=self._coord_metadata.name, segment_name=self._segment_metadata.name, on_coordinate_strand=self.on_coordinate_strand, start=min(end5, end3), end=max(end5, end3), )
[docs] def shift(self, amount: int) -> "DisjointIntervalSequence": """Shift the segment downstream by amount (negative shifts upstream). The coordinate space is unchanged. Only the segment indices move. """ downstream_step = -self._upstream_index_step() delta = amount * downstream_step return self._from_end_indices( self.end5_index + delta, self.end3_index + delta, )
[docs] def expand( self, upstream: int, dnstream: int | None = None ) -> "DisjointIntervalSequence": """Expand the segment upstream and/or downstream. Negative values contract the segment. Raises ValueError if contraction would result in end5 being downstream of end3. Args: upstream: Bases to expand (or contract if negative) toward the 5' end. dnstream: Bases to expand (or contract if negative) toward the 3' end. Defaults to upstream (symmetric). """ if dnstream is None: dnstream = upstream up_step = self._upstream_index_step() down_step = -up_step new_end5 = self.end5_index + (upstream * up_step) new_end3 = self.end3_index + (dnstream * down_step) # Validate end5 is still upstream of or equal to end3 if (new_end5 - new_end3) * up_step < 0: raise ValueError( "Invalid expansion: end5 would be downstream of end3 " f"(end5={new_end5}, end3={new_end3})" ) return self._from_end_indices(new_end5, new_end3)
[docs] def upstream_of(self, other: "DisjointIntervalSequence") -> bool: """True if self is strictly upstream of other (no overlap). Requires the same coordinate space and same on_coordinate_strand. """ self._validate_same_coordinate_space(other) if self.on_coordinate_strand != other.on_coordinate_strand: raise ValueError( f"Cannot compare: self is on " f"{'same' if self.on_coordinate_strand else 'opposite'} " f"strand but other is on " f"{'same' if other.on_coordinate_strand else 'opposite'} strand" ) if self.length == 0 and other.length == 0 and self.start == other.start: return False if self._upstream_index_step() == -1: return self._end <= other.start return self._start >= other.end
[docs] def dnstream_of(self, other: "DisjointIntervalSequence") -> bool: """True if self is strictly downstream of other (no overlap). Requires the same coordinate space and same on_coordinate_strand. """ self._validate_same_coordinate_space(other) if self.on_coordinate_strand != other.on_coordinate_strand: raise ValueError( f"Cannot compare: self is on " f"{'same' if self.on_coordinate_strand else 'opposite'} " f"strand but other is on " f"{'same' if other.on_coordinate_strand else 'opposite'} strand" ) if self.length == 0 and other.length == 0 and self.start == other.start: return False if self._upstream_index_step() == -1: return self._start >= other.end return self._end <= other.start
[docs] def within(self, other: "DisjointIntervalSequence") -> bool: """True if self's segment is contained within other's segment. Requires the same coordinate space and same on_coordinate_strand. """ self._validate_same_coordinate_space(other) if self.on_coordinate_strand != other.on_coordinate_strand: raise ValueError( f"Cannot compare: self is on " f"{'same' if self.on_coordinate_strand else 'opposite'} " f"strand but other is on " f"{'same' if other.on_coordinate_strand else 'opposite'} strand" ) return self._start >= other.start and self._end <= other.end
[docs] def is_same_strand(self) -> bool: """True if the segment is on the same strand as the coordinate intervals. Returns ------- :py:class:`bool` """ return self.on_coordinate_strand
[docs] def is_positive_strand(self) -> bool: """True if the segment is on the positive strand. Returns ------- :py:class:`bool` """ if self.strand == "+": return True return False
[docs] def as_positive_strand(self) -> "DisjointIntervalSequence": """Return a DIS with the segment on the positive strand. Returns ``self`` if already on the positive strand. The coordinate intervals are unchanged; only the segment strand is affected. Returns ------- :py:class:`DisjointIntervalSequence` """ if self.is_positive_strand(): return self return self.flip_strand()
[docs] def as_negative_strand(self) -> "DisjointIntervalSequence": """Return a DIS with the segment on the negative strand. Returns ``self`` if already on the negative strand. The coordinate intervals are unchanged; only the segment strand is affected. Returns ------- :py:class:`DisjointIntervalSequence` """ if not self.is_positive_strand(): return self return self.flip_strand()
[docs] def as_opposite_strand(self) -> "DisjointIntervalSequence": """Return a DIS with the segment on the opposite strand. Returns ``self`` if already on the opposite strand. The coordinate intervals are unchanged; only the segment strand is affected. Returns ------- :py:class:`DisjointIntervalSequence` """ if not self.on_coordinate_strand: return self return self.flip_strand()
[docs] def as_same_strand(self) -> "DisjointIntervalSequence": """Return a DIS with the segment on the coordinate strand. Returns ``self`` if already on the coordinate strand. The coordinate intervals are unchanged; only the segment strand is affected. Returns ------- :py:class:`DisjointIntervalSequence` """ if self.on_coordinate_strand: return self return self.flip_strand()
[docs] def flip_strand(self) -> "DisjointIntervalSequence": """Return a new DIS with ``on_coordinate_strand`` toggled. The coordinate intervals are unchanged. The segment's ``on_coordinate_strand`` is flipped. Returns ------- :py:class:`DisjointIntervalSequence` """ return DisjointIntervalSequence( self._coordinate_intervals, coord_name=self._coord_metadata.name, segment_name=self._segment_metadata.name, on_coordinate_strand=not self.on_coordinate_strand, start=self._start, end=self._end, )
[docs] def __len__(self) -> int: """Return the length of the segment.""" return self.length
[docs] def __repr__(self) -> str: """Return a human-readable representation.""" return ( f"DisjointIntervalSequence(" f"coord_name={self._coord_metadata.name!r}, " f"name={self._segment_metadata.name!r}, " f"{self.chromosome}:{self.coord_strand}, " f"len={self.length}, " f"coord_intervals={self._coordinate_intervals}, " f"start={self._start}, " f"end={self._end}, " f"end5={self.end5_index}, " f"end3={self.end3_index})" )
[docs] def __eq__(self, other: object) -> bool: """Equality based on coordinate intervals, metadata, and index values.""" if not isinstance(other, DisjointIntervalSequence): return NotImplemented return ( self._coord_metadata == other._coord_metadata and self._segment_metadata == other._segment_metadata and self._start == other._start and self._end == other._end and self._coordinate_intervals == other._coordinate_intervals )