arvados.commands.run

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2# Copyright (C) 2018 Genome Research Ltd.
  3#
  4# SPDX-License-Identifier: Apache-2.0
  5#
  6# Licensed under the Apache License, Version 2.0 (the "License");
  7# you may not use this file except in compliance with the License.
  8# You may obtain a copy of the License at
  9#
 10#    http://www.apache.org/licenses/LICENSE-2.0
 11#
 12# Unless required by applicable law or agreed to in writing, software
 13# distributed under the License is distributed on an "AS IS" BASIS,
 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15# See the License for the specific language governing permissions and
 16# limitations under the License.
 17
 18from __future__ import print_function
 19from __future__ import absolute_import
 20from builtins import range
 21from past.builtins import basestring
 22from builtins import object
 23import arvados
 24import arvados.commands.ws as ws
 25import argparse
 26import json
 27import re
 28import os
 29import stat
 30from . import put
 31import time
 32import subprocess
 33import logging
 34import sys
 35import errno
 36import arvados.commands._util as arv_cmd
 37import arvados.collection
 38import arvados.config as config
 39
 40from arvados._version import __version__
 41
 42logger = logging.getLogger('arvados.arv-run')
 43logger.setLevel(logging.INFO)
 44
 45class ArvFile(object):
 46    def __init__(self, prefix, fn):
 47        self.prefix = prefix
 48        self.fn = fn
 49
 50    def __hash__(self):
 51        return (self.prefix+self.fn).__hash__()
 52
 53    def __eq__(self, other):
 54        return (self.prefix == other.prefix) and (self.fn == other.fn)
 55
 56class UploadFile(ArvFile):
 57    pass
 58
 59# Determine if a file is in a collection, and return a tuple consisting of the
 60# portable data hash and the path relative to the root of the collection.
 61# Return None if the path isn't with an arv-mount collection or there was is error.
 62def is_in_collection(root, branch):
 63    try:
 64        if root == "/":
 65            return (None, None)
 66        fn = os.path.join(root, ".arvados#collection")
 67        if os.path.exists(fn):
 68            with open(fn, 'r') as f:
 69                c = json.load(f)
 70            return (c["portable_data_hash"], branch)
 71        else:
 72            sp = os.path.split(root)
 73            return is_in_collection(sp[0], os.path.join(sp[1], branch))
 74    except (IOError, OSError):
 75        return (None, None)
 76
 77# Determine the project to place the output of this command by searching upward
 78# for arv-mount psuedofile indicating the project.  If the cwd isn't within
 79# an arv-mount project or there is an error, return current_user.
 80def determine_project(root, current_user):
 81    try:
 82        if root == "/":
 83            return current_user
 84        fn = os.path.join(root, ".arvados#project")
 85        if os.path.exists(fn):
 86            with file(fn, 'r') as f:
 87                c = json.load(f)
 88            if 'writable_by' in c and current_user in c['writable_by']:
 89                return c["uuid"]
 90            else:
 91                return current_user
 92        else:
 93            sp = os.path.split(root)
 94            return determine_project(sp[0], current_user)
 95    except (IOError, OSError):
 96        return current_user
 97
 98# Determine if string corresponds to a file, and if that file is part of a
 99# arv-mounted collection or only local to the machine.  Returns one of
100# ArvFile() (file already exists in a collection), UploadFile() (file needs to
101# be uploaded to a collection), or simply returns prefix+fn (which yields the
102# original parameter string).
103def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False):
104    absfn = os.path.abspath(fn)
105    try:
106        st = os.stat(absfn)
107        sp = os.path.split(absfn)
108        (pdh, branch) = is_in_collection(sp[0], sp[1])
109        if pdh:
110            if stat.S_ISREG(st.st_mode):
111                return ArvFile(prefix, fnPattern % (pdh, branch))
112            elif stat.S_ISDIR(st.st_mode):
113                return ArvFile(prefix, dirPattern % (pdh, branch))
114            else:
115                raise Exception("%s is not a regular file or directory" % absfn)
116        else:
117            # trim leading '/' for path prefix test later
118            return UploadFile(prefix, absfn[1:])
119    except OSError as e:
120        if e.errno == errno.ENOENT and not raiseOSError:
121            pass
122        else:
123            raise
124
125    return prefix+fn
126
127def write_file(collection, pathprefix, fn, flush=False):
128    with open(os.path.join(pathprefix, fn), "rb") as src:
129        dst = collection.open(fn, "wb")
130        r = src.read(1024*128)
131        while r:
132            dst.write(r)
133            r = src.read(1024*128)
134        dst.close(flush=flush)
135
136def uploadfiles(files, api, dry_run=False, num_retries=0,
137                project=None,
138                fnPattern="$(file %s/%s)",
139                name=None,
140                collection=None,
141                packed=True):
142    # Find the smallest path prefix that includes all the files that need to be uploaded.
143    # This starts at the root and iteratively removes common parent directory prefixes
144    # until all file paths no longer have a common parent.
145    if files:
146        n = True
147        pathprefix = "/"
148        while n:
149            pathstep = None
150            for c in files:
151                if pathstep is None:
152                    sp = c.fn.split('/')
153                    if len(sp) < 2:
154                        # no parent directories left
155                        n = False
156                        break
157                    # path step takes next directory
158                    pathstep = sp[0] + "/"
159                else:
160                    # check if pathstep is common prefix for all files
161                    if not c.fn.startswith(pathstep):
162                        n = False
163                        break
164            if n:
165                # pathstep is common parent directory for all files, so remove the prefix
166                # from each path
167                pathprefix += pathstep
168                for c in files:
169                    c.fn = c.fn[len(pathstep):]
170
171        logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
172
173    if dry_run:
174        logger.info("$(input) is %s", pathprefix.rstrip('/'))
175        pdh = "$(input)"
176    else:
177        files = sorted(files, key=lambda x: x.fn)
178        if collection is None:
179            collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
180        prev = ""
181        for f in files:
182            localpath = os.path.join(pathprefix, f.fn)
183            if prev and localpath.startswith(prev+"/"):
184                # If this path is inside an already uploaded subdirectory,
185                # don't redundantly re-upload it.
186                # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
187                # skip it because it starts with "/tmp/foo/"
188                continue
189            prev = localpath
190            if os.path.isfile(localpath):
191                write_file(collection, pathprefix, f.fn, not packed)
192            elif os.path.isdir(localpath):
193                for root, dirs, iterfiles in os.walk(localpath):
194                    root = root[len(pathprefix):]
195                    for src in iterfiles:
196                        write_file(collection, pathprefix, os.path.join(root, src), not packed)
197
198        pdh = None
199        if len(collection) > 0:
200            # non-empty collection
201            filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
202            name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
203            if name:
204                filters.append(["name", "=", name_pdh])
205            if project:
206                filters.append(["owner_uuid", "=", project])
207
208            # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
209            # and there is a potential race with other workflows that may have created the collection
210            # between when we list it and find it does not exist and when we attempt to create it.
211            tries = 2
212            while pdh is None and tries > 0:
213                exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
214
215                if exists["items"]:
216                    item = exists["items"][0]
217                    pdh = item["portable_data_hash"]
218                    logger.info("Using collection %s (%s)", pdh, item["uuid"])
219                else:
220                    try:
221                        collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
222                        pdh = collection.portable_data_hash()
223                        logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
224                    except arvados.errors.ApiError as ae:
225                        tries -= 1
226            if pdh is None:
227                # Something weird going on here, probably a collection
228                # with a conflicting name but wrong PDH.  We won't
229                # able to reuse it but we still need to save our
230                # collection, so so save it with unique name.
231                logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
232                collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
233                pdh = collection.portable_data_hash()
234                logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
235        else:
236            # empty collection
237            pdh = collection.portable_data_hash()
238            assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
239            logger.debug("Using empty collection %s", pdh)
240
241    for c in files:
242        c.keepref = "%s/%s" % (pdh, c.fn)
243        c.fn = fnPattern % (pdh, c.fn)
244
245
246def main(arguments=None):
247    raise Exception("Legacy arv-run removed.")
248
249if __name__ == '__main__':
250    main()
logger = <Logger arvados.arv-run (INFO)>
class ArvFile:
46class ArvFile(object):
47    def __init__(self, prefix, fn):
48        self.prefix = prefix
49        self.fn = fn
50
51    def __hash__(self):
52        return (self.prefix+self.fn).__hash__()
53
54    def __eq__(self, other):
55        return (self.prefix == other.prefix) and (self.fn == other.fn)
ArvFile(prefix, fn)
47    def __init__(self, prefix, fn):
48        self.prefix = prefix
49        self.fn = fn
prefix
fn
class UploadFile(ArvFile):
57class UploadFile(ArvFile):
58    pass
Inherited Members
ArvFile
ArvFile
prefix
fn
def is_in_collection(root, branch):
63def is_in_collection(root, branch):
64    try:
65        if root == "/":
66            return (None, None)
67        fn = os.path.join(root, ".arvados#collection")
68        if os.path.exists(fn):
69            with open(fn, 'r') as f:
70                c = json.load(f)
71            return (c["portable_data_hash"], branch)
72        else:
73            sp = os.path.split(root)
74            return is_in_collection(sp[0], os.path.join(sp[1], branch))
75    except (IOError, OSError):
76        return (None, None)
def determine_project(root, current_user):
81def determine_project(root, current_user):
82    try:
83        if root == "/":
84            return current_user
85        fn = os.path.join(root, ".arvados#project")
86        if os.path.exists(fn):
87            with file(fn, 'r') as f:
88                c = json.load(f)
89            if 'writable_by' in c and current_user in c['writable_by']:
90                return c["uuid"]
91            else:
92                return current_user
93        else:
94            sp = os.path.split(root)
95            return determine_project(sp[0], current_user)
96    except (IOError, OSError):
97        return current_user
def statfile( prefix, fn, fnPattern='$(file %s/%s)', dirPattern='$(dir %s/%s/)', raiseOSError=False):
104def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False):
105    absfn = os.path.abspath(fn)
106    try:
107        st = os.stat(absfn)
108        sp = os.path.split(absfn)
109        (pdh, branch) = is_in_collection(sp[0], sp[1])
110        if pdh:
111            if stat.S_ISREG(st.st_mode):
112                return ArvFile(prefix, fnPattern % (pdh, branch))
113            elif stat.S_ISDIR(st.st_mode):
114                return ArvFile(prefix, dirPattern % (pdh, branch))
115            else:
116                raise Exception("%s is not a regular file or directory" % absfn)
117        else:
118            # trim leading '/' for path prefix test later
119            return UploadFile(prefix, absfn[1:])
120    except OSError as e:
121        if e.errno == errno.ENOENT and not raiseOSError:
122            pass
123        else:
124            raise
125
126    return prefix+fn
def write_file(collection, pathprefix, fn, flush=False):
128def write_file(collection, pathprefix, fn, flush=False):
129    with open(os.path.join(pathprefix, fn), "rb") as src:
130        dst = collection.open(fn, "wb")
131        r = src.read(1024*128)
132        while r:
133            dst.write(r)
134            r = src.read(1024*128)
135        dst.close(flush=flush)
def uploadfiles( files, api, dry_run=False, num_retries=0, project=None, fnPattern='$(file %s/%s)', name=None, collection=None, packed=True):
137def uploadfiles(files, api, dry_run=False, num_retries=0,
138                project=None,
139                fnPattern="$(file %s/%s)",
140                name=None,
141                collection=None,
142                packed=True):
143    # Find the smallest path prefix that includes all the files that need to be uploaded.
144    # This starts at the root and iteratively removes common parent directory prefixes
145    # until all file paths no longer have a common parent.
146    if files:
147        n = True
148        pathprefix = "/"
149        while n:
150            pathstep = None
151            for c in files:
152                if pathstep is None:
153                    sp = c.fn.split('/')
154                    if len(sp) < 2:
155                        # no parent directories left
156                        n = False
157                        break
158                    # path step takes next directory
159                    pathstep = sp[0] + "/"
160                else:
161                    # check if pathstep is common prefix for all files
162                    if not c.fn.startswith(pathstep):
163                        n = False
164                        break
165            if n:
166                # pathstep is common parent directory for all files, so remove the prefix
167                # from each path
168                pathprefix += pathstep
169                for c in files:
170                    c.fn = c.fn[len(pathstep):]
171
172        logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
173
174    if dry_run:
175        logger.info("$(input) is %s", pathprefix.rstrip('/'))
176        pdh = "$(input)"
177    else:
178        files = sorted(files, key=lambda x: x.fn)
179        if collection is None:
180            collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
181        prev = ""
182        for f in files:
183            localpath = os.path.join(pathprefix, f.fn)
184            if prev and localpath.startswith(prev+"/"):
185                # If this path is inside an already uploaded subdirectory,
186                # don't redundantly re-upload it.
187                # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
188                # skip it because it starts with "/tmp/foo/"
189                continue
190            prev = localpath
191            if os.path.isfile(localpath):
192                write_file(collection, pathprefix, f.fn, not packed)
193            elif os.path.isdir(localpath):
194                for root, dirs, iterfiles in os.walk(localpath):
195                    root = root[len(pathprefix):]
196                    for src in iterfiles:
197                        write_file(collection, pathprefix, os.path.join(root, src), not packed)
198
199        pdh = None
200        if len(collection) > 0:
201            # non-empty collection
202            filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
203            name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
204            if name:
205                filters.append(["name", "=", name_pdh])
206            if project:
207                filters.append(["owner_uuid", "=", project])
208
209            # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
210            # and there is a potential race with other workflows that may have created the collection
211            # between when we list it and find it does not exist and when we attempt to create it.
212            tries = 2
213            while pdh is None and tries > 0:
214                exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
215
216                if exists["items"]:
217                    item = exists["items"][0]
218                    pdh = item["portable_data_hash"]
219                    logger.info("Using collection %s (%s)", pdh, item["uuid"])
220                else:
221                    try:
222                        collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
223                        pdh = collection.portable_data_hash()
224                        logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
225                    except arvados.errors.ApiError as ae:
226                        tries -= 1
227            if pdh is None:
228                # Something weird going on here, probably a collection
229                # with a conflicting name but wrong PDH.  We won't
230                # able to reuse it but we still need to save our
231                # collection, so so save it with unique name.
232                logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
233                collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
234                pdh = collection.portable_data_hash()
235                logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
236        else:
237            # empty collection
238            pdh = collection.portable_data_hash()
239            assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
240            logger.debug("Using empty collection %s", pdh)
241
242    for c in files:
243        c.keepref = "%s/%s" % (pdh, c.fn)
244        c.fn = fnPattern % (pdh, c.fn)
def main(arguments=None):
247def main(arguments=None):
248    raise Exception("Legacy arv-run removed.")