"""
"""
import contextlib
import os
import os.path as op
import tarfile
import zipfile
import sys
import shutil
import time
import hashlib
import fnmatch
from .._utils.compat import cPickle, _urllib, md5_hash
from .base import chunk_report, Fetcher, md5_sum_file
[docs]def movetree(src, dst):
"""Move an entire tree to another directory. Any existing file is
overwritten"""
names = os.listdir(src)
# Create destination dir if it does not exist
if not op.exists(dst):
os.makedirs(dst)
errors = []
for name in names:
srcname = op.join(src, name)
dstname = op.join(dst, name)
try:
if op.isdir(srcname) and op.isdir(dstname):
movetree(srcname, dstname)
os.rmdir(srcname)
else:
shutil.move(srcname, dstname)
except (IOError, os.error) as why:
errors.append((srcname, dstname, str(why)))
# catch the Error from the recursive movetree so that we can
# continue with other files
except Exception as err:
errors.extend(err.args[0])
if errors:
raise Exception(errors)
def _tree(path, pattern=None, dictionary=False):
""" Return a directory tree under the form of a dictionaries and list
Parameters:
-----------
path: string
Path browsed
pattern: string, optional
Pattern used to filter files (see fnmatch)
dictionary: boolean, optional
If True, the function will return a dict instead of a list
"""
files = []
dirs = [] if not dictionary else {}
for file_ in os.listdir(path):
file_path = op.join(path, file_)
if op.isdir(file_path):
if not dictionary:
dirs.append((file_, _tree(file_path, pattern)))
else:
dirs[file_] = _tree(file_path, pattern)
else:
if pattern is None or fnmatch.fnmatch(file_, pattern):
files.append(file_path)
files = sorted(files)
if not dictionary:
return sorted(dirs) + files
if len(dirs) == 0:
return files
if len(files) > 0:
dirs['.'] = files
return dirs
def _chunk_read_(response, local_file, chunk_size=8192, report_hook=None,
initial_size=0, total_size=None, verbose=1):
"""Download a file chunk by chunk and show advancement
Parameters
----------
response: _urllib.response.addinfourl
Response to the download request in order to get file size
local_file: file
Hard disk file where data should be written
chunk_size: int, optional
Size of downloaded chunks. Default: 8192
report_hook: bool
Whether or not to show downloading advancement. Default: None
initial_size: int, optional
If resuming, indicate the initial size of the file
total_size: int, optional
Expected final size of download (None means it is unknown).
verbose: int, optional
verbosity level (0 means no message).
Returns
-------
data: string
The downloaded file.
"""
if total_size is None:
total_size = response.info().get('Content-Length', '110000000').strip()
try:
total_size = int(total_size) + initial_size
except Exception as e:
if verbose > 1:
print("Warning: total size could not be determined.")
if verbose > 2:
print("Full stack trace: %s" % e)
total_size = None
bytes_so_far = initial_size
t0 = time.time()
while True:
chunk = response.read(chunk_size)
bytes_so_far += len(chunk)
if not chunk:
if report_hook:
sys.stderr.write('\n')
break
local_file.write(chunk)
if report_hook:
chunk_report(bytes_so_far, total_size, initial_size, t0)
return
def _uncompress_file(file_, delete_archive=True, verbose=1):
"""Uncompress files contained in a data_set.
Parameters
----------
file: string
path of file to be uncompressed.
delete_archive: bool, optional
Whether or not to delete archive once it is uncompressed.
Default: True
verbose: int, optional
verbosity level (0 means no message).
Notes
-----
This handles zip, tar, gzip and bzip files only.
"""
if verbose > 0:
print('Extracting data from %s...' % file_)
data_dir = op.dirname(file_)
# We first try to see if it is a zip file
try:
filename, ext = op.splitext(file_)
with open(file_, "rb") as fd:
header = fd.read(4)
processed = False
if zipfile.is_zipfile(file_):
z = zipfile.ZipFile(file_)
z.extractall(data_dir)
z.close()
processed = True
elif ext == '.gz' or header.startswith(b'\x1f\x8b'):
import gzip
gz = gzip.open(file_)
if ext == '.tgz':
filename = filename + '.tar'
out = open(filename, 'wb')
shutil.copyfileobj(gz, out, 8192)
gz.close()
out.close()
# If file is .tar.gz, this will be handle in the next case
if delete_archive:
os.remove(file_)
file_ = filename
filename, ext = op.splitext(file_)
processed = True
if tarfile.is_tarfile(file_):
with contextlib.closing(tarfile.open(file_, "r")) as tar:
tar.extractall(path=data_dir)
processed = True
if not processed:
raise IOError("[Uncompress] unknown archive file format: "
"%s" % file_)
if delete_archive:
os.remove(file_)
if verbose > 0:
print(' ...done.')
except Exception as e:
if verbose > 0:
print('Error uncompressing file: %s' % e)
raise
def _fetch_file(url, data_dir, resume=True, overwrite=False,
md5sum=None, username=None, passwd=None,
handlers=None, headers=None, cookies=None, verbose=1):
"""Load requested file, downloading it if needed or requested.
Parameters
----------
url: string
Contains the url of the file to be downloaded.
data_dir: string, optional
Path of the data directory. Used to force data storage in a specified
location. Default: None
resume: bool, optional
If true, try to resume partially downloaded files
overwrite: bool, optional
If true and file already exists, delete it.
md5sum: string, optional
MD5 sum of the file. Checked if download of the file is required
username: string, optional
Username used for HTTP authentication
passwd: string, optional
Password used for HTTP authentication
handlers: list of BaseHandler, optional
urllib handlers passed to urllib.request.build_opener. Used by
advanced users to customize request handling.
headers: dictionary, specifying headers
cookies: dictionary, specifying cookies
verbose: int, optional
verbosity level (0 means no message).
Returns
-------
files: string
Absolute path of downloaded file.
Notes
-----
If, for any reason, the download procedure fails, all downloaded files are
removed.
"""
if handlers is None:
handlers = []
if headers is None:
headers = dict(),
if cookies is None:
cookies = dict()
# Determine data path
if not op.exists(data_dir):
os.makedirs(data_dir)
# Determine filename using URL
parse = _urllib.parse.urlparse(url)
file_name = op.basename(parse.path)
if file_name == '':
file_name = md5_hash(parse.path)
temp_file_name = file_name + ".part"
full_name = op.join(data_dir, file_name)
temp_full_name = op.join(data_dir, temp_file_name)
if op.exists(full_name):
if overwrite:
os.remove(full_name)
else:
return full_name
if op.exists(temp_full_name):
if overwrite:
os.remove(temp_full_name)
t0 = time.time()
local_file = None
initial_size = 0
try:
# Download data
if username:
# Make sure we're secure, basic auth is unencrypted
if parse.scheme and parse.scheme != 'https':
raise ValueError("Specifying username currently requires using"
" a secure (https) URL (%s)." % url)
password_mgr = _urllib.request.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, username, passwd)
# Don't append, don't want to update caller's list with this!
handlers = ([_urllib.request.HTTPBasicAuthHandler(password_mgr)] +
handlers)
url_opener = _urllib.request.build_opener(*handlers)
# Prep the request (add headers, cookies)
request = _urllib.request.Request(url)
request.add_header('Connection', 'Keep-Alive')
if cookies:
if 'Cookie' in headers:
headers['Cookie'] += ';'
else:
headers['Cookie'] = ''
headers['Cookie'] += ';'.join(['%s=%s' % (k, v)
for k, v in cookies.items()])
for header_name, header_val in headers.items():
request.add_header(header_name, header_val)
if verbose > 0:
displayed_url = url.split('?')[0] if verbose == 1 else url
print('Downloading data from %s ...' % displayed_url)
if not resume or not op.exists(temp_full_name):
# Simple case: no resume
data = url_opener.open(request)
local_file = open(temp_full_name, "wb")
else:
# Complex case: download has been interrupted, we try to resume it.
local_file_size = op.getsize(temp_full_name)
# If the file exists, then only download the remainder
request.add_header("Range", "bytes=%s-" % (local_file_size))
try:
data = url_opener.open(request)
content_range = data.info().get('Content-Range')
if (content_range is None or not content_range.startswith(
'bytes %s-' % local_file_size)):
raise IOError('Server does not support resuming')
except Exception:
# A wide number of errors can be raised here. HTTPError,
# URLError... I prefer to catch them all and rerun without
# resuming.
if verbose > 0:
print('Resuming failed, try to download the whole file.')
return _fetch_file(
url, data_dir, resume=False, overwrite=overwrite,
md5sum=md5sum, username=username, passwd=passwd,
handlers=handlers, headers=headers, cookies=cookies,
verbose=verbose)
else:
local_file = open(temp_full_name, "ab")
initial_size = local_file_size
# Download the file.
_chunk_read_(data, local_file, report_hook=(verbose > 0),
initial_size=initial_size, verbose=verbose)
# temp file must be closed prior to the move
if not local_file.closed:
local_file.close()
shutil.move(temp_full_name, full_name)
dt = time.time() - t0
if verbose > 0:
print('...done. (%i seconds, %i min)' % (dt, dt // 60))
except _urllib.error.HTTPError as e:
if verbose > 0:
print("Error while fetching file %s. "
"Dataset fetching aborted." % (file_name))
if verbose > 1:
print("HTTP Error: %s, %s" % (e, url))
raise
except _urllib.error.URLError as e:
if verbose > 0:
print("Error while fetching file %s. "
"Dataset fetching aborted." % (file_name))
if verbose > 1:
print("URL Error: %s, %s" % (e, url))
raise
finally:
if local_file is not None and not local_file.closed:
local_file.close()
if md5sum is not None:
if (md5_sum_file(full_name) != md5sum):
raise ValueError("File %s checksum verification has failed."
" Dataset fetching aborted." % local_file)
return full_name
[docs]def fetch_files(data_dir, files, resume=True, force=False, verbose=1,
delete_archive=True):
"""Load requested dataset, downloading it if needed or requested.
This function retrieves files from the hard drive or download them from
the given urls. Note to developers: All the files will be first
downloaded in a sandbox and, if everything goes well, they will be moved
into the folder of the dataset. This prevents corrupting previously
downloaded data. In case of a big dataset, do not hesitate to make several
calls if needed.
Parameters
----------
dataset_name: string
Unique dataset name
files: list of (string, string, dict)
List of files and their corresponding url. The dictionary contains
options regarding the files. Options supported are 'uncompress' to
indicates that the file is an archive, 'md5sum' to check the md5 sum of
the file and 'move' if renaming the file or moving it to a subfolder is
needed.
data_dir: string, optional
Path of the data directory. Used to force data storage in a specified
location. Default: None
resume: bool, optional
If true, try resuming download if possible
mock: boolean, optional
If true, create empty files if the file cannot be downloaded. Test use
only.
verbose: int, optional
verbosity level (0 means no message).
Returns
-------
files: list of string
Absolute paths of downloaded files on disk
"""
# We may be in a global read-only repository. If so, we cannot
# download files.
if not os.access(data_dir, os.W_OK):
raise ValueError('Dataset files are missing but dataset'
' repository is read-only. Contact your data'
' administrator to solve the problem')
# Create destination dirs
if not op.exists(data_dir):
os.makedirs(data_dir)
files_ = []
for file_, url, opts in files:
# There are two working directories here:
# - data_dir is the destination directory of the dataset
# - temp_dir is a temporary directory dedicated to this fetching call.
# All files that must be downloaded will be in this directory. If a
# corrupted file is found, or a file is missing, this working
# directory will be deleted.
files_pickle = cPickle.dumps(url)
files_md5 = hashlib.md5(files_pickle).hexdigest()
temp_dir = op.join(data_dir, files_md5)
# 3 possibilities:
# - the file exists in data_dir, nothing to do.
# - the file does not exists: we download it in temp_dir
# - the file exists in temp_dir: this can happen if an archive has been
# downloaded. There is nothing to do
# Target file in the data_dir
target_file = op.join(data_dir, file_)
if force or not op.exists(target_file):
# if not op.exists(temp_target_dir):
# os.makedirs(temp_target_dir)
# Fetch the file, if it doesn't already exist.
fetched_file = _fetch_file(url, temp_dir,
resume=resume,
overwrite=force,
verbose=verbose,
md5sum=opts.get('md5sum'),
username=opts.get('username'),
passwd=opts.get('passwd'),
handlers=opts.get('handlers', []),
headers=opts.get('headers', dict()),
cookies=opts.get('cookies', dict()))
# First, uncompress.
if opts.get('uncompress'):
target_files = _uncompress_file(fetched_file, verbose=verbose,
delete_archive=False)
else:
target_files = [fetched_file]
if opts.get('move'):
raise NotImplementedError('Move options has been removed.')
# XXX: here, move is supposed to be a dir, it can be a name
move_dir = op.join(temp_dir, opts['move'])
if len(target_files) > 1:
target_files = [op.join(op.dirname(move_dir),
op.basename(f))
for f in target_files]
# Do the move
else:
if not op.exists(move_dir):
os.makedirs(move_dir)
shutil.move(fetched_file, move_dir)
target_files = [move_dir]
# Let's examine our work
if not op.exists(target_file):
if op.exists(fetched_file):
target_dir = op.dirname(target_file)
if not op.exists(target_dir):
os.makedirs(target_dir)
shutil.move(fetched_file, target_file)
else:
raise Exception("An error occurred while fetching %s; "
"the expected target file cannot be found."
" (%s)\nDebug info: %s" % (
file_, target_file,
{'fetched_file': fetched_file,
'target_files': target_files}))
if opts.get('uncompress') and delete_archive:
os.remove(fetched_file)
# If needed, move files from temps directory to final directory.
if op.exists(temp_dir):
# XXX We could only moved the files requested
# XXX Movetree can go wrong
movetree(temp_dir, data_dir)
shutil.rmtree(temp_dir)
files_.append(target_file)
return files_
[docs]def copytree(src, dst, symlinks=False, ignore=None):
import os
import shutil
import stat
if not op.exists(dst):
os.makedirs(dst)
shutil.copystat(src, dst)
lst = os.listdir(src)
if ignore:
excl = ignore(src, lst)
lst = [x for x in lst if x not in excl]
for item in lst:
s = op.join(src, item)
d = op.join(dst, item)
if symlinks and op.islink(s):
if op.lexists(d):
os.remove(d)
os.symlink(os.readlink(s), d)
try:
st = os.lstat(s)
mode = stat.S_IMODE(st.st_mode)
os.lchmod(d, mode)
except:
pass # lchmod not available
elif op.isdir(s):
copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
[docs]class HttpFetcher(Fetcher):
def __init__(self, data_dir=None, username=None, passwd=None):
super(HttpFetcher, self).__init__(data_dir=data_dir)
self.username = username
self.passwd = passwd
[docs] def fetch(self, files, force=False, resume=True, check=False, verbose=1,
delete_archive=True):
files = self.reformat_files(files) # allows flexibility
if self.username is not None:
for tgt, src, opts in files:
opts['username'] = opts.get('username', self.username)
opts['passwd'] = opts.get('passwd', self.username)
return fetch_files(self.data_dir, files, resume=resume, force=force,
verbose=verbose, delete_archive=delete_archive)