arvados.commands.keepdocker
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import argparse 6import collections 7import datetime 8import errno 9import fcntl 10import json 11import logging 12import os 13import re 14import subprocess 15import sys 16import tarfile 17import tempfile 18 19import ciso8601 20from operator import itemgetter 21from stat import * 22 23import arvados 24import arvados.config 25import arvados.util 26import arvados.commands._util as arv_cmd 27import arvados.commands.put as arv_put 28from arvados._version import __version__ 29 30logger = logging.getLogger('arvados.keepdocker') 31logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG') 32 else logging.INFO) 33 34EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0) 35STAT_CACHE_ERRORS = (IOError, OSError, ValueError) 36 37DockerImage = collections.namedtuple( 38 'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize']) 39 40keepdocker_parser = argparse.ArgumentParser(add_help=False) 41keepdocker_parser.add_argument( 42 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 43 help='Print version and exit.') 44keepdocker_parser.add_argument( 45 '-f', '--force', action='store_true', default=False, 46 help="Re-upload the image even if it already exists on the server") 47keepdocker_parser.add_argument( 48 '--force-image-format', action='store_true', default=False, 49 help="Proceed even if the image format is not supported by the server") 50 51_group = keepdocker_parser.add_mutually_exclusive_group() 52_group.add_argument( 53 '--pull', action='store_true', default=False, 54 help="Try to pull the latest image from Docker registry") 55_group.add_argument( 56 '--no-pull', action='store_false', dest='pull', 57 help="Use locally installed image only, don't pull image from Docker registry (default)") 58 59# Combine keepdocker options listed above with run_opts options of arv-put. 60# The options inherited from arv-put include --name, --project-uuid, 61# --progress/--no-progress/--batch-progress and --resume/--no-resume. 62arg_parser = argparse.ArgumentParser( 63 description="Upload or list Docker images in Arvados", 64 parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt]) 65 66arg_parser.add_argument( 67 'image', nargs='?', 68 help="Docker image to upload: repo, repo:tag, or hash") 69arg_parser.add_argument( 70 'tag', nargs='?', 71 help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name") 72 73class DockerError(Exception): 74 pass 75 76 77def popen_docker(cmd, *args, **kwargs): 78 manage_stdin = ('stdin' not in kwargs) 79 kwargs.setdefault('stdin', subprocess.PIPE) 80 kwargs.setdefault('stdout', subprocess.PIPE) 81 kwargs.setdefault('stderr', subprocess.PIPE) 82 try: 83 docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs) 84 except OSError: # No docker in $PATH, try docker.io 85 docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs) 86 if manage_stdin: 87 docker_proc.stdin.close() 88 return docker_proc 89 90def check_docker(proc, description): 91 proc.wait() 92 if proc.returncode != 0: 93 raise DockerError("docker {} returned status code {}". 94 format(description, proc.returncode)) 95 96def docker_image_format(image_hash): 97 """Return the registry format ('v1' or 'v2') of the given image.""" 98 cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash], 99 stdout=subprocess.PIPE) 100 try: 101 image_id = next(cmd.stdout).decode('utf-8').strip() 102 if image_id.startswith('sha256:'): 103 return 'v2' 104 elif ':' not in image_id: 105 return 'v1' 106 else: 107 return 'unknown' 108 finally: 109 check_docker(cmd, "inspect") 110 111def docker_image_compatible(api, image_hash): 112 supported = api._rootDesc.get('dockerImageFormats', []) 113 if not supported: 114 logger.warning("server does not specify supported image formats (see docker_image_formats in server config).") 115 return False 116 117 fmt = docker_image_format(image_hash) 118 if fmt in supported: 119 return True 120 else: 121 logger.error("image format is {!r} " \ 122 "but server supports only {!r}".format(fmt, supported)) 123 return False 124 125def docker_images(): 126 # Yield a DockerImage tuple for each installed image. 127 list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE) 128 list_output = iter(list_proc.stdout) 129 next(list_output) # Ignore the header line 130 for line in list_output: 131 words = line.split() 132 words = [word.decode('utf-8') for word in words] 133 size_index = len(words) - 2 134 repo, tag, imageid = words[:3] 135 ctime = ' '.join(words[3:size_index]) 136 vsize = ' '.join(words[size_index:]) 137 yield DockerImage(repo, tag, imageid, ctime, vsize) 138 list_proc.stdout.close() 139 check_docker(list_proc, "images") 140 141def find_image_hashes(image_search, image_tag=None): 142 # Query for a Docker images with the repository and tag and return 143 # the image ids in a list. Returns empty list if no match is 144 # found. 145 146 list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE) 147 148 inspect = list_proc.stdout.read() 149 list_proc.stdout.close() 150 151 imageinfo = json.loads(inspect) 152 153 return [i["Id"] for i in imageinfo] 154 155def find_one_image_hash(image_search, image_tag=None): 156 hashes = find_image_hashes(image_search, image_tag) 157 hash_count = len(hashes) 158 if hash_count == 1: 159 return hashes.pop() 160 elif hash_count == 0: 161 raise DockerError("no matching image found") 162 else: 163 raise DockerError("{} images match {}".format(hash_count, image_search)) 164 165def stat_cache_name(image_file): 166 return getattr(image_file, 'name', image_file) + '.stat' 167 168def pull_image(image_name, image_tag): 169 check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]), 170 "pull") 171 172def save_image(image_hash, image_file): 173 # Save the specified Docker image to image_file, then try to save its 174 # stats so we can try to resume after interruption. 175 check_docker(popen_docker(['save', image_hash], stdout=image_file), 176 "save") 177 image_file.flush() 178 try: 179 with open(stat_cache_name(image_file), 'w') as statfile: 180 json.dump(tuple(os.fstat(image_file.fileno())), statfile) 181 except STAT_CACHE_ERRORS: 182 pass # We won't resume from this cache. No big deal. 183 184def get_cache_dir(): 185 return arv_cmd.make_home_conf_dir( 186 os.path.join('.cache', 'arvados', 'docker'), 0o700) 187 188def prep_image_file(filename): 189 # Return a file object ready to save a Docker image, 190 # and a boolean indicating whether or not we need to actually save the 191 # image (False if a cached save is available). 192 cache_dir = get_cache_dir() 193 if cache_dir is None: 194 image_file = tempfile.NamedTemporaryFile(suffix='.tar') 195 need_save = True 196 else: 197 file_path = os.path.join(cache_dir, filename) 198 try: 199 with open(stat_cache_name(file_path)) as statfile: 200 prev_stat = json.load(statfile) 201 now_stat = os.stat(file_path) 202 need_save = any(prev_stat[field] != now_stat[field] 203 for field in [ST_MTIME, ST_SIZE]) 204 except STAT_CACHE_ERRORS + (AttributeError, IndexError): 205 need_save = True # We couldn't compare against old stats 206 image_file = open(file_path, 'w+b' if need_save else 'rb') 207 return image_file, need_save 208 209def make_link(api_client, num_retries, link_class, link_name, **link_attrs): 210 link_attrs.update({'link_class': link_class, 'name': link_name}) 211 return api_client.links().create(body=link_attrs).execute( 212 num_retries=num_retries) 213 214def docker_link_sort_key(link): 215 """Build a sort key to find the latest available Docker image. 216 217 To find one source collection for a Docker image referenced by 218 name or image id, the API server looks for a link with the most 219 recent `image_timestamp` property; then the most recent 220 `created_at` timestamp. This method generates a sort key for 221 Docker metadata links to sort them from least to most preferred. 222 """ 223 try: 224 image_timestamp = ciso8601.parse_datetime_as_naive( 225 link['properties']['image_timestamp']) 226 except (KeyError, ValueError): 227 image_timestamp = EARLIEST_DATETIME 228 try: 229 created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at']) 230 except ValueError: 231 created_timestamp = None 232 return (image_timestamp, created_timestamp) 233 234def _get_docker_links(api_client, num_retries, **kwargs): 235 links = list(arvados.util.keyset_list_all( 236 api_client.links().list, num_retries=num_retries, **kwargs, 237 )) 238 for link in links: 239 link['_sort_key'] = docker_link_sort_key(link) 240 links.sort(key=itemgetter('_sort_key'), reverse=True) 241 return links 242 243def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'): 244 timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0 245 return { 246 '_sort_key': link['_sort_key'], 247 'timestamp': link['_sort_key'][timestamp_index], 248 'collection': link['head_uuid'], 249 'dockerhash': dockerhash, 250 'repo': repo, 251 'tag': tag, 252 } 253 254def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None): 255 """List all Docker images known to the api_client with image_name and 256 image_tag. If no image_name is given, defaults to listing all 257 Docker images. 258 259 Returns a list of tuples representing matching Docker images, 260 sorted in preference order (i.e. the first collection in the list 261 is the one that the API server would use). Each tuple is a 262 (collection_uuid, collection_info) pair, where collection_info is 263 a dict with fields "dockerhash", "repo", "tag", and "timestamp". 264 265 """ 266 search_filters = [] 267 repo_links = None 268 hash_links = None 269 270 project_filter = [] 271 if project_uuid is not None: 272 project_filter = [["owner_uuid", "=", project_uuid]] 273 274 if image_name: 275 # Find images with the name the user specified. 276 search_links = _get_docker_links( 277 api_client, num_retries, 278 filters=[['link_class', '=', 'docker_image_repo+tag'], 279 ['name', '=', 280 '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter) 281 if search_links: 282 repo_links = search_links 283 else: 284 # Fall back to finding images with the specified image hash. 285 search_links = _get_docker_links( 286 api_client, num_retries, 287 filters=[['link_class', '=', 'docker_image_hash'], 288 ['name', 'ilike', image_name + '%']]+project_filter) 289 hash_links = search_links 290 # Only list information about images that were found in the search. 291 search_filters.append(['head_uuid', 'in', 292 [link['head_uuid'] for link in search_links]]) 293 294 # It should be reasonable to expect that each collection only has one 295 # image hash (though there may be many links specifying this). Find 296 # the API server's most preferred image hash link for each collection. 297 if hash_links is None: 298 hash_links = _get_docker_links( 299 api_client, num_retries, 300 filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter) 301 hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)} 302 303 # Each collection may have more than one name (though again, one name 304 # may be specified more than once). Build an image listing from name 305 # tags, sorted by API server preference. 306 if repo_links is None: 307 repo_links = _get_docker_links( 308 api_client, num_retries, 309 filters=search_filters + [['link_class', '=', 310 'docker_image_repo+tag']]+project_filter) 311 seen_image_names = collections.defaultdict(set) 312 images = [] 313 for link in repo_links: 314 collection_uuid = link['head_uuid'] 315 if link['name'] in seen_image_names[collection_uuid]: 316 continue 317 seen_image_names[collection_uuid].add(link['name']) 318 try: 319 dockerhash = hash_link_map[collection_uuid]['name'] 320 except KeyError: 321 dockerhash = '<unknown>' 322 name_parts = link['name'].rsplit(':', 1) 323 images.append(_new_image_listing(link, dockerhash, *name_parts)) 324 325 # Find any image hash links that did not have a corresponding name link, 326 # and add image listings for them, retaining the API server preference 327 # sorting. 328 images_start_size = len(images) 329 for collection_uuid, link in hash_link_map.items(): 330 if not seen_image_names[collection_uuid]: 331 images.append(_new_image_listing(link, link['name'])) 332 if len(images) > images_start_size: 333 images.sort(key=itemgetter('_sort_key'), reverse=True) 334 335 # Remove any image listings that refer to unknown collections. 336 existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all( 337 api_client.collections().list, 338 num_retries=num_retries, 339 filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter, 340 select=['uuid'], 341 )} 342 return [(image['collection'], image) for image in images 343 if image['collection'] in existing_coll_uuids] 344 345def items_owned_by(owner_uuid, arv_items): 346 return (item for item in arv_items if item['owner_uuid'] == owner_uuid) 347 348def _uuid2pdh(api, uuid): 349 return api.collections().list( 350 filters=[['uuid', '=', uuid]], 351 select=['portable_data_hash'], 352 ).execute()['items'][0]['portable_data_hash'] 353 354def load_image_metadata(image_file): 355 """Load an image manifest and config from an archive 356 357 Given an image archive as an open binary file object, this function loads 358 the image manifest and configuration, deserializing each from JSON and 359 returning them in a 2-tuple of dicts. 360 """ 361 image_file.seek(0) 362 with tarfile.open(fileobj=image_file) as image_tar: 363 with image_tar.extractfile('manifest.json') as manifest_file: 364 image_manifest_list = json.load(manifest_file) 365 # Because arv-keepdocker only saves one image, there should only be 366 # one manifest. This extracts that from the list and raises 367 # ValueError if there's not exactly one. 368 image_manifest, = image_manifest_list 369 with image_tar.extractfile(image_manifest['Config']) as config_file: 370 image_config = json.load(config_file) 371 return image_manifest, image_config 372 373def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None): 374 args = arg_parser.parse_args(arguments) 375 if api is None: 376 api = arvados.api('v1', num_retries=args.retries) 377 378 if args.image is None or args.image == 'images': 379 fmt = "{:30} {:10} {:12} {:29} {:20}\n" 380 stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED")) 381 try: 382 for i, j in list_images_in_arv(api, args.retries): 383 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c"))) 384 except IOError as e: 385 if e.errno == errno.EPIPE: 386 pass 387 else: 388 raise 389 sys.exit(0) 390 391 if re.search(r':\w[-.\w]{0,127}$', args.image): 392 # image ends with :valid-tag 393 if args.tag is not None: 394 logger.error( 395 "image %r already includes a tag, cannot add tag argument %r", 396 args.image, args.tag) 397 sys.exit(1) 398 # rsplit() accommodates "myrepo.example:8888/repo/image:tag" 399 args.image, args.tag = args.image.rsplit(':', 1) 400 elif args.tag is None: 401 args.tag = 'latest' 402 403 if '/' in args.image: 404 hostport, path = args.image.split('/', 1) 405 if hostport.endswith(':443'): 406 # "docker pull host:443/asdf" transparently removes the 407 # :443 (which is redundant because https is implied) and 408 # after it succeeds "docker images" will list "host/asdf", 409 # not "host:443/asdf". If we strip the :443 then the name 410 # doesn't change underneath us. 411 args.image = '/'.join([hostport[:-4], path]) 412 413 # Pull the image if requested, unless the image is specified as a hash 414 # that we already have. 415 if args.pull and not find_image_hashes(args.image): 416 pull_image(args.image, args.tag) 417 418 images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag) 419 420 image_hash = None 421 try: 422 image_hash = find_one_image_hash(args.image, args.tag) 423 if not docker_image_compatible(api, image_hash): 424 if args.force_image_format: 425 logger.warning("forcing incompatible image") 426 else: 427 logger.error("refusing to store " \ 428 "incompatible format (use --force-image-format to override)") 429 sys.exit(1) 430 except DockerError as error: 431 if images_in_arv: 432 # We don't have Docker / we don't have the image locally, 433 # use image that's already uploaded to Arvados 434 image_hash = images_in_arv[0][1]['dockerhash'] 435 else: 436 logger.error(str(error)) 437 sys.exit(1) 438 439 image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None 440 441 if args.name is None: 442 if image_repo_tag: 443 collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12]) 444 else: 445 collection_name = 'Docker image {}'.format(image_hash[0:12]) 446 else: 447 collection_name = args.name 448 449 # Acquire a lock so that only one arv-keepdocker process will 450 # dump/upload a particular docker image at a time. Do this before 451 # checking if the image already exists in Arvados so that if there 452 # is an upload already underway, when that upload completes and 453 # this process gets a turn, it will discover the Docker image is 454 # already available and exit quickly. 455 outfile_name = '{}.tar'.format(image_hash) 456 lockfile_name = '{}.lock'.format(outfile_name) 457 lockfile = None 458 cache_dir = get_cache_dir() 459 if cache_dir: 460 lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+') 461 fcntl.flock(lockfile, fcntl.LOCK_EX) 462 463 try: 464 if not args.force: 465 # Check if this image is already in Arvados. 466 467 # Project where everything should be owned 468 parent_project_uuid = args.project_uuid or api.users().current().execute( 469 num_retries=args.retries)['uuid'] 470 471 # Find image hash tags 472 existing_links = _get_docker_links( 473 api, args.retries, 474 filters=[['link_class', '=', 'docker_image_hash'], 475 ['name', '=', image_hash]]) 476 if existing_links: 477 # get readable collections 478 collections = api.collections().list( 479 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]], 480 select=["uuid", "owner_uuid", "name", "manifest_text"] 481 ).execute(num_retries=args.retries)['items'] 482 483 if collections: 484 # check for repo+tag links on these collections 485 if image_repo_tag: 486 existing_repo_tag = _get_docker_links( 487 api, args.retries, 488 filters=[['link_class', '=', 'docker_image_repo+tag'], 489 ['name', '=', image_repo_tag], 490 ['head_uuid', 'in', [c["uuid"] for c in collections]]]) 491 else: 492 existing_repo_tag = [] 493 494 try: 495 coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid'] 496 except StopIteration: 497 # create new collection owned by the project 498 coll_uuid = api.collections().create( 499 body={"manifest_text": collections[0]['manifest_text'], 500 "name": collection_name, 501 "owner_uuid": parent_project_uuid, 502 "properties": {"docker-image-repo-tag": image_repo_tag}}, 503 ensure_unique_name=True 504 ).execute(num_retries=args.retries)['uuid'] 505 506 link_base = {'owner_uuid': parent_project_uuid, 507 'head_uuid': coll_uuid, 508 'properties': existing_links[0]['properties']} 509 510 if not any(items_owned_by(parent_project_uuid, existing_links)): 511 # create image link owned by the project 512 make_link(api, args.retries, 513 'docker_image_hash', image_hash, **link_base) 514 515 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)): 516 # create repo+tag link owned by the project 517 make_link(api, args.retries, 'docker_image_repo+tag', 518 image_repo_tag, **link_base) 519 520 stdout.write(coll_uuid + "\n") 521 522 sys.exit(0) 523 524 # Open a file for the saved image, and write it if needed. 525 image_file, need_save = prep_image_file(outfile_name) 526 if need_save: 527 save_image(image_hash, image_file) 528 529 # Call arv-put with switches we inherited from it 530 # (a.k.a., switches that aren't our own). 531 if arguments is None: 532 arguments = sys.argv[1:] 533 arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)] 534 put_args = keepdocker_parser.parse_known_args(arguments)[1] 535 536 # Don't fail when cached manifest is invalid, just ignore the cache. 537 put_args += ['--batch'] 538 539 if args.name is None: 540 put_args += ['--name', collection_name] 541 542 coll_uuid = arv_put.main( 543 put_args + ['--filename', outfile_name, image_file.name], stdout=stdout, 544 install_sig_handlers=install_sig_handlers).strip() 545 546 # Managed properties could be already set 547 coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {}) 548 coll_properties.update({"docker-image-repo-tag": image_repo_tag}) 549 api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries) 550 551 _, image_metadata = load_image_metadata(image_file) 552 link_base = {'head_uuid': coll_uuid, 'properties': {}} 553 if 'created' in image_metadata: 554 link_base['properties']['image_timestamp'] = image_metadata['created'] 555 if args.project_uuid is not None: 556 link_base['owner_uuid'] = args.project_uuid 557 558 make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base) 559 if image_repo_tag: 560 make_link(api, args.retries, 561 'docker_image_repo+tag', image_repo_tag, **link_base) 562 563 # Clean up. 564 image_file.close() 565 for filename in [stat_cache_name(image_file), image_file.name]: 566 try: 567 os.unlink(filename) 568 except OSError as error: 569 if error.errno != errno.ENOENT: 570 raise 571 finally: 572 if lockfile is not None: 573 # Closing the lockfile unlocks it. 574 lockfile.close() 575 576if __name__ == '__main__': 577 main()
DockerImage(repo, tag, hash, created, vsize)
Create new instance of DockerImage(repo, tag, hash, created, vsize)
Inherited Members
- builtins.tuple
- index
- count
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
78def popen_docker(cmd, *args, **kwargs): 79 manage_stdin = ('stdin' not in kwargs) 80 kwargs.setdefault('stdin', subprocess.PIPE) 81 kwargs.setdefault('stdout', subprocess.PIPE) 82 kwargs.setdefault('stderr', subprocess.PIPE) 83 try: 84 docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs) 85 except OSError: # No docker in $PATH, try docker.io 86 docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs) 87 if manage_stdin: 88 docker_proc.stdin.close() 89 return docker_proc
97def docker_image_format(image_hash): 98 """Return the registry format ('v1' or 'v2') of the given image.""" 99 cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash], 100 stdout=subprocess.PIPE) 101 try: 102 image_id = next(cmd.stdout).decode('utf-8').strip() 103 if image_id.startswith('sha256:'): 104 return 'v2' 105 elif ':' not in image_id: 106 return 'v1' 107 else: 108 return 'unknown' 109 finally: 110 check_docker(cmd, "inspect")
Return the registry format (’v1’ or ‘v2’) of the given image.
112def docker_image_compatible(api, image_hash): 113 supported = api._rootDesc.get('dockerImageFormats', []) 114 if not supported: 115 logger.warning("server does not specify supported image formats (see docker_image_formats in server config).") 116 return False 117 118 fmt = docker_image_format(image_hash) 119 if fmt in supported: 120 return True 121 else: 122 logger.error("image format is {!r} " \ 123 "but server supports only {!r}".format(fmt, supported)) 124 return False
126def docker_images(): 127 # Yield a DockerImage tuple for each installed image. 128 list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE) 129 list_output = iter(list_proc.stdout) 130 next(list_output) # Ignore the header line 131 for line in list_output: 132 words = line.split() 133 words = [word.decode('utf-8') for word in words] 134 size_index = len(words) - 2 135 repo, tag, imageid = words[:3] 136 ctime = ' '.join(words[3:size_index]) 137 vsize = ' '.join(words[size_index:]) 138 yield DockerImage(repo, tag, imageid, ctime, vsize) 139 list_proc.stdout.close() 140 check_docker(list_proc, "images")
142def find_image_hashes(image_search, image_tag=None): 143 # Query for a Docker images with the repository and tag and return 144 # the image ids in a list. Returns empty list if no match is 145 # found. 146 147 list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE) 148 149 inspect = list_proc.stdout.read() 150 list_proc.stdout.close() 151 152 imageinfo = json.loads(inspect) 153 154 return [i["Id"] for i in imageinfo]
156def find_one_image_hash(image_search, image_tag=None): 157 hashes = find_image_hashes(image_search, image_tag) 158 hash_count = len(hashes) 159 if hash_count == 1: 160 return hashes.pop() 161 elif hash_count == 0: 162 raise DockerError("no matching image found") 163 else: 164 raise DockerError("{} images match {}".format(hash_count, image_search))
173def save_image(image_hash, image_file): 174 # Save the specified Docker image to image_file, then try to save its 175 # stats so we can try to resume after interruption. 176 check_docker(popen_docker(['save', image_hash], stdout=image_file), 177 "save") 178 image_file.flush() 179 try: 180 with open(stat_cache_name(image_file), 'w') as statfile: 181 json.dump(tuple(os.fstat(image_file.fileno())), statfile) 182 except STAT_CACHE_ERRORS: 183 pass # We won't resume from this cache. No big deal.
189def prep_image_file(filename): 190 # Return a file object ready to save a Docker image, 191 # and a boolean indicating whether or not we need to actually save the 192 # image (False if a cached save is available). 193 cache_dir = get_cache_dir() 194 if cache_dir is None: 195 image_file = tempfile.NamedTemporaryFile(suffix='.tar') 196 need_save = True 197 else: 198 file_path = os.path.join(cache_dir, filename) 199 try: 200 with open(stat_cache_name(file_path)) as statfile: 201 prev_stat = json.load(statfile) 202 now_stat = os.stat(file_path) 203 need_save = any(prev_stat[field] != now_stat[field] 204 for field in [ST_MTIME, ST_SIZE]) 205 except STAT_CACHE_ERRORS + (AttributeError, IndexError): 206 need_save = True # We couldn't compare against old stats 207 image_file = open(file_path, 'w+b' if need_save else 'rb') 208 return image_file, need_save
215def docker_link_sort_key(link): 216 """Build a sort key to find the latest available Docker image. 217 218 To find one source collection for a Docker image referenced by 219 name or image id, the API server looks for a link with the most 220 recent `image_timestamp` property; then the most recent 221 `created_at` timestamp. This method generates a sort key for 222 Docker metadata links to sort them from least to most preferred. 223 """ 224 try: 225 image_timestamp = ciso8601.parse_datetime_as_naive( 226 link['properties']['image_timestamp']) 227 except (KeyError, ValueError): 228 image_timestamp = EARLIEST_DATETIME 229 try: 230 created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at']) 231 except ValueError: 232 created_timestamp = None 233 return (image_timestamp, created_timestamp)
Build a sort key to find the latest available Docker image.
To find one source collection for a Docker image referenced by
name or image id, the API server looks for a link with the most
recent image_timestamp
property; then the most recent
created_at
timestamp. This method generates a sort key for
Docker metadata links to sort them from least to most preferred.
255def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None): 256 """List all Docker images known to the api_client with image_name and 257 image_tag. If no image_name is given, defaults to listing all 258 Docker images. 259 260 Returns a list of tuples representing matching Docker images, 261 sorted in preference order (i.e. the first collection in the list 262 is the one that the API server would use). Each tuple is a 263 (collection_uuid, collection_info) pair, where collection_info is 264 a dict with fields "dockerhash", "repo", "tag", and "timestamp". 265 266 """ 267 search_filters = [] 268 repo_links = None 269 hash_links = None 270 271 project_filter = [] 272 if project_uuid is not None: 273 project_filter = [["owner_uuid", "=", project_uuid]] 274 275 if image_name: 276 # Find images with the name the user specified. 277 search_links = _get_docker_links( 278 api_client, num_retries, 279 filters=[['link_class', '=', 'docker_image_repo+tag'], 280 ['name', '=', 281 '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter) 282 if search_links: 283 repo_links = search_links 284 else: 285 # Fall back to finding images with the specified image hash. 286 search_links = _get_docker_links( 287 api_client, num_retries, 288 filters=[['link_class', '=', 'docker_image_hash'], 289 ['name', 'ilike', image_name + '%']]+project_filter) 290 hash_links = search_links 291 # Only list information about images that were found in the search. 292 search_filters.append(['head_uuid', 'in', 293 [link['head_uuid'] for link in search_links]]) 294 295 # It should be reasonable to expect that each collection only has one 296 # image hash (though there may be many links specifying this). Find 297 # the API server's most preferred image hash link for each collection. 298 if hash_links is None: 299 hash_links = _get_docker_links( 300 api_client, num_retries, 301 filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter) 302 hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)} 303 304 # Each collection may have more than one name (though again, one name 305 # may be specified more than once). Build an image listing from name 306 # tags, sorted by API server preference. 307 if repo_links is None: 308 repo_links = _get_docker_links( 309 api_client, num_retries, 310 filters=search_filters + [['link_class', '=', 311 'docker_image_repo+tag']]+project_filter) 312 seen_image_names = collections.defaultdict(set) 313 images = [] 314 for link in repo_links: 315 collection_uuid = link['head_uuid'] 316 if link['name'] in seen_image_names[collection_uuid]: 317 continue 318 seen_image_names[collection_uuid].add(link['name']) 319 try: 320 dockerhash = hash_link_map[collection_uuid]['name'] 321 except KeyError: 322 dockerhash = '<unknown>' 323 name_parts = link['name'].rsplit(':', 1) 324 images.append(_new_image_listing(link, dockerhash, *name_parts)) 325 326 # Find any image hash links that did not have a corresponding name link, 327 # and add image listings for them, retaining the API server preference 328 # sorting. 329 images_start_size = len(images) 330 for collection_uuid, link in hash_link_map.items(): 331 if not seen_image_names[collection_uuid]: 332 images.append(_new_image_listing(link, link['name'])) 333 if len(images) > images_start_size: 334 images.sort(key=itemgetter('_sort_key'), reverse=True) 335 336 # Remove any image listings that refer to unknown collections. 337 existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all( 338 api_client.collections().list, 339 num_retries=num_retries, 340 filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter, 341 select=['uuid'], 342 )} 343 return [(image['collection'], image) for image in images 344 if image['collection'] in existing_coll_uuids]
List all Docker images known to the api_client with image_name and image_tag. If no image_name is given, defaults to listing all Docker images.
Returns a list of tuples representing matching Docker images, sorted in preference order (i.e. the first collection in the list is the one that the API server would use). Each tuple is a (collection_uuid, collection_info) pair, where collection_info is a dict with fields “dockerhash”, “repo”, “tag”, and “timestamp”.
355def load_image_metadata(image_file): 356 """Load an image manifest and config from an archive 357 358 Given an image archive as an open binary file object, this function loads 359 the image manifest and configuration, deserializing each from JSON and 360 returning them in a 2-tuple of dicts. 361 """ 362 image_file.seek(0) 363 with tarfile.open(fileobj=image_file) as image_tar: 364 with image_tar.extractfile('manifest.json') as manifest_file: 365 image_manifest_list = json.load(manifest_file) 366 # Because arv-keepdocker only saves one image, there should only be 367 # one manifest. This extracts that from the list and raises 368 # ValueError if there's not exactly one. 369 image_manifest, = image_manifest_list 370 with image_tar.extractfile(image_manifest['Config']) as config_file: 371 image_config = json.load(config_file) 372 return image_manifest, image_config
Load an image manifest and config from an archive
Given an image archive as an open binary file object, this function loads the image manifest and configuration, deserializing each from JSON and returning them in a 2-tuple of dicts.
374def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None): 375 args = arg_parser.parse_args(arguments) 376 if api is None: 377 api = arvados.api('v1', num_retries=args.retries) 378 379 if args.image is None or args.image == 'images': 380 fmt = "{:30} {:10} {:12} {:29} {:20}\n" 381 stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED")) 382 try: 383 for i, j in list_images_in_arv(api, args.retries): 384 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c"))) 385 except IOError as e: 386 if e.errno == errno.EPIPE: 387 pass 388 else: 389 raise 390 sys.exit(0) 391 392 if re.search(r':\w[-.\w]{0,127}$', args.image): 393 # image ends with :valid-tag 394 if args.tag is not None: 395 logger.error( 396 "image %r already includes a tag, cannot add tag argument %r", 397 args.image, args.tag) 398 sys.exit(1) 399 # rsplit() accommodates "myrepo.example:8888/repo/image:tag" 400 args.image, args.tag = args.image.rsplit(':', 1) 401 elif args.tag is None: 402 args.tag = 'latest' 403 404 if '/' in args.image: 405 hostport, path = args.image.split('/', 1) 406 if hostport.endswith(':443'): 407 # "docker pull host:443/asdf" transparently removes the 408 # :443 (which is redundant because https is implied) and 409 # after it succeeds "docker images" will list "host/asdf", 410 # not "host:443/asdf". If we strip the :443 then the name 411 # doesn't change underneath us. 412 args.image = '/'.join([hostport[:-4], path]) 413 414 # Pull the image if requested, unless the image is specified as a hash 415 # that we already have. 416 if args.pull and not find_image_hashes(args.image): 417 pull_image(args.image, args.tag) 418 419 images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag) 420 421 image_hash = None 422 try: 423 image_hash = find_one_image_hash(args.image, args.tag) 424 if not docker_image_compatible(api, image_hash): 425 if args.force_image_format: 426 logger.warning("forcing incompatible image") 427 else: 428 logger.error("refusing to store " \ 429 "incompatible format (use --force-image-format to override)") 430 sys.exit(1) 431 except DockerError as error: 432 if images_in_arv: 433 # We don't have Docker / we don't have the image locally, 434 # use image that's already uploaded to Arvados 435 image_hash = images_in_arv[0][1]['dockerhash'] 436 else: 437 logger.error(str(error)) 438 sys.exit(1) 439 440 image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None 441 442 if args.name is None: 443 if image_repo_tag: 444 collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12]) 445 else: 446 collection_name = 'Docker image {}'.format(image_hash[0:12]) 447 else: 448 collection_name = args.name 449 450 # Acquire a lock so that only one arv-keepdocker process will 451 # dump/upload a particular docker image at a time. Do this before 452 # checking if the image already exists in Arvados so that if there 453 # is an upload already underway, when that upload completes and 454 # this process gets a turn, it will discover the Docker image is 455 # already available and exit quickly. 456 outfile_name = '{}.tar'.format(image_hash) 457 lockfile_name = '{}.lock'.format(outfile_name) 458 lockfile = None 459 cache_dir = get_cache_dir() 460 if cache_dir: 461 lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+') 462 fcntl.flock(lockfile, fcntl.LOCK_EX) 463 464 try: 465 if not args.force: 466 # Check if this image is already in Arvados. 467 468 # Project where everything should be owned 469 parent_project_uuid = args.project_uuid or api.users().current().execute( 470 num_retries=args.retries)['uuid'] 471 472 # Find image hash tags 473 existing_links = _get_docker_links( 474 api, args.retries, 475 filters=[['link_class', '=', 'docker_image_hash'], 476 ['name', '=', image_hash]]) 477 if existing_links: 478 # get readable collections 479 collections = api.collections().list( 480 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]], 481 select=["uuid", "owner_uuid", "name", "manifest_text"] 482 ).execute(num_retries=args.retries)['items'] 483 484 if collections: 485 # check for repo+tag links on these collections 486 if image_repo_tag: 487 existing_repo_tag = _get_docker_links( 488 api, args.retries, 489 filters=[['link_class', '=', 'docker_image_repo+tag'], 490 ['name', '=', image_repo_tag], 491 ['head_uuid', 'in', [c["uuid"] for c in collections]]]) 492 else: 493 existing_repo_tag = [] 494 495 try: 496 coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid'] 497 except StopIteration: 498 # create new collection owned by the project 499 coll_uuid = api.collections().create( 500 body={"manifest_text": collections[0]['manifest_text'], 501 "name": collection_name, 502 "owner_uuid": parent_project_uuid, 503 "properties": {"docker-image-repo-tag": image_repo_tag}}, 504 ensure_unique_name=True 505 ).execute(num_retries=args.retries)['uuid'] 506 507 link_base = {'owner_uuid': parent_project_uuid, 508 'head_uuid': coll_uuid, 509 'properties': existing_links[0]['properties']} 510 511 if not any(items_owned_by(parent_project_uuid, existing_links)): 512 # create image link owned by the project 513 make_link(api, args.retries, 514 'docker_image_hash', image_hash, **link_base) 515 516 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)): 517 # create repo+tag link owned by the project 518 make_link(api, args.retries, 'docker_image_repo+tag', 519 image_repo_tag, **link_base) 520 521 stdout.write(coll_uuid + "\n") 522 523 sys.exit(0) 524 525 # Open a file for the saved image, and write it if needed. 526 image_file, need_save = prep_image_file(outfile_name) 527 if need_save: 528 save_image(image_hash, image_file) 529 530 # Call arv-put with switches we inherited from it 531 # (a.k.a., switches that aren't our own). 532 if arguments is None: 533 arguments = sys.argv[1:] 534 arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)] 535 put_args = keepdocker_parser.parse_known_args(arguments)[1] 536 537 # Don't fail when cached manifest is invalid, just ignore the cache. 538 put_args += ['--batch'] 539 540 if args.name is None: 541 put_args += ['--name', collection_name] 542 543 coll_uuid = arv_put.main( 544 put_args + ['--filename', outfile_name, image_file.name], stdout=stdout, 545 install_sig_handlers=install_sig_handlers).strip() 546 547 # Managed properties could be already set 548 coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {}) 549 coll_properties.update({"docker-image-repo-tag": image_repo_tag}) 550 api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries) 551 552 _, image_metadata = load_image_metadata(image_file) 553 link_base = {'head_uuid': coll_uuid, 'properties': {}} 554 if 'created' in image_metadata: 555 link_base['properties']['image_timestamp'] = image_metadata['created'] 556 if args.project_uuid is not None: 557 link_base['owner_uuid'] = args.project_uuid 558 559 make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base) 560 if image_repo_tag: 561 make_link(api, args.retries, 562 'docker_image_repo+tag', image_repo_tag, **link_base) 563 564 # Clean up. 565 image_file.close() 566 for filename in [stat_cache_name(image_file), image_file.name]: 567 try: 568 os.unlink(filename) 569 except OSError as error: 570 if error.errno != errno.ENOENT: 571 raise 572 finally: 573 if lockfile is not None: 574 # Closing the lockfile unlocks it. 575 lockfile.close()