arvados.http_to_keep
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import division 6from future import standard_library 7standard_library.install_aliases() 8 9import email.utils 10import time 11import datetime 12import re 13import arvados 14import arvados.collection 15import urllib.parse 16import logging 17import calendar 18import urllib.parse 19import pycurl 20import dataclasses 21import typing 22from arvados._pycurlhelper import PyCurlHelper 23 24logger = logging.getLogger('arvados.http_import') 25 26def _my_formatdate(dt): 27 return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()), 28 localtime=False, usegmt=True) 29 30def _my_parsedate(text): 31 parsed = email.utils.parsedate_tz(text) 32 if parsed: 33 if parsed[9]: 34 # Adjust to UTC 35 return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9]) 36 else: 37 # TZ is zero or missing, assume UTC. 38 return datetime.datetime(*parsed[:6]) 39 else: 40 return datetime.datetime(1970, 1, 1) 41 42def _fresh_cache(url, properties, now): 43 pr = properties[url] 44 expires = None 45 46 logger.debug("Checking cache freshness for %s using %s", url, pr) 47 48 if "Cache-Control" in pr: 49 if re.match(r"immutable", pr["Cache-Control"]): 50 return True 51 52 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"]) 53 if g: 54 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2))) 55 56 if expires is None and "Expires" in pr: 57 expires = _my_parsedate(pr["Expires"]) 58 59 if expires is None: 60 # Use a default cache time of 24 hours if upstream didn't set 61 # any cache headers, to reduce redundant downloads. 62 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24) 63 64 if not expires: 65 return False 66 67 return (now < expires) 68 69def _remember_headers(url, properties, headers, now): 70 properties.setdefault(url, {}) 71 for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"): 72 if h in headers: 73 properties[url][h] = headers[h] 74 if "Date" not in headers: 75 properties[url]["Date"] = _my_formatdate(now) 76 77@dataclasses.dataclass 78class _Response: 79 status_code: int 80 headers: typing.Mapping[str, str] 81 82 83class _Downloader(PyCurlHelper): 84 # Wait up to 60 seconds for connection 85 # How long it can be in "low bandwidth" state before it gives up 86 # Low bandwidth threshold is 32 KiB/s 87 DOWNLOADER_TIMEOUT = (60, 300, 32768) 88 89 def __init__(self, apiclient): 90 super(_Downloader, self).__init__(title_case_headers=True) 91 self.curl = pycurl.Curl() 92 self.curl.setopt(pycurl.NOSIGNAL, 1) 93 self.curl.setopt(pycurl.OPENSOCKETFUNCTION, 94 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 95 self.target = None 96 self.apiclient = apiclient 97 98 def head(self, url): 99 get_headers = {'Accept': 'application/octet-stream'} 100 self._headers = {} 101 102 self.curl.setopt(pycurl.URL, url.encode('utf-8')) 103 self.curl.setopt(pycurl.HTTPHEADER, [ 104 '{}: {}'.format(k,v) for k,v in get_headers.items()]) 105 106 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 107 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 108 self.curl.setopt(pycurl.NOBODY, True) 109 self.curl.setopt(pycurl.FOLLOWLOCATION, True) 110 111 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True) 112 113 try: 114 self.curl.perform() 115 except Exception as e: 116 raise arvados.errors.HttpError(0, str(e)) 117 finally: 118 if self._socket: 119 self._socket.close() 120 self._socket = None 121 122 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers) 123 124 def download(self, url, headers): 125 self.count = 0 126 self.start = time.time() 127 self.checkpoint = self.start 128 self._headers = {} 129 self._first_chunk = True 130 self.collection = None 131 self.parsedurl = urllib.parse.urlparse(url) 132 133 get_headers = {'Accept': 'application/octet-stream'} 134 get_headers.update(headers) 135 136 self.curl.setopt(pycurl.URL, url.encode('utf-8')) 137 self.curl.setopt(pycurl.HTTPHEADER, [ 138 '{}: {}'.format(k,v) for k,v in get_headers.items()]) 139 140 self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write) 141 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 142 143 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 144 self.curl.setopt(pycurl.HTTPGET, True) 145 self.curl.setopt(pycurl.FOLLOWLOCATION, True) 146 147 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False) 148 149 try: 150 self.curl.perform() 151 except Exception as e: 152 raise arvados.errors.HttpError(0, str(e)) 153 finally: 154 if self._socket: 155 self._socket.close() 156 self._socket = None 157 158 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers) 159 160 def headers_received(self): 161 self.collection = arvados.collection.Collection(api_client=self.apiclient) 162 163 if "Content-Length" in self._headers: 164 self.contentlength = int(self._headers["Content-Length"]) 165 logger.info("File size is %s bytes", self.contentlength) 166 else: 167 self.contentlength = None 168 169 if self._headers.get("Content-Disposition"): 170 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', 171 self._headers["Content-Disposition"]) 172 if grp.group(2): 173 self.name = grp.group(2) 174 else: 175 self.name = grp.group(4) 176 else: 177 self.name = self.parsedurl.path.split("/")[-1] 178 179 # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until 180 # perform() is done but we need to know the status before that 181 # so we have to parse the status line ourselves. 182 mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"]) 183 code = int(mt.group(3)) 184 185 if not self.name: 186 logger.error("Cannot determine filename from URL or headers") 187 return 188 189 if code == 200: 190 self.target = self.collection.open(self.name, "wb") 191 192 def body_write(self, chunk): 193 if self._first_chunk: 194 self.headers_received() 195 self._first_chunk = False 196 197 self.count += len(chunk) 198 199 if self.target is None: 200 # "If this number is not equal to the size of the byte 201 # string, this signifies an error and libcurl will abort 202 # the request." 203 return 0 204 205 self.target.write(chunk) 206 loopnow = time.time() 207 if (loopnow - self.checkpoint) < 20: 208 return 209 210 bps = self.count / (loopnow - self.start) 211 if self.contentlength is not None: 212 logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left", 213 ((self.count * 100) / self.contentlength), 214 (bps / (1024.0*1024.0)), 215 ((self.contentlength-self.count) // bps)) 216 else: 217 logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0))) 218 self.checkpoint = loopnow 219 220 221def _changed(url, clean_url, properties, now, curldownloader): 222 req = curldownloader.head(url) 223 224 if req.status_code != 200: 225 # Sometimes endpoints are misconfigured and will deny HEAD but 226 # allow GET so instead of failing here, we'll try GET If-None-Match 227 return True 228 229 # previous version of this code used "ETag", now we are 230 # normalizing to "Etag", check for both. 231 etag = properties[url].get("Etag") or properties[url].get("ETag") 232 233 if url in properties: 234 del properties[url] 235 _remember_headers(clean_url, properties, req.headers, now) 236 237 if "Etag" in req.headers and etag == req.headers["Etag"]: 238 # Didn't change 239 return False 240 241 return True 242 243def _etag_quote(etag): 244 # if it already has leading and trailing quotes, do nothing 245 if etag[0] == '"' and etag[-1] == '"': 246 return etag 247 else: 248 # Add quotes. 249 return '"' + etag + '"' 250 251 252def check_cached_url(api, project_uuid, url, etags, 253 utcnow=datetime.datetime.utcnow, 254 varying_url_params="", 255 prefer_cached_downloads=False): 256 257 logger.info("Checking Keep for %s", url) 258 259 varying_params = [s.strip() for s in varying_url_params.split(",")] 260 261 parsed = urllib.parse.urlparse(url) 262 query = [q for q in urllib.parse.parse_qsl(parsed.query) 263 if q[0] not in varying_params] 264 265 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, 266 urllib.parse.urlencode(query, safe="/"), parsed.fragment)) 267 268 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute() 269 270 if clean_url == url: 271 items = r1["items"] 272 else: 273 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute() 274 items = r1["items"] + r2["items"] 275 276 now = utcnow() 277 278 curldownloader = _Downloader(api) 279 280 for item in items: 281 properties = item["properties"] 282 283 if clean_url in properties: 284 cache_url = clean_url 285 elif url in properties: 286 cache_url = url 287 else: 288 raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"]) 289 290 if prefer_cached_downloads or _fresh_cache(cache_url, properties, now): 291 # HTTP caching rules say we should use the cache 292 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 293 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now) 294 295 if not _changed(cache_url, clean_url, properties, now, curldownloader): 296 # Etag didn't change, same content, just update headers 297 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute() 298 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 299 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now) 300 301 for etagstr in ("Etag", "ETag"): 302 if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2: 303 etags[properties[cache_url][etagstr]] = item 304 305 logger.debug("Found ETag values %s", etags) 306 307 return (None, None, None, clean_url, now) 308 309 310def http_to_keep(api, project_uuid, url, 311 utcnow=datetime.datetime.utcnow, varying_url_params="", 312 prefer_cached_downloads=False): 313 """Download a file over HTTP and upload it to keep, with HTTP headers as metadata. 314 315 Before downloading the URL, checks to see if the URL already 316 exists in Keep and applies HTTP caching policy, the 317 varying_url_params and prefer_cached_downloads flags in order to 318 decide whether to use the version in Keep or re-download it. 319 """ 320 321 etags = {} 322 cache_result = check_cached_url(api, project_uuid, url, etags, 323 utcnow, varying_url_params, 324 prefer_cached_downloads) 325 326 if cache_result[0] is not None: 327 return cache_result 328 329 clean_url = cache_result[3] 330 now = cache_result[4] 331 332 properties = {} 333 headers = {} 334 if etags: 335 headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()]) 336 logger.debug("Sending GET request with headers %s", headers) 337 338 logger.info("Beginning download of %s", url) 339 340 curldownloader = _Downloader(api) 341 342 req = curldownloader.download(url, headers) 343 344 c = curldownloader.collection 345 346 if req.status_code not in (200, 304): 347 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code)) 348 349 if curldownloader.target is not None: 350 curldownloader.target.close() 351 352 _remember_headers(clean_url, properties, req.headers, now) 353 354 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags: 355 item = etags[req.headers["Etag"]] 356 item["properties"].update(properties) 357 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute() 358 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 359 return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now) 360 361 logger.info("Download complete") 362 363 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='') 364 365 # max length - space to add a timestamp used by ensure_unique_name 366 max_name_len = 254 - 28 367 368 if len(collectionname) > max_name_len: 369 over = len(collectionname) - max_name_len 370 split = int(max_name_len/2) 371 collectionname = collectionname[0:split] + "…" + collectionname[split+over:] 372 373 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True) 374 375 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute() 376 377 return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)
logger =
<Logger arvados.http_import (WARNING)>
def
check_cached_url( api, project_uuid, url, etags, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False):
253def check_cached_url(api, project_uuid, url, etags, 254 utcnow=datetime.datetime.utcnow, 255 varying_url_params="", 256 prefer_cached_downloads=False): 257 258 logger.info("Checking Keep for %s", url) 259 260 varying_params = [s.strip() for s in varying_url_params.split(",")] 261 262 parsed = urllib.parse.urlparse(url) 263 query = [q for q in urllib.parse.parse_qsl(parsed.query) 264 if q[0] not in varying_params] 265 266 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, 267 urllib.parse.urlencode(query, safe="/"), parsed.fragment)) 268 269 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute() 270 271 if clean_url == url: 272 items = r1["items"] 273 else: 274 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute() 275 items = r1["items"] + r2["items"] 276 277 now = utcnow() 278 279 curldownloader = _Downloader(api) 280 281 for item in items: 282 properties = item["properties"] 283 284 if clean_url in properties: 285 cache_url = clean_url 286 elif url in properties: 287 cache_url = url 288 else: 289 raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"]) 290 291 if prefer_cached_downloads or _fresh_cache(cache_url, properties, now): 292 # HTTP caching rules say we should use the cache 293 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 294 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now) 295 296 if not _changed(cache_url, clean_url, properties, now, curldownloader): 297 # Etag didn't change, same content, just update headers 298 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute() 299 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 300 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now) 301 302 for etagstr in ("Etag", "ETag"): 303 if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2: 304 etags[properties[cache_url][etagstr]] = item 305 306 logger.debug("Found ETag values %s", etags) 307 308 return (None, None, None, clean_url, now)
def
http_to_keep( api, project_uuid, url, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False):
311def http_to_keep(api, project_uuid, url, 312 utcnow=datetime.datetime.utcnow, varying_url_params="", 313 prefer_cached_downloads=False): 314 """Download a file over HTTP and upload it to keep, with HTTP headers as metadata. 315 316 Before downloading the URL, checks to see if the URL already 317 exists in Keep and applies HTTP caching policy, the 318 varying_url_params and prefer_cached_downloads flags in order to 319 decide whether to use the version in Keep or re-download it. 320 """ 321 322 etags = {} 323 cache_result = check_cached_url(api, project_uuid, url, etags, 324 utcnow, varying_url_params, 325 prefer_cached_downloads) 326 327 if cache_result[0] is not None: 328 return cache_result 329 330 clean_url = cache_result[3] 331 now = cache_result[4] 332 333 properties = {} 334 headers = {} 335 if etags: 336 headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()]) 337 logger.debug("Sending GET request with headers %s", headers) 338 339 logger.info("Beginning download of %s", url) 340 341 curldownloader = _Downloader(api) 342 343 req = curldownloader.download(url, headers) 344 345 c = curldownloader.collection 346 347 if req.status_code not in (200, 304): 348 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code)) 349 350 if curldownloader.target is not None: 351 curldownloader.target.close() 352 353 _remember_headers(clean_url, properties, req.headers, now) 354 355 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags: 356 item = etags[req.headers["Etag"]] 357 item["properties"].update(properties) 358 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute() 359 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 360 return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now) 361 362 logger.info("Download complete") 363 364 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='') 365 366 # max length - space to add a timestamp used by ensure_unique_name 367 max_name_len = 254 - 28 368 369 if len(collectionname) > max_name_len: 370 over = len(collectionname) - max_name_len 371 split = int(max_name_len/2) 372 collectionname = collectionname[0:split] + "…" + collectionname[split+over:] 373 374 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True) 375 376 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute() 377 378 return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)
Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
Before downloading the URL, checks to see if the URL already exists in Keep and applies HTTP caching policy, the varying_url_params and prefer_cached_downloads flags in order to decide whether to use the version in Keep or re-download it.