1
2
3
4
5
6 import argparse
7 import hashlib
8 import os
9 import re
10 import string
11 import sys
12 import logging
13
14 import arvados
15 import arvados.commands._util as arv_cmd
16 import arvados.util as util
17
18 from arvados._version import __version__
19
20 api_client = None
21 logger = logging.getLogger('arvados.arv-get')
22
23 parser = argparse.ArgumentParser(
24 description='Copy data from Keep to a local file or pipe.',
25 parents=[arv_cmd.retry_opt])
26 parser.add_argument('--version', action='version',
27 version="%s %s" % (sys.argv[0], __version__),
28 help='Print version and exit.')
29 parser.add_argument('locator', type=str,
30 help="""
31 Collection locator, optionally with a file path or prefix.
32 """)
33 parser.add_argument('destination', type=str, nargs='?', default='-',
34 help="""
35 Local file or directory where the data is to be written. Default: stdout.
36 """)
37 group = parser.add_mutually_exclusive_group()
38 group.add_argument('--progress', action='store_true',
39 help="""
40 Display human-readable progress on stderr (bytes and, if possible,
41 percentage of total data size). This is the default behavior when it
42 is not expected to interfere with the output: specifically, stderr is
43 a tty _and_ either stdout is not a tty, or output is being written to
44 named files rather than stdout.
45 """)
46 group.add_argument('--no-progress', action='store_true',
47 help="""
48 Do not display human-readable progress on stderr.
49 """)
50 group.add_argument('--batch-progress', action='store_true',
51 help="""
52 Display machine-readable progress on stderr (bytes and, if known,
53 total data size).
54 """)
55 group = parser.add_mutually_exclusive_group()
56 group.add_argument('--hash',
57 help="""
58 Display the hash of each file as it is read from Keep, using the given
59 hash algorithm. Supported algorithms include md5, sha1, sha224,
60 sha256, sha384, and sha512.
61 """)
62 group.add_argument('--md5sum', action='store_const',
63 dest='hash', const='md5',
64 help="""
65 Display the MD5 hash of each file as it is read from Keep.
66 """)
67 parser.add_argument('-n', action='store_true',
68 help="""
69 Do not write any data -- just read from Keep, and report md5sums if
70 requested.
71 """)
72 parser.add_argument('-r', action='store_true',
73 help="""
74 Retrieve all files in the specified collection/prefix. This is the
75 default behavior if the "locator" argument ends with a forward slash.
76 """)
77 group = parser.add_mutually_exclusive_group()
78 group.add_argument('-f', action='store_true',
79 help="""
80 Overwrite existing files while writing. The default behavior is to
81 refuse to write *anything* if any of the output files already
82 exist. As a special case, -f is not needed to write to stdout.
83 """)
84 group.add_argument('-v', action='count', default=0,
85 help="""
86 Once for verbose mode, twice for debug mode.
87 """)
88 group.add_argument('--skip-existing', action='store_true',
89 help="""
90 Skip files that already exist. The default behavior is to refuse to
91 write *anything* if any files exist that would have to be
92 overwritten. This option causes even devices, sockets, and fifos to be
93 skipped.
94 """)
95 group.add_argument('--strip-manifest', action='store_true', default=False,
96 help="""
97 When getting a collection manifest, strip its access tokens before writing
98 it.
99 """)
100
102 args = parser.parse_args(arguments)
103
104 if args.locator[-1] == os.sep:
105 args.r = True
106 if (args.r and
107 not args.n and
108 not (args.destination and
109 os.path.isdir(args.destination))):
110 parser.error('Destination is not a directory.')
111 if not args.r and (os.path.isdir(args.destination) or
112 args.destination[-1] == os.path.sep):
113 args.destination = os.path.join(args.destination,
114 os.path.basename(args.locator))
115 logger.debug("Appended source file name to destination directory: %s",
116 args.destination)
117
118 if args.destination == '/dev/stdout':
119 args.destination = "-"
120
121 if args.destination == '-':
122
123
124
125 args.f = True
126 else:
127 args.destination = args.destination.rstrip(os.sep)
128
129
130
131
132 if (not (args.batch_progress or args.no_progress)
133 and stderr.isatty()
134 and (args.destination != '-'
135 or not stdout.isatty())):
136 args.progress = True
137 return args
138
139 -def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
140 global api_client
141
142 if stdout is sys.stdout and hasattr(stdout, 'buffer'):
143
144 stdout = stdout.buffer
145
146 args = parse_arguments(arguments, stdout, stderr)
147 logger.setLevel(logging.WARNING - 10 * args.v)
148
149 request_id = arvados.util.new_request_id()
150 logger.info('X-Request-Id: '+request_id)
151
152 if api_client is None:
153 api_client = arvados.api('v1', request_id=request_id)
154
155 r = re.search(r'^(.*?)(/.*)?$', args.locator)
156 col_loc = r.group(1)
157 get_prefix = r.group(2)
158 if args.r and not get_prefix:
159 get_prefix = os.sep
160
161
162 if not get_prefix:
163 if not args.n:
164 open_flags = os.O_CREAT | os.O_WRONLY
165 if not args.f:
166 open_flags |= os.O_EXCL
167 try:
168 if args.destination == "-":
169 write_block_or_manifest(
170 dest=stdout, src=col_loc,
171 api_client=api_client, args=args)
172 else:
173 out_fd = os.open(args.destination, open_flags)
174 with os.fdopen(out_fd, 'wb') as out_file:
175 write_block_or_manifest(
176 dest=out_file, src=col_loc,
177 api_client=api_client, args=args)
178 except (IOError, OSError) as error:
179 logger.error("can't write to '{}': {}".format(args.destination, error))
180 return 1
181 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
182 logger.error("failed to download '{}': {}".format(col_loc, error))
183 return 1
184 except arvados.errors.ArgumentError as error:
185 if 'Argument to CollectionReader' in str(error):
186 logger.error("error reading collection: {}".format(error))
187 return 1
188 else:
189 raise
190 return 0
191
192 try:
193 reader = arvados.CollectionReader(
194 col_loc, api_client=api_client, num_retries=args.retries)
195 except Exception as error:
196 logger.error("failed to read collection: {}".format(error))
197 return 1
198
199
200
201 todo = []
202 todo_bytes = 0
203 try:
204 if get_prefix == os.sep:
205 item = reader
206 else:
207 item = reader.find('.' + get_prefix)
208
209 if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
210
211 if get_prefix[-1] != os.sep:
212 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
213 return 1
214
215 elif args.destination == '-':
216 logger.error("cannot use 'stdout' as destination when downloading multiple files.")
217 return 1
218
219
220 for s, f in files_in_collection(item):
221 dest_path = os.path.join(
222 args.destination,
223 os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
224 if (not (args.n or args.f or args.skip_existing) and
225 os.path.exists(dest_path)):
226 logger.error('Local file %s already exists.' % (dest_path,))
227 return 1
228 todo += [(s, f, dest_path)]
229 todo_bytes += f.size()
230 elif isinstance(item, arvados.arvfile.ArvadosFile):
231 todo += [(item.parent, item, args.destination)]
232 todo_bytes += item.size()
233 else:
234 logger.error("'{}' not found.".format('.' + get_prefix))
235 return 1
236 except (IOError, arvados.errors.NotFoundError) as e:
237 logger.error(e)
238 return 1
239
240 out_bytes = 0
241 for s, f, outfilename in todo:
242 outfile = None
243 digestor = None
244 if not args.n:
245 if outfilename == "-":
246 outfile = stdout
247 else:
248 if args.skip_existing and os.path.exists(outfilename):
249 logger.debug('Local file %s exists. Skipping.', outfilename)
250 continue
251 elif not args.f and (os.path.isfile(outfilename) or
252 os.path.isdir(outfilename)):
253
254
255 logger.error('Local file %s already exists.' % (outfilename,))
256 return 1
257 if args.r:
258 arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
259 try:
260 outfile = open(outfilename, 'wb')
261 except Exception as error:
262 logger.error('Open(%s) failed: %s' % (outfilename, error))
263 return 1
264 if args.hash:
265 digestor = hashlib.new(args.hash)
266 try:
267 with s.open(f.name, 'rb') as file_reader:
268 for data in file_reader.readall():
269 if outfile:
270 outfile.write(data)
271 if digestor:
272 digestor.update(data)
273 out_bytes += len(data)
274 if args.progress:
275 stderr.write('\r%d MiB / %d MiB %.1f%%' %
276 (out_bytes >> 20,
277 todo_bytes >> 20,
278 (100
279 if todo_bytes==0
280 else 100.0*out_bytes/todo_bytes)))
281 elif args.batch_progress:
282 stderr.write('%s %d read %d total\n' %
283 (sys.argv[0], os.getpid(),
284 out_bytes, todo_bytes))
285 if digestor:
286 stderr.write("%s %s/%s\n"
287 % (digestor.hexdigest(), s.stream_name(), f.name))
288 except KeyboardInterrupt:
289 if outfile and (outfile.fileno() > 2) and not outfile.closed:
290 os.unlink(outfile.name)
291 break
292 finally:
293 if outfile != None and outfile != stdout:
294 outfile.close()
295
296 if args.progress:
297 stderr.write('\n')
298 return 0
299
311
322