arvados.util
Arvados utilities
This module provides functions and constants that are useful across a variety
of Arvados resource types, or extend the Arvados API client (see arvados.api
).
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4"""Arvados utilities 5 6This module provides functions and constants that are useful across a variety 7of Arvados resource types, or extend the Arvados API client (see `arvados.api`). 8""" 9 10import errno 11import fcntl 12import functools 13import hashlib 14import httplib2 15import os 16import random 17import re 18import subprocess 19import sys 20import warnings 21 22import arvados.errors 23 24from typing import ( 25 Any, 26 Callable, 27 Dict, 28 Iterator, 29 TypeVar, 30 Union, 31) 32 33T = TypeVar('T') 34 35HEX_RE = re.compile(r'^[0-9a-fA-F]+$') 36"""Regular expression to match a hexadecimal string (case-insensitive)""" 37CR_UNCOMMITTED = 'Uncommitted' 38"""Constant `state` value for uncommited container requests""" 39CR_COMMITTED = 'Committed' 40"""Constant `state` value for committed container requests""" 41CR_FINAL = 'Final' 42"""Constant `state` value for finalized container requests""" 43 44keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*') 45"""Regular expression to match any Keep block locator""" 46signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*') 47"""Regular expression to match any Keep block locator with an access token hint""" 48portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+') 49"""Regular expression to match any collection portable data hash""" 50manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE) 51"""Regular expression to match an Arvados collection manifest text""" 52keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)') 53"""Regular expression to match a file path from a collection identified by portable data hash""" 54keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)') 55"""Regular expression to match a `keep:` URI with a collection identified by portable data hash""" 56 57uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}') 58"""Regular expression to match any Arvados object UUID""" 59collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}') 60"""Regular expression to match any Arvados collection UUID""" 61container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}') 62"""Regular expression to match any Arvados container UUID""" 63group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}') 64"""Regular expression to match any Arvados group UUID""" 65link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}') 66"""Regular expression to match any Arvados link UUID""" 67user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}') 68"""Regular expression to match any Arvados user UUID""" 69job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}') 70"""Regular expression to match any Arvados job UUID 71 72.. WARNING:: Deprecated 73 Arvados job resources are deprecated and will be removed in a future 74 release. Prefer the containers API instead. 75""" 76 77def _deprecated(version=None, preferred=None): 78 """Mark a callable as deprecated in the SDK 79 80 This will wrap the callable to emit as a DeprecationWarning 81 and add a deprecation notice to its docstring. 82 83 If the following arguments are given, they'll be included in the 84 notices: 85 86 * preferred: str | None --- The name of an alternative that users should 87 use instead. 88 89 * version: str | None --- The version of Arvados when the callable is 90 scheduled to be removed. 91 """ 92 if version is None: 93 version = '' 94 else: 95 version = f' and scheduled to be removed in Arvados {version}' 96 if preferred is None: 97 preferred = '' 98 else: 99 preferred = f' Prefer {preferred} instead.' 100 def deprecated_decorator(func): 101 fullname = f'{func.__module__}.{func.__qualname__}' 102 parent, _, name = fullname.rpartition('.') 103 if name == '__init__': 104 fullname = parent 105 warning_msg = f'{fullname} is deprecated{version}.{preferred}' 106 @functools.wraps(func) 107 def deprecated_wrapper(*args, **kwargs): 108 warnings.warn(warning_msg, DeprecationWarning, 2) 109 return func(*args, **kwargs) 110 # Get func's docstring without any trailing newline or empty lines. 111 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '') 112 match = re.search(r'\n([ \t]+)\S', func_doc) 113 indent = '' if match is None else match.group(1) 114 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}' 115 # Make the deprecation notice the second "paragraph" of the 116 # docstring if possible. Otherwise append it. 117 docstring, count = re.subn( 118 rf'\n[ \t]*\n{indent}', 119 f'{warning_doc}\n\n{indent}', 120 func_doc, 121 count=1, 122 ) 123 if not count: 124 docstring = f'{func_doc.lstrip()}{warning_doc}' 125 deprecated_wrapper.__doc__ = docstring 126 return deprecated_wrapper 127 return deprecated_decorator 128 129def is_hex(s: str, *length_args: int) -> bool: 130 """Indicate whether a string is a hexadecimal number 131 132 This method returns true if all characters in the string are hexadecimal 133 digits. It is case-insensitive. 134 135 You can also pass optional length arguments to check that the string has 136 the expected number of digits. If you pass one integer, the string must 137 have that length exactly, otherwise the method returns False. If you 138 pass two integers, the string's length must fall within that minimum and 139 maximum (inclusive), otherwise the method returns False. 140 141 Arguments: 142 143 * s: str --- The string to check 144 145 * length_args: int --- Optional length limit(s) for the string to check 146 """ 147 num_length_args = len(length_args) 148 if num_length_args > 2: 149 raise arvados.errors.ArgumentError( 150 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args)) 151 elif num_length_args == 2: 152 good_len = (length_args[0] <= len(s) <= length_args[1]) 153 elif num_length_args == 1: 154 good_len = (len(s) == length_args[0]) 155 else: 156 good_len = True 157 return bool(good_len and HEX_RE.match(s)) 158 159def keyset_list_all( 160 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'], 161 order_key: str="created_at", 162 num_retries: int=0, 163 ascending: bool=True, 164 **kwargs: Any, 165) -> Iterator[Dict[str, Any]]: 166 """Iterate all Arvados resources from an API list call 167 168 This method takes a method that represents an Arvados API list call, and 169 iterates the objects returned by the API server. It can make multiple API 170 calls to retrieve and iterate all objects available from the API server. 171 172 Arguments: 173 174 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A 175 function that wraps an Arvados API method that returns a list of 176 objects. If you have an Arvados API client named `arv`, examples 177 include `arv.collections().list` and `arv.groups().contents`. Note 178 that you should pass the function *without* calling it. 179 180 * order_key: str --- The name of the primary object field that objects 181 should be sorted by. This name is used to build an `order` argument 182 for `fn`. Default `'created_at'`. 183 184 * num_retries: int --- This argument is passed through to 185 `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See 186 that method's docstring for details. Default 0 (meaning API calls will 187 use the `num_retries` value set when the Arvados API client was 188 constructed). 189 190 * ascending: bool --- Used to build an `order` argument for `fn`. If True, 191 all fields will be sorted in `'asc'` (ascending) order. Otherwise, all 192 fields will be sorted in `'desc'` (descending) order. 193 194 Additional keyword arguments will be passed directly to `fn` for each API 195 call. Note that this function sets `count`, `limit`, and `order` as part of 196 its work. 197 """ 198 pagesize = 1000 199 kwargs["limit"] = pagesize 200 kwargs["count"] = 'none' 201 asc = "asc" if ascending else "desc" 202 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc] 203 other_filters = kwargs.get("filters", []) 204 205 try: 206 select = set(kwargs['select']) 207 except KeyError: 208 pass 209 else: 210 select.add(order_key) 211 select.add('uuid') 212 kwargs['select'] = list(select) 213 214 nextpage = [] 215 tot = 0 216 expect_full_page = True 217 seen_prevpage = set() 218 seen_thispage = set() 219 lastitem = None 220 prev_page_all_same_order_key = False 221 222 while True: 223 kwargs["filters"] = nextpage+other_filters 224 items = fn(**kwargs).execute(num_retries=num_retries) 225 226 if len(items["items"]) == 0: 227 if prev_page_all_same_order_key: 228 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]] 229 prev_page_all_same_order_key = False 230 continue 231 else: 232 return 233 234 seen_prevpage = seen_thispage 235 seen_thispage = set() 236 237 for i in items["items"]: 238 # In cases where there's more than one record with the 239 # same order key, the result could include records we 240 # already saw in the last page. Skip them. 241 if i["uuid"] in seen_prevpage: 242 continue 243 seen_thispage.add(i["uuid"]) 244 yield i 245 246 firstitem = items["items"][0] 247 lastitem = items["items"][-1] 248 249 if firstitem[order_key] == lastitem[order_key]: 250 # Got a page where every item has the same order key. 251 # Switch to using uuid for paging. 252 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]] 253 prev_page_all_same_order_key = True 254 else: 255 # Start from the last order key seen, but skip the last 256 # known uuid to avoid retrieving the same row twice. If 257 # there are multiple rows with the same order key it is 258 # still likely we'll end up retrieving duplicate rows. 259 # That's handled by tracking the "seen" rows for each page 260 # so they can be skipped if they show up on the next page. 261 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]] 262 prev_page_all_same_order_key = False 263 264def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]: 265 """Return the path of the best available source of CA certificates 266 267 This function checks various known paths that provide trusted CA 268 certificates, and returns the first one that exists. It checks: 269 270 * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL) 271 * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software 272 * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based 273 distributions 274 * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based 275 distributions 276 277 If none of these paths exist, this function returns the value of `fallback`. 278 279 Arguments: 280 281 * fallback: T --- The value to return if none of the known paths exist. 282 The default value is the certificate store of Mozilla's trusted CAs 283 included with the Python [certifi][] package. 284 285 [certifi]: https://pypi.org/project/certifi/ 286 """ 287 for ca_certs_path in [ 288 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note 289 # that httplib2 itself also supports HTTPLIB2_CA_CERTS. 290 os.environ.get('SSL_CERT_FILE'), 291 # Arvados specific: 292 '/etc/arvados/ca-certificates.crt', 293 # Debian: 294 '/etc/ssl/certs/ca-certificates.crt', 295 # Red Hat: 296 '/etc/pki/tls/certs/ca-bundle.crt', 297 ]: 298 if ca_certs_path and os.path.exists(ca_certs_path): 299 return ca_certs_path 300 return fallback 301 302def new_request_id() -> str: 303 """Return a random request ID 304 305 This function generates and returns a random string suitable for use as a 306 `X-Request-Id` header value in the Arvados API. 307 """ 308 rid = "req-" 309 # 2**104 > 36**20 > 2**103 310 n = random.getrandbits(104) 311 for _ in range(20): 312 c = n % 36 313 if c < 10: 314 rid += chr(c+ord('0')) 315 else: 316 rid += chr(c+ord('a')-10) 317 n = n // 36 318 return rid 319 320def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]: 321 """Return an Arvados cluster's configuration, with caching 322 323 This function gets and returns the Arvados configuration from the API 324 server. It caches the result on the client object and reuses it on any 325 future calls. 326 327 Arguments: 328 329 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client 330 object to use to retrieve and cache the Arvados cluster configuration. 331 """ 332 if not svc._rootDesc.get('resources').get('configs', False): 333 # Old API server version, no config export endpoint 334 return {} 335 if not hasattr(svc, '_cached_config'): 336 svc._cached_config = svc.configs().get().execute() 337 return svc._cached_config 338 339def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]: 340 """Return an Arvados cluster's vocabulary, with caching 341 342 This function gets and returns the Arvados vocabulary from the API 343 server. It caches the result on the client object and reuses it on any 344 future calls. 345 346 .. HINT:: Low-level method 347 This is a relatively low-level wrapper around the Arvados API. Most 348 users will prefer to use `arvados.vocabulary.load_vocabulary`. 349 350 Arguments: 351 352 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client 353 object to use to retrieve and cache the Arvados cluster vocabulary. 354 """ 355 if not svc._rootDesc.get('resources').get('vocabularies', False): 356 # Old API server version, no vocabulary export endpoint 357 return {} 358 if not hasattr(svc, '_cached_vocabulary'): 359 svc._cached_vocabulary = svc.vocabularies().get().execute() 360 return svc._cached_vocabulary 361 362def trim_name(collectionname: str) -> str: 363 """Limit the length of a name to fit within Arvados API limits 364 365 This function ensures that a string is short enough to use as an object 366 name in the Arvados API, leaving room for text that may be added by the 367 `ensure_unique_name` argument. If the source name is short enough, it is 368 returned unchanged. Otherwise, this function returns a string with excess 369 characters removed from the middle of the source string and replaced with 370 an ellipsis. 371 372 Arguments: 373 374 * collectionname: str --- The desired source name 375 """ 376 max_name_len = 254 - 28 377 378 if len(collectionname) > max_name_len: 379 over = len(collectionname) - max_name_len 380 split = int(max_name_len/2) 381 collectionname = collectionname[0:split] + "…" + collectionname[split+over:] 382 383 return collectionname 384 385@_deprecated('3.0', 'arvados.util.keyset_list_all') 386def list_all(fn, num_retries=0, **kwargs): 387 # Default limit to (effectively) api server's MAX_LIMIT 388 kwargs.setdefault('limit', sys.maxsize) 389 items = [] 390 offset = 0 391 items_available = sys.maxsize 392 while len(items) < items_available: 393 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries) 394 items += c['items'] 395 items_available = c['items_available'] 396 offset = c['offset'] + len(c['items']) 397 return items 398 399@_deprecated('3.0') 400def clear_tmpdir(path=None): 401 """ 402 Ensure the given directory (or TASK_TMPDIR if none given) 403 exists and is empty. 404 """ 405 from arvados import current_task 406 if path is None: 407 path = current_task().tmpdir 408 if os.path.exists(path): 409 p = subprocess.Popen(['rm', '-rf', path]) 410 stdout, stderr = p.communicate(None) 411 if p.returncode != 0: 412 raise Exception('rm -rf %s: %s' % (path, stderr)) 413 os.mkdir(path) 414 415@_deprecated('3.0', 'subprocess.run') 416def run_command(execargs, **kwargs): 417 kwargs.setdefault('stdin', subprocess.PIPE) 418 kwargs.setdefault('stdout', subprocess.PIPE) 419 kwargs.setdefault('stderr', sys.stderr) 420 kwargs.setdefault('close_fds', True) 421 kwargs.setdefault('shell', False) 422 p = subprocess.Popen(execargs, **kwargs) 423 stdoutdata, stderrdata = p.communicate(None) 424 if p.returncode != 0: 425 raise arvados.errors.CommandFailedError( 426 "run_command %s exit %d:\n%s" % 427 (execargs, p.returncode, stderrdata)) 428 return stdoutdata, stderrdata 429 430@_deprecated('3.0') 431def git_checkout(url, version, path): 432 from arvados import current_job 433 if not re.search('^/', path): 434 path = os.path.join(current_job().tmpdir, path) 435 if not os.path.exists(path): 436 run_command(["git", "clone", url, path], 437 cwd=os.path.dirname(path)) 438 run_command(["git", "checkout", version], 439 cwd=path) 440 return path 441 442@_deprecated('3.0') 443def tar_extractor(path, decompress_flag): 444 return subprocess.Popen(["tar", 445 "-C", path, 446 ("-x%sf" % decompress_flag), 447 "-"], 448 stdout=None, 449 stdin=subprocess.PIPE, stderr=sys.stderr, 450 shell=False, close_fds=True) 451 452@_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module') 453def tarball_extract(tarball, path): 454 """Retrieve a tarball from Keep and extract it to a local 455 directory. Return the absolute path where the tarball was 456 extracted. If the top level of the tarball contained just one 457 file or directory, return the absolute path of that single 458 item. 459 460 tarball -- collection locator 461 path -- where to extract the tarball: absolute, or relative to job tmp 462 """ 463 from arvados import current_job 464 from arvados.collection import CollectionReader 465 if not re.search('^/', path): 466 path = os.path.join(current_job().tmpdir, path) 467 lockfile = open(path + '.lock', 'w') 468 fcntl.flock(lockfile, fcntl.LOCK_EX) 469 try: 470 os.stat(path) 471 except OSError: 472 os.mkdir(path) 473 already_have_it = False 474 try: 475 if os.readlink(os.path.join(path, '.locator')) == tarball: 476 already_have_it = True 477 except OSError: 478 pass 479 if not already_have_it: 480 481 # emulate "rm -f" (i.e., if the file does not exist, we win) 482 try: 483 os.unlink(os.path.join(path, '.locator')) 484 except OSError: 485 if os.path.exists(os.path.join(path, '.locator')): 486 os.unlink(os.path.join(path, '.locator')) 487 488 for f in CollectionReader(tarball).all_files(): 489 f_name = f.name() 490 if f_name.endswith(('.tbz', '.tar.bz2')): 491 p = tar_extractor(path, 'j') 492 elif f_name.endswith(('.tgz', '.tar.gz')): 493 p = tar_extractor(path, 'z') 494 elif f_name.endswith('.tar'): 495 p = tar_extractor(path, '') 496 else: 497 raise arvados.errors.AssertionError( 498 "tarball_extract cannot handle filename %s" % f.name()) 499 while True: 500 buf = f.read(2**20) 501 if len(buf) == 0: 502 break 503 p.stdin.write(buf) 504 p.stdin.close() 505 p.wait() 506 if p.returncode != 0: 507 lockfile.close() 508 raise arvados.errors.CommandFailedError( 509 "tar exited %d" % p.returncode) 510 os.symlink(tarball, os.path.join(path, '.locator')) 511 tld_extracts = [f for f in os.listdir(path) if f != '.locator'] 512 lockfile.close() 513 if len(tld_extracts) == 1: 514 return os.path.join(path, tld_extracts[0]) 515 return path 516 517@_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module') 518def zipball_extract(zipball, path): 519 """Retrieve a zip archive from Keep and extract it to a local 520 directory. Return the absolute path where the archive was 521 extracted. If the top level of the archive contained just one 522 file or directory, return the absolute path of that single 523 item. 524 525 zipball -- collection locator 526 path -- where to extract the archive: absolute, or relative to job tmp 527 """ 528 from arvados import current_job 529 from arvados.collection import CollectionReader 530 if not re.search('^/', path): 531 path = os.path.join(current_job().tmpdir, path) 532 lockfile = open(path + '.lock', 'w') 533 fcntl.flock(lockfile, fcntl.LOCK_EX) 534 try: 535 os.stat(path) 536 except OSError: 537 os.mkdir(path) 538 already_have_it = False 539 try: 540 if os.readlink(os.path.join(path, '.locator')) == zipball: 541 already_have_it = True 542 except OSError: 543 pass 544 if not already_have_it: 545 546 # emulate "rm -f" (i.e., if the file does not exist, we win) 547 try: 548 os.unlink(os.path.join(path, '.locator')) 549 except OSError: 550 if os.path.exists(os.path.join(path, '.locator')): 551 os.unlink(os.path.join(path, '.locator')) 552 553 for f in CollectionReader(zipball).all_files(): 554 if not f.name().endswith('.zip'): 555 raise arvados.errors.NotImplementedError( 556 "zipball_extract cannot handle filename %s" % f.name()) 557 zip_filename = os.path.join(path, os.path.basename(f.name())) 558 zip_file = open(zip_filename, 'wb') 559 while True: 560 buf = f.read(2**20) 561 if len(buf) == 0: 562 break 563 zip_file.write(buf) 564 zip_file.close() 565 566 p = subprocess.Popen(["unzip", 567 "-q", "-o", 568 "-d", path, 569 zip_filename], 570 stdout=None, 571 stdin=None, stderr=sys.stderr, 572 shell=False, close_fds=True) 573 p.wait() 574 if p.returncode != 0: 575 lockfile.close() 576 raise arvados.errors.CommandFailedError( 577 "unzip exited %d" % p.returncode) 578 os.unlink(zip_filename) 579 os.symlink(zipball, os.path.join(path, '.locator')) 580 tld_extracts = [f for f in os.listdir(path) if f != '.locator'] 581 lockfile.close() 582 if len(tld_extracts) == 1: 583 return os.path.join(path, tld_extracts[0]) 584 return path 585 586@_deprecated('3.0', 'arvados.collection.Collection') 587def collection_extract(collection, path, files=[], decompress=True): 588 """Retrieve a collection from Keep and extract it to a local 589 directory. Return the absolute path where the collection was 590 extracted. 591 592 collection -- collection locator 593 path -- where to extract: absolute, or relative to job tmp 594 """ 595 from arvados import current_job 596 from arvados.collection import CollectionReader 597 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection) 598 if matches: 599 collection_hash = matches.group(1) 600 else: 601 collection_hash = hashlib.md5(collection).hexdigest() 602 if not re.search('^/', path): 603 path = os.path.join(current_job().tmpdir, path) 604 lockfile = open(path + '.lock', 'w') 605 fcntl.flock(lockfile, fcntl.LOCK_EX) 606 try: 607 os.stat(path) 608 except OSError: 609 os.mkdir(path) 610 already_have_it = False 611 try: 612 if os.readlink(os.path.join(path, '.locator')) == collection_hash: 613 already_have_it = True 614 except OSError: 615 pass 616 617 # emulate "rm -f" (i.e., if the file does not exist, we win) 618 try: 619 os.unlink(os.path.join(path, '.locator')) 620 except OSError: 621 if os.path.exists(os.path.join(path, '.locator')): 622 os.unlink(os.path.join(path, '.locator')) 623 624 files_got = [] 625 for s in CollectionReader(collection).all_streams(): 626 stream_name = s.name() 627 for f in s.all_files(): 628 if (files == [] or 629 ((f.name() not in files_got) and 630 (f.name() in files or 631 (decompress and f.decompressed_name() in files)))): 632 outname = f.decompressed_name() if decompress else f.name() 633 files_got += [outname] 634 if os.path.exists(os.path.join(path, stream_name, outname)): 635 continue 636 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname))) 637 outfile = open(os.path.join(path, stream_name, outname), 'wb') 638 for buf in (f.readall_decompressed() if decompress 639 else f.readall()): 640 outfile.write(buf) 641 outfile.close() 642 if len(files_got) < len(files): 643 raise arvados.errors.AssertionError( 644 "Wanted files %s but only got %s from %s" % 645 (files, files_got, 646 [z.name() for z in CollectionReader(collection).all_files()])) 647 os.symlink(collection_hash, os.path.join(path, '.locator')) 648 649 lockfile.close() 650 return path 651 652@_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)') 653def mkdir_dash_p(path): 654 if not os.path.isdir(path): 655 try: 656 os.makedirs(path) 657 except OSError as e: 658 if e.errno == errno.EEXIST and os.path.isdir(path): 659 # It is not an error if someone else creates the 660 # directory between our exists() and makedirs() calls. 661 pass 662 else: 663 raise 664 665@_deprecated('3.0', 'arvados.collection.Collection') 666def stream_extract(stream, path, files=[], decompress=True): 667 """Retrieve a stream from Keep and extract it to a local 668 directory. Return the absolute path where the stream was 669 extracted. 670 671 stream -- StreamReader object 672 path -- where to extract: absolute, or relative to job tmp 673 """ 674 from arvados import current_job 675 if not re.search('^/', path): 676 path = os.path.join(current_job().tmpdir, path) 677 lockfile = open(path + '.lock', 'w') 678 fcntl.flock(lockfile, fcntl.LOCK_EX) 679 try: 680 os.stat(path) 681 except OSError: 682 os.mkdir(path) 683 684 files_got = [] 685 for f in stream.all_files(): 686 if (files == [] or 687 ((f.name() not in files_got) and 688 (f.name() in files or 689 (decompress and f.decompressed_name() in files)))): 690 outname = f.decompressed_name() if decompress else f.name() 691 files_got += [outname] 692 if os.path.exists(os.path.join(path, outname)): 693 os.unlink(os.path.join(path, outname)) 694 mkdir_dash_p(os.path.dirname(os.path.join(path, outname))) 695 outfile = open(os.path.join(path, outname), 'wb') 696 for buf in (f.readall_decompressed() if decompress 697 else f.readall()): 698 outfile.write(buf) 699 outfile.close() 700 if len(files_got) < len(files): 701 raise arvados.errors.AssertionError( 702 "Wanted files %s but only got %s from %s" % 703 (files, files_got, [z.name() for z in stream.all_files()])) 704 lockfile.close() 705 return path 706 707@_deprecated('3.0', 'os.walk') 708def listdir_recursive(dirname, base=None, max_depth=None): 709 """listdir_recursive(dirname, base, max_depth) 710 711 Return a list of file and directory names found under dirname. 712 713 If base is not None, prepend "{base}/" to each returned name. 714 715 If max_depth is None, descend into directories and return only the 716 names of files found in the directory tree. 717 718 If max_depth is a non-negative integer, stop descending into 719 directories at the given depth, and at that point return directory 720 names instead. 721 722 If max_depth==0 (and base is None) this is equivalent to 723 sorted(os.listdir(dirname)). 724 """ 725 allfiles = [] 726 for ent in sorted(os.listdir(dirname)): 727 ent_path = os.path.join(dirname, ent) 728 ent_base = os.path.join(base, ent) if base else ent 729 if os.path.isdir(ent_path) and max_depth != 0: 730 allfiles += listdir_recursive( 731 ent_path, base=ent_base, 732 max_depth=(max_depth-1 if max_depth else None)) 733 else: 734 allfiles += [ent_base] 735 return allfiles
Regular expression to match a hexadecimal string (case-insensitive)
Constant state
value for uncommited container requests
Constant state
value for committed container requests
Constant state
value for finalized container requests
Regular expression to match any Keep block locator
Regular expression to match any Keep block locator with an access token hint
Regular expression to match any collection portable data hash
Regular expression to match an Arvados collection manifest text
Regular expression to match a file path from a collection identified by portable data hash
Regular expression to match a keep:
URI with a collection identified by portable data hash
Regular expression to match any Arvados object UUID
Regular expression to match any Arvados collection UUID
Regular expression to match any Arvados container UUID
Regular expression to match any Arvados group UUID
Regular expression to match any Arvados link UUID
Regular expression to match any Arvados user UUID
Regular expression to match any Arvados job UUID
130def is_hex(s: str, *length_args: int) -> bool: 131 """Indicate whether a string is a hexadecimal number 132 133 This method returns true if all characters in the string are hexadecimal 134 digits. It is case-insensitive. 135 136 You can also pass optional length arguments to check that the string has 137 the expected number of digits. If you pass one integer, the string must 138 have that length exactly, otherwise the method returns False. If you 139 pass two integers, the string's length must fall within that minimum and 140 maximum (inclusive), otherwise the method returns False. 141 142 Arguments: 143 144 * s: str --- The string to check 145 146 * length_args: int --- Optional length limit(s) for the string to check 147 """ 148 num_length_args = len(length_args) 149 if num_length_args > 2: 150 raise arvados.errors.ArgumentError( 151 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args)) 152 elif num_length_args == 2: 153 good_len = (length_args[0] <= len(s) <= length_args[1]) 154 elif num_length_args == 1: 155 good_len = (len(s) == length_args[0]) 156 else: 157 good_len = True 158 return bool(good_len and HEX_RE.match(s))
Indicate whether a string is a hexadecimal number
This method returns true if all characters in the string are hexadecimal digits. It is case-insensitive.
You can also pass optional length arguments to check that the string has the expected number of digits. If you pass one integer, the string must have that length exactly, otherwise the method returns False. If you pass two integers, the string’s length must fall within that minimum and maximum (inclusive), otherwise the method returns False.
Arguments:
s: str — The string to check
length_args: int — Optional length limit(s) for the string to check
160def keyset_list_all( 161 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'], 162 order_key: str="created_at", 163 num_retries: int=0, 164 ascending: bool=True, 165 **kwargs: Any, 166) -> Iterator[Dict[str, Any]]: 167 """Iterate all Arvados resources from an API list call 168 169 This method takes a method that represents an Arvados API list call, and 170 iterates the objects returned by the API server. It can make multiple API 171 calls to retrieve and iterate all objects available from the API server. 172 173 Arguments: 174 175 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A 176 function that wraps an Arvados API method that returns a list of 177 objects. If you have an Arvados API client named `arv`, examples 178 include `arv.collections().list` and `arv.groups().contents`. Note 179 that you should pass the function *without* calling it. 180 181 * order_key: str --- The name of the primary object field that objects 182 should be sorted by. This name is used to build an `order` argument 183 for `fn`. Default `'created_at'`. 184 185 * num_retries: int --- This argument is passed through to 186 `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See 187 that method's docstring for details. Default 0 (meaning API calls will 188 use the `num_retries` value set when the Arvados API client was 189 constructed). 190 191 * ascending: bool --- Used to build an `order` argument for `fn`. If True, 192 all fields will be sorted in `'asc'` (ascending) order. Otherwise, all 193 fields will be sorted in `'desc'` (descending) order. 194 195 Additional keyword arguments will be passed directly to `fn` for each API 196 call. Note that this function sets `count`, `limit`, and `order` as part of 197 its work. 198 """ 199 pagesize = 1000 200 kwargs["limit"] = pagesize 201 kwargs["count"] = 'none' 202 asc = "asc" if ascending else "desc" 203 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc] 204 other_filters = kwargs.get("filters", []) 205 206 try: 207 select = set(kwargs['select']) 208 except KeyError: 209 pass 210 else: 211 select.add(order_key) 212 select.add('uuid') 213 kwargs['select'] = list(select) 214 215 nextpage = [] 216 tot = 0 217 expect_full_page = True 218 seen_prevpage = set() 219 seen_thispage = set() 220 lastitem = None 221 prev_page_all_same_order_key = False 222 223 while True: 224 kwargs["filters"] = nextpage+other_filters 225 items = fn(**kwargs).execute(num_retries=num_retries) 226 227 if len(items["items"]) == 0: 228 if prev_page_all_same_order_key: 229 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]] 230 prev_page_all_same_order_key = False 231 continue 232 else: 233 return 234 235 seen_prevpage = seen_thispage 236 seen_thispage = set() 237 238 for i in items["items"]: 239 # In cases where there's more than one record with the 240 # same order key, the result could include records we 241 # already saw in the last page. Skip them. 242 if i["uuid"] in seen_prevpage: 243 continue 244 seen_thispage.add(i["uuid"]) 245 yield i 246 247 firstitem = items["items"][0] 248 lastitem = items["items"][-1] 249 250 if firstitem[order_key] == lastitem[order_key]: 251 # Got a page where every item has the same order key. 252 # Switch to using uuid for paging. 253 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]] 254 prev_page_all_same_order_key = True 255 else: 256 # Start from the last order key seen, but skip the last 257 # known uuid to avoid retrieving the same row twice. If 258 # there are multiple rows with the same order key it is 259 # still likely we'll end up retrieving duplicate rows. 260 # That's handled by tracking the "seen" rows for each page 261 # so they can be skipped if they show up on the next page. 262 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]] 263 prev_page_all_same_order_key = False
Iterate all Arvados resources from an API list call
This method takes a method that represents an Arvados API list call, and iterates the objects returned by the API server. It can make multiple API calls to retrieve and iterate all objects available from the API server.
Arguments:
fn: Callable[…, arvados.api_resources.ArvadosAPIRequest] — A function that wraps an Arvados API method that returns a list of objects. If you have an Arvados API client named
arv
, examples includearv.collections().list
andarv.groups().contents
. Note that you should pass the function without calling it.order_key: str — The name of the primary object field that objects should be sorted by. This name is used to build an
order
argument forfn
. Default'created_at'
.num_retries: int — This argument is passed through to
arvados.api_resources.ArvadosAPIRequest.execute
for each API call. See that method’s docstring for details. Default 0 (meaning API calls will use thenum_retries
value set when the Arvados API client was constructed).ascending: bool — Used to build an
order
argument forfn
. If True, all fields will be sorted in'asc'
(ascending) order. Otherwise, all fields will be sorted in'desc'
(descending) order.
Additional keyword arguments will be passed directly to fn
for each API
call. Note that this function sets count
, limit
, and order
as part of
its work.
265def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]: 266 """Return the path of the best available source of CA certificates 267 268 This function checks various known paths that provide trusted CA 269 certificates, and returns the first one that exists. It checks: 270 271 * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL) 272 * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software 273 * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based 274 distributions 275 * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based 276 distributions 277 278 If none of these paths exist, this function returns the value of `fallback`. 279 280 Arguments: 281 282 * fallback: T --- The value to return if none of the known paths exist. 283 The default value is the certificate store of Mozilla's trusted CAs 284 included with the Python [certifi][] package. 285 286 [certifi]: https://pypi.org/project/certifi/ 287 """ 288 for ca_certs_path in [ 289 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note 290 # that httplib2 itself also supports HTTPLIB2_CA_CERTS. 291 os.environ.get('SSL_CERT_FILE'), 292 # Arvados specific: 293 '/etc/arvados/ca-certificates.crt', 294 # Debian: 295 '/etc/ssl/certs/ca-certificates.crt', 296 # Red Hat: 297 '/etc/pki/tls/certs/ca-bundle.crt', 298 ]: 299 if ca_certs_path and os.path.exists(ca_certs_path): 300 return ca_certs_path 301 return fallback
Return the path of the best available source of CA certificates
This function checks various known paths that provide trusted CA certificates, and returns the first one that exists. It checks:
- the path in the
SSL_CERT_FILE
environment variable (used by OpenSSL) /etc/arvados/ca-certificates.crt
, respected by all Arvados software/etc/ssl/certs/ca-certificates.crt
, the default store on Debian-based distributions/etc/pki/tls/certs/ca-bundle.crt
, the default store on Red Hat-based distributions
If none of these paths exist, this function returns the value of fallback
.
Arguments:
- fallback: T — The value to return if none of the known paths exist. The default value is the certificate store of Mozilla’s trusted CAs included with the Python certifi package.
303def new_request_id() -> str: 304 """Return a random request ID 305 306 This function generates and returns a random string suitable for use as a 307 `X-Request-Id` header value in the Arvados API. 308 """ 309 rid = "req-" 310 # 2**104 > 36**20 > 2**103 311 n = random.getrandbits(104) 312 for _ in range(20): 313 c = n % 36 314 if c < 10: 315 rid += chr(c+ord('0')) 316 else: 317 rid += chr(c+ord('a')-10) 318 n = n // 36 319 return rid
Return a random request ID
This function generates and returns a random string suitable for use as a
X-Request-Id
header value in the Arvados API.
321def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]: 322 """Return an Arvados cluster's configuration, with caching 323 324 This function gets and returns the Arvados configuration from the API 325 server. It caches the result on the client object and reuses it on any 326 future calls. 327 328 Arguments: 329 330 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client 331 object to use to retrieve and cache the Arvados cluster configuration. 332 """ 333 if not svc._rootDesc.get('resources').get('configs', False): 334 # Old API server version, no config export endpoint 335 return {} 336 if not hasattr(svc, '_cached_config'): 337 svc._cached_config = svc.configs().get().execute() 338 return svc._cached_config
Return an Arvados cluster’s configuration, with caching
This function gets and returns the Arvados configuration from the API server. It caches the result on the client object and reuses it on any future calls.
Arguments:
- svc: arvados.api_resources.ArvadosAPIClient — The Arvados API client object to use to retrieve and cache the Arvados cluster configuration.
340def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]: 341 """Return an Arvados cluster's vocabulary, with caching 342 343 This function gets and returns the Arvados vocabulary from the API 344 server. It caches the result on the client object and reuses it on any 345 future calls. 346 347 .. HINT:: Low-level method 348 This is a relatively low-level wrapper around the Arvados API. Most 349 users will prefer to use `arvados.vocabulary.load_vocabulary`. 350 351 Arguments: 352 353 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client 354 object to use to retrieve and cache the Arvados cluster vocabulary. 355 """ 356 if not svc._rootDesc.get('resources').get('vocabularies', False): 357 # Old API server version, no vocabulary export endpoint 358 return {} 359 if not hasattr(svc, '_cached_vocabulary'): 360 svc._cached_vocabulary = svc.vocabularies().get().execute() 361 return svc._cached_vocabulary
Return an Arvados cluster’s vocabulary, with caching
This function gets and returns the Arvados vocabulary from the API server. It caches the result on the client object and reuses it on any future calls.
Arguments:
- svc: arvados.api_resources.ArvadosAPIClient — The Arvados API client object to use to retrieve and cache the Arvados cluster vocabulary.
363def trim_name(collectionname: str) -> str: 364 """Limit the length of a name to fit within Arvados API limits 365 366 This function ensures that a string is short enough to use as an object 367 name in the Arvados API, leaving room for text that may be added by the 368 `ensure_unique_name` argument. If the source name is short enough, it is 369 returned unchanged. Otherwise, this function returns a string with excess 370 characters removed from the middle of the source string and replaced with 371 an ellipsis. 372 373 Arguments: 374 375 * collectionname: str --- The desired source name 376 """ 377 max_name_len = 254 - 28 378 379 if len(collectionname) > max_name_len: 380 over = len(collectionname) - max_name_len 381 split = int(max_name_len/2) 382 collectionname = collectionname[0:split] + "…" + collectionname[split+over:] 383 384 return collectionname
Limit the length of a name to fit within Arvados API limits
This function ensures that a string is short enough to use as an object
name in the Arvados API, leaving room for text that may be added by the
ensure_unique_name
argument. If the source name is short enough, it is
returned unchanged. Otherwise, this function returns a string with excess
characters removed from the middle of the source string and replaced with
an ellipsis.
Arguments:
- collectionname: str — The desired source name
386@_deprecated('3.0', 'arvados.util.keyset_list_all') 387def list_all(fn, num_retries=0, **kwargs): 388 # Default limit to (effectively) api server's MAX_LIMIT 389 kwargs.setdefault('limit', sys.maxsize) 390 items = [] 391 offset = 0 392 items_available = sys.maxsize 393 while len(items) < items_available: 394 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries) 395 items += c['items'] 396 items_available = c['items_available'] 397 offset = c['offset'] + len(c['items']) 398 return items
400@_deprecated('3.0') 401def clear_tmpdir(path=None): 402 """ 403 Ensure the given directory (or TASK_TMPDIR if none given) 404 exists and is empty. 405 """ 406 from arvados import current_task 407 if path is None: 408 path = current_task().tmpdir 409 if os.path.exists(path): 410 p = subprocess.Popen(['rm', '-rf', path]) 411 stdout, stderr = p.communicate(None) 412 if p.returncode != 0: 413 raise Exception('rm -rf %s: %s' % (path, stderr)) 414 os.mkdir(path)
Ensure the given directory (or TASK_TMPDIR if none given) exists and is empty.
416@_deprecated('3.0', 'subprocess.run') 417def run_command(execargs, **kwargs): 418 kwargs.setdefault('stdin', subprocess.PIPE) 419 kwargs.setdefault('stdout', subprocess.PIPE) 420 kwargs.setdefault('stderr', sys.stderr) 421 kwargs.setdefault('close_fds', True) 422 kwargs.setdefault('shell', False) 423 p = subprocess.Popen(execargs, **kwargs) 424 stdoutdata, stderrdata = p.communicate(None) 425 if p.returncode != 0: 426 raise arvados.errors.CommandFailedError( 427 "run_command %s exit %d:\n%s" % 428 (execargs, p.returncode, stderrdata)) 429 return stdoutdata, stderrdata
431@_deprecated('3.0') 432def git_checkout(url, version, path): 433 from arvados import current_job 434 if not re.search('^/', path): 435 path = os.path.join(current_job().tmpdir, path) 436 if not os.path.exists(path): 437 run_command(["git", "clone", url, path], 438 cwd=os.path.dirname(path)) 439 run_command(["git", "checkout", version], 440 cwd=path) 441 return path
443@_deprecated('3.0') 444def tar_extractor(path, decompress_flag): 445 return subprocess.Popen(["tar", 446 "-C", path, 447 ("-x%sf" % decompress_flag), 448 "-"], 449 stdout=None, 450 stdin=subprocess.PIPE, stderr=sys.stderr, 451 shell=False, close_fds=True)
453@_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module') 454def tarball_extract(tarball, path): 455 """Retrieve a tarball from Keep and extract it to a local 456 directory. Return the absolute path where the tarball was 457 extracted. If the top level of the tarball contained just one 458 file or directory, return the absolute path of that single 459 item. 460 461 tarball -- collection locator 462 path -- where to extract the tarball: absolute, or relative to job tmp 463 """ 464 from arvados import current_job 465 from arvados.collection import CollectionReader 466 if not re.search('^/', path): 467 path = os.path.join(current_job().tmpdir, path) 468 lockfile = open(path + '.lock', 'w') 469 fcntl.flock(lockfile, fcntl.LOCK_EX) 470 try: 471 os.stat(path) 472 except OSError: 473 os.mkdir(path) 474 already_have_it = False 475 try: 476 if os.readlink(os.path.join(path, '.locator')) == tarball: 477 already_have_it = True 478 except OSError: 479 pass 480 if not already_have_it: 481 482 # emulate "rm -f" (i.e., if the file does not exist, we win) 483 try: 484 os.unlink(os.path.join(path, '.locator')) 485 except OSError: 486 if os.path.exists(os.path.join(path, '.locator')): 487 os.unlink(os.path.join(path, '.locator')) 488 489 for f in CollectionReader(tarball).all_files(): 490 f_name = f.name() 491 if f_name.endswith(('.tbz', '.tar.bz2')): 492 p = tar_extractor(path, 'j') 493 elif f_name.endswith(('.tgz', '.tar.gz')): 494 p = tar_extractor(path, 'z') 495 elif f_name.endswith('.tar'): 496 p = tar_extractor(path, '') 497 else: 498 raise arvados.errors.AssertionError( 499 "tarball_extract cannot handle filename %s" % f.name()) 500 while True: 501 buf = f.read(2**20) 502 if len(buf) == 0: 503 break 504 p.stdin.write(buf) 505 p.stdin.close() 506 p.wait() 507 if p.returncode != 0: 508 lockfile.close() 509 raise arvados.errors.CommandFailedError( 510 "tar exited %d" % p.returncode) 511 os.symlink(tarball, os.path.join(path, '.locator')) 512 tld_extracts = [f for f in os.listdir(path) if f != '.locator'] 513 lockfile.close() 514 if len(tld_extracts) == 1: 515 return os.path.join(path, tld_extracts[0]) 516 return path
Retrieve a tarball from Keep and extract it to a local directory. Return the absolute path where the tarball was extracted. If the top level of the tarball contained just one file or directory, return the absolute path of that single item.
tarball – collection locator path – where to extract the tarball: absolute, or relative to job tmp
518@_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module') 519def zipball_extract(zipball, path): 520 """Retrieve a zip archive from Keep and extract it to a local 521 directory. Return the absolute path where the archive was 522 extracted. If the top level of the archive contained just one 523 file or directory, return the absolute path of that single 524 item. 525 526 zipball -- collection locator 527 path -- where to extract the archive: absolute, or relative to job tmp 528 """ 529 from arvados import current_job 530 from arvados.collection import CollectionReader 531 if not re.search('^/', path): 532 path = os.path.join(current_job().tmpdir, path) 533 lockfile = open(path + '.lock', 'w') 534 fcntl.flock(lockfile, fcntl.LOCK_EX) 535 try: 536 os.stat(path) 537 except OSError: 538 os.mkdir(path) 539 already_have_it = False 540 try: 541 if os.readlink(os.path.join(path, '.locator')) == zipball: 542 already_have_it = True 543 except OSError: 544 pass 545 if not already_have_it: 546 547 # emulate "rm -f" (i.e., if the file does not exist, we win) 548 try: 549 os.unlink(os.path.join(path, '.locator')) 550 except OSError: 551 if os.path.exists(os.path.join(path, '.locator')): 552 os.unlink(os.path.join(path, '.locator')) 553 554 for f in CollectionReader(zipball).all_files(): 555 if not f.name().endswith('.zip'): 556 raise arvados.errors.NotImplementedError( 557 "zipball_extract cannot handle filename %s" % f.name()) 558 zip_filename = os.path.join(path, os.path.basename(f.name())) 559 zip_file = open(zip_filename, 'wb') 560 while True: 561 buf = f.read(2**20) 562 if len(buf) == 0: 563 break 564 zip_file.write(buf) 565 zip_file.close() 566 567 p = subprocess.Popen(["unzip", 568 "-q", "-o", 569 "-d", path, 570 zip_filename], 571 stdout=None, 572 stdin=None, stderr=sys.stderr, 573 shell=False, close_fds=True) 574 p.wait() 575 if p.returncode != 0: 576 lockfile.close() 577 raise arvados.errors.CommandFailedError( 578 "unzip exited %d" % p.returncode) 579 os.unlink(zip_filename) 580 os.symlink(zipball, os.path.join(path, '.locator')) 581 tld_extracts = [f for f in os.listdir(path) if f != '.locator'] 582 lockfile.close() 583 if len(tld_extracts) == 1: 584 return os.path.join(path, tld_extracts[0]) 585 return path
Retrieve a zip archive from Keep and extract it to a local directory. Return the absolute path where the archive was extracted. If the top level of the archive contained just one file or directory, return the absolute path of that single item.
zipball – collection locator path – where to extract the archive: absolute, or relative to job tmp
587@_deprecated('3.0', 'arvados.collection.Collection') 588def collection_extract(collection, path, files=[], decompress=True): 589 """Retrieve a collection from Keep and extract it to a local 590 directory. Return the absolute path where the collection was 591 extracted. 592 593 collection -- collection locator 594 path -- where to extract: absolute, or relative to job tmp 595 """ 596 from arvados import current_job 597 from arvados.collection import CollectionReader 598 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection) 599 if matches: 600 collection_hash = matches.group(1) 601 else: 602 collection_hash = hashlib.md5(collection).hexdigest() 603 if not re.search('^/', path): 604 path = os.path.join(current_job().tmpdir, path) 605 lockfile = open(path + '.lock', 'w') 606 fcntl.flock(lockfile, fcntl.LOCK_EX) 607 try: 608 os.stat(path) 609 except OSError: 610 os.mkdir(path) 611 already_have_it = False 612 try: 613 if os.readlink(os.path.join(path, '.locator')) == collection_hash: 614 already_have_it = True 615 except OSError: 616 pass 617 618 # emulate "rm -f" (i.e., if the file does not exist, we win) 619 try: 620 os.unlink(os.path.join(path, '.locator')) 621 except OSError: 622 if os.path.exists(os.path.join(path, '.locator')): 623 os.unlink(os.path.join(path, '.locator')) 624 625 files_got = [] 626 for s in CollectionReader(collection).all_streams(): 627 stream_name = s.name() 628 for f in s.all_files(): 629 if (files == [] or 630 ((f.name() not in files_got) and 631 (f.name() in files or 632 (decompress and f.decompressed_name() in files)))): 633 outname = f.decompressed_name() if decompress else f.name() 634 files_got += [outname] 635 if os.path.exists(os.path.join(path, stream_name, outname)): 636 continue 637 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname))) 638 outfile = open(os.path.join(path, stream_name, outname), 'wb') 639 for buf in (f.readall_decompressed() if decompress 640 else f.readall()): 641 outfile.write(buf) 642 outfile.close() 643 if len(files_got) < len(files): 644 raise arvados.errors.AssertionError( 645 "Wanted files %s but only got %s from %s" % 646 (files, files_got, 647 [z.name() for z in CollectionReader(collection).all_files()])) 648 os.symlink(collection_hash, os.path.join(path, '.locator')) 649 650 lockfile.close() 651 return path
Retrieve a collection from Keep and extract it to a local directory. Return the absolute path where the collection was extracted.
collection – collection locator path – where to extract: absolute, or relative to job tmp
653@_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)') 654def mkdir_dash_p(path): 655 if not os.path.isdir(path): 656 try: 657 os.makedirs(path) 658 except OSError as e: 659 if e.errno == errno.EEXIST and os.path.isdir(path): 660 # It is not an error if someone else creates the 661 # directory between our exists() and makedirs() calls. 662 pass 663 else: 664 raise
666@_deprecated('3.0', 'arvados.collection.Collection') 667def stream_extract(stream, path, files=[], decompress=True): 668 """Retrieve a stream from Keep and extract it to a local 669 directory. Return the absolute path where the stream was 670 extracted. 671 672 stream -- StreamReader object 673 path -- where to extract: absolute, or relative to job tmp 674 """ 675 from arvados import current_job 676 if not re.search('^/', path): 677 path = os.path.join(current_job().tmpdir, path) 678 lockfile = open(path + '.lock', 'w') 679 fcntl.flock(lockfile, fcntl.LOCK_EX) 680 try: 681 os.stat(path) 682 except OSError: 683 os.mkdir(path) 684 685 files_got = [] 686 for f in stream.all_files(): 687 if (files == [] or 688 ((f.name() not in files_got) and 689 (f.name() in files or 690 (decompress and f.decompressed_name() in files)))): 691 outname = f.decompressed_name() if decompress else f.name() 692 files_got += [outname] 693 if os.path.exists(os.path.join(path, outname)): 694 os.unlink(os.path.join(path, outname)) 695 mkdir_dash_p(os.path.dirname(os.path.join(path, outname))) 696 outfile = open(os.path.join(path, outname), 'wb') 697 for buf in (f.readall_decompressed() if decompress 698 else f.readall()): 699 outfile.write(buf) 700 outfile.close() 701 if len(files_got) < len(files): 702 raise arvados.errors.AssertionError( 703 "Wanted files %s but only got %s from %s" % 704 (files, files_got, [z.name() for z in stream.all_files()])) 705 lockfile.close() 706 return path
Retrieve a stream from Keep and extract it to a local directory. Return the absolute path where the stream was extracted.
stream – StreamReader object path – where to extract: absolute, or relative to job tmp
708@_deprecated('3.0', 'os.walk') 709def listdir_recursive(dirname, base=None, max_depth=None): 710 """listdir_recursive(dirname, base, max_depth) 711 712 Return a list of file and directory names found under dirname. 713 714 If base is not None, prepend "{base}/" to each returned name. 715 716 If max_depth is None, descend into directories and return only the 717 names of files found in the directory tree. 718 719 If max_depth is a non-negative integer, stop descending into 720 directories at the given depth, and at that point return directory 721 names instead. 722 723 If max_depth==0 (and base is None) this is equivalent to 724 sorted(os.listdir(dirname)). 725 """ 726 allfiles = [] 727 for ent in sorted(os.listdir(dirname)): 728 ent_path = os.path.join(dirname, ent) 729 ent_base = os.path.join(base, ent) if base else ent 730 if os.path.isdir(ent_path) and max_depth != 0: 731 allfiles += listdir_recursive( 732 ent_path, base=ent_base, 733 max_depth=(max_depth-1 if max_depth else None)) 734 else: 735 allfiles += [ent_base] 736 return allfiles
listdir_recursive(dirname, base, max_depth)
Return a list of file and directory names found under dirname.
If base is not None, prepend “{base}/” to each returned name.
If max_depth is None, descend into directories and return only the names of files found in the directory tree.
If max_depth is a non-negative integer, stop descending into directories at the given depth, and at that point return directory names instead.
If max_depth==0 (and base is None) this is equivalent to sorted(os.listdir(dirname)).