Source code for genome_kit.gk_data

# Copyright (C) 2016-2023 Deep Genomics Inc. All Rights Reserved.
""" This module automatically fetches GenomeKit data.

A user should first use :py:func:`~genome_kit.gk_data.upload_file` to upload a
file to make it available other GenomeKit users.

After the upload, the file can be used by any GenomeKit function via
:py:func:`~genome_kit.gk_data.get_file`, which will download the file on demand
and return its local path.

Example
-------
>>> upload_file('/local/path/hg38.2bit', 'hg38.2bit')
>>> get_file('hg38.2bit')
"/Users/example/Application Support/genome_kit/hg38.2bit"

"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from typing import Dict
from importlib_metadata import entry_points


from ._util import makedirs
from ._gk_data_config import _config
from . import _cxx
from .data_manager import DataManager, DefaultDataManager, ProgressPercentage
import tempfile
import ftplib
import hashlib
import calendar
import time

from urllib.parse import urlparse
from urllib.request import urlopen


eps = entry_points(group="genomekit.plugins.data_manager")
try:
    DataManagerImpl = list(eps)[0].load()
except:
    DataManagerImpl = DefaultDataManager
# Register an implementation of DataManager if you need to upload
# new data
data_manager: DataManager = DataManagerImpl(_config["DATA_DIR"])


[docs] def get_file(filename): """compatibility wrapper for :py:meth:`~genome_kit.DataManager.get_file`""" return data_manager.get_file(filename)
[docs] def upload_file(filepath:str, filename:str, metadata:Dict[str, str]=None): # pragma: no cover """compatibility wrapper for :py:meth:`~genome_kit.DataManager.upload_file`""" chrom_sizes_ext = ".chrom.sizes" if filename.endswith(chrom_sizes_ext): refg_name = filename[:-len(chrom_sizes_ext)] hashval = _cxx.Genome._refg_hash(refg_name) print(f"Detected upload of a refg. Creating and uploading a hash lookup file {hashval}.hash") tmpfilename = None with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: f.write(refg_name) tmpfilename = f.name data_manager.upload_file(tmpfilename, f"{hashval}.hash", metadata) return data_manager.upload_file(filepath, filename, metadata)
@_cxx.register def resolve_datafile_path(path): """Convert an abstract file path into a concrete, possibly resolving to a GenomeKit data file. If path is a regular path, then it is simply returned. If path contains ```'{GENOMEKIT_DATA_DIR}'```, then the file is presumed to reside in the GenomeKit data file repository. In that case, `get_file` will be attempted on the file name, so that the file may be downloaded and versioned if necessary. The path to the final file is returned. """ # If this path includes the special GENOMEKIT_DATA_DIR signifier, # process the path and try to download the file if missing. # Otherwise just return the file name as-is. if "{GENOMEKIT_DATA_DIR}" in path: # pragma: no cover rel_path = path.replace("{GENOMEKIT_DATA_DIR}", "").lstrip('/\\') path = get_file(rel_path) return path def _try_ftp_last_modified_date(url): """Tries to determine the last-modified date, as UTC in seconds since epoch, of a file using the FTP LIST command. If not date could be determined using the LIST method, returns None. Raises an error if the url does not exist or if it is not a file. """ # Parse URL and check that it's FTP url = urlparse(url) if url.scheme != 'ftp': return None # Open an FTP connection ftp = ftplib.FTP(url.netloc, user='anonymous', timeout=20) # Run the MDTM command to get time in "YYYYMMDDhhmmss" format where MM is 01-12, DD is 01-31, hh is 00-23 last_modified_str = ftp.sendcmd("MDTM " + url.path).split()[-1] try: ftp.quit() except (OSError, EOFError): pass finally: if ftp.sock is not None: ftp.close() # Convert to UTC as epoch timestruct = time.strptime(last_modified_str, "%Y%m%d%H%M%S") return calendar.timegm(timestruct)
[docs] def wget(url, dst=None, timestamping=False, progress=False): """Download a file from the URL. If `dst` is unspecified, the file will be downloaded to the system temp directory and given the same name. If the last-modified time of the remote file is available, it will be assigned to the downloaded file. Parameters ---------- url : :py:class:`str` The URL of the source file. dst : :py:class:`str` The path to the local destination. timestamping: :py:class:`bool` If true, only download when last-modified date if the remote file is newer than the local file. If the server does not provide the last-modified date, the local file will be kept by default. progress: :py:class:`bool` If true, show a progress bar. Returns ------- :py:class:`str` The path to the local version of the file. Raises ------ :py:exc:`IOError` There was an error opening `dst`. :py:exc:`HTTPError` There was an error opening the URL. :py:exc:`socket.timeout` The connection timed out or was interrupted. """ if not isinstance(url, str): raise TypeError("url must be str") # Get path to output file. if dst is None: # Use an auto-generated filename based on the URL filename plus a hash of full URL tempdir = tempfile.gettempdir() filename = "genomekit.{}.{}".format(hashlib.sha1(url.encode('ascii')).hexdigest(), url.split('/')[-1]) dst = os.path.join(tempdir, filename) elif not isinstance(dst, str): raise TypeError("dst must be str") # Name of partial download file dst_part = dst + ".part" # Open the URL and check header for timestamp with urlopen(url, timeout=20) as request: # Fetch remote last-modified time remote_last_modified = request.headers['last-modified'] if remote_last_modified: # UTC time remote_last_modified = calendar.timegm(time.strptime(remote_last_modified, '%a, %d %b %Y %H:%M:%S %Z')) else: remote_last_modified = _try_ftp_last_modified_date(url) # Fetch remote file size remote_file_size = request.headers.get("Content-Length", None) if remote_file_size is not None: remote_file_size = int(remote_file_size) # If dst exists, only proceed with download if can determine that remote file is newer if timestamping and os.path.exists(dst): local_last_modified = os.path.getmtime(dst) if remote_last_modified is None or local_last_modified >= remote_last_modified: return dst # Download the file content try: makedirs(os.path.dirname(dst)) with open(dst_part, 'wb') as f: # Create a progress bar requested if progress: progress = ProgressPercentage(dst, remote_file_size, '[GenomeKit] downloading') # pragma: no cover while True: # Read a chunk into dst buffer = request.read(2**18) if not buffer: break f.write(buffer) # Report progress if progress: progress(len(buffer)) # pragma: no cover finally: # Set the modification time on the file. This should happen whether # the download completed successfully or not. # Skip codecov because don't currently know of URL that triggers this case. if remote_last_modified is not None: # pragma: no cover os.utime(dst_part, (remote_last_modified, remote_last_modified)) if os.path.isfile(dst): os.unlink(dst) os.rename(dst_part, dst) return dst