arvados.util

Arvados utilities

This module provides functions and constants that are useful across a variety of Arvados resource types, or extend the Arvados API client (see arvados.api).

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4"""Arvados utilities
  5
  6This module provides functions and constants that are useful across a variety
  7of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
  8"""
  9
 10import errno
 11import fcntl
 12import functools
 13import hashlib
 14import httplib2
 15import os
 16import random
 17import re
 18import subprocess
 19import sys
 20import warnings
 21
 22import arvados.errors
 23
 24from typing import (
 25    Any,
 26    Callable,
 27    Dict,
 28    Iterator,
 29    TypeVar,
 30    Union,
 31)
 32
 33T = TypeVar('T')
 34
 35HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
 36"""Regular expression to match a hexadecimal string (case-insensitive)"""
 37CR_UNCOMMITTED = 'Uncommitted'
 38"""Constant `state` value for uncommited container requests"""
 39CR_COMMITTED = 'Committed'
 40"""Constant `state` value for committed container requests"""
 41CR_FINAL = 'Final'
 42"""Constant `state` value for finalized container requests"""
 43
 44keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
 45"""Regular expression to match any Keep block locator"""
 46signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
 47"""Regular expression to match any Keep block locator with an access token hint"""
 48portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
 49"""Regular expression to match any collection portable data hash"""
 50manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
 51"""Regular expression to match an Arvados collection manifest text"""
 52keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
 53"""Regular expression to match a file path from a collection identified by portable data hash"""
 54keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
 55"""Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
 56
 57uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
 58"""Regular expression to match any Arvados object UUID"""
 59collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
 60"""Regular expression to match any Arvados collection UUID"""
 61container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
 62"""Regular expression to match any Arvados container UUID"""
 63group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
 64"""Regular expression to match any Arvados group UUID"""
 65link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
 66"""Regular expression to match any Arvados link UUID"""
 67user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
 68"""Regular expression to match any Arvados user UUID"""
 69job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
 70"""Regular expression to match any Arvados job UUID
 71
 72.. WARNING:: Deprecated
 73   Arvados job resources are deprecated and will be removed in a future
 74   release. Prefer the containers API instead.
 75"""
 76
 77def _deprecated(version=None, preferred=None):
 78    """Mark a callable as deprecated in the SDK
 79
 80    This will wrap the callable to emit as a DeprecationWarning
 81    and add a deprecation notice to its docstring.
 82
 83    If the following arguments are given, they'll be included in the
 84    notices:
 85
 86    * preferred: str | None --- The name of an alternative that users should
 87      use instead.
 88
 89    * version: str | None --- The version of Arvados when the callable is
 90      scheduled to be removed.
 91    """
 92    if version is None:
 93        version = ''
 94    else:
 95        version = f' and scheduled to be removed in Arvados {version}'
 96    if preferred is None:
 97        preferred = ''
 98    else:
 99        preferred = f' Prefer {preferred} instead.'
100    def deprecated_decorator(func):
101        fullname = f'{func.__module__}.{func.__qualname__}'
102        parent, _, name = fullname.rpartition('.')
103        if name == '__init__':
104            fullname = parent
105        warning_msg = f'{fullname} is deprecated{version}.{preferred}'
106        @functools.wraps(func)
107        def deprecated_wrapper(*args, **kwargs):
108            warnings.warn(warning_msg, DeprecationWarning, 2)
109            return func(*args, **kwargs)
110        # Get func's docstring without any trailing newline or empty lines.
111        func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
112        match = re.search(r'\n([ \t]+)\S', func_doc)
113        indent = '' if match is None else match.group(1)
114        warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
115        # Make the deprecation notice the second "paragraph" of the
116        # docstring if possible. Otherwise append it.
117        docstring, count = re.subn(
118            rf'\n[ \t]*\n{indent}',
119            f'{warning_doc}\n\n{indent}',
120            func_doc,
121            count=1,
122        )
123        if not count:
124            docstring = f'{func_doc.lstrip()}{warning_doc}'
125        deprecated_wrapper.__doc__ = docstring
126        return deprecated_wrapper
127    return deprecated_decorator
128
129def is_hex(s: str, *length_args: int) -> bool:
130    """Indicate whether a string is a hexadecimal number
131
132    This method returns true if all characters in the string are hexadecimal
133    digits. It is case-insensitive.
134
135    You can also pass optional length arguments to check that the string has
136    the expected number of digits. If you pass one integer, the string must
137    have that length exactly, otherwise the method returns False. If you
138    pass two integers, the string's length must fall within that minimum and
139    maximum (inclusive), otherwise the method returns False.
140
141    Arguments:
142
143    * s: str --- The string to check
144
145    * length_args: int --- Optional length limit(s) for the string to check
146    """
147    num_length_args = len(length_args)
148    if num_length_args > 2:
149        raise arvados.errors.ArgumentError(
150            "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
151    elif num_length_args == 2:
152        good_len = (length_args[0] <= len(s) <= length_args[1])
153    elif num_length_args == 1:
154        good_len = (len(s) == length_args[0])
155    else:
156        good_len = True
157    return bool(good_len and HEX_RE.match(s))
158
159def keyset_list_all(
160        fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
161        order_key: str="created_at",
162        num_retries: int=0,
163        ascending: bool=True,
164        **kwargs: Any,
165) -> Iterator[Dict[str, Any]]:
166    """Iterate all Arvados resources from an API list call
167
168    This method takes a method that represents an Arvados API list call, and
169    iterates the objects returned by the API server. It can make multiple API
170    calls to retrieve and iterate all objects available from the API server.
171
172    Arguments:
173
174    * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
175      function that wraps an Arvados API method that returns a list of
176      objects. If you have an Arvados API client named `arv`, examples
177      include `arv.collections().list` and `arv.groups().contents`. Note
178      that you should pass the function *without* calling it.
179
180    * order_key: str --- The name of the primary object field that objects
181      should be sorted by. This name is used to build an `order` argument
182      for `fn`. Default `'created_at'`.
183
184    * num_retries: int --- This argument is passed through to
185      `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
186      that method's docstring for details. Default 0 (meaning API calls will
187      use the `num_retries` value set when the Arvados API client was
188      constructed).
189
190    * ascending: bool --- Used to build an `order` argument for `fn`. If True,
191      all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
192      fields will be sorted in `'desc'` (descending) order.
193
194    Additional keyword arguments will be passed directly to `fn` for each API
195    call. Note that this function sets `count`, `limit`, and `order` as part of
196    its work.
197    """
198    pagesize = 1000
199    kwargs["limit"] = pagesize
200    kwargs["count"] = 'none'
201    asc = "asc" if ascending else "desc"
202    kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
203    other_filters = kwargs.get("filters", [])
204
205    try:
206        select = set(kwargs['select'])
207    except KeyError:
208        pass
209    else:
210        select.add(order_key)
211        select.add('uuid')
212        kwargs['select'] = list(select)
213
214    nextpage = []
215    tot = 0
216    expect_full_page = True
217    seen_prevpage = set()
218    seen_thispage = set()
219    lastitem = None
220    prev_page_all_same_order_key = False
221
222    while True:
223        kwargs["filters"] = nextpage+other_filters
224        items = fn(**kwargs).execute(num_retries=num_retries)
225
226        if len(items["items"]) == 0:
227            if prev_page_all_same_order_key:
228                nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
229                prev_page_all_same_order_key = False
230                continue
231            else:
232                return
233
234        seen_prevpage = seen_thispage
235        seen_thispage = set()
236
237        for i in items["items"]:
238            # In cases where there's more than one record with the
239            # same order key, the result could include records we
240            # already saw in the last page.  Skip them.
241            if i["uuid"] in seen_prevpage:
242                continue
243            seen_thispage.add(i["uuid"])
244            yield i
245
246        firstitem = items["items"][0]
247        lastitem = items["items"][-1]
248
249        if firstitem[order_key] == lastitem[order_key]:
250            # Got a page where every item has the same order key.
251            # Switch to using uuid for paging.
252            nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
253            prev_page_all_same_order_key = True
254        else:
255            # Start from the last order key seen, but skip the last
256            # known uuid to avoid retrieving the same row twice.  If
257            # there are multiple rows with the same order key it is
258            # still likely we'll end up retrieving duplicate rows.
259            # That's handled by tracking the "seen" rows for each page
260            # so they can be skipped if they show up on the next page.
261            nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
262            prev_page_all_same_order_key = False
263
264def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
265    """Return the path of the best available source of CA certificates
266
267    This function checks various known paths that provide trusted CA
268    certificates, and returns the first one that exists. It checks:
269
270    * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
271    * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
272    * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
273      distributions
274    * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
275      distributions
276
277    If none of these paths exist, this function returns the value of `fallback`.
278
279    Arguments:
280
281    * fallback: T --- The value to return if none of the known paths exist.
282      The default value is the certificate store of Mozilla's trusted CAs
283      included with the Python [certifi][] package.
284
285    [certifi]: https://pypi.org/project/certifi/
286    """
287    for ca_certs_path in [
288        # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
289        # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
290        os.environ.get('SSL_CERT_FILE'),
291        # Arvados specific:
292        '/etc/arvados/ca-certificates.crt',
293        # Debian:
294        '/etc/ssl/certs/ca-certificates.crt',
295        # Red Hat:
296        '/etc/pki/tls/certs/ca-bundle.crt',
297        ]:
298        if ca_certs_path and os.path.exists(ca_certs_path):
299            return ca_certs_path
300    return fallback
301
302def new_request_id() -> str:
303    """Return a random request ID
304
305    This function generates and returns a random string suitable for use as a
306    `X-Request-Id` header value in the Arvados API.
307    """
308    rid = "req-"
309    # 2**104 > 36**20 > 2**103
310    n = random.getrandbits(104)
311    for _ in range(20):
312        c = n % 36
313        if c < 10:
314            rid += chr(c+ord('0'))
315        else:
316            rid += chr(c+ord('a')-10)
317        n = n // 36
318    return rid
319
320def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
321    """Return an Arvados cluster's configuration, with caching
322
323    This function gets and returns the Arvados configuration from the API
324    server. It caches the result on the client object and reuses it on any
325    future calls.
326
327    Arguments:
328
329    * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
330      object to use to retrieve and cache the Arvados cluster configuration.
331    """
332    if not svc._rootDesc.get('resources').get('configs', False):
333        # Old API server version, no config export endpoint
334        return {}
335    if not hasattr(svc, '_cached_config'):
336        svc._cached_config = svc.configs().get().execute()
337    return svc._cached_config
338
339def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
340    """Return an Arvados cluster's vocabulary, with caching
341
342    This function gets and returns the Arvados vocabulary from the API
343    server. It caches the result on the client object and reuses it on any
344    future calls.
345
346    .. HINT:: Low-level method
347       This is a relatively low-level wrapper around the Arvados API. Most
348       users will prefer to use `arvados.vocabulary.load_vocabulary`.
349
350    Arguments:
351
352    * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
353      object to use to retrieve and cache the Arvados cluster vocabulary.
354    """
355    if not svc._rootDesc.get('resources').get('vocabularies', False):
356        # Old API server version, no vocabulary export endpoint
357        return {}
358    if not hasattr(svc, '_cached_vocabulary'):
359        svc._cached_vocabulary = svc.vocabularies().get().execute()
360    return svc._cached_vocabulary
361
362def trim_name(collectionname: str) -> str:
363    """Limit the length of a name to fit within Arvados API limits
364
365    This function ensures that a string is short enough to use as an object
366    name in the Arvados API, leaving room for text that may be added by the
367    `ensure_unique_name` argument. If the source name is short enough, it is
368    returned unchanged. Otherwise, this function returns a string with excess
369    characters removed from the middle of the source string and replaced with
370    an ellipsis.
371
372    Arguments:
373
374    * collectionname: str --- The desired source name
375    """
376    max_name_len = 254 - 28
377
378    if len(collectionname) > max_name_len:
379        over = len(collectionname) - max_name_len
380        split = int(max_name_len/2)
381        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
382
383    return collectionname
384
385@_deprecated('3.0', 'arvados.util.keyset_list_all')
386def list_all(fn, num_retries=0, **kwargs):
387    # Default limit to (effectively) api server's MAX_LIMIT
388    kwargs.setdefault('limit', sys.maxsize)
389    items = []
390    offset = 0
391    items_available = sys.maxsize
392    while len(items) < items_available:
393        c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
394        items += c['items']
395        items_available = c['items_available']
396        offset = c['offset'] + len(c['items'])
397    return items
398
399@_deprecated('3.0')
400def clear_tmpdir(path=None):
401    """
402    Ensure the given directory (or TASK_TMPDIR if none given)
403    exists and is empty.
404    """
405    from arvados import current_task
406    if path is None:
407        path = current_task().tmpdir
408    if os.path.exists(path):
409        p = subprocess.Popen(['rm', '-rf', path])
410        stdout, stderr = p.communicate(None)
411        if p.returncode != 0:
412            raise Exception('rm -rf %s: %s' % (path, stderr))
413    os.mkdir(path)
414
415@_deprecated('3.0', 'subprocess.run')
416def run_command(execargs, **kwargs):
417    kwargs.setdefault('stdin', subprocess.PIPE)
418    kwargs.setdefault('stdout', subprocess.PIPE)
419    kwargs.setdefault('stderr', sys.stderr)
420    kwargs.setdefault('close_fds', True)
421    kwargs.setdefault('shell', False)
422    p = subprocess.Popen(execargs, **kwargs)
423    stdoutdata, stderrdata = p.communicate(None)
424    if p.returncode != 0:
425        raise arvados.errors.CommandFailedError(
426            "run_command %s exit %d:\n%s" %
427            (execargs, p.returncode, stderrdata))
428    return stdoutdata, stderrdata
429
430@_deprecated('3.0')
431def git_checkout(url, version, path):
432    from arvados import current_job
433    if not re.search('^/', path):
434        path = os.path.join(current_job().tmpdir, path)
435    if not os.path.exists(path):
436        run_command(["git", "clone", url, path],
437                    cwd=os.path.dirname(path))
438    run_command(["git", "checkout", version],
439                cwd=path)
440    return path
441
442@_deprecated('3.0')
443def tar_extractor(path, decompress_flag):
444    return subprocess.Popen(["tar",
445                             "-C", path,
446                             ("-x%sf" % decompress_flag),
447                             "-"],
448                            stdout=None,
449                            stdin=subprocess.PIPE, stderr=sys.stderr,
450                            shell=False, close_fds=True)
451
452@_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
453def tarball_extract(tarball, path):
454    """Retrieve a tarball from Keep and extract it to a local
455    directory.  Return the absolute path where the tarball was
456    extracted. If the top level of the tarball contained just one
457    file or directory, return the absolute path of that single
458    item.
459
460    tarball -- collection locator
461    path -- where to extract the tarball: absolute, or relative to job tmp
462    """
463    from arvados import current_job
464    from arvados.collection import CollectionReader
465    if not re.search('^/', path):
466        path = os.path.join(current_job().tmpdir, path)
467    lockfile = open(path + '.lock', 'w')
468    fcntl.flock(lockfile, fcntl.LOCK_EX)
469    try:
470        os.stat(path)
471    except OSError:
472        os.mkdir(path)
473    already_have_it = False
474    try:
475        if os.readlink(os.path.join(path, '.locator')) == tarball:
476            already_have_it = True
477    except OSError:
478        pass
479    if not already_have_it:
480
481        # emulate "rm -f" (i.e., if the file does not exist, we win)
482        try:
483            os.unlink(os.path.join(path, '.locator'))
484        except OSError:
485            if os.path.exists(os.path.join(path, '.locator')):
486                os.unlink(os.path.join(path, '.locator'))
487
488        for f in CollectionReader(tarball).all_files():
489            f_name = f.name()
490            if f_name.endswith(('.tbz', '.tar.bz2')):
491                p = tar_extractor(path, 'j')
492            elif f_name.endswith(('.tgz', '.tar.gz')):
493                p = tar_extractor(path, 'z')
494            elif f_name.endswith('.tar'):
495                p = tar_extractor(path, '')
496            else:
497                raise arvados.errors.AssertionError(
498                    "tarball_extract cannot handle filename %s" % f.name())
499            while True:
500                buf = f.read(2**20)
501                if len(buf) == 0:
502                    break
503                p.stdin.write(buf)
504            p.stdin.close()
505            p.wait()
506            if p.returncode != 0:
507                lockfile.close()
508                raise arvados.errors.CommandFailedError(
509                    "tar exited %d" % p.returncode)
510        os.symlink(tarball, os.path.join(path, '.locator'))
511    tld_extracts = [f for f in os.listdir(path) if f != '.locator']
512    lockfile.close()
513    if len(tld_extracts) == 1:
514        return os.path.join(path, tld_extracts[0])
515    return path
516
517@_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
518def zipball_extract(zipball, path):
519    """Retrieve a zip archive from Keep and extract it to a local
520    directory.  Return the absolute path where the archive was
521    extracted. If the top level of the archive contained just one
522    file or directory, return the absolute path of that single
523    item.
524
525    zipball -- collection locator
526    path -- where to extract the archive: absolute, or relative to job tmp
527    """
528    from arvados import current_job
529    from arvados.collection import CollectionReader
530    if not re.search('^/', path):
531        path = os.path.join(current_job().tmpdir, path)
532    lockfile = open(path + '.lock', 'w')
533    fcntl.flock(lockfile, fcntl.LOCK_EX)
534    try:
535        os.stat(path)
536    except OSError:
537        os.mkdir(path)
538    already_have_it = False
539    try:
540        if os.readlink(os.path.join(path, '.locator')) == zipball:
541            already_have_it = True
542    except OSError:
543        pass
544    if not already_have_it:
545
546        # emulate "rm -f" (i.e., if the file does not exist, we win)
547        try:
548            os.unlink(os.path.join(path, '.locator'))
549        except OSError:
550            if os.path.exists(os.path.join(path, '.locator')):
551                os.unlink(os.path.join(path, '.locator'))
552
553        for f in CollectionReader(zipball).all_files():
554            if not f.name().endswith('.zip'):
555                raise arvados.errors.NotImplementedError(
556                    "zipball_extract cannot handle filename %s" % f.name())
557            zip_filename = os.path.join(path, os.path.basename(f.name()))
558            zip_file = open(zip_filename, 'wb')
559            while True:
560                buf = f.read(2**20)
561                if len(buf) == 0:
562                    break
563                zip_file.write(buf)
564            zip_file.close()
565
566            p = subprocess.Popen(["unzip",
567                                  "-q", "-o",
568                                  "-d", path,
569                                  zip_filename],
570                                 stdout=None,
571                                 stdin=None, stderr=sys.stderr,
572                                 shell=False, close_fds=True)
573            p.wait()
574            if p.returncode != 0:
575                lockfile.close()
576                raise arvados.errors.CommandFailedError(
577                    "unzip exited %d" % p.returncode)
578            os.unlink(zip_filename)
579        os.symlink(zipball, os.path.join(path, '.locator'))
580    tld_extracts = [f for f in os.listdir(path) if f != '.locator']
581    lockfile.close()
582    if len(tld_extracts) == 1:
583        return os.path.join(path, tld_extracts[0])
584    return path
585
586@_deprecated('3.0', 'arvados.collection.Collection')
587def collection_extract(collection, path, files=[], decompress=True):
588    """Retrieve a collection from Keep and extract it to a local
589    directory.  Return the absolute path where the collection was
590    extracted.
591
592    collection -- collection locator
593    path -- where to extract: absolute, or relative to job tmp
594    """
595    from arvados import current_job
596    from arvados.collection import CollectionReader
597    matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
598    if matches:
599        collection_hash = matches.group(1)
600    else:
601        collection_hash = hashlib.md5(collection).hexdigest()
602    if not re.search('^/', path):
603        path = os.path.join(current_job().tmpdir, path)
604    lockfile = open(path + '.lock', 'w')
605    fcntl.flock(lockfile, fcntl.LOCK_EX)
606    try:
607        os.stat(path)
608    except OSError:
609        os.mkdir(path)
610    already_have_it = False
611    try:
612        if os.readlink(os.path.join(path, '.locator')) == collection_hash:
613            already_have_it = True
614    except OSError:
615        pass
616
617    # emulate "rm -f" (i.e., if the file does not exist, we win)
618    try:
619        os.unlink(os.path.join(path, '.locator'))
620    except OSError:
621        if os.path.exists(os.path.join(path, '.locator')):
622            os.unlink(os.path.join(path, '.locator'))
623
624    files_got = []
625    for s in CollectionReader(collection).all_streams():
626        stream_name = s.name()
627        for f in s.all_files():
628            if (files == [] or
629                ((f.name() not in files_got) and
630                 (f.name() in files or
631                  (decompress and f.decompressed_name() in files)))):
632                outname = f.decompressed_name() if decompress else f.name()
633                files_got += [outname]
634                if os.path.exists(os.path.join(path, stream_name, outname)):
635                    continue
636                mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
637                outfile = open(os.path.join(path, stream_name, outname), 'wb')
638                for buf in (f.readall_decompressed() if decompress
639                            else f.readall()):
640                    outfile.write(buf)
641                outfile.close()
642    if len(files_got) < len(files):
643        raise arvados.errors.AssertionError(
644            "Wanted files %s but only got %s from %s" %
645            (files, files_got,
646             [z.name() for z in CollectionReader(collection).all_files()]))
647    os.symlink(collection_hash, os.path.join(path, '.locator'))
648
649    lockfile.close()
650    return path
651
652@_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
653def mkdir_dash_p(path):
654    if not os.path.isdir(path):
655        try:
656            os.makedirs(path)
657        except OSError as e:
658            if e.errno == errno.EEXIST and os.path.isdir(path):
659                # It is not an error if someone else creates the
660                # directory between our exists() and makedirs() calls.
661                pass
662            else:
663                raise
664
665@_deprecated('3.0', 'arvados.collection.Collection')
666def stream_extract(stream, path, files=[], decompress=True):
667    """Retrieve a stream from Keep and extract it to a local
668    directory.  Return the absolute path where the stream was
669    extracted.
670
671    stream -- StreamReader object
672    path -- where to extract: absolute, or relative to job tmp
673    """
674    from arvados import current_job
675    if not re.search('^/', path):
676        path = os.path.join(current_job().tmpdir, path)
677    lockfile = open(path + '.lock', 'w')
678    fcntl.flock(lockfile, fcntl.LOCK_EX)
679    try:
680        os.stat(path)
681    except OSError:
682        os.mkdir(path)
683
684    files_got = []
685    for f in stream.all_files():
686        if (files == [] or
687            ((f.name() not in files_got) and
688             (f.name() in files or
689              (decompress and f.decompressed_name() in files)))):
690            outname = f.decompressed_name() if decompress else f.name()
691            files_got += [outname]
692            if os.path.exists(os.path.join(path, outname)):
693                os.unlink(os.path.join(path, outname))
694            mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
695            outfile = open(os.path.join(path, outname), 'wb')
696            for buf in (f.readall_decompressed() if decompress
697                        else f.readall()):
698                outfile.write(buf)
699            outfile.close()
700    if len(files_got) < len(files):
701        raise arvados.errors.AssertionError(
702            "Wanted files %s but only got %s from %s" %
703            (files, files_got, [z.name() for z in stream.all_files()]))
704    lockfile.close()
705    return path
706
707@_deprecated('3.0', 'os.walk')
708def listdir_recursive(dirname, base=None, max_depth=None):
709    """listdir_recursive(dirname, base, max_depth)
710
711    Return a list of file and directory names found under dirname.
712
713    If base is not None, prepend "{base}/" to each returned name.
714
715    If max_depth is None, descend into directories and return only the
716    names of files found in the directory tree.
717
718    If max_depth is a non-negative integer, stop descending into
719    directories at the given depth, and at that point return directory
720    names instead.
721
722    If max_depth==0 (and base is None) this is equivalent to
723    sorted(os.listdir(dirname)).
724    """
725    allfiles = []
726    for ent in sorted(os.listdir(dirname)):
727        ent_path = os.path.join(dirname, ent)
728        ent_base = os.path.join(base, ent) if base else ent
729        if os.path.isdir(ent_path) and max_depth != 0:
730            allfiles += listdir_recursive(
731                ent_path, base=ent_base,
732                max_depth=(max_depth-1 if max_depth else None))
733        else:
734            allfiles += [ent_base]
735    return allfiles
HEX_RE = re.compile('^[0-9a-fA-F]+$')

Regular expression to match a hexadecimal string (case-insensitive)

CR_UNCOMMITTED = 'Uncommitted'

Constant state value for uncommited container requests

CR_COMMITTED = 'Committed'

Constant state value for committed container requests

CR_FINAL = 'Final'

Constant state value for finalized container requests

keep_locator_pattern = re.compile('[0-9a-f]{32}\\+[0-9]+(\\+\\S+)*')

Regular expression to match any Keep block locator

signed_locator_pattern = re.compile('[0-9a-f]{32}\\+[0-9]+(\\+\\S+)*\\+A\\S+(\\+\\S+)*')

Regular expression to match any Keep block locator with an access token hint

portable_data_hash_pattern = re.compile('[0-9a-f]{32}\\+[0-9]+')

Regular expression to match any collection portable data hash

manifest_pattern = re.compile('((\\S+)( +[a-f0-9]{32}(\\+[0-9]+)(\\+\\S+)*)+( +[0-9]+:[0-9]+:\\S+)+$)+', re.MULTILINE)

Regular expression to match an Arvados collection manifest text

keep_file_locator_pattern = re.compile('([0-9a-f]{32}\\+[0-9]+)/(.*)')

Regular expression to match a file path from a collection identified by portable data hash

keepuri_pattern = re.compile('keep:([0-9a-f]{32}\\+[0-9]+)/(.*)')

Regular expression to match a keep: URI with a collection identified by portable data hash

uuid_pattern = re.compile('[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')

Regular expression to match any Arvados object UUID

collection_uuid_pattern = re.compile('[a-z0-9]{5}-4zz18-[a-z0-9]{15}')

Regular expression to match any Arvados collection UUID

container_uuid_pattern = re.compile('[a-z0-9]{5}-dz642-[a-z0-9]{15}')

Regular expression to match any Arvados container UUID

group_uuid_pattern = re.compile('[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')

Regular expression to match any Arvados group UUID

user_uuid_pattern = re.compile('[a-z0-9]{5}-tpzed-[a-z0-9]{15}')

Regular expression to match any Arvados user UUID

job_uuid_pattern = re.compile('[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')

Regular expression to match any Arvados job UUID

def is_hex(s: str, *length_args: int) -> bool:
130def is_hex(s: str, *length_args: int) -> bool:
131    """Indicate whether a string is a hexadecimal number
132
133    This method returns true if all characters in the string are hexadecimal
134    digits. It is case-insensitive.
135
136    You can also pass optional length arguments to check that the string has
137    the expected number of digits. If you pass one integer, the string must
138    have that length exactly, otherwise the method returns False. If you
139    pass two integers, the string's length must fall within that minimum and
140    maximum (inclusive), otherwise the method returns False.
141
142    Arguments:
143
144    * s: str --- The string to check
145
146    * length_args: int --- Optional length limit(s) for the string to check
147    """
148    num_length_args = len(length_args)
149    if num_length_args > 2:
150        raise arvados.errors.ArgumentError(
151            "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
152    elif num_length_args == 2:
153        good_len = (length_args[0] <= len(s) <= length_args[1])
154    elif num_length_args == 1:
155        good_len = (len(s) == length_args[0])
156    else:
157        good_len = True
158    return bool(good_len and HEX_RE.match(s))

Indicate whether a string is a hexadecimal number

This method returns true if all characters in the string are hexadecimal digits. It is case-insensitive.

You can also pass optional length arguments to check that the string has the expected number of digits. If you pass one integer, the string must have that length exactly, otherwise the method returns False. If you pass two integers, the string’s length must fall within that minimum and maximum (inclusive), otherwise the method returns False.

Arguments:

  • s: str — The string to check

  • length_args: int — Optional length limit(s) for the string to check

def keyset_list_all( fn: Callable[..., arvados.api_resources.ArvadosAPIRequest], order_key: str = 'created_at', num_retries: int = 0, ascending: bool = True, **kwargs: Any) -> Iterator[Dict[str, Any]]:
160def keyset_list_all(
161        fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
162        order_key: str="created_at",
163        num_retries: int=0,
164        ascending: bool=True,
165        **kwargs: Any,
166) -> Iterator[Dict[str, Any]]:
167    """Iterate all Arvados resources from an API list call
168
169    This method takes a method that represents an Arvados API list call, and
170    iterates the objects returned by the API server. It can make multiple API
171    calls to retrieve and iterate all objects available from the API server.
172
173    Arguments:
174
175    * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
176      function that wraps an Arvados API method that returns a list of
177      objects. If you have an Arvados API client named `arv`, examples
178      include `arv.collections().list` and `arv.groups().contents`. Note
179      that you should pass the function *without* calling it.
180
181    * order_key: str --- The name of the primary object field that objects
182      should be sorted by. This name is used to build an `order` argument
183      for `fn`. Default `'created_at'`.
184
185    * num_retries: int --- This argument is passed through to
186      `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
187      that method's docstring for details. Default 0 (meaning API calls will
188      use the `num_retries` value set when the Arvados API client was
189      constructed).
190
191    * ascending: bool --- Used to build an `order` argument for `fn`. If True,
192      all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
193      fields will be sorted in `'desc'` (descending) order.
194
195    Additional keyword arguments will be passed directly to `fn` for each API
196    call. Note that this function sets `count`, `limit`, and `order` as part of
197    its work.
198    """
199    pagesize = 1000
200    kwargs["limit"] = pagesize
201    kwargs["count"] = 'none'
202    asc = "asc" if ascending else "desc"
203    kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
204    other_filters = kwargs.get("filters", [])
205
206    try:
207        select = set(kwargs['select'])
208    except KeyError:
209        pass
210    else:
211        select.add(order_key)
212        select.add('uuid')
213        kwargs['select'] = list(select)
214
215    nextpage = []
216    tot = 0
217    expect_full_page = True
218    seen_prevpage = set()
219    seen_thispage = set()
220    lastitem = None
221    prev_page_all_same_order_key = False
222
223    while True:
224        kwargs["filters"] = nextpage+other_filters
225        items = fn(**kwargs).execute(num_retries=num_retries)
226
227        if len(items["items"]) == 0:
228            if prev_page_all_same_order_key:
229                nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
230                prev_page_all_same_order_key = False
231                continue
232            else:
233                return
234
235        seen_prevpage = seen_thispage
236        seen_thispage = set()
237
238        for i in items["items"]:
239            # In cases where there's more than one record with the
240            # same order key, the result could include records we
241            # already saw in the last page.  Skip them.
242            if i["uuid"] in seen_prevpage:
243                continue
244            seen_thispage.add(i["uuid"])
245            yield i
246
247        firstitem = items["items"][0]
248        lastitem = items["items"][-1]
249
250        if firstitem[order_key] == lastitem[order_key]:
251            # Got a page where every item has the same order key.
252            # Switch to using uuid for paging.
253            nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
254            prev_page_all_same_order_key = True
255        else:
256            # Start from the last order key seen, but skip the last
257            # known uuid to avoid retrieving the same row twice.  If
258            # there are multiple rows with the same order key it is
259            # still likely we'll end up retrieving duplicate rows.
260            # That's handled by tracking the "seen" rows for each page
261            # so they can be skipped if they show up on the next page.
262            nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
263            prev_page_all_same_order_key = False

Iterate all Arvados resources from an API list call

This method takes a method that represents an Arvados API list call, and iterates the objects returned by the API server. It can make multiple API calls to retrieve and iterate all objects available from the API server.

Arguments:

  • fn: Callable[…, arvados.api_resources.ArvadosAPIRequest] — A function that wraps an Arvados API method that returns a list of objects. If you have an Arvados API client named arv, examples include arv.collections().list and arv.groups().contents. Note that you should pass the function without calling it.

  • order_key: str — The name of the primary object field that objects should be sorted by. This name is used to build an order argument for fn. Default 'created_at'.

  • num_retries: int — This argument is passed through to arvados.api_resources.ArvadosAPIRequest.execute for each API call. See that method’s docstring for details. Default 0 (meaning API calls will use the num_retries value set when the Arvados API client was constructed).

  • ascending: bool — Used to build an order argument for fn. If True, all fields will be sorted in 'asc' (ascending) order. Otherwise, all fields will be sorted in 'desc' (descending) order.

Additional keyword arguments will be passed directly to fn for each API call. Note that this function sets count, limit, and order as part of its work.

def ca_certs_path( fallback: ~T = '/var/lib/gitolite3/venv-pdoc/lib/python3.9/site-packages/certifi/cacert.pem') -> Union[str, ~T]:
265def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
266    """Return the path of the best available source of CA certificates
267
268    This function checks various known paths that provide trusted CA
269    certificates, and returns the first one that exists. It checks:
270
271    * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
272    * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
273    * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
274      distributions
275    * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
276      distributions
277
278    If none of these paths exist, this function returns the value of `fallback`.
279
280    Arguments:
281
282    * fallback: T --- The value to return if none of the known paths exist.
283      The default value is the certificate store of Mozilla's trusted CAs
284      included with the Python [certifi][] package.
285
286    [certifi]: https://pypi.org/project/certifi/
287    """
288    for ca_certs_path in [
289        # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
290        # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
291        os.environ.get('SSL_CERT_FILE'),
292        # Arvados specific:
293        '/etc/arvados/ca-certificates.crt',
294        # Debian:
295        '/etc/ssl/certs/ca-certificates.crt',
296        # Red Hat:
297        '/etc/pki/tls/certs/ca-bundle.crt',
298        ]:
299        if ca_certs_path and os.path.exists(ca_certs_path):
300            return ca_certs_path
301    return fallback

Return the path of the best available source of CA certificates

This function checks various known paths that provide trusted CA certificates, and returns the first one that exists. It checks:

  • the path in the SSL_CERT_FILE environment variable (used by OpenSSL)
  • /etc/arvados/ca-certificates.crt, respected by all Arvados software
  • /etc/ssl/certs/ca-certificates.crt, the default store on Debian-based distributions
  • /etc/pki/tls/certs/ca-bundle.crt, the default store on Red Hat-based distributions

If none of these paths exist, this function returns the value of fallback.

Arguments:

  • fallback: T — The value to return if none of the known paths exist. The default value is the certificate store of Mozilla’s trusted CAs included with the Python certifi package.
def new_request_id() -> str:
303def new_request_id() -> str:
304    """Return a random request ID
305
306    This function generates and returns a random string suitable for use as a
307    `X-Request-Id` header value in the Arvados API.
308    """
309    rid = "req-"
310    # 2**104 > 36**20 > 2**103
311    n = random.getrandbits(104)
312    for _ in range(20):
313        c = n % 36
314        if c < 10:
315            rid += chr(c+ord('0'))
316        else:
317            rid += chr(c+ord('a')-10)
318        n = n // 36
319    return rid

Return a random request ID

This function generates and returns a random string suitable for use as a X-Request-Id header value in the Arvados API.

def get_config_once(svc: arvados.api_resources.ArvadosAPIClient) -> Dict[str, Any]:
321def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
322    """Return an Arvados cluster's configuration, with caching
323
324    This function gets and returns the Arvados configuration from the API
325    server. It caches the result on the client object and reuses it on any
326    future calls.
327
328    Arguments:
329
330    * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
331      object to use to retrieve and cache the Arvados cluster configuration.
332    """
333    if not svc._rootDesc.get('resources').get('configs', False):
334        # Old API server version, no config export endpoint
335        return {}
336    if not hasattr(svc, '_cached_config'):
337        svc._cached_config = svc.configs().get().execute()
338    return svc._cached_config

Return an Arvados cluster’s configuration, with caching

This function gets and returns the Arvados configuration from the API server. It caches the result on the client object and reuses it on any future calls.

Arguments:

def get_vocabulary_once(svc: arvados.api_resources.ArvadosAPIClient) -> Dict[str, Any]:
340def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
341    """Return an Arvados cluster's vocabulary, with caching
342
343    This function gets and returns the Arvados vocabulary from the API
344    server. It caches the result on the client object and reuses it on any
345    future calls.
346
347    .. HINT:: Low-level method
348       This is a relatively low-level wrapper around the Arvados API. Most
349       users will prefer to use `arvados.vocabulary.load_vocabulary`.
350
351    Arguments:
352
353    * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
354      object to use to retrieve and cache the Arvados cluster vocabulary.
355    """
356    if not svc._rootDesc.get('resources').get('vocabularies', False):
357        # Old API server version, no vocabulary export endpoint
358        return {}
359    if not hasattr(svc, '_cached_vocabulary'):
360        svc._cached_vocabulary = svc.vocabularies().get().execute()
361    return svc._cached_vocabulary

Return an Arvados cluster’s vocabulary, with caching

This function gets and returns the Arvados vocabulary from the API server. It caches the result on the client object and reuses it on any future calls.

Arguments:

def trim_name(collectionname: str) -> str:
363def trim_name(collectionname: str) -> str:
364    """Limit the length of a name to fit within Arvados API limits
365
366    This function ensures that a string is short enough to use as an object
367    name in the Arvados API, leaving room for text that may be added by the
368    `ensure_unique_name` argument. If the source name is short enough, it is
369    returned unchanged. Otherwise, this function returns a string with excess
370    characters removed from the middle of the source string and replaced with
371    an ellipsis.
372
373    Arguments:
374
375    * collectionname: str --- The desired source name
376    """
377    max_name_len = 254 - 28
378
379    if len(collectionname) > max_name_len:
380        over = len(collectionname) - max_name_len
381        split = int(max_name_len/2)
382        collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
383
384    return collectionname

Limit the length of a name to fit within Arvados API limits

This function ensures that a string is short enough to use as an object name in the Arvados API, leaving room for text that may be added by the ensure_unique_name argument. If the source name is short enough, it is returned unchanged. Otherwise, this function returns a string with excess characters removed from the middle of the source string and replaced with an ellipsis.

Arguments:

  • collectionname: str — The desired source name
def list_all(fn, num_retries=0, **kwargs):
386@_deprecated('3.0', 'arvados.util.keyset_list_all')
387def list_all(fn, num_retries=0, **kwargs):
388    # Default limit to (effectively) api server's MAX_LIMIT
389    kwargs.setdefault('limit', sys.maxsize)
390    items = []
391    offset = 0
392    items_available = sys.maxsize
393    while len(items) < items_available:
394        c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
395        items += c['items']
396        items_available = c['items_available']
397        offset = c['offset'] + len(c['items'])
398    return items
def clear_tmpdir(path=None):
400@_deprecated('3.0')
401def clear_tmpdir(path=None):
402    """
403    Ensure the given directory (or TASK_TMPDIR if none given)
404    exists and is empty.
405    """
406    from arvados import current_task
407    if path is None:
408        path = current_task().tmpdir
409    if os.path.exists(path):
410        p = subprocess.Popen(['rm', '-rf', path])
411        stdout, stderr = p.communicate(None)
412        if p.returncode != 0:
413            raise Exception('rm -rf %s: %s' % (path, stderr))
414    os.mkdir(path)

Ensure the given directory (or TASK_TMPDIR if none given) exists and is empty.

def run_command(execargs, **kwargs):
416@_deprecated('3.0', 'subprocess.run')
417def run_command(execargs, **kwargs):
418    kwargs.setdefault('stdin', subprocess.PIPE)
419    kwargs.setdefault('stdout', subprocess.PIPE)
420    kwargs.setdefault('stderr', sys.stderr)
421    kwargs.setdefault('close_fds', True)
422    kwargs.setdefault('shell', False)
423    p = subprocess.Popen(execargs, **kwargs)
424    stdoutdata, stderrdata = p.communicate(None)
425    if p.returncode != 0:
426        raise arvados.errors.CommandFailedError(
427            "run_command %s exit %d:\n%s" %
428            (execargs, p.returncode, stderrdata))
429    return stdoutdata, stderrdata
def git_checkout(url, version, path):
431@_deprecated('3.0')
432def git_checkout(url, version, path):
433    from arvados import current_job
434    if not re.search('^/', path):
435        path = os.path.join(current_job().tmpdir, path)
436    if not os.path.exists(path):
437        run_command(["git", "clone", url, path],
438                    cwd=os.path.dirname(path))
439    run_command(["git", "checkout", version],
440                cwd=path)
441    return path
def tar_extractor(path, decompress_flag):
443@_deprecated('3.0')
444def tar_extractor(path, decompress_flag):
445    return subprocess.Popen(["tar",
446                             "-C", path,
447                             ("-x%sf" % decompress_flag),
448                             "-"],
449                            stdout=None,
450                            stdin=subprocess.PIPE, stderr=sys.stderr,
451                            shell=False, close_fds=True)
def tarball_extract(tarball, path):
453@_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
454def tarball_extract(tarball, path):
455    """Retrieve a tarball from Keep and extract it to a local
456    directory.  Return the absolute path where the tarball was
457    extracted. If the top level of the tarball contained just one
458    file or directory, return the absolute path of that single
459    item.
460
461    tarball -- collection locator
462    path -- where to extract the tarball: absolute, or relative to job tmp
463    """
464    from arvados import current_job
465    from arvados.collection import CollectionReader
466    if not re.search('^/', path):
467        path = os.path.join(current_job().tmpdir, path)
468    lockfile = open(path + '.lock', 'w')
469    fcntl.flock(lockfile, fcntl.LOCK_EX)
470    try:
471        os.stat(path)
472    except OSError:
473        os.mkdir(path)
474    already_have_it = False
475    try:
476        if os.readlink(os.path.join(path, '.locator')) == tarball:
477            already_have_it = True
478    except OSError:
479        pass
480    if not already_have_it:
481
482        # emulate "rm -f" (i.e., if the file does not exist, we win)
483        try:
484            os.unlink(os.path.join(path, '.locator'))
485        except OSError:
486            if os.path.exists(os.path.join(path, '.locator')):
487                os.unlink(os.path.join(path, '.locator'))
488
489        for f in CollectionReader(tarball).all_files():
490            f_name = f.name()
491            if f_name.endswith(('.tbz', '.tar.bz2')):
492                p = tar_extractor(path, 'j')
493            elif f_name.endswith(('.tgz', '.tar.gz')):
494                p = tar_extractor(path, 'z')
495            elif f_name.endswith('.tar'):
496                p = tar_extractor(path, '')
497            else:
498                raise arvados.errors.AssertionError(
499                    "tarball_extract cannot handle filename %s" % f.name())
500            while True:
501                buf = f.read(2**20)
502                if len(buf) == 0:
503                    break
504                p.stdin.write(buf)
505            p.stdin.close()
506            p.wait()
507            if p.returncode != 0:
508                lockfile.close()
509                raise arvados.errors.CommandFailedError(
510                    "tar exited %d" % p.returncode)
511        os.symlink(tarball, os.path.join(path, '.locator'))
512    tld_extracts = [f for f in os.listdir(path) if f != '.locator']
513    lockfile.close()
514    if len(tld_extracts) == 1:
515        return os.path.join(path, tld_extracts[0])
516    return path

Retrieve a tarball from Keep and extract it to a local directory. Return the absolute path where the tarball was extracted. If the top level of the tarball contained just one file or directory, return the absolute path of that single item.

tarball – collection locator path – where to extract the tarball: absolute, or relative to job tmp

def zipball_extract(zipball, path):
518@_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
519def zipball_extract(zipball, path):
520    """Retrieve a zip archive from Keep and extract it to a local
521    directory.  Return the absolute path where the archive was
522    extracted. If the top level of the archive contained just one
523    file or directory, return the absolute path of that single
524    item.
525
526    zipball -- collection locator
527    path -- where to extract the archive: absolute, or relative to job tmp
528    """
529    from arvados import current_job
530    from arvados.collection import CollectionReader
531    if not re.search('^/', path):
532        path = os.path.join(current_job().tmpdir, path)
533    lockfile = open(path + '.lock', 'w')
534    fcntl.flock(lockfile, fcntl.LOCK_EX)
535    try:
536        os.stat(path)
537    except OSError:
538        os.mkdir(path)
539    already_have_it = False
540    try:
541        if os.readlink(os.path.join(path, '.locator')) == zipball:
542            already_have_it = True
543    except OSError:
544        pass
545    if not already_have_it:
546
547        # emulate "rm -f" (i.e., if the file does not exist, we win)
548        try:
549            os.unlink(os.path.join(path, '.locator'))
550        except OSError:
551            if os.path.exists(os.path.join(path, '.locator')):
552                os.unlink(os.path.join(path, '.locator'))
553
554        for f in CollectionReader(zipball).all_files():
555            if not f.name().endswith('.zip'):
556                raise arvados.errors.NotImplementedError(
557                    "zipball_extract cannot handle filename %s" % f.name())
558            zip_filename = os.path.join(path, os.path.basename(f.name()))
559            zip_file = open(zip_filename, 'wb')
560            while True:
561                buf = f.read(2**20)
562                if len(buf) == 0:
563                    break
564                zip_file.write(buf)
565            zip_file.close()
566
567            p = subprocess.Popen(["unzip",
568                                  "-q", "-o",
569                                  "-d", path,
570                                  zip_filename],
571                                 stdout=None,
572                                 stdin=None, stderr=sys.stderr,
573                                 shell=False, close_fds=True)
574            p.wait()
575            if p.returncode != 0:
576                lockfile.close()
577                raise arvados.errors.CommandFailedError(
578                    "unzip exited %d" % p.returncode)
579            os.unlink(zip_filename)
580        os.symlink(zipball, os.path.join(path, '.locator'))
581    tld_extracts = [f for f in os.listdir(path) if f != '.locator']
582    lockfile.close()
583    if len(tld_extracts) == 1:
584        return os.path.join(path, tld_extracts[0])
585    return path

Retrieve a zip archive from Keep and extract it to a local directory. Return the absolute path where the archive was extracted. If the top level of the archive contained just one file or directory, return the absolute path of that single item.

zipball – collection locator path – where to extract the archive: absolute, or relative to job tmp

def collection_extract(collection, path, files=[], decompress=True):
587@_deprecated('3.0', 'arvados.collection.Collection')
588def collection_extract(collection, path, files=[], decompress=True):
589    """Retrieve a collection from Keep and extract it to a local
590    directory.  Return the absolute path where the collection was
591    extracted.
592
593    collection -- collection locator
594    path -- where to extract: absolute, or relative to job tmp
595    """
596    from arvados import current_job
597    from arvados.collection import CollectionReader
598    matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
599    if matches:
600        collection_hash = matches.group(1)
601    else:
602        collection_hash = hashlib.md5(collection).hexdigest()
603    if not re.search('^/', path):
604        path = os.path.join(current_job().tmpdir, path)
605    lockfile = open(path + '.lock', 'w')
606    fcntl.flock(lockfile, fcntl.LOCK_EX)
607    try:
608        os.stat(path)
609    except OSError:
610        os.mkdir(path)
611    already_have_it = False
612    try:
613        if os.readlink(os.path.join(path, '.locator')) == collection_hash:
614            already_have_it = True
615    except OSError:
616        pass
617
618    # emulate "rm -f" (i.e., if the file does not exist, we win)
619    try:
620        os.unlink(os.path.join(path, '.locator'))
621    except OSError:
622        if os.path.exists(os.path.join(path, '.locator')):
623            os.unlink(os.path.join(path, '.locator'))
624
625    files_got = []
626    for s in CollectionReader(collection).all_streams():
627        stream_name = s.name()
628        for f in s.all_files():
629            if (files == [] or
630                ((f.name() not in files_got) and
631                 (f.name() in files or
632                  (decompress and f.decompressed_name() in files)))):
633                outname = f.decompressed_name() if decompress else f.name()
634                files_got += [outname]
635                if os.path.exists(os.path.join(path, stream_name, outname)):
636                    continue
637                mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
638                outfile = open(os.path.join(path, stream_name, outname), 'wb')
639                for buf in (f.readall_decompressed() if decompress
640                            else f.readall()):
641                    outfile.write(buf)
642                outfile.close()
643    if len(files_got) < len(files):
644        raise arvados.errors.AssertionError(
645            "Wanted files %s but only got %s from %s" %
646            (files, files_got,
647             [z.name() for z in CollectionReader(collection).all_files()]))
648    os.symlink(collection_hash, os.path.join(path, '.locator'))
649
650    lockfile.close()
651    return path

Retrieve a collection from Keep and extract it to a local directory. Return the absolute path where the collection was extracted.

collection – collection locator path – where to extract: absolute, or relative to job tmp

def mkdir_dash_p(path):
653@_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
654def mkdir_dash_p(path):
655    if not os.path.isdir(path):
656        try:
657            os.makedirs(path)
658        except OSError as e:
659            if e.errno == errno.EEXIST and os.path.isdir(path):
660                # It is not an error if someone else creates the
661                # directory between our exists() and makedirs() calls.
662                pass
663            else:
664                raise
def stream_extract(stream, path, files=[], decompress=True):
666@_deprecated('3.0', 'arvados.collection.Collection')
667def stream_extract(stream, path, files=[], decompress=True):
668    """Retrieve a stream from Keep and extract it to a local
669    directory.  Return the absolute path where the stream was
670    extracted.
671
672    stream -- StreamReader object
673    path -- where to extract: absolute, or relative to job tmp
674    """
675    from arvados import current_job
676    if not re.search('^/', path):
677        path = os.path.join(current_job().tmpdir, path)
678    lockfile = open(path + '.lock', 'w')
679    fcntl.flock(lockfile, fcntl.LOCK_EX)
680    try:
681        os.stat(path)
682    except OSError:
683        os.mkdir(path)
684
685    files_got = []
686    for f in stream.all_files():
687        if (files == [] or
688            ((f.name() not in files_got) and
689             (f.name() in files or
690              (decompress and f.decompressed_name() in files)))):
691            outname = f.decompressed_name() if decompress else f.name()
692            files_got += [outname]
693            if os.path.exists(os.path.join(path, outname)):
694                os.unlink(os.path.join(path, outname))
695            mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
696            outfile = open(os.path.join(path, outname), 'wb')
697            for buf in (f.readall_decompressed() if decompress
698                        else f.readall()):
699                outfile.write(buf)
700            outfile.close()
701    if len(files_got) < len(files):
702        raise arvados.errors.AssertionError(
703            "Wanted files %s but only got %s from %s" %
704            (files, files_got, [z.name() for z in stream.all_files()]))
705    lockfile.close()
706    return path

Retrieve a stream from Keep and extract it to a local directory. Return the absolute path where the stream was extracted.

stream – StreamReader object path – where to extract: absolute, or relative to job tmp

def listdir_recursive(dirname, base=None, max_depth=None):
708@_deprecated('3.0', 'os.walk')
709def listdir_recursive(dirname, base=None, max_depth=None):
710    """listdir_recursive(dirname, base, max_depth)
711
712    Return a list of file and directory names found under dirname.
713
714    If base is not None, prepend "{base}/" to each returned name.
715
716    If max_depth is None, descend into directories and return only the
717    names of files found in the directory tree.
718
719    If max_depth is a non-negative integer, stop descending into
720    directories at the given depth, and at that point return directory
721    names instead.
722
723    If max_depth==0 (and base is None) this is equivalent to
724    sorted(os.listdir(dirname)).
725    """
726    allfiles = []
727    for ent in sorted(os.listdir(dirname)):
728        ent_path = os.path.join(dirname, ent)
729        ent_base = os.path.join(base, ent) if base else ent
730        if os.path.isdir(ent_path) and max_depth != 0:
731            allfiles += listdir_recursive(
732                ent_path, base=ent_base,
733                max_depth=(max_depth-1 if max_depth else None))
734        else:
735            allfiles += [ent_base]
736    return allfiles

listdir_recursive(dirname, base, max_depth)

Return a list of file and directory names found under dirname.

If base is not None, prepend “{base}/” to each returned name.

If max_depth is None, descend into directories and return only the names of files found in the directory tree.

If max_depth is a non-negative integer, stop descending into directories at the given depth, and at that point return directory names instead.

If max_depth==0 (and base is None) this is equivalent to sorted(os.listdir(dirname)).