arvados.commands.run
1# Copyright (C) The Arvados Authors. All rights reserved. 2# Copyright (C) 2018 Genome Research Ltd. 3# 4# SPDX-License-Identifier: Apache-2.0 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18from __future__ import print_function 19from __future__ import absolute_import 20from builtins import range 21from past.builtins import basestring 22from builtins import object 23import arvados 24import arvados.commands.ws as ws 25import argparse 26import json 27import re 28import os 29import stat 30from . import put 31import time 32import subprocess 33import logging 34import sys 35import errno 36import arvados.commands._util as arv_cmd 37import arvados.collection 38import arvados.config as config 39 40from arvados._version import __version__ 41 42logger = logging.getLogger('arvados.arv-run') 43logger.setLevel(logging.INFO) 44 45class ArvFile(object): 46 def __init__(self, prefix, fn): 47 self.prefix = prefix 48 self.fn = fn 49 50 def __hash__(self): 51 return (self.prefix+self.fn).__hash__() 52 53 def __eq__(self, other): 54 return (self.prefix == other.prefix) and (self.fn == other.fn) 55 56class UploadFile(ArvFile): 57 pass 58 59# Determine if a file is in a collection, and return a tuple consisting of the 60# portable data hash and the path relative to the root of the collection. 61# Return None if the path isn't with an arv-mount collection or there was is error. 62def is_in_collection(root, branch): 63 try: 64 if root == "/": 65 return (None, None) 66 fn = os.path.join(root, ".arvados#collection") 67 if os.path.exists(fn): 68 with open(fn, 'r') as f: 69 c = json.load(f) 70 return (c["portable_data_hash"], branch) 71 else: 72 sp = os.path.split(root) 73 return is_in_collection(sp[0], os.path.join(sp[1], branch)) 74 except (IOError, OSError): 75 return (None, None) 76 77# Determine the project to place the output of this command by searching upward 78# for arv-mount psuedofile indicating the project. If the cwd isn't within 79# an arv-mount project or there is an error, return current_user. 80def determine_project(root, current_user): 81 try: 82 if root == "/": 83 return current_user 84 fn = os.path.join(root, ".arvados#project") 85 if os.path.exists(fn): 86 with file(fn, 'r') as f: 87 c = json.load(f) 88 if 'writable_by' in c and current_user in c['writable_by']: 89 return c["uuid"] 90 else: 91 return current_user 92 else: 93 sp = os.path.split(root) 94 return determine_project(sp[0], current_user) 95 except (IOError, OSError): 96 return current_user 97 98# Determine if string corresponds to a file, and if that file is part of a 99# arv-mounted collection or only local to the machine. Returns one of 100# ArvFile() (file already exists in a collection), UploadFile() (file needs to 101# be uploaded to a collection), or simply returns prefix+fn (which yields the 102# original parameter string). 103def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False): 104 absfn = os.path.abspath(fn) 105 try: 106 st = os.stat(absfn) 107 sp = os.path.split(absfn) 108 (pdh, branch) = is_in_collection(sp[0], sp[1]) 109 if pdh: 110 if stat.S_ISREG(st.st_mode): 111 return ArvFile(prefix, fnPattern % (pdh, branch)) 112 elif stat.S_ISDIR(st.st_mode): 113 return ArvFile(prefix, dirPattern % (pdh, branch)) 114 else: 115 raise Exception("%s is not a regular file or directory" % absfn) 116 else: 117 # trim leading '/' for path prefix test later 118 return UploadFile(prefix, absfn[1:]) 119 except OSError as e: 120 if e.errno == errno.ENOENT and not raiseOSError: 121 pass 122 else: 123 raise 124 125 return prefix+fn 126 127def write_file(collection, pathprefix, fn, flush=False): 128 with open(os.path.join(pathprefix, fn), "rb") as src: 129 dst = collection.open(fn, "wb") 130 r = src.read(1024*128) 131 while r: 132 dst.write(r) 133 r = src.read(1024*128) 134 dst.close(flush=flush) 135 136def uploadfiles(files, api, dry_run=False, num_retries=0, 137 project=None, 138 fnPattern="$(file %s/%s)", 139 name=None, 140 collection=None, 141 packed=True): 142 # Find the smallest path prefix that includes all the files that need to be uploaded. 143 # This starts at the root and iteratively removes common parent directory prefixes 144 # until all file paths no longer have a common parent. 145 if files: 146 n = True 147 pathprefix = "/" 148 while n: 149 pathstep = None 150 for c in files: 151 if pathstep is None: 152 sp = c.fn.split('/') 153 if len(sp) < 2: 154 # no parent directories left 155 n = False 156 break 157 # path step takes next directory 158 pathstep = sp[0] + "/" 159 else: 160 # check if pathstep is common prefix for all files 161 if not c.fn.startswith(pathstep): 162 n = False 163 break 164 if n: 165 # pathstep is common parent directory for all files, so remove the prefix 166 # from each path 167 pathprefix += pathstep 168 for c in files: 169 c.fn = c.fn[len(pathstep):] 170 171 logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files])) 172 173 if dry_run: 174 logger.info("$(input) is %s", pathprefix.rstrip('/')) 175 pdh = "$(input)" 176 else: 177 files = sorted(files, key=lambda x: x.fn) 178 if collection is None: 179 collection = arvados.collection.Collection(api_client=api, num_retries=num_retries) 180 prev = "" 181 for f in files: 182 localpath = os.path.join(pathprefix, f.fn) 183 if prev and localpath.startswith(prev+"/"): 184 # If this path is inside an already uploaded subdirectory, 185 # don't redundantly re-upload it. 186 # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar 187 # skip it because it starts with "/tmp/foo/" 188 continue 189 prev = localpath 190 if os.path.isfile(localpath): 191 write_file(collection, pathprefix, f.fn, not packed) 192 elif os.path.isdir(localpath): 193 for root, dirs, iterfiles in os.walk(localpath): 194 root = root[len(pathprefix):] 195 for src in iterfiles: 196 write_file(collection, pathprefix, os.path.join(root, src), not packed) 197 198 pdh = None 199 if len(collection) > 0: 200 # non-empty collection 201 filters = [["portable_data_hash", "=", collection.portable_data_hash()]] 202 name_pdh = "%s (%s)" % (name, collection.portable_data_hash()) 203 if name: 204 filters.append(["name", "=", name_pdh]) 205 if project: 206 filters.append(["owner_uuid", "=", project]) 207 208 # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False` 209 # and there is a potential race with other workflows that may have created the collection 210 # between when we list it and find it does not exist and when we attempt to create it. 211 tries = 2 212 while pdh is None and tries > 0: 213 exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries) 214 215 if exists["items"]: 216 item = exists["items"][0] 217 pdh = item["portable_data_hash"] 218 logger.info("Using collection %s (%s)", pdh, item["uuid"]) 219 else: 220 try: 221 collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False) 222 pdh = collection.portable_data_hash() 223 logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator()) 224 except arvados.errors.ApiError as ae: 225 tries -= 1 226 if pdh is None: 227 # Something weird going on here, probably a collection 228 # with a conflicting name but wrong PDH. We won't 229 # able to reuse it but we still need to save our 230 # collection, so so save it with unique name. 231 logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh) 232 collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True) 233 pdh = collection.portable_data_hash() 234 logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator()) 235 else: 236 # empty collection 237 pdh = collection.portable_data_hash() 238 assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh 239 logger.debug("Using empty collection %s", pdh) 240 241 for c in files: 242 c.keepref = "%s/%s" % (pdh, c.fn) 243 c.fn = fnPattern % (pdh, c.fn) 244 245 246def main(arguments=None): 247 raise Exception("Legacy arv-run removed.") 248 249if __name__ == '__main__': 250 main()
logger =
<Logger arvados.arv-run (INFO)>
class
ArvFile:
def
is_in_collection(root, branch):
63def is_in_collection(root, branch): 64 try: 65 if root == "/": 66 return (None, None) 67 fn = os.path.join(root, ".arvados#collection") 68 if os.path.exists(fn): 69 with open(fn, 'r') as f: 70 c = json.load(f) 71 return (c["portable_data_hash"], branch) 72 else: 73 sp = os.path.split(root) 74 return is_in_collection(sp[0], os.path.join(sp[1], branch)) 75 except (IOError, OSError): 76 return (None, None)
def
determine_project(root, current_user):
81def determine_project(root, current_user): 82 try: 83 if root == "/": 84 return current_user 85 fn = os.path.join(root, ".arvados#project") 86 if os.path.exists(fn): 87 with file(fn, 'r') as f: 88 c = json.load(f) 89 if 'writable_by' in c and current_user in c['writable_by']: 90 return c["uuid"] 91 else: 92 return current_user 93 else: 94 sp = os.path.split(root) 95 return determine_project(sp[0], current_user) 96 except (IOError, OSError): 97 return current_user
def
statfile( prefix, fn, fnPattern='$(file %s/%s)', dirPattern='$(dir %s/%s/)', raiseOSError=False):
104def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False): 105 absfn = os.path.abspath(fn) 106 try: 107 st = os.stat(absfn) 108 sp = os.path.split(absfn) 109 (pdh, branch) = is_in_collection(sp[0], sp[1]) 110 if pdh: 111 if stat.S_ISREG(st.st_mode): 112 return ArvFile(prefix, fnPattern % (pdh, branch)) 113 elif stat.S_ISDIR(st.st_mode): 114 return ArvFile(prefix, dirPattern % (pdh, branch)) 115 else: 116 raise Exception("%s is not a regular file or directory" % absfn) 117 else: 118 # trim leading '/' for path prefix test later 119 return UploadFile(prefix, absfn[1:]) 120 except OSError as e: 121 if e.errno == errno.ENOENT and not raiseOSError: 122 pass 123 else: 124 raise 125 126 return prefix+fn
def
write_file(collection, pathprefix, fn, flush=False):
def
uploadfiles( files, api, dry_run=False, num_retries=0, project=None, fnPattern='$(file %s/%s)', name=None, collection=None, packed=True):
137def uploadfiles(files, api, dry_run=False, num_retries=0, 138 project=None, 139 fnPattern="$(file %s/%s)", 140 name=None, 141 collection=None, 142 packed=True): 143 # Find the smallest path prefix that includes all the files that need to be uploaded. 144 # This starts at the root and iteratively removes common parent directory prefixes 145 # until all file paths no longer have a common parent. 146 if files: 147 n = True 148 pathprefix = "/" 149 while n: 150 pathstep = None 151 for c in files: 152 if pathstep is None: 153 sp = c.fn.split('/') 154 if len(sp) < 2: 155 # no parent directories left 156 n = False 157 break 158 # path step takes next directory 159 pathstep = sp[0] + "/" 160 else: 161 # check if pathstep is common prefix for all files 162 if not c.fn.startswith(pathstep): 163 n = False 164 break 165 if n: 166 # pathstep is common parent directory for all files, so remove the prefix 167 # from each path 168 pathprefix += pathstep 169 for c in files: 170 c.fn = c.fn[len(pathstep):] 171 172 logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files])) 173 174 if dry_run: 175 logger.info("$(input) is %s", pathprefix.rstrip('/')) 176 pdh = "$(input)" 177 else: 178 files = sorted(files, key=lambda x: x.fn) 179 if collection is None: 180 collection = arvados.collection.Collection(api_client=api, num_retries=num_retries) 181 prev = "" 182 for f in files: 183 localpath = os.path.join(pathprefix, f.fn) 184 if prev and localpath.startswith(prev+"/"): 185 # If this path is inside an already uploaded subdirectory, 186 # don't redundantly re-upload it. 187 # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar 188 # skip it because it starts with "/tmp/foo/" 189 continue 190 prev = localpath 191 if os.path.isfile(localpath): 192 write_file(collection, pathprefix, f.fn, not packed) 193 elif os.path.isdir(localpath): 194 for root, dirs, iterfiles in os.walk(localpath): 195 root = root[len(pathprefix):] 196 for src in iterfiles: 197 write_file(collection, pathprefix, os.path.join(root, src), not packed) 198 199 pdh = None 200 if len(collection) > 0: 201 # non-empty collection 202 filters = [["portable_data_hash", "=", collection.portable_data_hash()]] 203 name_pdh = "%s (%s)" % (name, collection.portable_data_hash()) 204 if name: 205 filters.append(["name", "=", name_pdh]) 206 if project: 207 filters.append(["owner_uuid", "=", project]) 208 209 # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False` 210 # and there is a potential race with other workflows that may have created the collection 211 # between when we list it and find it does not exist and when we attempt to create it. 212 tries = 2 213 while pdh is None and tries > 0: 214 exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries) 215 216 if exists["items"]: 217 item = exists["items"][0] 218 pdh = item["portable_data_hash"] 219 logger.info("Using collection %s (%s)", pdh, item["uuid"]) 220 else: 221 try: 222 collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False) 223 pdh = collection.portable_data_hash() 224 logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator()) 225 except arvados.errors.ApiError as ae: 226 tries -= 1 227 if pdh is None: 228 # Something weird going on here, probably a collection 229 # with a conflicting name but wrong PDH. We won't 230 # able to reuse it but we still need to save our 231 # collection, so so save it with unique name. 232 logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh) 233 collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True) 234 pdh = collection.portable_data_hash() 235 logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator()) 236 else: 237 # empty collection 238 pdh = collection.portable_data_hash() 239 assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh 240 logger.debug("Using empty collection %s", pdh) 241 242 for c in files: 243 c.keepref = "%s/%s" % (pdh, c.fn) 244 c.fn = fnPattern % (pdh, c.fn)
def
main(arguments=None):