arvados.commands.keepdocker
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import argparse 6import collections 7import datetime 8import errno 9import fcntl 10import json 11import logging 12import os 13import re 14import subprocess 15import sys 16import tarfile 17import tempfile 18 19import ciso8601 20from operator import itemgetter 21from pathlib import Path 22from stat import * 23 24import arvados 25import arvados.config 26import arvados.util 27import arvados.commands._util as arv_cmd 28import arvados.commands.put as arv_put 29 30from arvados._internal import basedirs 31from arvados._version import __version__ 32from typing import ( 33 Callable, 34) 35 36logger = logging.getLogger('arvados.keepdocker') 37logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG') 38 else logging.INFO) 39 40EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0) 41STAT_CACHE_ERRORS = (IOError, OSError, ValueError) 42 43DockerImage = collections.namedtuple( 44 'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize']) 45 46keepdocker_parser = argparse.ArgumentParser(add_help=False) 47keepdocker_parser.add_argument( 48 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 49 help='Print version and exit.') 50keepdocker_parser.add_argument( 51 '-f', '--force', action='store_true', default=False, 52 help="Re-upload the image even if it already exists on the server") 53keepdocker_parser.add_argument( 54 '--force-image-format', action='store_true', default=False, 55 help="Proceed even if the image format is not supported by the server") 56 57_group = keepdocker_parser.add_mutually_exclusive_group() 58_group.add_argument( 59 '--pull', action='store_true', default=False, 60 help="Try to pull the latest image from Docker registry") 61_group.add_argument( 62 '--no-pull', action='store_false', dest='pull', 63 help="Use locally installed image only, don't pull image from Docker registry (default)") 64 65# Combine keepdocker options listed above with run_opts options of arv-put. 66# The options inherited from arv-put include --name, --project-uuid, 67# --progress/--no-progress/--batch-progress and --resume/--no-resume. 68arg_parser = argparse.ArgumentParser( 69 description="Upload or list Docker images in Arvados", 70 parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt]) 71 72arg_parser.add_argument( 73 'image', nargs='?', 74 help="Docker image to upload: repo, repo:tag, or hash") 75arg_parser.add_argument( 76 'tag', nargs='?', 77 help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name") 78 79class DockerError(Exception): 80 pass 81 82 83def popen_docker(cmd, *args, **kwargs): 84 manage_stdin = ('stdin' not in kwargs) 85 kwargs.setdefault('stdin', subprocess.PIPE) 86 kwargs.setdefault('stdout', subprocess.PIPE) 87 kwargs.setdefault('stderr', subprocess.PIPE) 88 try: 89 docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs) 90 except OSError: # No docker in $PATH, try docker.io 91 docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs) 92 if manage_stdin: 93 docker_proc.stdin.close() 94 return docker_proc 95 96def check_docker(proc, description): 97 proc.wait() 98 if proc.returncode != 0: 99 raise DockerError("docker {} returned status code {}". 100 format(description, proc.returncode)) 101 102def docker_image_format(image_hash): 103 """Return the registry format ('v1' or 'v2') of the given image.""" 104 cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash], 105 stdout=subprocess.PIPE) 106 try: 107 image_id = next(cmd.stdout).decode('utf-8').strip() 108 if image_id.startswith('sha256:'): 109 return 'v2' 110 elif ':' not in image_id: 111 return 'v1' 112 else: 113 return 'unknown' 114 finally: 115 check_docker(cmd, "inspect") 116 117def docker_image_compatible(api, image_hash): 118 supported = api._rootDesc.get('dockerImageFormats', []) 119 if not supported: 120 logger.warning("server does not specify supported image formats (see docker_image_formats in server config).") 121 return False 122 123 fmt = docker_image_format(image_hash) 124 if fmt in supported: 125 return True 126 else: 127 logger.error("image format is {!r} " \ 128 "but server supports only {!r}".format(fmt, supported)) 129 return False 130 131def docker_images(): 132 # Yield a DockerImage tuple for each installed image. 133 list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE) 134 list_output = iter(list_proc.stdout) 135 next(list_output) # Ignore the header line 136 for line in list_output: 137 words = line.split() 138 words = [word.decode('utf-8') for word in words] 139 size_index = len(words) - 2 140 repo, tag, imageid = words[:3] 141 ctime = ' '.join(words[3:size_index]) 142 vsize = ' '.join(words[size_index:]) 143 yield DockerImage(repo, tag, imageid, ctime, vsize) 144 list_proc.stdout.close() 145 check_docker(list_proc, "images") 146 147def find_image_hashes(image_search, image_tag=None): 148 # Query for a Docker images with the repository and tag and return 149 # the image ids in a list. Returns empty list if no match is 150 # found. 151 152 list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE) 153 154 inspect = list_proc.stdout.read() 155 list_proc.stdout.close() 156 157 imageinfo = json.loads(inspect) 158 159 return [i["Id"] for i in imageinfo] 160 161def find_one_image_hash(image_search, image_tag=None): 162 hashes = find_image_hashes(image_search, image_tag) 163 hash_count = len(hashes) 164 if hash_count == 1: 165 return hashes.pop() 166 elif hash_count == 0: 167 raise DockerError("no matching image found") 168 else: 169 raise DockerError("{} images match {}".format(hash_count, image_search)) 170 171def stat_cache_name(image_file): 172 return getattr(image_file, 'name', image_file) + '.stat' 173 174def pull_image(image_name, image_tag): 175 check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]), 176 "pull") 177 178def save_image(image_hash, image_file): 179 # Save the specified Docker image to image_file, then try to save its 180 # stats so we can try to resume after interruption. 181 check_docker(popen_docker(['save', image_hash], stdout=image_file), 182 "save") 183 image_file.flush() 184 try: 185 with open(stat_cache_name(image_file), 'w') as statfile: 186 json.dump(tuple(os.fstat(image_file.fileno())), statfile) 187 except STAT_CACHE_ERRORS: 188 pass # We won't resume from this cache. No big deal. 189 190def get_cache_dir( 191 mkparent: Callable[[], Path]=basedirs.BaseDirectories('CACHE').storage_path, 192) -> str: 193 path = mkparent() / 'docker' 194 path.mkdir(mode=0o700, exist_ok=True) 195 return str(path) 196 197def prep_image_file(filename): 198 # Return a file object ready to save a Docker image, 199 # and a boolean indicating whether or not we need to actually save the 200 # image (False if a cached save is available). 201 cache_dir = get_cache_dir() 202 if cache_dir is None: 203 image_file = tempfile.NamedTemporaryFile(suffix='.tar') 204 need_save = True 205 else: 206 file_path = os.path.join(cache_dir, filename) 207 try: 208 with open(stat_cache_name(file_path)) as statfile: 209 prev_stat = json.load(statfile) 210 now_stat = os.stat(file_path) 211 need_save = any(prev_stat[field] != now_stat[field] 212 for field in [ST_MTIME, ST_SIZE]) 213 except STAT_CACHE_ERRORS + (AttributeError, IndexError): 214 need_save = True # We couldn't compare against old stats 215 image_file = open(file_path, 'w+b' if need_save else 'rb') 216 return image_file, need_save 217 218def make_link(api_client, num_retries, link_class, link_name, **link_attrs): 219 link_attrs.update({'link_class': link_class, 'name': link_name}) 220 return api_client.links().create(body=link_attrs).execute( 221 num_retries=num_retries) 222 223def docker_link_sort_key(link): 224 """Build a sort key to find the latest available Docker image. 225 226 To find one source collection for a Docker image referenced by 227 name or image id, the API server looks for a link with the most 228 recent `image_timestamp` property; then the most recent 229 `created_at` timestamp. This method generates a sort key for 230 Docker metadata links to sort them from least to most preferred. 231 """ 232 try: 233 image_timestamp = ciso8601.parse_datetime_as_naive( 234 link['properties']['image_timestamp']) 235 except (KeyError, ValueError): 236 image_timestamp = EARLIEST_DATETIME 237 try: 238 created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at']) 239 except ValueError: 240 created_timestamp = None 241 return (image_timestamp, created_timestamp) 242 243def _get_docker_links(api_client, num_retries, **kwargs): 244 links = list(arvados.util.keyset_list_all( 245 api_client.links().list, num_retries=num_retries, **kwargs, 246 )) 247 for link in links: 248 link['_sort_key'] = docker_link_sort_key(link) 249 links.sort(key=itemgetter('_sort_key'), reverse=True) 250 return links 251 252def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'): 253 timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0 254 return { 255 '_sort_key': link['_sort_key'], 256 'timestamp': link['_sort_key'][timestamp_index], 257 'collection': link['head_uuid'], 258 'dockerhash': dockerhash, 259 'repo': repo, 260 'tag': tag, 261 } 262 263def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None): 264 """List all Docker images known to the api_client with image_name and 265 image_tag. If no image_name is given, defaults to listing all 266 Docker images. 267 268 Returns a list of tuples representing matching Docker images, 269 sorted in preference order (i.e. the first collection in the list 270 is the one that the API server would use). Each tuple is a 271 (collection_uuid, collection_info) pair, where collection_info is 272 a dict with fields "dockerhash", "repo", "tag", and "timestamp". 273 274 """ 275 search_filters = [] 276 repo_links = None 277 hash_links = None 278 279 project_filter = [] 280 if project_uuid is not None: 281 project_filter = [["owner_uuid", "=", project_uuid]] 282 283 if image_name: 284 # Find images with the name the user specified. 285 search_links = _get_docker_links( 286 api_client, num_retries, 287 filters=[['link_class', '=', 'docker_image_repo+tag'], 288 ['name', '=', 289 '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter) 290 if search_links: 291 repo_links = search_links 292 else: 293 # Fall back to finding images with the specified image hash. 294 search_links = _get_docker_links( 295 api_client, num_retries, 296 filters=[['link_class', '=', 'docker_image_hash'], 297 ['name', 'ilike', image_name + '%']]+project_filter) 298 hash_links = search_links 299 # Only list information about images that were found in the search. 300 search_filters.append(['head_uuid', 'in', 301 [link['head_uuid'] for link in search_links]]) 302 303 # It should be reasonable to expect that each collection only has one 304 # image hash (though there may be many links specifying this). Find 305 # the API server's most preferred image hash link for each collection. 306 if hash_links is None: 307 hash_links = _get_docker_links( 308 api_client, num_retries, 309 filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter) 310 hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)} 311 312 # Each collection may have more than one name (though again, one name 313 # may be specified more than once). Build an image listing from name 314 # tags, sorted by API server preference. 315 if repo_links is None: 316 repo_links = _get_docker_links( 317 api_client, num_retries, 318 filters=search_filters + [['link_class', '=', 319 'docker_image_repo+tag']]+project_filter) 320 seen_image_names = collections.defaultdict(set) 321 images = [] 322 for link in repo_links: 323 collection_uuid = link['head_uuid'] 324 if link['name'] in seen_image_names[collection_uuid]: 325 continue 326 seen_image_names[collection_uuid].add(link['name']) 327 try: 328 dockerhash = hash_link_map[collection_uuid]['name'] 329 except KeyError: 330 dockerhash = '<unknown>' 331 name_parts = link['name'].rsplit(':', 1) 332 images.append(_new_image_listing(link, dockerhash, *name_parts)) 333 334 # Find any image hash links that did not have a corresponding name link, 335 # and add image listings for them, retaining the API server preference 336 # sorting. 337 images_start_size = len(images) 338 for collection_uuid, link in hash_link_map.items(): 339 if not seen_image_names[collection_uuid]: 340 images.append(_new_image_listing(link, link['name'])) 341 if len(images) > images_start_size: 342 images.sort(key=itemgetter('_sort_key'), reverse=True) 343 344 # Remove any image listings that refer to unknown collections. 345 existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all( 346 api_client.collections().list, 347 num_retries=num_retries, 348 filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter, 349 select=['uuid'], 350 )} 351 return [(image['collection'], image) for image in images 352 if image['collection'] in existing_coll_uuids] 353 354def items_owned_by(owner_uuid, arv_items): 355 return (item for item in arv_items if item['owner_uuid'] == owner_uuid) 356 357def _uuid2pdh(api, uuid): 358 return api.collections().list( 359 filters=[['uuid', '=', uuid]], 360 select=['portable_data_hash'], 361 ).execute()['items'][0]['portable_data_hash'] 362 363def load_image_metadata(image_file): 364 """Load an image manifest and config from an archive 365 366 Given an image archive as an open binary file object, this function loads 367 the image manifest and configuration, deserializing each from JSON and 368 returning them in a 2-tuple of dicts. 369 """ 370 image_file.seek(0) 371 with tarfile.open(fileobj=image_file) as image_tar: 372 with image_tar.extractfile('manifest.json') as manifest_file: 373 image_manifest_list = json.load(manifest_file) 374 # Because arv-keepdocker only saves one image, there should only be 375 # one manifest. This extracts that from the list and raises 376 # ValueError if there's not exactly one. 377 image_manifest, = image_manifest_list 378 with image_tar.extractfile(image_manifest['Config']) as config_file: 379 image_config = json.load(config_file) 380 return image_manifest, image_config 381 382def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None): 383 args = arg_parser.parse_args(arguments) 384 if api is None: 385 api = arvados.api('v1', num_retries=args.retries) 386 387 if args.image is None or args.image == 'images': 388 fmt = "{:30} {:10} {:12} {:29} {:20}\n" 389 stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED")) 390 try: 391 for i, j in list_images_in_arv(api, args.retries): 392 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c"))) 393 except IOError as e: 394 if e.errno == errno.EPIPE: 395 pass 396 else: 397 raise 398 sys.exit(0) 399 400 if re.search(r':\w[-.\w]{0,127}$', args.image): 401 # image ends with :valid-tag 402 if args.tag is not None: 403 logger.error( 404 "image %r already includes a tag, cannot add tag argument %r", 405 args.image, args.tag) 406 sys.exit(1) 407 # rsplit() accommodates "myrepo.example:8888/repo/image:tag" 408 args.image, args.tag = args.image.rsplit(':', 1) 409 elif args.tag is None: 410 args.tag = 'latest' 411 412 if '/' in args.image: 413 hostport, path = args.image.split('/', 1) 414 if hostport.endswith(':443'): 415 # "docker pull host:443/asdf" transparently removes the 416 # :443 (which is redundant because https is implied) and 417 # after it succeeds "docker images" will list "host/asdf", 418 # not "host:443/asdf". If we strip the :443 then the name 419 # doesn't change underneath us. 420 args.image = '/'.join([hostport[:-4], path]) 421 422 # Pull the image if requested, unless the image is specified as a hash 423 # that we already have. 424 if args.pull and not find_image_hashes(args.image): 425 pull_image(args.image, args.tag) 426 427 images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag) 428 429 image_hash = None 430 try: 431 image_hash = find_one_image_hash(args.image, args.tag) 432 if not docker_image_compatible(api, image_hash): 433 if args.force_image_format: 434 logger.warning("forcing incompatible image") 435 else: 436 logger.error("refusing to store " \ 437 "incompatible format (use --force-image-format to override)") 438 sys.exit(1) 439 except DockerError as error: 440 if images_in_arv: 441 # We don't have Docker / we don't have the image locally, 442 # use image that's already uploaded to Arvados 443 image_hash = images_in_arv[0][1]['dockerhash'] 444 else: 445 logger.error(str(error)) 446 sys.exit(1) 447 448 image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None 449 450 if args.name is None: 451 if image_repo_tag: 452 collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12]) 453 else: 454 collection_name = 'Docker image {}'.format(image_hash[0:12]) 455 else: 456 collection_name = args.name 457 458 # Acquire a lock so that only one arv-keepdocker process will 459 # dump/upload a particular docker image at a time. Do this before 460 # checking if the image already exists in Arvados so that if there 461 # is an upload already underway, when that upload completes and 462 # this process gets a turn, it will discover the Docker image is 463 # already available and exit quickly. 464 outfile_name = '{}.tar'.format(image_hash) 465 lockfile_name = '{}.lock'.format(outfile_name) 466 lockfile = None 467 cache_dir = get_cache_dir() 468 if cache_dir: 469 lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+') 470 fcntl.flock(lockfile, fcntl.LOCK_EX) 471 472 try: 473 if not args.force: 474 # Check if this image is already in Arvados. 475 476 # Project where everything should be owned 477 parent_project_uuid = args.project_uuid or api.users().current().execute( 478 num_retries=args.retries)['uuid'] 479 480 # Find image hash tags 481 existing_links = _get_docker_links( 482 api, args.retries, 483 filters=[['link_class', '=', 'docker_image_hash'], 484 ['name', '=', image_hash]]) 485 if existing_links: 486 # get readable collections 487 collections = api.collections().list( 488 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]], 489 select=["uuid", "owner_uuid", "name", "manifest_text"] 490 ).execute(num_retries=args.retries)['items'] 491 492 if collections: 493 # check for repo+tag links on these collections 494 if image_repo_tag: 495 existing_repo_tag = _get_docker_links( 496 api, args.retries, 497 filters=[['link_class', '=', 'docker_image_repo+tag'], 498 ['name', '=', image_repo_tag], 499 ['head_uuid', 'in', [c["uuid"] for c in collections]]]) 500 else: 501 existing_repo_tag = [] 502 503 try: 504 coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid'] 505 except StopIteration: 506 # create new collection owned by the project 507 coll_uuid = api.collections().create( 508 body={"manifest_text": collections[0]['manifest_text'], 509 "name": collection_name, 510 "owner_uuid": parent_project_uuid, 511 "properties": {"docker-image-repo-tag": image_repo_tag}}, 512 ensure_unique_name=True 513 ).execute(num_retries=args.retries)['uuid'] 514 515 link_base = {'owner_uuid': parent_project_uuid, 516 'head_uuid': coll_uuid, 517 'properties': existing_links[0]['properties']} 518 519 if not any(items_owned_by(parent_project_uuid, existing_links)): 520 # create image link owned by the project 521 make_link(api, args.retries, 522 'docker_image_hash', image_hash, **link_base) 523 524 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)): 525 # create repo+tag link owned by the project 526 make_link(api, args.retries, 'docker_image_repo+tag', 527 image_repo_tag, **link_base) 528 529 stdout.write(coll_uuid + "\n") 530 531 sys.exit(0) 532 533 # Open a file for the saved image, and write it if needed. 534 image_file, need_save = prep_image_file(outfile_name) 535 if need_save: 536 save_image(image_hash, image_file) 537 538 # Call arv-put with switches we inherited from it 539 # (a.k.a., switches that aren't our own). 540 if arguments is None: 541 arguments = sys.argv[1:] 542 arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)] 543 put_args = keepdocker_parser.parse_known_args(arguments)[1] 544 545 # Don't fail when cached manifest is invalid, just ignore the cache. 546 put_args += ['--batch'] 547 548 if args.name is None: 549 put_args += ['--name', collection_name] 550 551 coll_uuid = arv_put.main( 552 put_args + ['--filename', outfile_name, image_file.name], stdout=stdout, 553 install_sig_handlers=install_sig_handlers).strip() 554 555 # Managed properties could be already set 556 coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {}) 557 coll_properties.update({"docker-image-repo-tag": image_repo_tag}) 558 api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries) 559 560 _, image_metadata = load_image_metadata(image_file) 561 link_base = {'head_uuid': coll_uuid, 'properties': {}} 562 if 'created' in image_metadata: 563 link_base['properties']['image_timestamp'] = image_metadata['created'] 564 if args.project_uuid is not None: 565 link_base['owner_uuid'] = args.project_uuid 566 567 make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base) 568 if image_repo_tag: 569 make_link(api, args.retries, 570 'docker_image_repo+tag', image_repo_tag, **link_base) 571 572 # Clean up. 573 image_file.close() 574 for filename in [stat_cache_name(image_file), image_file.name]: 575 try: 576 os.unlink(filename) 577 except OSError as error: 578 if error.errno != errno.ENOENT: 579 raise 580 finally: 581 if lockfile is not None: 582 # Closing the lockfile unlocks it. 583 lockfile.close() 584 585if __name__ == '__main__': 586 main()
DockerImage(repo, tag, hash, created, vsize)
Common base class for all non-exit exceptions.
84def popen_docker(cmd, *args, **kwargs): 85 manage_stdin = ('stdin' not in kwargs) 86 kwargs.setdefault('stdin', subprocess.PIPE) 87 kwargs.setdefault('stdout', subprocess.PIPE) 88 kwargs.setdefault('stderr', subprocess.PIPE) 89 try: 90 docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs) 91 except OSError: # No docker in $PATH, try docker.io 92 docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs) 93 if manage_stdin: 94 docker_proc.stdin.close() 95 return docker_proc
103def docker_image_format(image_hash): 104 """Return the registry format ('v1' or 'v2') of the given image.""" 105 cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash], 106 stdout=subprocess.PIPE) 107 try: 108 image_id = next(cmd.stdout).decode('utf-8').strip() 109 if image_id.startswith('sha256:'): 110 return 'v2' 111 elif ':' not in image_id: 112 return 'v1' 113 else: 114 return 'unknown' 115 finally: 116 check_docker(cmd, "inspect")
Return the registry format (’v1’ or ‘v2’) of the given image.
118def docker_image_compatible(api, image_hash): 119 supported = api._rootDesc.get('dockerImageFormats', []) 120 if not supported: 121 logger.warning("server does not specify supported image formats (see docker_image_formats in server config).") 122 return False 123 124 fmt = docker_image_format(image_hash) 125 if fmt in supported: 126 return True 127 else: 128 logger.error("image format is {!r} " \ 129 "but server supports only {!r}".format(fmt, supported)) 130 return False
132def docker_images(): 133 # Yield a DockerImage tuple for each installed image. 134 list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE) 135 list_output = iter(list_proc.stdout) 136 next(list_output) # Ignore the header line 137 for line in list_output: 138 words = line.split() 139 words = [word.decode('utf-8') for word in words] 140 size_index = len(words) - 2 141 repo, tag, imageid = words[:3] 142 ctime = ' '.join(words[3:size_index]) 143 vsize = ' '.join(words[size_index:]) 144 yield DockerImage(repo, tag, imageid, ctime, vsize) 145 list_proc.stdout.close() 146 check_docker(list_proc, "images")
148def find_image_hashes(image_search, image_tag=None): 149 # Query for a Docker images with the repository and tag and return 150 # the image ids in a list. Returns empty list if no match is 151 # found. 152 153 list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE) 154 155 inspect = list_proc.stdout.read() 156 list_proc.stdout.close() 157 158 imageinfo = json.loads(inspect) 159 160 return [i["Id"] for i in imageinfo]
162def find_one_image_hash(image_search, image_tag=None): 163 hashes = find_image_hashes(image_search, image_tag) 164 hash_count = len(hashes) 165 if hash_count == 1: 166 return hashes.pop() 167 elif hash_count == 0: 168 raise DockerError("no matching image found") 169 else: 170 raise DockerError("{} images match {}".format(hash_count, image_search))
179def save_image(image_hash, image_file): 180 # Save the specified Docker image to image_file, then try to save its 181 # stats so we can try to resume after interruption. 182 check_docker(popen_docker(['save', image_hash], stdout=image_file), 183 "save") 184 image_file.flush() 185 try: 186 with open(stat_cache_name(image_file), 'w') as statfile: 187 json.dump(tuple(os.fstat(image_file.fileno())), statfile) 188 except STAT_CACHE_ERRORS: 189 pass # We won't resume from this cache. No big deal.
198def prep_image_file(filename): 199 # Return a file object ready to save a Docker image, 200 # and a boolean indicating whether or not we need to actually save the 201 # image (False if a cached save is available). 202 cache_dir = get_cache_dir() 203 if cache_dir is None: 204 image_file = tempfile.NamedTemporaryFile(suffix='.tar') 205 need_save = True 206 else: 207 file_path = os.path.join(cache_dir, filename) 208 try: 209 with open(stat_cache_name(file_path)) as statfile: 210 prev_stat = json.load(statfile) 211 now_stat = os.stat(file_path) 212 need_save = any(prev_stat[field] != now_stat[field] 213 for field in [ST_MTIME, ST_SIZE]) 214 except STAT_CACHE_ERRORS + (AttributeError, IndexError): 215 need_save = True # We couldn't compare against old stats 216 image_file = open(file_path, 'w+b' if need_save else 'rb') 217 return image_file, need_save
224def docker_link_sort_key(link): 225 """Build a sort key to find the latest available Docker image. 226 227 To find one source collection for a Docker image referenced by 228 name or image id, the API server looks for a link with the most 229 recent `image_timestamp` property; then the most recent 230 `created_at` timestamp. This method generates a sort key for 231 Docker metadata links to sort them from least to most preferred. 232 """ 233 try: 234 image_timestamp = ciso8601.parse_datetime_as_naive( 235 link['properties']['image_timestamp']) 236 except (KeyError, ValueError): 237 image_timestamp = EARLIEST_DATETIME 238 try: 239 created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at']) 240 except ValueError: 241 created_timestamp = None 242 return (image_timestamp, created_timestamp)
Build a sort key to find the latest available Docker image.
To find one source collection for a Docker image referenced by
name or image id, the API server looks for a link with the most
recent image_timestamp
property; then the most recent
created_at
timestamp. This method generates a sort key for
Docker metadata links to sort them from least to most preferred.
264def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None): 265 """List all Docker images known to the api_client with image_name and 266 image_tag. If no image_name is given, defaults to listing all 267 Docker images. 268 269 Returns a list of tuples representing matching Docker images, 270 sorted in preference order (i.e. the first collection in the list 271 is the one that the API server would use). Each tuple is a 272 (collection_uuid, collection_info) pair, where collection_info is 273 a dict with fields "dockerhash", "repo", "tag", and "timestamp". 274 275 """ 276 search_filters = [] 277 repo_links = None 278 hash_links = None 279 280 project_filter = [] 281 if project_uuid is not None: 282 project_filter = [["owner_uuid", "=", project_uuid]] 283 284 if image_name: 285 # Find images with the name the user specified. 286 search_links = _get_docker_links( 287 api_client, num_retries, 288 filters=[['link_class', '=', 'docker_image_repo+tag'], 289 ['name', '=', 290 '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter) 291 if search_links: 292 repo_links = search_links 293 else: 294 # Fall back to finding images with the specified image hash. 295 search_links = _get_docker_links( 296 api_client, num_retries, 297 filters=[['link_class', '=', 'docker_image_hash'], 298 ['name', 'ilike', image_name + '%']]+project_filter) 299 hash_links = search_links 300 # Only list information about images that were found in the search. 301 search_filters.append(['head_uuid', 'in', 302 [link['head_uuid'] for link in search_links]]) 303 304 # It should be reasonable to expect that each collection only has one 305 # image hash (though there may be many links specifying this). Find 306 # the API server's most preferred image hash link for each collection. 307 if hash_links is None: 308 hash_links = _get_docker_links( 309 api_client, num_retries, 310 filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter) 311 hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)} 312 313 # Each collection may have more than one name (though again, one name 314 # may be specified more than once). Build an image listing from name 315 # tags, sorted by API server preference. 316 if repo_links is None: 317 repo_links = _get_docker_links( 318 api_client, num_retries, 319 filters=search_filters + [['link_class', '=', 320 'docker_image_repo+tag']]+project_filter) 321 seen_image_names = collections.defaultdict(set) 322 images = [] 323 for link in repo_links: 324 collection_uuid = link['head_uuid'] 325 if link['name'] in seen_image_names[collection_uuid]: 326 continue 327 seen_image_names[collection_uuid].add(link['name']) 328 try: 329 dockerhash = hash_link_map[collection_uuid]['name'] 330 except KeyError: 331 dockerhash = '<unknown>' 332 name_parts = link['name'].rsplit(':', 1) 333 images.append(_new_image_listing(link, dockerhash, *name_parts)) 334 335 # Find any image hash links that did not have a corresponding name link, 336 # and add image listings for them, retaining the API server preference 337 # sorting. 338 images_start_size = len(images) 339 for collection_uuid, link in hash_link_map.items(): 340 if not seen_image_names[collection_uuid]: 341 images.append(_new_image_listing(link, link['name'])) 342 if len(images) > images_start_size: 343 images.sort(key=itemgetter('_sort_key'), reverse=True) 344 345 # Remove any image listings that refer to unknown collections. 346 existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all( 347 api_client.collections().list, 348 num_retries=num_retries, 349 filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter, 350 select=['uuid'], 351 )} 352 return [(image['collection'], image) for image in images 353 if image['collection'] in existing_coll_uuids]
List all Docker images known to the api_client with image_name and image_tag. If no image_name is given, defaults to listing all Docker images.
Returns a list of tuples representing matching Docker images, sorted in preference order (i.e. the first collection in the list is the one that the API server would use). Each tuple is a (collection_uuid, collection_info) pair, where collection_info is a dict with fields “dockerhash”, “repo”, “tag”, and “timestamp”.
364def load_image_metadata(image_file): 365 """Load an image manifest and config from an archive 366 367 Given an image archive as an open binary file object, this function loads 368 the image manifest and configuration, deserializing each from JSON and 369 returning them in a 2-tuple of dicts. 370 """ 371 image_file.seek(0) 372 with tarfile.open(fileobj=image_file) as image_tar: 373 with image_tar.extractfile('manifest.json') as manifest_file: 374 image_manifest_list = json.load(manifest_file) 375 # Because arv-keepdocker only saves one image, there should only be 376 # one manifest. This extracts that from the list and raises 377 # ValueError if there's not exactly one. 378 image_manifest, = image_manifest_list 379 with image_tar.extractfile(image_manifest['Config']) as config_file: 380 image_config = json.load(config_file) 381 return image_manifest, image_config
Load an image manifest and config from an archive
Given an image archive as an open binary file object, this function loads the image manifest and configuration, deserializing each from JSON and returning them in a 2-tuple of dicts.
383def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None): 384 args = arg_parser.parse_args(arguments) 385 if api is None: 386 api = arvados.api('v1', num_retries=args.retries) 387 388 if args.image is None or args.image == 'images': 389 fmt = "{:30} {:10} {:12} {:29} {:20}\n" 390 stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED")) 391 try: 392 for i, j in list_images_in_arv(api, args.retries): 393 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c"))) 394 except IOError as e: 395 if e.errno == errno.EPIPE: 396 pass 397 else: 398 raise 399 sys.exit(0) 400 401 if re.search(r':\w[-.\w]{0,127}$', args.image): 402 # image ends with :valid-tag 403 if args.tag is not None: 404 logger.error( 405 "image %r already includes a tag, cannot add tag argument %r", 406 args.image, args.tag) 407 sys.exit(1) 408 # rsplit() accommodates "myrepo.example:8888/repo/image:tag" 409 args.image, args.tag = args.image.rsplit(':', 1) 410 elif args.tag is None: 411 args.tag = 'latest' 412 413 if '/' in args.image: 414 hostport, path = args.image.split('/', 1) 415 if hostport.endswith(':443'): 416 # "docker pull host:443/asdf" transparently removes the 417 # :443 (which is redundant because https is implied) and 418 # after it succeeds "docker images" will list "host/asdf", 419 # not "host:443/asdf". If we strip the :443 then the name 420 # doesn't change underneath us. 421 args.image = '/'.join([hostport[:-4], path]) 422 423 # Pull the image if requested, unless the image is specified as a hash 424 # that we already have. 425 if args.pull and not find_image_hashes(args.image): 426 pull_image(args.image, args.tag) 427 428 images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag) 429 430 image_hash = None 431 try: 432 image_hash = find_one_image_hash(args.image, args.tag) 433 if not docker_image_compatible(api, image_hash): 434 if args.force_image_format: 435 logger.warning("forcing incompatible image") 436 else: 437 logger.error("refusing to store " \ 438 "incompatible format (use --force-image-format to override)") 439 sys.exit(1) 440 except DockerError as error: 441 if images_in_arv: 442 # We don't have Docker / we don't have the image locally, 443 # use image that's already uploaded to Arvados 444 image_hash = images_in_arv[0][1]['dockerhash'] 445 else: 446 logger.error(str(error)) 447 sys.exit(1) 448 449 image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None 450 451 if args.name is None: 452 if image_repo_tag: 453 collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12]) 454 else: 455 collection_name = 'Docker image {}'.format(image_hash[0:12]) 456 else: 457 collection_name = args.name 458 459 # Acquire a lock so that only one arv-keepdocker process will 460 # dump/upload a particular docker image at a time. Do this before 461 # checking if the image already exists in Arvados so that if there 462 # is an upload already underway, when that upload completes and 463 # this process gets a turn, it will discover the Docker image is 464 # already available and exit quickly. 465 outfile_name = '{}.tar'.format(image_hash) 466 lockfile_name = '{}.lock'.format(outfile_name) 467 lockfile = None 468 cache_dir = get_cache_dir() 469 if cache_dir: 470 lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+') 471 fcntl.flock(lockfile, fcntl.LOCK_EX) 472 473 try: 474 if not args.force: 475 # Check if this image is already in Arvados. 476 477 # Project where everything should be owned 478 parent_project_uuid = args.project_uuid or api.users().current().execute( 479 num_retries=args.retries)['uuid'] 480 481 # Find image hash tags 482 existing_links = _get_docker_links( 483 api, args.retries, 484 filters=[['link_class', '=', 'docker_image_hash'], 485 ['name', '=', image_hash]]) 486 if existing_links: 487 # get readable collections 488 collections = api.collections().list( 489 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]], 490 select=["uuid", "owner_uuid", "name", "manifest_text"] 491 ).execute(num_retries=args.retries)['items'] 492 493 if collections: 494 # check for repo+tag links on these collections 495 if image_repo_tag: 496 existing_repo_tag = _get_docker_links( 497 api, args.retries, 498 filters=[['link_class', '=', 'docker_image_repo+tag'], 499 ['name', '=', image_repo_tag], 500 ['head_uuid', 'in', [c["uuid"] for c in collections]]]) 501 else: 502 existing_repo_tag = [] 503 504 try: 505 coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid'] 506 except StopIteration: 507 # create new collection owned by the project 508 coll_uuid = api.collections().create( 509 body={"manifest_text": collections[0]['manifest_text'], 510 "name": collection_name, 511 "owner_uuid": parent_project_uuid, 512 "properties": {"docker-image-repo-tag": image_repo_tag}}, 513 ensure_unique_name=True 514 ).execute(num_retries=args.retries)['uuid'] 515 516 link_base = {'owner_uuid': parent_project_uuid, 517 'head_uuid': coll_uuid, 518 'properties': existing_links[0]['properties']} 519 520 if not any(items_owned_by(parent_project_uuid, existing_links)): 521 # create image link owned by the project 522 make_link(api, args.retries, 523 'docker_image_hash', image_hash, **link_base) 524 525 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)): 526 # create repo+tag link owned by the project 527 make_link(api, args.retries, 'docker_image_repo+tag', 528 image_repo_tag, **link_base) 529 530 stdout.write(coll_uuid + "\n") 531 532 sys.exit(0) 533 534 # Open a file for the saved image, and write it if needed. 535 image_file, need_save = prep_image_file(outfile_name) 536 if need_save: 537 save_image(image_hash, image_file) 538 539 # Call arv-put with switches we inherited from it 540 # (a.k.a., switches that aren't our own). 541 if arguments is None: 542 arguments = sys.argv[1:] 543 arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)] 544 put_args = keepdocker_parser.parse_known_args(arguments)[1] 545 546 # Don't fail when cached manifest is invalid, just ignore the cache. 547 put_args += ['--batch'] 548 549 if args.name is None: 550 put_args += ['--name', collection_name] 551 552 coll_uuid = arv_put.main( 553 put_args + ['--filename', outfile_name, image_file.name], stdout=stdout, 554 install_sig_handlers=install_sig_handlers).strip() 555 556 # Managed properties could be already set 557 coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {}) 558 coll_properties.update({"docker-image-repo-tag": image_repo_tag}) 559 api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries) 560 561 _, image_metadata = load_image_metadata(image_file) 562 link_base = {'head_uuid': coll_uuid, 'properties': {}} 563 if 'created' in image_metadata: 564 link_base['properties']['image_timestamp'] = image_metadata['created'] 565 if args.project_uuid is not None: 566 link_base['owner_uuid'] = args.project_uuid 567 568 make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base) 569 if image_repo_tag: 570 make_link(api, args.retries, 571 'docker_image_repo+tag', image_repo_tag, **link_base) 572 573 # Clean up. 574 image_file.close() 575 for filename in [stat_cache_name(image_file), image_file.name]: 576 try: 577 os.unlink(filename) 578 except OSError as error: 579 if error.errno != errno.ENOENT: 580 raise 581 finally: 582 if lockfile is not None: 583 # Closing the lockfile unlocks it. 584 lockfile.close()