Package arvados :: Module util
[hide private]
[frames] | no frames]

Source Code for Module arvados.util

  1  # Copyright (C) The Arvados Authors. All rights reserved. 
  2  # 
  3  # SPDX-License-Identifier: Apache-2.0 
  4   
  5  from __future__ import division 
  6  from builtins import range 
  7   
  8  import fcntl 
  9  import hashlib 
 10  import httplib2 
 11  import os 
 12  import random 
 13  import re 
 14  import subprocess 
 15  import errno 
 16  import sys 
 17   
 18  import arvados 
 19  from arvados.collection import CollectionReader 
 20   
 21  HEX_RE = re.compile(r'^[0-9a-fA-F]+$') 
 22   
 23  keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*') 
 24  signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*') 
 25  portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+') 
 26  uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}') 
 27  collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}') 
 28  group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}') 
 29  user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}') 
 30  link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}') 
 31  job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}') 
 32  container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}') 
 33  manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE) 
 34   
35 -def clear_tmpdir(path=None):
36 """ 37 Ensure the given directory (or TASK_TMPDIR if none given) 38 exists and is empty. 39 """ 40 if path is None: 41 path = arvados.current_task().tmpdir 42 if os.path.exists(path): 43 p = subprocess.Popen(['rm', '-rf', path]) 44 stdout, stderr = p.communicate(None) 45 if p.returncode != 0: 46 raise Exception('rm -rf %s: %s' % (path, stderr)) 47 os.mkdir(path)
48
49 -def run_command(execargs, **kwargs):
50 kwargs.setdefault('stdin', subprocess.PIPE) 51 kwargs.setdefault('stdout', subprocess.PIPE) 52 kwargs.setdefault('stderr', sys.stderr) 53 kwargs.setdefault('close_fds', True) 54 kwargs.setdefault('shell', False) 55 p = subprocess.Popen(execargs, **kwargs) 56 stdoutdata, stderrdata = p.communicate(None) 57 if p.returncode != 0: 58 raise arvados.errors.CommandFailedError( 59 "run_command %s exit %d:\n%s" % 60 (execargs, p.returncode, stderrdata)) 61 return stdoutdata, stderrdata
62
63 -def git_checkout(url, version, path):
64 if not re.search('^/', path): 65 path = os.path.join(arvados.current_job().tmpdir, path) 66 if not os.path.exists(path): 67 run_command(["git", "clone", url, path], 68 cwd=os.path.dirname(path)) 69 run_command(["git", "checkout", version], 70 cwd=path) 71 return path
72
73 -def tar_extractor(path, decompress_flag):
74 return subprocess.Popen(["tar", 75 "-C", path, 76 ("-x%sf" % decompress_flag), 77 "-"], 78 stdout=None, 79 stdin=subprocess.PIPE, stderr=sys.stderr, 80 shell=False, close_fds=True)
81
82 -def tarball_extract(tarball, path):
83 """Retrieve a tarball from Keep and extract it to a local 84 directory. Return the absolute path where the tarball was 85 extracted. If the top level of the tarball contained just one 86 file or directory, return the absolute path of that single 87 item. 88 89 tarball -- collection locator 90 path -- where to extract the tarball: absolute, or relative to job tmp 91 """ 92 if not re.search('^/', path): 93 path = os.path.join(arvados.current_job().tmpdir, path) 94 lockfile = open(path + '.lock', 'w') 95 fcntl.flock(lockfile, fcntl.LOCK_EX) 96 try: 97 os.stat(path) 98 except OSError: 99 os.mkdir(path) 100 already_have_it = False 101 try: 102 if os.readlink(os.path.join(path, '.locator')) == tarball: 103 already_have_it = True 104 except OSError: 105 pass 106 if not already_have_it: 107 108 # emulate "rm -f" (i.e., if the file does not exist, we win) 109 try: 110 os.unlink(os.path.join(path, '.locator')) 111 except OSError: 112 if os.path.exists(os.path.join(path, '.locator')): 113 os.unlink(os.path.join(path, '.locator')) 114 115 for f in CollectionReader(tarball).all_files(): 116 if re.search('\.(tbz|tar.bz2)$', f.name()): 117 p = tar_extractor(path, 'j') 118 elif re.search('\.(tgz|tar.gz)$', f.name()): 119 p = tar_extractor(path, 'z') 120 elif re.search('\.tar$', f.name()): 121 p = tar_extractor(path, '') 122 else: 123 raise arvados.errors.AssertionError( 124 "tarball_extract cannot handle filename %s" % f.name()) 125 while True: 126 buf = f.read(2**20) 127 if len(buf) == 0: 128 break 129 p.stdin.write(buf) 130 p.stdin.close() 131 p.wait() 132 if p.returncode != 0: 133 lockfile.close() 134 raise arvados.errors.CommandFailedError( 135 "tar exited %d" % p.returncode) 136 os.symlink(tarball, os.path.join(path, '.locator')) 137 tld_extracts = [f for f in os.listdir(path) if f != '.locator'] 138 lockfile.close() 139 if len(tld_extracts) == 1: 140 return os.path.join(path, tld_extracts[0]) 141 return path
142
143 -def zipball_extract(zipball, path):
144 """Retrieve a zip archive from Keep and extract it to a local 145 directory. Return the absolute path where the archive was 146 extracted. If the top level of the archive contained just one 147 file or directory, return the absolute path of that single 148 item. 149 150 zipball -- collection locator 151 path -- where to extract the archive: absolute, or relative to job tmp 152 """ 153 if not re.search('^/', path): 154 path = os.path.join(arvados.current_job().tmpdir, path) 155 lockfile = open(path + '.lock', 'w') 156 fcntl.flock(lockfile, fcntl.LOCK_EX) 157 try: 158 os.stat(path) 159 except OSError: 160 os.mkdir(path) 161 already_have_it = False 162 try: 163 if os.readlink(os.path.join(path, '.locator')) == zipball: 164 already_have_it = True 165 except OSError: 166 pass 167 if not already_have_it: 168 169 # emulate "rm -f" (i.e., if the file does not exist, we win) 170 try: 171 os.unlink(os.path.join(path, '.locator')) 172 except OSError: 173 if os.path.exists(os.path.join(path, '.locator')): 174 os.unlink(os.path.join(path, '.locator')) 175 176 for f in CollectionReader(zipball).all_files(): 177 if not re.search('\.zip$', f.name()): 178 raise arvados.errors.NotImplementedError( 179 "zipball_extract cannot handle filename %s" % f.name()) 180 zip_filename = os.path.join(path, os.path.basename(f.name())) 181 zip_file = open(zip_filename, 'wb') 182 while True: 183 buf = f.read(2**20) 184 if len(buf) == 0: 185 break 186 zip_file.write(buf) 187 zip_file.close() 188 189 p = subprocess.Popen(["unzip", 190 "-q", "-o", 191 "-d", path, 192 zip_filename], 193 stdout=None, 194 stdin=None, stderr=sys.stderr, 195 shell=False, close_fds=True) 196 p.wait() 197 if p.returncode != 0: 198 lockfile.close() 199 raise arvados.errors.CommandFailedError( 200 "unzip exited %d" % p.returncode) 201 os.unlink(zip_filename) 202 os.symlink(zipball, os.path.join(path, '.locator')) 203 tld_extracts = [f for f in os.listdir(path) if f != '.locator'] 204 lockfile.close() 205 if len(tld_extracts) == 1: 206 return os.path.join(path, tld_extracts[0]) 207 return path
208
209 -def collection_extract(collection, path, files=[], decompress=True):
210 """Retrieve a collection from Keep and extract it to a local 211 directory. Return the absolute path where the collection was 212 extracted. 213 214 collection -- collection locator 215 path -- where to extract: absolute, or relative to job tmp 216 """ 217 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection) 218 if matches: 219 collection_hash = matches.group(1) 220 else: 221 collection_hash = hashlib.md5(collection).hexdigest() 222 if not re.search('^/', path): 223 path = os.path.join(arvados.current_job().tmpdir, path) 224 lockfile = open(path + '.lock', 'w') 225 fcntl.flock(lockfile, fcntl.LOCK_EX) 226 try: 227 os.stat(path) 228 except OSError: 229 os.mkdir(path) 230 already_have_it = False 231 try: 232 if os.readlink(os.path.join(path, '.locator')) == collection_hash: 233 already_have_it = True 234 except OSError: 235 pass 236 237 # emulate "rm -f" (i.e., if the file does not exist, we win) 238 try: 239 os.unlink(os.path.join(path, '.locator')) 240 except OSError: 241 if os.path.exists(os.path.join(path, '.locator')): 242 os.unlink(os.path.join(path, '.locator')) 243 244 files_got = [] 245 for s in CollectionReader(collection).all_streams(): 246 stream_name = s.name() 247 for f in s.all_files(): 248 if (files == [] or 249 ((f.name() not in files_got) and 250 (f.name() in files or 251 (decompress and f.decompressed_name() in files)))): 252 outname = f.decompressed_name() if decompress else f.name() 253 files_got += [outname] 254 if os.path.exists(os.path.join(path, stream_name, outname)): 255 continue 256 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname))) 257 outfile = open(os.path.join(path, stream_name, outname), 'wb') 258 for buf in (f.readall_decompressed() if decompress 259 else f.readall()): 260 outfile.write(buf) 261 outfile.close() 262 if len(files_got) < len(files): 263 raise arvados.errors.AssertionError( 264 "Wanted files %s but only got %s from %s" % 265 (files, files_got, 266 [z.name() for z in CollectionReader(collection).all_files()])) 267 os.symlink(collection_hash, os.path.join(path, '.locator')) 268 269 lockfile.close() 270 return path
271
272 -def mkdir_dash_p(path):
273 if not os.path.isdir(path): 274 try: 275 os.makedirs(path) 276 except OSError as e: 277 if e.errno == errno.EEXIST and os.path.isdir(path): 278 # It is not an error if someone else creates the 279 # directory between our exists() and makedirs() calls. 280 pass 281 else: 282 raise
283
284 -def stream_extract(stream, path, files=[], decompress=True):
285 """Retrieve a stream from Keep and extract it to a local 286 directory. Return the absolute path where the stream was 287 extracted. 288 289 stream -- StreamReader object 290 path -- where to extract: absolute, or relative to job tmp 291 """ 292 if not re.search('^/', path): 293 path = os.path.join(arvados.current_job().tmpdir, path) 294 lockfile = open(path + '.lock', 'w') 295 fcntl.flock(lockfile, fcntl.LOCK_EX) 296 try: 297 os.stat(path) 298 except OSError: 299 os.mkdir(path) 300 301 files_got = [] 302 for f in stream.all_files(): 303 if (files == [] or 304 ((f.name() not in files_got) and 305 (f.name() in files or 306 (decompress and f.decompressed_name() in files)))): 307 outname = f.decompressed_name() if decompress else f.name() 308 files_got += [outname] 309 if os.path.exists(os.path.join(path, outname)): 310 os.unlink(os.path.join(path, outname)) 311 mkdir_dash_p(os.path.dirname(os.path.join(path, outname))) 312 outfile = open(os.path.join(path, outname), 'wb') 313 for buf in (f.readall_decompressed() if decompress 314 else f.readall()): 315 outfile.write(buf) 316 outfile.close() 317 if len(files_got) < len(files): 318 raise arvados.errors.AssertionError( 319 "Wanted files %s but only got %s from %s" % 320 (files, files_got, [z.name() for z in stream.all_files()])) 321 lockfile.close() 322 return path
323
324 -def listdir_recursive(dirname, base=None, max_depth=None):
325 """listdir_recursive(dirname, base, max_depth) 326 327 Return a list of file and directory names found under dirname. 328 329 If base is not None, prepend "{base}/" to each returned name. 330 331 If max_depth is None, descend into directories and return only the 332 names of files found in the directory tree. 333 334 If max_depth is a non-negative integer, stop descending into 335 directories at the given depth, and at that point return directory 336 names instead. 337 338 If max_depth==0 (and base is None) this is equivalent to 339 sorted(os.listdir(dirname)). 340 """ 341 allfiles = [] 342 for ent in sorted(os.listdir(dirname)): 343 ent_path = os.path.join(dirname, ent) 344 ent_base = os.path.join(base, ent) if base else ent 345 if os.path.isdir(ent_path) and max_depth != 0: 346 allfiles += listdir_recursive( 347 ent_path, base=ent_base, 348 max_depth=(max_depth-1 if max_depth else None)) 349 else: 350 allfiles += [ent_base] 351 return allfiles
352
353 -def is_hex(s, *length_args):
354 """is_hex(s[, length[, max_length]]) -> boolean 355 356 Return True if s is a string of hexadecimal digits. 357 If one length argument is given, the string must contain exactly 358 that number of digits. 359 If two length arguments are given, the string must contain a number of 360 digits between those two lengths, inclusive. 361 Return False otherwise. 362 """ 363 num_length_args = len(length_args) 364 if num_length_args > 2: 365 raise arvados.errors.ArgumentError( 366 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args)) 367 elif num_length_args == 2: 368 good_len = (length_args[0] <= len(s) <= length_args[1]) 369 elif num_length_args == 1: 370 good_len = (len(s) == length_args[0]) 371 else: 372 good_len = True 373 return bool(good_len and HEX_RE.match(s))
374
375 -def list_all(fn, num_retries=0, **kwargs):
376 # Default limit to (effectively) api server's MAX_LIMIT 377 kwargs.setdefault('limit', sys.maxsize) 378 items = [] 379 offset = 0 380 items_available = sys.maxsize 381 while len(items) < items_available: 382 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries) 383 items += c['items'] 384 items_available = c['items_available'] 385 offset = c['offset'] + len(c['items']) 386 return items
387
388 -def ca_certs_path(fallback=httplib2.CA_CERTS):
389 """Return the path of the best available CA certs source. 390 391 This function searches for various distribution sources of CA 392 certificates, and returns the first it finds. If it doesn't find any, 393 it returns the value of `fallback` (httplib2's CA certs by default). 394 """ 395 for ca_certs_path in [ 396 # Arvados specific: 397 '/etc/arvados/ca-certificates.crt', 398 # Debian: 399 '/etc/ssl/certs/ca-certificates.crt', 400 # Red Hat: 401 '/etc/pki/tls/certs/ca-bundle.crt', 402 ]: 403 if os.path.exists(ca_certs_path): 404 return ca_certs_path 405 return fallback
406
407 -def new_request_id():
408 rid = "req-" 409 # 2**104 > 36**20 > 2**103 410 n = random.getrandbits(104) 411 for _ in range(20): 412 c = n % 36 413 if c < 10: 414 rid += chr(c+ord('0')) 415 else: 416 rid += chr(c+ord('a')-10) 417 n = n // 36 418 return rid
419