arvados.http_to_keep

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5import calendar
  6import dataclasses
  7import datetime
  8import email.utils
  9import logging
 10import re
 11import time
 12import typing
 13import urllib.parse
 14
 15import pycurl
 16
 17import arvados
 18import arvados.collection
 19from arvados._pycurlhelper import PyCurlHelper
 20
 21logger = logging.getLogger('arvados.http_import')
 22
 23def _my_formatdate(dt):
 24    return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
 25                                  localtime=False, usegmt=True)
 26
 27def _my_parsedate(text):
 28    parsed = email.utils.parsedate_tz(text)
 29    if parsed:
 30        if parsed[9]:
 31            # Adjust to UTC
 32            return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
 33        else:
 34            # TZ is zero or missing, assume UTC.
 35            return datetime.datetime(*parsed[:6])
 36    else:
 37        return datetime.datetime(1970, 1, 1)
 38
 39def _fresh_cache(url, properties, now):
 40    pr = properties[url]
 41    expires = None
 42
 43    logger.debug("Checking cache freshness for %s using %s", url, pr)
 44
 45    if "Cache-Control" in pr:
 46        if re.match(r"immutable", pr["Cache-Control"]):
 47            return True
 48
 49        g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
 50        if g:
 51            expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
 52
 53    if expires is None and "Expires" in pr:
 54        expires = _my_parsedate(pr["Expires"])
 55
 56    if expires is None:
 57        # Use a default cache time of 24 hours if upstream didn't set
 58        # any cache headers, to reduce redundant downloads.
 59        expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
 60
 61    if not expires:
 62        return False
 63
 64    return (now < expires)
 65
 66def _remember_headers(url, properties, headers, now):
 67    properties.setdefault(url, {})
 68    for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"):
 69        if h in headers:
 70            properties[url][h] = headers[h]
 71    if "Date" not in headers:
 72        properties[url]["Date"] = _my_formatdate(now)
 73
 74@dataclasses.dataclass
 75class _Response:
 76    status_code: int
 77    headers: typing.Mapping[str, str]
 78
 79
 80class _Downloader(PyCurlHelper):
 81    # Wait up to 60 seconds for connection
 82    # How long it can be in "low bandwidth" state before it gives up
 83    # Low bandwidth threshold is 32 KiB/s
 84    DOWNLOADER_TIMEOUT = (60, 300, 32768)
 85
 86    def __init__(self, apiclient):
 87        super(_Downloader, self).__init__(title_case_headers=True)
 88        self.curl = pycurl.Curl()
 89        self.curl.setopt(pycurl.NOSIGNAL, 1)
 90        self.curl.setopt(pycurl.OPENSOCKETFUNCTION,
 91                    lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 92        self.target = None
 93        self.apiclient = apiclient
 94
 95    def head(self, url):
 96        get_headers = {'Accept': 'application/octet-stream'}
 97        self._headers = {}
 98
 99        self.curl.setopt(pycurl.URL, url.encode('utf-8'))
100        self.curl.setopt(pycurl.HTTPHEADER, [
101            '{}: {}'.format(k,v) for k,v in get_headers.items()])
102
103        self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
104        self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
105        self.curl.setopt(pycurl.NOBODY, True)
106        self.curl.setopt(pycurl.FOLLOWLOCATION, True)
107
108        self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True)
109
110        try:
111            self.curl.perform()
112        except Exception as e:
113            raise arvados.errors.HttpError(0, str(e))
114        finally:
115            if self._socket:
116                self._socket.close()
117                self._socket = None
118
119        return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
120
121    def download(self, url, headers):
122        self.count = 0
123        self.start = time.time()
124        self.checkpoint = self.start
125        self._headers = {}
126        self._first_chunk = True
127        self.collection = None
128        self.parsedurl = urllib.parse.urlparse(url)
129
130        get_headers = {'Accept': 'application/octet-stream'}
131        get_headers.update(headers)
132
133        self.curl.setopt(pycurl.URL, url.encode('utf-8'))
134        self.curl.setopt(pycurl.HTTPHEADER, [
135            '{}: {}'.format(k,v) for k,v in get_headers.items()])
136
137        self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write)
138        self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
139
140        self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
141        self.curl.setopt(pycurl.HTTPGET, True)
142        self.curl.setopt(pycurl.FOLLOWLOCATION, True)
143
144        self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False)
145
146        try:
147            self.curl.perform()
148        except Exception as e:
149            raise arvados.errors.HttpError(0, str(e))
150        finally:
151            if self._socket:
152                self._socket.close()
153                self._socket = None
154
155        return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers)
156
157    def headers_received(self):
158        self.collection = arvados.collection.Collection(api_client=self.apiclient)
159
160        if "Content-Length" in self._headers:
161            self.contentlength = int(self._headers["Content-Length"])
162            logger.info("File size is %s bytes", self.contentlength)
163        else:
164            self.contentlength = None
165
166        if self._headers.get("Content-Disposition"):
167            grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))',
168                            self._headers["Content-Disposition"])
169            if grp.group(2):
170                self.name = grp.group(2)
171            else:
172                self.name = grp.group(4)
173        else:
174            self.name = self.parsedurl.path.split("/")[-1]
175
176        # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until
177        # perform() is done but we need to know the status before that
178        # so we have to parse the status line ourselves.
179        mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"])
180        code = int(mt.group(3))
181
182        if not self.name:
183            logger.error("Cannot determine filename from URL or headers")
184            return
185
186        if code == 200:
187            self.target = self.collection.open(self.name, "wb")
188
189    def body_write(self, chunk):
190        if self._first_chunk:
191            self.headers_received()
192            self._first_chunk = False
193
194        self.count += len(chunk)
195
196        if self.target is None:
197            # "If this number is not equal to the size of the byte
198            # string, this signifies an error and libcurl will abort
199            # the request."
200            return 0
201
202        self.target.write(chunk)
203        loopnow = time.time()
204        if (loopnow - self.checkpoint) < 20:
205            return
206
207        bps = self.count / (loopnow - self.start)
208        if self.contentlength is not None:
209            logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left",
210                        ((self.count * 100) / self.contentlength),
211                        (bps / (1024.0*1024.0)),
212                        ((self.contentlength-self.count) // bps))
213        else:
214            logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0)))
215        self.checkpoint = loopnow
216
217
218def _changed(url, clean_url, properties, now, curldownloader):
219    req = curldownloader.head(url)
220
221    if req.status_code != 200:
222        # Sometimes endpoints are misconfigured and will deny HEAD but
223        # allow GET so instead of failing here, we'll try GET If-None-Match
224        return True
225
226    # previous version of this code used "ETag", now we are
227    # normalizing to "Etag", check for both.
228    etag = properties[url].get("Etag") or properties[url].get("ETag")
229
230    if url in properties:
231        del properties[url]
232    _remember_headers(clean_url, properties, req.headers, now)
233
234    if "Etag" in req.headers and etag == req.headers["Etag"]:
235        # Didn't change
236        return False
237
238    return True
239
240def _etag_quote(etag):
241    # if it already has leading and trailing quotes, do nothing
242    if etag[0] == '"' and etag[-1] == '"':
243        return etag
244    else:
245        # Add quotes.
246        return '"' + etag + '"'
247
248
249def check_cached_url(api, project_uuid, url, etags,
250                     utcnow=datetime.datetime.utcnow,
251                     varying_url_params="",
252                     prefer_cached_downloads=False):
253
254    logger.info("Checking Keep for %s", url)
255
256    varying_params = [s.strip() for s in varying_url_params.split(",")]
257
258    parsed = urllib.parse.urlparse(url)
259    query = [q for q in urllib.parse.parse_qsl(parsed.query)
260             if q[0] not in varying_params]
261
262    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
263                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
264
265    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
266
267    if clean_url == url:
268        items = r1["items"]
269    else:
270        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
271        items = r1["items"] + r2["items"]
272
273    now = utcnow()
274
275    curldownloader = _Downloader(api)
276
277    for item in items:
278        properties = item["properties"]
279
280        if clean_url in properties:
281            cache_url = clean_url
282        elif url in properties:
283            cache_url = url
284        else:
285            raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
286
287        if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
288            # HTTP caching rules say we should use the cache
289            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
290            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
291
292        if not _changed(cache_url, clean_url, properties, now, curldownloader):
293            # Etag didn't change, same content, just update headers
294            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
295            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
296            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
297
298        for etagstr in ("Etag", "ETag"):
299            if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
300                etags[properties[cache_url][etagstr]] = item
301
302    logger.debug("Found ETag values %s", etags)
303
304    return (None, None, None, clean_url, now)
305
306
307def http_to_keep(api, project_uuid, url,
308                 utcnow=datetime.datetime.utcnow, varying_url_params="",
309                 prefer_cached_downloads=False):
310    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
311
312    Before downloading the URL, checks to see if the URL already
313    exists in Keep and applies HTTP caching policy, the
314    varying_url_params and prefer_cached_downloads flags in order to
315    decide whether to use the version in Keep or re-download it.
316    """
317
318    etags = {}
319    cache_result = check_cached_url(api, project_uuid, url, etags,
320                                    utcnow, varying_url_params,
321                                    prefer_cached_downloads)
322
323    if cache_result[0] is not None:
324        return cache_result
325
326    clean_url = cache_result[3]
327    now = cache_result[4]
328
329    properties = {}
330    headers = {}
331    if etags:
332        headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
333    logger.debug("Sending GET request with headers %s", headers)
334
335    logger.info("Beginning download of %s", url)
336
337    curldownloader = _Downloader(api)
338
339    req = curldownloader.download(url, headers)
340
341    c = curldownloader.collection
342
343    if req.status_code not in (200, 304):
344        raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
345
346    if curldownloader.target is not None:
347        curldownloader.target.close()
348
349    _remember_headers(clean_url, properties, req.headers, now)
350
351    if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
352        item = etags[req.headers["Etag"]]
353        item["properties"].update(properties)
354        api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
355        cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
356        return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
357
358    logger.info("Download complete")
359
360    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
361
362    # max length - space to add a timestamp used by ensure_unique_name
363    max_name_len = 254 - 28
364
365    if len(collectionname) > max_name_len:
366        over = len(collectionname) - max_name_len
367        split = int(max_name_len/2)
368        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
369
370    c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
371
372    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
373
374    return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)
logger = <Logger arvados.http_import (WARNING)>
def check_cached_url( api, project_uuid, url, etags, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False):
250def check_cached_url(api, project_uuid, url, etags,
251                     utcnow=datetime.datetime.utcnow,
252                     varying_url_params="",
253                     prefer_cached_downloads=False):
254
255    logger.info("Checking Keep for %s", url)
256
257    varying_params = [s.strip() for s in varying_url_params.split(",")]
258
259    parsed = urllib.parse.urlparse(url)
260    query = [q for q in urllib.parse.parse_qsl(parsed.query)
261             if q[0] not in varying_params]
262
263    clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
264                                         urllib.parse.urlencode(query, safe="/"),  parsed.fragment))
265
266    r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
267
268    if clean_url == url:
269        items = r1["items"]
270    else:
271        r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
272        items = r1["items"] + r2["items"]
273
274    now = utcnow()
275
276    curldownloader = _Downloader(api)
277
278    for item in items:
279        properties = item["properties"]
280
281        if clean_url in properties:
282            cache_url = clean_url
283        elif url in properties:
284            cache_url = url
285        else:
286            raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"])
287
288        if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
289            # HTTP caching rules say we should use the cache
290            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
291            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
292
293        if not _changed(cache_url, clean_url, properties, now, curldownloader):
294            # Etag didn't change, same content, just update headers
295            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
296            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
297            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
298
299        for etagstr in ("Etag", "ETag"):
300            if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
301                etags[properties[cache_url][etagstr]] = item
302
303    logger.debug("Found ETag values %s", etags)
304
305    return (None, None, None, clean_url, now)
def http_to_keep( api, project_uuid, url, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False):
308def http_to_keep(api, project_uuid, url,
309                 utcnow=datetime.datetime.utcnow, varying_url_params="",
310                 prefer_cached_downloads=False):
311    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
312
313    Before downloading the URL, checks to see if the URL already
314    exists in Keep and applies HTTP caching policy, the
315    varying_url_params and prefer_cached_downloads flags in order to
316    decide whether to use the version in Keep or re-download it.
317    """
318
319    etags = {}
320    cache_result = check_cached_url(api, project_uuid, url, etags,
321                                    utcnow, varying_url_params,
322                                    prefer_cached_downloads)
323
324    if cache_result[0] is not None:
325        return cache_result
326
327    clean_url = cache_result[3]
328    now = cache_result[4]
329
330    properties = {}
331    headers = {}
332    if etags:
333        headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()])
334    logger.debug("Sending GET request with headers %s", headers)
335
336    logger.info("Beginning download of %s", url)
337
338    curldownloader = _Downloader(api)
339
340    req = curldownloader.download(url, headers)
341
342    c = curldownloader.collection
343
344    if req.status_code not in (200, 304):
345        raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
346
347    if curldownloader.target is not None:
348        curldownloader.target.close()
349
350    _remember_headers(clean_url, properties, req.headers, now)
351
352    if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags:
353        item = etags[req.headers["Etag"]]
354        item["properties"].update(properties)
355        api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute()
356        cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
357        return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now)
358
359    logger.info("Download complete")
360
361    collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='')
362
363    # max length - space to add a timestamp used by ensure_unique_name
364    max_name_len = 254 - 28
365
366    if len(collectionname) > max_name_len:
367        over = len(collectionname) - max_name_len
368        split = int(max_name_len/2)
369        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
370
371    c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True)
372
373    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
374
375    return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)

Download a file over HTTP and upload it to keep, with HTTP headers as metadata.

Before downloading the URL, checks to see if the URL already exists in Keep and applies HTTP caching policy, the varying_url_params and prefer_cached_downloads flags in order to decide whether to use the version in Keep or re-download it.