Source code for genome_kit._twobitutils

# Copyright (C) 2016-2023 Deep Genomics Inc. All Rights Reserved.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from array import array
from itertools import product


# Copied from twobitreader
def _true_long_type():
    """
    OS X uses an 8-byte long, so make sure L (long) is the right size
    and switch to I (int) if needed
    """
    for type_ in ['L', 'I']:
        test_array = array(type_, [0])
        long_size = test_array.itemsize
        if long_size == 4:
            return type_
    raise ImportError("Couldn't determine a valid 4-byte long type to use as \
                       equivalent to _LONG")


_LONG = _true_long_type()

_TWOBIG_SIG_LILEND = 0x1A412743
_TWOBIG_SIG_BIGEND = 0x4327411A
_TWOBIG_VER_LILEND = 0x00000000
_TWOBIG_VER_BIGEND = 0x00000000

_BASES = ['T', 'C', 'A', 'G', 'N']

_BASE_AS_BITS = {
    'T': 0,
    't': 0,
    'C': 1,
    'c': 1,
    'A': 2,
    'a': 2,
    'G': 3,
    'g': 3,
    'N': 0,
    'n': 0,
}


def _quad_as_bits(s):
    """Packs a string of up to four bases (TCAG) into a byte representation."""
    if len(s) < 4:
        s += 'N' * (4 - len(s))
    return _BASE_AS_BITS[s[0]] << 6 | \
           _BASE_AS_BITS[s[1]] << 4 | \
           _BASE_AS_BITS[s[2]] << 2 | \
           _BASE_AS_BITS[s[3]]  # noqa


def _basecombos(reps):
    args = [_BASES] * reps  # List of lists
    for x in product(*args):
        yield "".join(x)


def _find_blocks(seq, includes):
    starts = []
    sizes = []

    # Loop over each position in the string, taking note of each
    # transition from "inside" a block to "outside" a block
    was_inside = False
    for i, c in enumerate(seq):
        is_inside = c in includes
        if is_inside != was_inside:
            if was_inside:
                sizes.append(i - starts[-1])
            else:
                starts.append(i)
            was_inside = is_inside

    if was_inside:
        sizes.append(len(seq) - starts[-1])

    assert len(starts) == len(sizes)
    return {'starts': starts, 'sizes': sizes}


[docs] def write2bit(outfile, seqs): """Write a 2bit file containing the given DNA sequences. Parameters ---------- outfile : str The name of the output file. seqs : dict A dictionary of named DNA sequence strings each of which will be represented by name. Sequence names can be arbitrary. Sequences may contain a mix of upper/lower case ACGTN characters. Examples -------- >>> seqs={ ... "chr1" : "AAAANNnnCCAAaattTT", ... "chr2" : "NNNnnnnggggGGGG", ... } >>> write2bit('mydna.2bit', seqs) """ # Lookup table for all possible substrings of QUAD_AS_BITS = {} QUAD_AS_BITS.update({q: _quad_as_bits(q) for q in _basecombos(4)}) QUAD_AS_BITS.update({q: _quad_as_bits(q) for q in _basecombos(3)}) QUAD_AS_BITS.update({q: _quad_as_bits(q) for q in _basecombos(2)}) QUAD_AS_BITS.update({q: _quad_as_bits(q) for q in _basecombos(1)}) # Convert each quad of 4 bases to a byte # Pad the resulting byte list with 0s until it's a multiple of 4 seq_to_bits = lambda seq: \ [QUAD_AS_BITS[seq[i:i+4]] for i in range(0, len(seq), 4)] \ + [0]*(((len(seq)+3)//4) % 4) # header layout on disk: # signature (4 bytes) # version (4 bytes) # sequence_count (4 bytes) # reserved (4 bytes) headsize = 16 # index layout on disk: # name_size (1 byte) # name (1 byte x name_size, no NULL terminator) # offset (4 bytes) idxsize = lambda name: 1 + len(name) + 4 # record layout on disk: # dna_size (4 bytes) # nblock_count (4 bytes) # nblock_starts (4 bytes x number of nblocks) # nblock_sizes (4 bytes x number of nblocks) # mask_count (4 bytes) # mask_starts (4 bytes x number of masks) # mask_sizes (4 bytes x number of masks) # reserved (4 bytes) # bits ((dna_size+3//4) bytes) recsize = lambda rec: 4 + \ 4+8*len(rec['nblocks']['starts']) + \ 4+8*len(rec['masks']['starts']) + \ 4 + \ len(rec['bits']) # noqa names = sorted(seqs.keys()) recs = [] for name in names: seq = seqs[name] rec = {} rec['dnasize'] = len(seq) rec['nblocks'] = _find_blocks(seq, 'Nn') rec['masks'] = _find_blocks(seq, "acgtn") rec['bits'] = seq_to_bits(seq.upper()) recs.append(rec) with open(outfile, 'wb') as f: # Write the header array(_LONG, [_TWOBIG_SIG_LILEND, _TWOBIG_VER_LILEND, len(seqs), 0]).tofile(f) # Write the index, one entry for each sequence for i, name in enumerate(names): rec_offset = headsize + sum(map(idxsize, names)) \ + sum(map(recsize, recs[:i])) # Write the name of this sequence array('B', [len(name)]).tofile(f) array('B', name.encode('ascii')).tofile(f) array(_LONG, [rec_offset]).tofile(f) # Write the sequence records themselves for rec, name in zip(recs, names): # Write the actual DNA sequence length array(_LONG, [rec['dnasize']]).tofile(f) # Write the nblocks array(_LONG, [len(rec['nblocks']['starts'])]).tofile(f) array(_LONG, rec['nblocks']['starts']).tofile(f) array(_LONG, rec['nblocks']['sizes']).tofile(f) # Write the masks array(_LONG, [len(rec['masks']['starts'])]).tofile(f) array(_LONG, rec['masks']['starts']).tofile(f) array(_LONG, rec['masks']['sizes']).tofile(f) # Write reserved DWORD array(_LONG, [0]).tofile(f) # reserved # Write DNA bits (size rounded up to nearest byte) array('B', rec['bits']).tofile(f)