arvados.http_to_keep
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import calendar 6import dataclasses 7import datetime 8import email.utils 9import logging 10import re 11import time 12import typing 13import urllib.parse 14 15import pycurl 16 17import arvados 18import arvados.collection 19from arvados._pycurlhelper import PyCurlHelper 20 21logger = logging.getLogger('arvados.http_import') 22 23def _my_formatdate(dt): 24 return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()), 25 localtime=False, usegmt=True) 26 27def _my_parsedate(text): 28 parsed = email.utils.parsedate_tz(text) 29 if parsed: 30 if parsed[9]: 31 # Adjust to UTC 32 return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9]) 33 else: 34 # TZ is zero or missing, assume UTC. 35 return datetime.datetime(*parsed[:6]) 36 else: 37 return datetime.datetime(1970, 1, 1) 38 39def _fresh_cache(url, properties, now): 40 pr = properties[url] 41 expires = None 42 43 logger.debug("Checking cache freshness for %s using %s", url, pr) 44 45 if "Cache-Control" in pr: 46 if re.match(r"immutable", pr["Cache-Control"]): 47 return True 48 49 g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"]) 50 if g: 51 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2))) 52 53 if expires is None and "Expires" in pr: 54 expires = _my_parsedate(pr["Expires"]) 55 56 if expires is None: 57 # Use a default cache time of 24 hours if upstream didn't set 58 # any cache headers, to reduce redundant downloads. 59 expires = _my_parsedate(pr["Date"]) + datetime.timedelta(hours=24) 60 61 if not expires: 62 return False 63 64 return (now < expires) 65 66def _remember_headers(url, properties, headers, now): 67 properties.setdefault(url, {}) 68 for h in ("Cache-Control", "Etag", "Expires", "Date", "Content-Length"): 69 if h in headers: 70 properties[url][h] = headers[h] 71 if "Date" not in headers: 72 properties[url]["Date"] = _my_formatdate(now) 73 74@dataclasses.dataclass 75class _Response: 76 status_code: int 77 headers: typing.Mapping[str, str] 78 79 80class _Downloader(PyCurlHelper): 81 # Wait up to 60 seconds for connection 82 # How long it can be in "low bandwidth" state before it gives up 83 # Low bandwidth threshold is 32 KiB/s 84 DOWNLOADER_TIMEOUT = (60, 300, 32768) 85 86 def __init__(self, apiclient): 87 super(_Downloader, self).__init__(title_case_headers=True) 88 self.curl = pycurl.Curl() 89 self.curl.setopt(pycurl.NOSIGNAL, 1) 90 self.curl.setopt(pycurl.OPENSOCKETFUNCTION, 91 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 92 self.target = None 93 self.apiclient = apiclient 94 95 def head(self, url): 96 get_headers = {'Accept': 'application/octet-stream'} 97 self._headers = {} 98 99 self.curl.setopt(pycurl.URL, url.encode('utf-8')) 100 self.curl.setopt(pycurl.HTTPHEADER, [ 101 '{}: {}'.format(k,v) for k,v in get_headers.items()]) 102 103 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 104 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 105 self.curl.setopt(pycurl.NOBODY, True) 106 self.curl.setopt(pycurl.FOLLOWLOCATION, True) 107 108 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, True) 109 110 try: 111 self.curl.perform() 112 except Exception as e: 113 raise arvados.errors.HttpError(0, str(e)) 114 finally: 115 if self._socket: 116 self._socket.close() 117 self._socket = None 118 119 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers) 120 121 def download(self, url, headers): 122 self.count = 0 123 self.start = time.time() 124 self.checkpoint = self.start 125 self._headers = {} 126 self._first_chunk = True 127 self.collection = None 128 self.parsedurl = urllib.parse.urlparse(url) 129 130 get_headers = {'Accept': 'application/octet-stream'} 131 get_headers.update(headers) 132 133 self.curl.setopt(pycurl.URL, url.encode('utf-8')) 134 self.curl.setopt(pycurl.HTTPHEADER, [ 135 '{}: {}'.format(k,v) for k,v in get_headers.items()]) 136 137 self.curl.setopt(pycurl.WRITEFUNCTION, self.body_write) 138 self.curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 139 140 self.curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 141 self.curl.setopt(pycurl.HTTPGET, True) 142 self.curl.setopt(pycurl.FOLLOWLOCATION, True) 143 144 self._setcurltimeouts(self.curl, self.DOWNLOADER_TIMEOUT, False) 145 146 try: 147 self.curl.perform() 148 except Exception as e: 149 raise arvados.errors.HttpError(0, str(e)) 150 finally: 151 if self._socket: 152 self._socket.close() 153 self._socket = None 154 155 return _Response(self.curl.getinfo(pycurl.RESPONSE_CODE), self._headers) 156 157 def headers_received(self): 158 self.collection = arvados.collection.Collection(api_client=self.apiclient) 159 160 if "Content-Length" in self._headers: 161 self.contentlength = int(self._headers["Content-Length"]) 162 logger.info("File size is %s bytes", self.contentlength) 163 else: 164 self.contentlength = None 165 166 if self._headers.get("Content-Disposition"): 167 grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', 168 self._headers["Content-Disposition"]) 169 if grp.group(2): 170 self.name = grp.group(2) 171 else: 172 self.name = grp.group(4) 173 else: 174 self.name = self.parsedurl.path.split("/")[-1] 175 176 # Can't call curl.getinfo(pycurl.RESPONSE_CODE) until 177 # perform() is done but we need to know the status before that 178 # so we have to parse the status line ourselves. 179 mt = re.match(r'^HTTP\/(\d(\.\d)?) ([1-5]\d\d) ([^\r\n\x00-\x08\x0b\x0c\x0e-\x1f\x7f]*)\r\n$', self._headers["x-status-line"]) 180 code = int(mt.group(3)) 181 182 if not self.name: 183 logger.error("Cannot determine filename from URL or headers") 184 return 185 186 if code == 200: 187 self.target = self.collection.open(self.name, "wb") 188 189 def body_write(self, chunk): 190 if self._first_chunk: 191 self.headers_received() 192 self._first_chunk = False 193 194 self.count += len(chunk) 195 196 if self.target is None: 197 # "If this number is not equal to the size of the byte 198 # string, this signifies an error and libcurl will abort 199 # the request." 200 return 0 201 202 self.target.write(chunk) 203 loopnow = time.time() 204 if (loopnow - self.checkpoint) < 20: 205 return 206 207 bps = self.count / (loopnow - self.start) 208 if self.contentlength is not None: 209 logger.info("%2.1f%% complete, %6.2f MiB/s, %1.0f seconds left", 210 ((self.count * 100) / self.contentlength), 211 (bps / (1024.0*1024.0)), 212 ((self.contentlength-self.count) // bps)) 213 else: 214 logger.info("%d downloaded, %6.2f MiB/s", count, (bps / (1024.0*1024.0))) 215 self.checkpoint = loopnow 216 217 218def _changed(url, clean_url, properties, now, curldownloader): 219 req = curldownloader.head(url) 220 221 if req.status_code != 200: 222 # Sometimes endpoints are misconfigured and will deny HEAD but 223 # allow GET so instead of failing here, we'll try GET If-None-Match 224 return True 225 226 # previous version of this code used "ETag", now we are 227 # normalizing to "Etag", check for both. 228 etag = properties[url].get("Etag") or properties[url].get("ETag") 229 230 if url in properties: 231 del properties[url] 232 _remember_headers(clean_url, properties, req.headers, now) 233 234 if "Etag" in req.headers and etag == req.headers["Etag"]: 235 # Didn't change 236 return False 237 238 return True 239 240def _etag_quote(etag): 241 # if it already has leading and trailing quotes, do nothing 242 if etag[0] == '"' and etag[-1] == '"': 243 return etag 244 else: 245 # Add quotes. 246 return '"' + etag + '"' 247 248 249def check_cached_url(api, project_uuid, url, etags, 250 utcnow=datetime.datetime.utcnow, 251 varying_url_params="", 252 prefer_cached_downloads=False): 253 254 logger.info("Checking Keep for %s", url) 255 256 varying_params = [s.strip() for s in varying_url_params.split(",")] 257 258 parsed = urllib.parse.urlparse(url) 259 query = [q for q in urllib.parse.parse_qsl(parsed.query) 260 if q[0] not in varying_params] 261 262 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, 263 urllib.parse.urlencode(query, safe="/"), parsed.fragment)) 264 265 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute() 266 267 if clean_url == url: 268 items = r1["items"] 269 else: 270 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute() 271 items = r1["items"] + r2["items"] 272 273 now = utcnow() 274 275 curldownloader = _Downloader(api) 276 277 for item in items: 278 properties = item["properties"] 279 280 if clean_url in properties: 281 cache_url = clean_url 282 elif url in properties: 283 cache_url = url 284 else: 285 raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"]) 286 287 if prefer_cached_downloads or _fresh_cache(cache_url, properties, now): 288 # HTTP caching rules say we should use the cache 289 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 290 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now) 291 292 if not _changed(cache_url, clean_url, properties, now, curldownloader): 293 # Etag didn't change, same content, just update headers 294 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute() 295 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 296 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now) 297 298 for etagstr in ("Etag", "ETag"): 299 if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2: 300 etags[properties[cache_url][etagstr]] = item 301 302 logger.debug("Found ETag values %s", etags) 303 304 return (None, None, None, clean_url, now) 305 306 307def http_to_keep(api, project_uuid, url, 308 utcnow=datetime.datetime.utcnow, varying_url_params="", 309 prefer_cached_downloads=False): 310 """Download a file over HTTP and upload it to keep, with HTTP headers as metadata. 311 312 Before downloading the URL, checks to see if the URL already 313 exists in Keep and applies HTTP caching policy, the 314 varying_url_params and prefer_cached_downloads flags in order to 315 decide whether to use the version in Keep or re-download it. 316 """ 317 318 etags = {} 319 cache_result = check_cached_url(api, project_uuid, url, etags, 320 utcnow, varying_url_params, 321 prefer_cached_downloads) 322 323 if cache_result[0] is not None: 324 return cache_result 325 326 clean_url = cache_result[3] 327 now = cache_result[4] 328 329 properties = {} 330 headers = {} 331 if etags: 332 headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()]) 333 logger.debug("Sending GET request with headers %s", headers) 334 335 logger.info("Beginning download of %s", url) 336 337 curldownloader = _Downloader(api) 338 339 req = curldownloader.download(url, headers) 340 341 c = curldownloader.collection 342 343 if req.status_code not in (200, 304): 344 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code)) 345 346 if curldownloader.target is not None: 347 curldownloader.target.close() 348 349 _remember_headers(clean_url, properties, req.headers, now) 350 351 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags: 352 item = etags[req.headers["Etag"]] 353 item["properties"].update(properties) 354 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute() 355 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 356 return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now) 357 358 logger.info("Download complete") 359 360 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='') 361 362 # max length - space to add a timestamp used by ensure_unique_name 363 max_name_len = 254 - 28 364 365 if len(collectionname) > max_name_len: 366 over = len(collectionname) - max_name_len 367 split = int(max_name_len/2) 368 collectionname = collectionname[0:split] + "…" + collectionname[split+over:] 369 370 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True) 371 372 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute() 373 374 return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)
logger =
<Logger arvados.http_import (WARNING)>
def
check_cached_url( api, project_uuid, url, etags, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False):
250def check_cached_url(api, project_uuid, url, etags, 251 utcnow=datetime.datetime.utcnow, 252 varying_url_params="", 253 prefer_cached_downloads=False): 254 255 logger.info("Checking Keep for %s", url) 256 257 varying_params = [s.strip() for s in varying_url_params.split(",")] 258 259 parsed = urllib.parse.urlparse(url) 260 query = [q for q in urllib.parse.parse_qsl(parsed.query) 261 if q[0] not in varying_params] 262 263 clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, 264 urllib.parse.urlencode(query, safe="/"), parsed.fragment)) 265 266 r1 = api.collections().list(filters=[["properties", "exists", url]]).execute() 267 268 if clean_url == url: 269 items = r1["items"] 270 else: 271 r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute() 272 items = r1["items"] + r2["items"] 273 274 now = utcnow() 275 276 curldownloader = _Downloader(api) 277 278 for item in items: 279 properties = item["properties"] 280 281 if clean_url in properties: 282 cache_url = clean_url 283 elif url in properties: 284 cache_url = url 285 else: 286 raise Exception("Shouldn't happen, got an API result for %s that doesn't have the URL in properties" % item["uuid"]) 287 288 if prefer_cached_downloads or _fresh_cache(cache_url, properties, now): 289 # HTTP caching rules say we should use the cache 290 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 291 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now) 292 293 if not _changed(cache_url, clean_url, properties, now, curldownloader): 294 # Etag didn't change, same content, just update headers 295 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute() 296 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 297 return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now) 298 299 for etagstr in ("Etag", "ETag"): 300 if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2: 301 etags[properties[cache_url][etagstr]] = item 302 303 logger.debug("Found ETag values %s", etags) 304 305 return (None, None, None, clean_url, now)
def
http_to_keep( api, project_uuid, url, utcnow=<built-in method utcnow of type object>, varying_url_params='', prefer_cached_downloads=False):
308def http_to_keep(api, project_uuid, url, 309 utcnow=datetime.datetime.utcnow, varying_url_params="", 310 prefer_cached_downloads=False): 311 """Download a file over HTTP and upload it to keep, with HTTP headers as metadata. 312 313 Before downloading the URL, checks to see if the URL already 314 exists in Keep and applies HTTP caching policy, the 315 varying_url_params and prefer_cached_downloads flags in order to 316 decide whether to use the version in Keep or re-download it. 317 """ 318 319 etags = {} 320 cache_result = check_cached_url(api, project_uuid, url, etags, 321 utcnow, varying_url_params, 322 prefer_cached_downloads) 323 324 if cache_result[0] is not None: 325 return cache_result 326 327 clean_url = cache_result[3] 328 now = cache_result[4] 329 330 properties = {} 331 headers = {} 332 if etags: 333 headers['If-None-Match'] = ', '.join([_etag_quote(k) for k,v in etags.items()]) 334 logger.debug("Sending GET request with headers %s", headers) 335 336 logger.info("Beginning download of %s", url) 337 338 curldownloader = _Downloader(api) 339 340 req = curldownloader.download(url, headers) 341 342 c = curldownloader.collection 343 344 if req.status_code not in (200, 304): 345 raise Exception("Failed to download '%s' got status %s " % (url, req.status_code)) 346 347 if curldownloader.target is not None: 348 curldownloader.target.close() 349 350 _remember_headers(clean_url, properties, req.headers, now) 351 352 if req.status_code == 304 and "Etag" in req.headers and req.headers["Etag"] in etags: 353 item = etags[req.headers["Etag"]] 354 item["properties"].update(properties) 355 api.collections().update(uuid=item["uuid"], body={"collection":{"properties": item["properties"]}}).execute() 356 cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api) 357 return (item["portable_data_hash"], list(cr.keys())[0], item["uuid"], clean_url, now) 358 359 logger.info("Download complete") 360 361 collectionname = "Downloaded from %s" % urllib.parse.quote(clean_url, safe='') 362 363 # max length - space to add a timestamp used by ensure_unique_name 364 max_name_len = 254 - 28 365 366 if len(collectionname) > max_name_len: 367 over = len(collectionname) - max_name_len 368 split = int(max_name_len/2) 369 collectionname = collectionname[0:split] + "…" + collectionname[split+over:] 370 371 c.save_new(name=collectionname, owner_uuid=project_uuid, ensure_unique_name=True) 372 373 api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute() 374 375 return (c.portable_data_hash(), curldownloader.name, c.manifest_locator(), clean_url, now)
Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
Before downloading the URL, checks to see if the URL already exists in Keep and applies HTTP caching policy, the varying_url_params and prefer_cached_downloads flags in order to decide whether to use the version in Keep or re-download it.