Package arvados :: Package commands :: Module keepdocker
[hide private]
[frames] | no frames]

Source Code for Module arvados.commands.keepdocker

  1  # Copyright (C) The Arvados Authors. All rights reserved. 
  2  # 
  3  # SPDX-License-Identifier: Apache-2.0 
  4   
  5  from builtins import next 
  6  import argparse 
  7  import collections 
  8  import datetime 
  9  import errno 
 10  import json 
 11  import os 
 12  import re 
 13  import subprocess32 as subprocess 
 14  import sys 
 15  import tarfile 
 16  import tempfile 
 17  import shutil 
 18  import _strptime 
 19  import fcntl 
 20   
 21  from operator import itemgetter 
 22  from stat import * 
 23   
 24  import arvados 
 25  import arvados.util 
 26  import arvados.commands._util as arv_cmd 
 27  import arvados.commands.put as arv_put 
 28  from arvados.collection import CollectionReader 
 29  import ciso8601 
 30  import logging 
 31  import arvados.config 
 32   
 33  from arvados._version import __version__ 
 34   
 35  logger = logging.getLogger('arvados.keepdocker') 
 36  logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG') 
 37                  else logging.INFO) 
 38   
 39  EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0) 
 40  STAT_CACHE_ERRORS = (IOError, OSError, ValueError) 
 41   
 42  DockerImage = collections.namedtuple( 
 43      'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize']) 
 44   
 45  keepdocker_parser = argparse.ArgumentParser(add_help=False) 
 46  keepdocker_parser.add_argument( 
 47      '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 
 48      help='Print version and exit.') 
 49  keepdocker_parser.add_argument( 
 50      '-f', '--force', action='store_true', default=False, 
 51      help="Re-upload the image even if it already exists on the server") 
 52  keepdocker_parser.add_argument( 
 53      '--force-image-format', action='store_true', default=False, 
 54      help="Proceed even if the image format is not supported by the server") 
 55   
 56  _group = keepdocker_parser.add_mutually_exclusive_group() 
 57  _group.add_argument( 
 58      '--pull', action='store_true', default=False, 
 59      help="Try to pull the latest image from Docker registry") 
 60  _group.add_argument( 
 61      '--no-pull', action='store_false', dest='pull', 
 62      help="Use locally installed image only, don't pull image from Docker registry (default)") 
 63   
 64  keepdocker_parser.add_argument( 
 65      'image', nargs='?', 
 66      help="Docker image to upload: repo, repo:tag, or hash") 
 67  keepdocker_parser.add_argument( 
 68      'tag', nargs='?', 
 69      help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name") 
 70   
 71  # Combine keepdocker options listed above with run_opts options of arv-put. 
 72  # The options inherited from arv-put include --name, --project-uuid, 
 73  # --progress/--no-progress/--batch-progress and --resume/--no-resume. 
 74  arg_parser = argparse.ArgumentParser( 
 75          description="Upload or list Docker images in Arvados", 
 76          parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt]) 
 77   
78 -class DockerError(Exception):
79 pass
80 81
82 -def popen_docker(cmd, *args, **kwargs):
83 manage_stdin = ('stdin' not in kwargs) 84 kwargs.setdefault('stdin', subprocess.PIPE) 85 kwargs.setdefault('stdout', sys.stderr) 86 try: 87 docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs) 88 except OSError: # No docker.io in $PATH 89 docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs) 90 if manage_stdin: 91 docker_proc.stdin.close() 92 return docker_proc
93
94 -def check_docker(proc, description):
95 proc.wait() 96 if proc.returncode != 0: 97 raise DockerError("docker {} returned status code {}". 98 format(description, proc.returncode))
99
100 -def docker_image_format(image_hash):
101 """Return the registry format ('v1' or 'v2') of the given image.""" 102 cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash], 103 stdout=subprocess.PIPE) 104 try: 105 image_id = next(cmd.stdout).decode().strip() 106 if image_id.startswith('sha256:'): 107 return 'v2' 108 elif ':' not in image_id: 109 return 'v1' 110 else: 111 return 'unknown' 112 finally: 113 check_docker(cmd, "inspect")
114
115 -def docker_image_compatible(api, image_hash):
116 supported = api._rootDesc.get('dockerImageFormats', []) 117 if not supported: 118 logger.warning("server does not specify supported image formats (see docker_image_formats in server config).") 119 return False 120 121 fmt = docker_image_format(image_hash) 122 if fmt in supported: 123 return True 124 else: 125 logger.error("image format is {!r} " \ 126 "but server supports only {!r}".format(fmt, supported)) 127 return False
128
129 -def docker_images():
130 # Yield a DockerImage tuple for each installed image. 131 list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE) 132 list_output = iter(list_proc.stdout) 133 next(list_output) # Ignore the header line 134 for line in list_output: 135 words = line.split() 136 size_index = len(words) - 2 137 repo, tag, imageid = words[:3] 138 ctime = ' '.join(words[3:size_index]) 139 vsize = ' '.join(words[size_index:]) 140 yield DockerImage(repo, tag, imageid, ctime, vsize) 141 list_proc.stdout.close() 142 check_docker(list_proc, "images")
143
144 -def find_image_hashes(image_search, image_tag=None):
145 # Given one argument, search for Docker images with matching hashes, 146 # and return their full hashes in a set. 147 # Given two arguments, also search for a Docker image with the 148 # same repository and tag. If one is found, return its hash in a 149 # set; otherwise, fall back to the one-argument hash search. 150 # Returns None if no match is found, or a hash search is ambiguous. 151 hash_search = image_search.lower() 152 hash_matches = set() 153 for image in docker_images(): 154 if (image.repo == image_search) and (image.tag == image_tag): 155 return set([image.hash]) 156 elif image.hash.startswith(hash_search): 157 hash_matches.add(image.hash) 158 return hash_matches
159
160 -def find_one_image_hash(image_search, image_tag=None):
161 hashes = find_image_hashes(image_search, image_tag) 162 hash_count = len(hashes) 163 if hash_count == 1: 164 return hashes.pop() 165 elif hash_count == 0: 166 raise DockerError("no matching image found") 167 else: 168 raise DockerError("{} images match {}".format(hash_count, image_search))
169
170 -def stat_cache_name(image_file):
171 return getattr(image_file, 'name', image_file) + '.stat'
172
173 -def pull_image(image_name, image_tag):
174 check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]), 175 "pull")
176
177 -def save_image(image_hash, image_file):
178 # Save the specified Docker image to image_file, then try to save its 179 # stats so we can try to resume after interruption. 180 check_docker(popen_docker(['save', image_hash], stdout=image_file), 181 "save") 182 image_file.flush() 183 try: 184 with open(stat_cache_name(image_file), 'w') as statfile: 185 json.dump(tuple(os.fstat(image_file.fileno())), statfile) 186 except STAT_CACHE_ERRORS: 187 pass # We won't resume from this cache. No big deal.
188
189 -def get_cache_dir():
190 return arv_cmd.make_home_conf_dir( 191 os.path.join('.cache', 'arvados', 'docker'), 0o700)
192
193 -def prep_image_file(filename):
194 # Return a file object ready to save a Docker image, 195 # and a boolean indicating whether or not we need to actually save the 196 # image (False if a cached save is available). 197 cache_dir = get_cache_dir() 198 if cache_dir is None: 199 image_file = tempfile.NamedTemporaryFile(suffix='.tar') 200 need_save = True 201 else: 202 file_path = os.path.join(cache_dir, filename) 203 try: 204 with open(stat_cache_name(file_path)) as statfile: 205 prev_stat = json.load(statfile) 206 now_stat = os.stat(file_path) 207 need_save = any(prev_stat[field] != now_stat[field] 208 for field in [ST_MTIME, ST_SIZE]) 209 except STAT_CACHE_ERRORS + (AttributeError, IndexError): 210 need_save = True # We couldn't compare against old stats 211 image_file = open(file_path, 'w+b' if need_save else 'rb') 212 return image_file, need_save
213 218 235 243
244 -def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
245 timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0 246 return { 247 '_sort_key': link['_sort_key'], 248 'timestamp': link['_sort_key'][timestamp_index], 249 'collection': link['head_uuid'], 250 'dockerhash': dockerhash, 251 'repo': repo, 252 'tag': tag, 253 }
254
255 -def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
256 """List all Docker images known to the api_client with image_name and 257 image_tag. If no image_name is given, defaults to listing all 258 Docker images. 259 260 Returns a list of tuples representing matching Docker images, 261 sorted in preference order (i.e. the first collection in the list 262 is the one that the API server would use). Each tuple is a 263 (collection_uuid, collection_info) pair, where collection_info is 264 a dict with fields "dockerhash", "repo", "tag", and "timestamp". 265 266 """ 267 search_filters = [] 268 repo_links = None 269 hash_links = None 270 if image_name: 271 # Find images with the name the user specified. 272 search_links = _get_docker_links( 273 api_client, num_retries, 274 filters=[['link_class', '=', 'docker_image_repo+tag'], 275 ['name', '=', 276 '{}:{}'.format(image_name, image_tag or 'latest')]]) 277 if search_links: 278 repo_links = search_links 279 else: 280 # Fall back to finding images with the specified image hash. 281 search_links = _get_docker_links( 282 api_client, num_retries, 283 filters=[['link_class', '=', 'docker_image_hash'], 284 ['name', 'ilike', image_name + '%']]) 285 hash_links = search_links 286 # Only list information about images that were found in the search. 287 search_filters.append(['head_uuid', 'in', 288 [link['head_uuid'] for link in search_links]]) 289 290 # It should be reasonable to expect that each collection only has one 291 # image hash (though there may be many links specifying this). Find 292 # the API server's most preferred image hash link for each collection. 293 if hash_links is None: 294 hash_links = _get_docker_links( 295 api_client, num_retries, 296 filters=search_filters + [['link_class', '=', 'docker_image_hash']]) 297 hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)} 298 299 # Each collection may have more than one name (though again, one name 300 # may be specified more than once). Build an image listing from name 301 # tags, sorted by API server preference. 302 if repo_links is None: 303 repo_links = _get_docker_links( 304 api_client, num_retries, 305 filters=search_filters + [['link_class', '=', 306 'docker_image_repo+tag']]) 307 seen_image_names = collections.defaultdict(set) 308 images = [] 309 for link in repo_links: 310 collection_uuid = link['head_uuid'] 311 if link['name'] in seen_image_names[collection_uuid]: 312 continue 313 seen_image_names[collection_uuid].add(link['name']) 314 try: 315 dockerhash = hash_link_map[collection_uuid]['name'] 316 except KeyError: 317 dockerhash = '<unknown>' 318 name_parts = link['name'].split(':', 1) 319 images.append(_new_image_listing(link, dockerhash, *name_parts)) 320 321 # Find any image hash links that did not have a corresponding name link, 322 # and add image listings for them, retaining the API server preference 323 # sorting. 324 images_start_size = len(images) 325 for collection_uuid, link in hash_link_map.items(): 326 if not seen_image_names[collection_uuid]: 327 images.append(_new_image_listing(link, link['name'])) 328 if len(images) > images_start_size: 329 images.sort(key=itemgetter('_sort_key'), reverse=True) 330 331 # Remove any image listings that refer to unknown collections. 332 existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all( 333 api_client.collections().list, num_retries, 334 filters=[['uuid', 'in', [im['collection'] for im in images]]], 335 select=['uuid'])} 336 return [(image['collection'], image) for image in images 337 if image['collection'] in existing_coll_uuids]
338
339 -def items_owned_by(owner_uuid, arv_items):
340 return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
341
342 -def _uuid2pdh(api, uuid):
343 return api.collections().list( 344 filters=[['uuid', '=', uuid]], 345 select=['portable_data_hash'], 346 ).execute()['items'][0]['portable_data_hash']
347
348 -def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
349 args = arg_parser.parse_args(arguments) 350 if api is None: 351 api = arvados.api('v1') 352 353 if args.image is None or args.image == 'images': 354 fmt = "{:30} {:10} {:12} {:29} {:20}\n" 355 stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED")) 356 try: 357 for i, j in list_images_in_arv(api, args.retries): 358 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c"))) 359 except IOError as e: 360 if e.errno == errno.EPIPE: 361 pass 362 else: 363 raise 364 sys.exit(0) 365 366 if re.search(r':\w[-.\w]{0,127}$', args.image): 367 # image ends with :valid-tag 368 if args.tag is not None: 369 logger.error( 370 "image %r already includes a tag, cannot add tag argument %r", 371 args.image, args.tag) 372 sys.exit(1) 373 # rsplit() accommodates "myrepo.example:8888/repo/image:tag" 374 args.image, args.tag = args.image.rsplit(':', 1) 375 elif args.tag is None: 376 args.tag = 'latest' 377 378 # Pull the image if requested, unless the image is specified as a hash 379 # that we already have. 380 if args.pull and not find_image_hashes(args.image): 381 pull_image(args.image, args.tag) 382 383 try: 384 image_hash = find_one_image_hash(args.image, args.tag) 385 except DockerError as error: 386 logger.error(error.message) 387 sys.exit(1) 388 389 if not docker_image_compatible(api, image_hash): 390 if args.force_image_format: 391 logger.warning("forcing incompatible image") 392 else: 393 logger.error("refusing to store " \ 394 "incompatible format (use --force-image-format to override)") 395 sys.exit(1) 396 397 image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None 398 399 if args.name is None: 400 if image_repo_tag: 401 collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12]) 402 else: 403 collection_name = 'Docker image {}'.format(image_hash[0:12]) 404 else: 405 collection_name = args.name 406 407 # Acquire a lock so that only one arv-keepdocker process will 408 # dump/upload a particular docker image at a time. Do this before 409 # checking if the image already exists in Arvados so that if there 410 # is an upload already underway, when that upload completes and 411 # this process gets a turn, it will discover the Docker image is 412 # already available and exit quickly. 413 outfile_name = '{}.tar'.format(image_hash) 414 lockfile_name = '{}.lock'.format(outfile_name) 415 lockfile = None 416 cache_dir = get_cache_dir() 417 if cache_dir: 418 lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+') 419 fcntl.flock(lockfile, fcntl.LOCK_EX) 420 421 try: 422 if not args.force: 423 # Check if this image is already in Arvados. 424 425 # Project where everything should be owned 426 parent_project_uuid = args.project_uuid or api.users().current().execute( 427 num_retries=args.retries)['uuid'] 428 429 # Find image hash tags 430 existing_links = _get_docker_links( 431 api, args.retries, 432 filters=[['link_class', '=', 'docker_image_hash'], 433 ['name', '=', image_hash]]) 434 if existing_links: 435 # get readable collections 436 collections = api.collections().list( 437 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]], 438 select=["uuid", "owner_uuid", "name", "manifest_text"] 439 ).execute(num_retries=args.retries)['items'] 440 441 if collections: 442 # check for repo+tag links on these collections 443 if image_repo_tag: 444 existing_repo_tag = _get_docker_links( 445 api, args.retries, 446 filters=[['link_class', '=', 'docker_image_repo+tag'], 447 ['name', '=', image_repo_tag], 448 ['head_uuid', 'in', [c["uuid"] for c in collections]]]) 449 else: 450 existing_repo_tag = [] 451 452 try: 453 coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid'] 454 except StopIteration: 455 # create new collection owned by the project 456 coll_uuid = api.collections().create( 457 body={"manifest_text": collections[0]['manifest_text'], 458 "name": collection_name, 459 "owner_uuid": parent_project_uuid}, 460 ensure_unique_name=True 461 ).execute(num_retries=args.retries)['uuid'] 462 463 link_base = {'owner_uuid': parent_project_uuid, 464 'head_uuid': coll_uuid, 465 'properties': existing_links[0]['properties']} 466 467 if not any(items_owned_by(parent_project_uuid, existing_links)): 468 # create image link owned by the project 469 make_link(api, args.retries, 470 'docker_image_hash', image_hash, **link_base) 471 472 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)): 473 # create repo+tag link owned by the project 474 make_link(api, args.retries, 'docker_image_repo+tag', 475 image_repo_tag, **link_base) 476 477 stdout.write(coll_uuid + "\n") 478 479 sys.exit(0) 480 481 # Open a file for the saved image, and write it if needed. 482 image_file, need_save = prep_image_file(outfile_name) 483 if need_save: 484 save_image(image_hash, image_file) 485 486 # Call arv-put with switches we inherited from it 487 # (a.k.a., switches that aren't our own). 488 put_args = keepdocker_parser.parse_known_args(arguments)[1] 489 490 if args.name is None: 491 put_args += ['--name', collection_name] 492 493 coll_uuid = arv_put.main( 494 put_args + ['--filename', outfile_name, image_file.name], stdout=stdout, 495 install_sig_handlers=install_sig_handlers).strip() 496 497 # Read the image metadata and make Arvados links from it. 498 image_file.seek(0) 499 image_tar = tarfile.open(fileobj=image_file) 500 image_hash_type, _, raw_image_hash = image_hash.rpartition(':') 501 if image_hash_type: 502 json_filename = raw_image_hash + '.json' 503 else: 504 json_filename = raw_image_hash + '/json' 505 json_file = image_tar.extractfile(image_tar.getmember(json_filename)) 506 image_metadata = json.load(json_file) 507 json_file.close() 508 image_tar.close() 509 link_base = {'head_uuid': coll_uuid, 'properties': {}} 510 if 'created' in image_metadata: 511 link_base['properties']['image_timestamp'] = image_metadata['created'] 512 if args.project_uuid is not None: 513 link_base['owner_uuid'] = args.project_uuid 514 515 make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base) 516 if image_repo_tag: 517 make_link(api, args.retries, 518 'docker_image_repo+tag', image_repo_tag, **link_base) 519 520 # Clean up. 521 image_file.close() 522 for filename in [stat_cache_name(image_file), image_file.name]: 523 try: 524 os.unlink(filename) 525 except OSError as error: 526 if error.errno != errno.ENOENT: 527 raise 528 finally: 529 if lockfile is not None: 530 # Closing the lockfile unlocks it. 531 lockfile.close()
532 533 if __name__ == '__main__': 534 main() 535