Source code for nidata.core.fetchers.aws_fetcher

"""
"""

import os
import os.path as op
import time
import warnings
from functools import partial

import nibabel as nib
import numpy as np

from .base import chunk_report, Fetcher


[docs]def test_cb(cur_bytes, total_bytes, t0=None, **kwargs): return chunk_report(bytes_so_far=cur_bytes, total_size=total_bytes, initial_size=0, t0=t0)
[docs]class AmazonS3Fetcher(Fetcher): dependencies = ['boto'] + Fetcher.dependencies def __init__(self, data_dir=None, access_key=None, secret_access_key=None, profile_name=None): if not (profile_name or (access_key and secret_access_key)): raise ValueError("profile_name or access_key / secret_access_key " "must be provided.") super(AmazonS3Fetcher, self).__init__(data_dir=data_dir) self.access_key = access_key self.secret_access_key = secret_access_key self.profile_name = profile_name
[docs] def fetch(self, files, force=False, check=False, verbose=1): assert (self.profile_name or (self.access_key and self.secret_access_key)) files = Fetcher.reformat_files(files) # allows flexibility import boto if self.profile_name is not None: s3 = boto.connect_s3(profile_name=self.profile_name) elif (self.access_key is not None and self.secret_access_key is not None): s3 = boto.connect_s3(self.access_key, self.secret_access_key) bucket_names = np.unique([opts.get('bucket') for f, rk, opts in files]) files_ = [] for bucket_name in bucket_names: # loop over bucket names: efficient if bucket_name: # bucket requested buck = s3.get_bucket(bucket_name) else: # default to first bucket buck = s3.get_all_buckets()[0] for file_, remote_key, opts in files: if opts.get('bucket') != bucket_name: continue # get all files from the current bucket only. target_file = op.join(self.data_dir, file_) key = buck.get_key(remote_key) if not key: warnings.warn('Failed to find key: %s' % remote_key) files_.append(None) else: do_download = force or not op.exists(target_file) try: do_download = (do_download or (check and nib.load( target_file).get_data() is None)) except IOError as ioe: if verbose > 0: print("Warning: %s corrupted, re-downloading " "(Error=%s)" % (target_file, ioe)) do_download = True if do_download: # Ensure destination directory exists destination_dir = op.dirname(target_file) if not op.isdir(destination_dir): if verbose > 0: print("Creating base directory %s" % ( destination_dir)) os.makedirs(destination_dir) if verbose > 0: print("Downloading [%s]/%s to %s." % ( bucket_name or 'default bucket', remote_key, target_file)) with open(target_file, 'wb') as fp: cb = partial(test_cb, t0=time.time()) key.get_contents_to_file(fp, cb=cb, num_cb=None) files_.append(target_file) return files_