Module arvados.http_to_keep
Expand source code
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import division
from future import standard_library
standard_library.install_aliases()
import email.utils
import time
import datetime
import re
import arvados
import arvados.collection
import urllib.parse
import logging
import calendar
import urllib.parse
import pycurl
import dataclasses
import typing
from arvados._pycurlhelper import PyCurlHelper
logger = logging.getLogger('arvados.http_import')
def _my_formatdate(dt):
return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
localtime=False, usegmt=True)
def _my_parsedate(text):
parsed = email.utils.parsedate_tz(text)
if parsed:
if parsed[9]:
# Adjust to UTC
return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
else:
# TZ is zero or missing, assume UTC.
return datetime.datetime(*parsed[:6])
else:
return datetime.datetime(1970, 1, 1)
def _fresh_cache(url, properties, now):
pr = properties[url]
expires = None
logger.debug("Checking cache freshness for %s using %s", url, pr)
if "Cache-Control" in pr:
if re.match(r"immutable", pr["Cache-Control"]):
return True
g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
if g:
expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
if expires is None and "Expires" in pr:
expires = _my_parsedate(pr["Expires"])
if expires is None:
# Use a default cache time of 24 hours if upstream didn't set
# any cache headers, to reduce redundant downloads.
expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
if not expires:
return False
return (now < expires)
def _remember_headers(url, properties, headers, now):
properties.setdefault(url, {})
for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
if h in headers:
properties[url][h] = headers[h]
if "Date" not in headers:
properties[url]["Date"] = _my_formatdate(now)
@dataclasses.dataclass
class _Response:
status_code: int
headers: typing.Mapping[str, str]
class _Downloader(PyCurlHelper):
# Wait up to 60 seconds for connection
# How long it can be in "low bandwidth" state before it gives up
# Low bandwidth threshold is 32 KiB/s
DOWNLOADER_TIMEOUT = (60, 300, 32768)
def __init__(self, apiclient):
super(_Downloader, self).__init__(title_case_headers=True)
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.NOSIGNAL, 1)
self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
lambda *args, **kwargs: self._socket_open(*args, **kwargs))
self.target = None
self.apiclient = apiclient
def head(self, url):
get_headers = {'Accept': 'application/octet-stream'}
self._headers = {}
self.curl.setopt(pycurl.URL, url.encode('utf-8'))
self.curl.setopt(pycurl.HTTPHEADER, [
'{}: {}'.format(k,v) for k,v in get_headers.items()])
self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
self.curl.setopt(pycurl.NOBODY, True)
self.curl.setopt(pycurl.FOLLOWLOCATION, True)
self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
try:
self.curl.perform()
except Exception as e:
raise arvados.errors.HttpError(0, str(e))
finally:
if self._socket:
self._socket.close()
self._socket = None
return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
def download(self, url, headers):
self.count = 0
self.start = time.time()
self.checkpoint = self.start
self._headers = {}
self._first_chunk = True
self.collection = None
self.parsedurl = urllib.parse.urlparse(url)
get_headers = {'Accept': 'application/octet-stream'}
get_headers.update(headers)
self.curl.setopt(pycurl.URL, url.encode('utf-8'))
self.curl.setopt(pycurl.HTTPHEADER, [
'{}: {}'.format(k,v) for k,v in get_headers.items()])
self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
self.curl.setopt(pycurl.HTTPGET, True)
self.curl.setopt(pycurl.FOLLOWLOCATION, True)
self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
try:
self.curl.perform()
except Exception as e:
raise arvados.errors.HttpError(0, str(e))
finally:
if self._socket:
self._socket.close()
self._socket = None
return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
def headers_received(self):
self.collection = arvados.collection.Collection(api_client=self.apiclient)
if "Content-Length" in self._headers:
self.contentlength = int(self._headers["Content-Length"])
logger.info("File size is %s bytes", self.contentlength)
else:
self.contentlength = None
if self._headers.get("Content-Disposition"):
grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
self._headers["Content-Disposition"])
if grp.group(2):
self.name = grp.group(2)
else:
self.name = grp.group(4)
else:
self.name = self.parsedurl.path.split("/")[-1]
# Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
# perform() is done but we need to know the status before that
# so we have to parse the status line ourselves.
mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
code = int(mt.group(3))
if code == 200:
self.target = self.collection.open(self.name, "wb")
def body_write(self, chunk):
if self._first_chunk:
self.headers_received()
self._first_chunk = False
self.count += len(chunk)
self.target.write(chunk)
loopnow = time.time()
if (loopnow - self.checkpoint) < 20:
return
bps = self.count / (loopnow - self.start)
if self.contentlength is not None:
logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
((self.count * 100) / self.contentlength),
(bps / (1024.0*1024.0)),
((self.contentlength-self.count) // bps))
else:
logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
self.checkpoint = loopnow
def _changed(url, clean_url, properties, now, curldownloader):
req = curldownloader.head(url)
if req.status_code != 200:
# Sometimes endpoints are misconfigured and will deny HEAD but
# allow GET so instead of failing here, we'll try GET If-None-Match
return True
# previous version of this code used "ETag", now we are
# normalizing to "Etag", check for both.
etag = properties[url].get("Etag") or properties[url].get("ETag")
if url in properties:
del properties[url]
_remember_headers(clean_url, properties, req.headers, now)
if "Etag" in req.headers and etag == req.headers["Etag"]:
# Didn't change
return False
return True
def _etag_quote(etag):
# if it already has leading and trailing quotes, do nothing
if etag[0] == '"' and etag[-1] == '"':
return etag
else:
# Add quotes.
return '"' + etag + '"'
def http_to_keep(api, project_uuid, url,
utcnow=datetime.datetime.utcnow, varying_url_params="",
prefer_cached_downloads=False):
"""Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
Before downloading the URL, checks to see if the URL already
exists in Keep and applies HTTP caching policy, the
varying_url_params and prefer_cached_downloads flags in order to
decide whether to use the version in Keep or re-download it.
"""
logger.info("Checking Keep for %s", url)
varying_params = [s.strip() for s in varying_url_params.split(",")]
parsed = urllib.parse.urlparse(url)
query = [q for q in urllib.parse.parse_qsl(parsed.query)
if q[0] not in varying_params]
clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
urllib.parse.urlencode(query, safe="/"), parsed.fragment))
r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
if clean_url == url:
items = r1["items"]
else:
r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
items = r1["items"] + r2["items"]
now = utcnow()
etags = {}
curldownloader = _Downloader(api)
for item in items:
properties = item["properties"]
if clean_url in properties:
cache_url = clean_url
elif url in properties:
cache_url = url
else:
raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
# HTTP caching rules say we should use the cache
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
return (item["portable_data_hash"], next(iter(cr.keys())) )
if not _changed(cache_url, clean_url, properties, now, curldownloader):
# Etag didn't change, same content, just update headers
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
return (item["portable_data_hash"], next(iter(cr.keys())))
for etagstr in ("Etag", "ETag"):
if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
etags[properties[cache_url][etagstr]] = item
logger.debug("Found ETag values %s", etags)
properties = {}
headers = {}
if etags:
headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
logger.debug("Sending GET request with headers %s", headers)
logger.info("Beginning download of %s", url)
req = curldownloader.download(url, headers)
c = curldownloader.collection
if req.status_code not in (200, 304):
raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
if curldownloader.target is not None:
curldownloader.target.close()
_remember_headers(clean_url, properties, req.headers, now)
if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
item = etags[req.headers["Etag"]]
item["properties"].update(properties)
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
return (item["portable_data_hash"], list(cr.keys())[0])
logger.info("Download complete")
collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
# max length - space to add a timestamp used by ensure_unique_name
max_name_len = 254 - 28
if len(collectionname) > max_name_len:
over = len(collectionname) - max_name_len
split = int(max_name_len/2)
collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
return (c.portable_data_hash(), curldownloader.name)
Functions
def http_to_keep(api, project_uuid, url, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False)
-
Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
Before downloading the URL, checks to see if the URL already exists in Keep and applies HTTP caching policy, the varying_url_params and prefer_cached_downloads flags in order to decide whether to use the version in Keep or re-download it.
Expand source code
def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False): """Download a file over HTTP and upload it to keep, with HTTP headers as metadata. Before downloading the URL, checks to see if the URL already exists in Keep and applies HTTP caching policy, the varying_url_params and prefer_cached_downloads flags in order to decide whether to use the version in Keep or re-download it. """ logger.info("Checking Keep for %s", url) varying_params = [s.strip() for s in varying_url_params.split(",")] parsed = urllib.parse.urlparse(url) query = [q for q in urllib.parse.parse_qsl(parsed.query) if q[0] not in varying_params] clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, urllib.parse.urlencode(query, safe="/"), parsed.fragment)) r1 = api.collections().list(filters=[["properties", "exists", url]]).execute() if clean_url == url: items = r1["items"] else: r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute() items = r1["items"] + r2["items"] now = utcnow() etags = {} curldownloader = _Downloader(api) for item in items: properties = item["properties"] if clean_url in properties: cache_url = clean_url elif url in properties: cache_url = url else: raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"]) if prefer_cached_downloads or _fresh_cache(cache_url, properties, now): # HTTP caching rules say we should use the cache cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) return (item["portable_data_hash"], next(iter(cr.keys())) ) if not _changed(cache_url, clean_url, properties, now, curldownloader): # Etag didn't change, same content, just update headers api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute() cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) return (item["portable_data_hash"], next(iter(cr.keys()))) for etagstr in ("Etag", "ETag"): if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2: etags[properties[cache_url][etagstr]] = item logger.debug("Found ETag values %s", etags) properties = {} headers = {} if etags: headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()]) logger.debug("Sending GET request with headers %s", headers) logger.info("Beginning download of %s", url) req = curldownloader.download(url, headers) c = curldownloader.collection if req.status_code not in (200, 304): raise Exception("Failed to download '%s' got status %s " % (url, req.status_code)) if curldownloader.target is not None: curldownloader.target.close() _remember_headers(clean_url, properties, req.headers, now) if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags: item = etags[req.headers["Etag"]] item["properties"].update(properties) api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute() cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) return (item["portable_data_hash"], list(cr.keys())[0]) logger.info("Download complete") collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='') # max length - space to add a timestamp used by ensure_unique_name max_name_len = 254 - 28 if len(collectionname) > max_name_len: over = len(collectionname) - max_name_len split = int(max_name_len/2) collectionname = collectionname[0:split] + "…" + collectionname[split+over:] c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True) api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute() return (c.portable_data_hash(), curldownloader.name)