arvados.http_to_keep

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5from __future__ import division
  6from future import standard_library
  7standard_library.install_aliases()
  8
  9import email.utils
 10import time
 11import datetime
 12import re
 13import arvados
 14import arvados.collection
 15import urllib.parse
 16import logging
 17import calendar
 18import urllib.parse
 19import pycurl
 20import dataclasses
 21import typing
 22from arvados._pycurlhelper import PyCurlHelper
 23
 24logger = logging.getLogger('arvados.http_import')
 25
 26def _my_formatdate(dt):
 27    return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
 28                                  localtime=False, usegmt=True)
 29
 30def _my_parsedate(text):
 31    parsed = email.utils.parsedate_tz(text)
 32    if parsed:
 33        if parsed[9]:
 34            # Adjust to UTC
 35            return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
 36        else:
 37            # TZ is zero or missing, assume UTC.
 38            return datetime.datetime(*parsed[:6])
 39    else:
 40        return datetime.datetime(1970, 1, 1)
 41
 42def _fresh_cache(url, properties, now):
 43    pr = properties[url]
 44    expires = None
 45
 46    logger.debug("Checking cache freshness for %s using %s", url, pr)
 47
 48    if "Cache-Control" in pr:
 49        if re.match(r"immutable", pr["Cache-Control"]):
 50            return True
 51
 52        g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
 53        if g:
 54            expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
 55
 56    if expires is None and "Expires" in pr:
 57        expires = _my_parsedate(pr["Expires"])
 58
 59    if expires is None:
 60        # Use a default cache time of 24 hours if upstream didn't set
 61        # any cache headers, to reduce redundant downloads.
 62        expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
 63
 64    if not expires:
 65        return False
 66
 67    return (now < expires)
 68
 69def _remember_headers(url, properties, headers, now):
 70    properties.setdefault(url, {})
 71    for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
 72        if h in headers:
 73            properties[url][h] = headers[h]
 74    if "Date" not in headers:
 75        properties[url]["Date"] = _my_formatdate(now)
 76
 77@dataclasses.dataclass
 78class _Response:
 79    status_code: int
 80    headers: typing.Mapping[str, str]
 81
 82
 83class _Downloader(PyCurlHelper):
 84    # Wait up to 60 seconds for connection
 85    # How long it can be in "low bandwidth" state before it gives up
 86    # Low bandwidth threshold is 32 KiB/s
 87    DOWNLOADER_TIMEOUT = (60, 300, 32768)
 88
 89    def __init__(self, apiclient):
 90        super(_Downloader, self).__init__(title_case_headers=True)
 91        self.curl = pycurl.Curl()
 92        self.curl.setopt(pycurl.NOSIGNAL, 1)
 93        self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
 94                    lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 95        self.target = None
 96        self.apiclient = apiclient
 97
 98    def head(self, url):
 99        get_headers = {'Accept': 'application/octet-stream'}
100        self._headers = {}
101
102        self.curl.setopt(pycurl.URL, url.encode('utf-8'))
103        self.curl.setopt(pycurl.HTTPHEADER, [
104            '{}: {}'.format(k,v) for k,v in get_headers.items()])
105
106        self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
107        self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
108        self.curl.setopt(pycurl.NOBODY, True)
109        self.curl.setopt(pycurl.FOLLOWLOCATION, True)
110
111        self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
112
113        try:
114            self.curl.perform()
115        except Exception as e:
116            raise arvados.errors.HttpError(0, str(e))
117        finally:
118            if self._socket:
119                self._socket.close()
120                self._socket = None
121
122        return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
123
124    def download(self, url, headers):
125        self.count = 0
126        self.start = time.time()
127        self.checkpoint = self.start
128        self._headers = {}
129        self._first_chunk = True
130        self.collection = None
131        self.parsedurl = urllib.parse.urlparse(url)
132
133        get_headers = {'Accept': 'application/octet-stream'}
134        get_headers.update(headers)
135
136        self.curl.setopt(pycurl.URL, url.encode('utf-8'))
137        self.curl.setopt(pycurl.HTTPHEADER, [
138            '{}: {}'.format(k,v) for k,v in get_headers.items()])
139
140        self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
141        self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
142
143        self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
144        self.curl.setopt(pycurl.HTTPGET, True)
145        self.curl.setopt(pycurl.FOLLOWLOCATION, True)
146
147        self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
148
149        try:
150            self.curl.perform()
151        except Exception as e:
152            raise arvados.errors.HttpError(0, str(e))
153        finally:
154            if self._socket:
155                self._socket.close()
156                self._socket = None
157
158        return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
159
160    def headers_received(self):
161        self.collection = arvados.collection.Collection(api_client=self.apiclient)
162
163        if "Content-Length" in self._headers:
164            self.contentlength = int(self._headers["Content-Length"])
165            logger.info("File size is %s bytes", self.contentlength)
166        else:
167            self.contentlength = None
168
169        if self._headers.get("Content-Disposition"):
170            grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
171                            self._headers["Content-Disposition"])
172            if grp.group(2):
173                self.name = grp.group(2)
174            else:
175                self.name = grp.group(4)
176        else:
177            self.name = self.parsedurl.path.split("/")[-1]
178
179        # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
180        # perform() is done but we need to know the status before that
181        # so we have to parse the status line ourselves.
182        mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
183        code = int(mt.group(3))
184
185        if not self.name:
186            logger.error("Cannot determine filename from URL or headers")
187            return
188
189        if code == 200:
190            self.target = self.collection.open(self.name, "wb")
191
192    def body_write(self, chunk):
193        if self._first_chunk:
194            self.headers_received()
195            self._first_chunk = False
196
197        self.count += len(chunk)
198
199        if self.target is None:
200            # "If this number is not equal to the size of the byte
201            # string, this signifies an error and libcurl will abort
202            # the request."
203            return 0
204
205        self.target.write(chunk)
206        loopnow = time.time()
207        if (loopnow - self.checkpoint) < 20:
208            return
209
210        bps = self.count / (loopnow - self.start)
211        if self.contentlength is not None:
212            logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
213                        ((self.count * 100) / self.contentlength),
214                        (bps / (1024.0*1024.0)),
215                        ((self.contentlength-self.count) // bps))
216        else:
217            logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
218        self.checkpoint = loopnow
219
220
221def _changed(url, clean_url, properties, now, curldownloader):
222    req = curldownloader.head(url)
223
224    if req.status_code != 200:
225        # Sometimes endpoints are misconfigured and will deny HEAD but
226        # allow GET so instead of failing here, we'll try GET If-None-Match
227        return True
228
229    # previous version of this code used "ETag", now we are
230    # normalizing to "Etag", check for both.
231    etag = properties[url].get("Etag") or properties[url].get("ETag")
232
233    if url in properties:
234        del properties[url]
235    _remember_headers(clean_url, properties, req.headers, now)
236
237    if "Etag" in req.headers and etag == req.headers["Etag"]:
238        # Didn't change
239        return False
240
241    return True
242
243def _etag_quote(etag):
244    # if it already has leading and trailing quotes, do nothing
245    if etag[0] == '"' and etag[-1] == '"':
246        return etag
247    else:
248        # Add quotes.
249        return '"' + etag + '"'
250
251
252def check_cached_url(api, project_uuid, url, etags,
253                     utcnow=datetime.datetime.utcnow,
254                     varying_url_params="",
255                     prefer_cached_downloads=False):
256
257    logger.info("Checking Keep for %s", url)
258
259    varying_params = [s.strip() for s in varying_url_params.split(",")]
260
261    parsed = urllib.parse.urlparse(url)
262    query = [q for q in urllib.parse.parse_qsl(parsed.query)
263             if q[0] not in varying_params]
264
265    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
266                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
267
268    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
269
270    if clean_url == url:
271        items = r1["items"]
272    else:
273        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
274        items = r1["items"] + r2["items"]
275
276    now = utcnow()
277
278    curldownloader = _Downloader(api)
279
280    for item in items:
281        properties = item["properties"]
282
283        if clean_url in properties:
284            cache_url = clean_url
285        elif url in properties:
286            cache_url = url
287        else:
288            raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
289
290        if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
291            # HTTP caching rules say we should use the cache
292            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
293            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
294
295        if not _changed(cache_url, clean_url, properties, now, curldownloader):
296            # Etag didn't change, same content, just update headers
297            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
298            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
299            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
300
301        for etagstr in ("Etag", "ETag"):
302            if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
303                etags[properties[cache_url][etagstr]] = item
304
305    logger.debug("Found ETag values %s", etags)
306
307    return (None, None, None, clean_url, now)
308
309
310def http_to_keep(api, project_uuid, url,
311                 utcnow=datetime.datetime.utcnow, varying_url_params="",
312                 prefer_cached_downloads=False):
313    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
314
315    Before downloading the URL, checks to see if the URL already
316    exists in Keep and applies HTTP caching policy, the
317    varying_url_params and prefer_cached_downloads flags in order to
318    decide whether to use the version in Keep or re-download it.
319    """
320
321    etags = {}
322    cache_result = check_cached_url(api, project_uuid, url, etags,
323                                    utcnow, varying_url_params,
324                                    prefer_cached_downloads)
325
326    if cache_result[0] is not None:
327        return cache_result
328
329    clean_url = cache_result[3]
330    now = cache_result[4]
331
332    properties = {}
333    headers = {}
334    if etags:
335        headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
336    logger.debug("Sending GET request with headers %s", headers)
337
338    logger.info("Beginning download of %s", url)
339
340    curldownloader = _Downloader(api)
341
342    req = curldownloader.download(url, headers)
343
344    c = curldownloader.collection
345
346    if req.status_code not in (200, 304):
347        raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
348
349    if curldownloader.target is not None:
350        curldownloader.target.close()
351
352    _remember_headers(clean_url, properties, req.headers, now)
353
354    if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
355        item = etags[req.headers["Etag"]]
356        item["properties"].update(properties)
357        api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
358        cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
359        return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
360
361    logger.info("Download complete")
362
363    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
364
365    # max length - space to add a timestamp used by ensure_unique_name
366    max_name_len = 254 - 28
367
368    if len(collectionname) > max_name_len:
369        over = len(collectionname) - max_name_len
370        split = int(max_name_len/2)
371        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
372
373    c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
374
375    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
376
377    return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)
logger = <Logger arvados.http_import (WARNING)>
def check_cached_url( api, project_uuid, url, etags, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False):
253def check_cached_url(api, project_uuid, url, etags,
254                     utcnow=datetime.datetime.utcnow,
255                     varying_url_params="",
256                     prefer_cached_downloads=False):
257
258    logger.info("Checking Keep for %s", url)
259
260    varying_params = [s.strip() for s in varying_url_params.split(",")]
261
262    parsed = urllib.parse.urlparse(url)
263    query = [q for q in urllib.parse.parse_qsl(parsed.query)
264             if q[0] not in varying_params]
265
266    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
267                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
268
269    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
270
271    if clean_url == url:
272        items = r1["items"]
273    else:
274        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
275        items = r1["items"] + r2["items"]
276
277    now = utcnow()
278
279    curldownloader = _Downloader(api)
280
281    for item in items:
282        properties = item["properties"]
283
284        if clean_url in properties:
285            cache_url = clean_url
286        elif url in properties:
287            cache_url = url
288        else:
289            raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
290
291        if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
292            # HTTP caching rules say we should use the cache
293            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
294            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
295
296        if not _changed(cache_url, clean_url, properties, now, curldownloader):
297            # Etag didn't change, same content, just update headers
298            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
299            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
300            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
301
302        for etagstr in ("Etag", "ETag"):
303            if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
304                etags[properties[cache_url][etagstr]] = item
305
306    logger.debug("Found ETag values %s", etags)
307
308    return (None, None, None, clean_url, now)
def http_to_keep( api, project_uuid, url, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False):
311def http_to_keep(api, project_uuid, url,
312                 utcnow=datetime.datetime.utcnow, varying_url_params="",
313                 prefer_cached_downloads=False):
314    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
315
316    Before downloading the URL, checks to see if the URL already
317    exists in Keep and applies HTTP caching policy, the
318    varying_url_params and prefer_cached_downloads flags in order to
319    decide whether to use the version in Keep or re-download it.
320    """
321
322    etags = {}
323    cache_result = check_cached_url(api, project_uuid, url, etags,
324                                    utcnow, varying_url_params,
325                                    prefer_cached_downloads)
326
327    if cache_result[0] is not None:
328        return cache_result
329
330    clean_url = cache_result[3]
331    now = cache_result[4]
332
333    properties = {}
334    headers = {}
335    if etags:
336        headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
337    logger.debug("Sending GET request with headers %s", headers)
338
339    logger.info("Beginning download of %s", url)
340
341    curldownloader = _Downloader(api)
342
343    req = curldownloader.download(url, headers)
344
345    c = curldownloader.collection
346
347    if req.status_code not in (200, 304):
348        raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
349
350    if curldownloader.target is not None:
351        curldownloader.target.close()
352
353    _remember_headers(clean_url, properties, req.headers, now)
354
355    if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
356        item = etags[req.headers["Etag"]]
357        item["properties"].update(properties)
358        api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
359        cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
360        return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
361
362    logger.info("Download complete")
363
364    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
365
366    # max length - space to add a timestamp used by ensure_unique_name
367    max_name_len = 254 - 28
368
369    if len(collectionname) > max_name_len:
370        over = len(collectionname) - max_name_len
371        split = int(max_name_len/2)
372        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
373
374    c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
375
376    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
377
378    return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)

Download a file over HTTP and upload it to keep, with HTTP headers as metadata.

Before downloading the URL, checks to see if the URL already exists in Keep and applies HTTP caching policy, the varying_url_params and prefer_cached_downloads flags in order to decide whether to use the version in Keep or re-download it.