1
2
3
4
5 from __future__ import division
6 from builtins import range
7
8 import fcntl
9 import hashlib
10 import httplib2
11 import os
12 import random
13 import re
14 import subprocess
15 import errno
16 import sys
17
18 import arvados
19 from arvados.collection import CollectionReader
20
21 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
22
23 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*')
24 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+\d+(\+\S+)*\+A\S+(\+\S+)*')
25 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
26 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
27 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
28 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
29 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
30 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
31 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
32 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
33 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
34
36 """
37 Ensure the given directory (or TASK_TMPDIR if none given)
38 exists and is empty.
39 """
40 if path is None:
41 path = arvados.current_task().tmpdir
42 if os.path.exists(path):
43 p = subprocess.Popen(['rm', '-rf', path])
44 stdout, stderr = p.communicate(None)
45 if p.returncode != 0:
46 raise Exception('rm -rf %s: %s' % (path, stderr))
47 os.mkdir(path)
48
50 kwargs.setdefault('stdin', subprocess.PIPE)
51 kwargs.setdefault('stdout', subprocess.PIPE)
52 kwargs.setdefault('stderr', sys.stderr)
53 kwargs.setdefault('close_fds', True)
54 kwargs.setdefault('shell', False)
55 p = subprocess.Popen(execargs, **kwargs)
56 stdoutdata, stderrdata = p.communicate(None)
57 if p.returncode != 0:
58 raise arvados.errors.CommandFailedError(
59 "run_command %s exit %d:\n%s" %
60 (execargs, p.returncode, stderrdata))
61 return stdoutdata, stderrdata
62
64 if not re.search('^/', path):
65 path = os.path.join(arvados.current_job().tmpdir, path)
66 if not os.path.exists(path):
67 run_command(["git", "clone", url, path],
68 cwd=os.path.dirname(path))
69 run_command(["git", "checkout", version],
70 cwd=path)
71 return path
72
74 return subprocess.Popen(["tar",
75 "-C", path,
76 ("-x%sf" % decompress_flag),
77 "-"],
78 stdout=None,
79 stdin=subprocess.PIPE, stderr=sys.stderr,
80 shell=False, close_fds=True)
81
83 """Retrieve a tarball from Keep and extract it to a local
84 directory. Return the absolute path where the tarball was
85 extracted. If the top level of the tarball contained just one
86 file or directory, return the absolute path of that single
87 item.
88
89 tarball -- collection locator
90 path -- where to extract the tarball: absolute, or relative to job tmp
91 """
92 if not re.search('^/', path):
93 path = os.path.join(arvados.current_job().tmpdir, path)
94 lockfile = open(path + '.lock', 'w')
95 fcntl.flock(lockfile, fcntl.LOCK_EX)
96 try:
97 os.stat(path)
98 except OSError:
99 os.mkdir(path)
100 already_have_it = False
101 try:
102 if os.readlink(os.path.join(path, '.locator')) == tarball:
103 already_have_it = True
104 except OSError:
105 pass
106 if not already_have_it:
107
108
109 try:
110 os.unlink(os.path.join(path, '.locator'))
111 except OSError:
112 if os.path.exists(os.path.join(path, '.locator')):
113 os.unlink(os.path.join(path, '.locator'))
114
115 for f in CollectionReader(tarball).all_files():
116 if re.search('\.(tbz|tar.bz2)$', f.name()):
117 p = tar_extractor(path, 'j')
118 elif re.search('\.(tgz|tar.gz)$', f.name()):
119 p = tar_extractor(path, 'z')
120 elif re.search('\.tar$', f.name()):
121 p = tar_extractor(path, '')
122 else:
123 raise arvados.errors.AssertionError(
124 "tarball_extract cannot handle filename %s" % f.name())
125 while True:
126 buf = f.read(2**20)
127 if len(buf) == 0:
128 break
129 p.stdin.write(buf)
130 p.stdin.close()
131 p.wait()
132 if p.returncode != 0:
133 lockfile.close()
134 raise arvados.errors.CommandFailedError(
135 "tar exited %d" % p.returncode)
136 os.symlink(tarball, os.path.join(path, '.locator'))
137 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
138 lockfile.close()
139 if len(tld_extracts) == 1:
140 return os.path.join(path, tld_extracts[0])
141 return path
142
144 """Retrieve a zip archive from Keep and extract it to a local
145 directory. Return the absolute path where the archive was
146 extracted. If the top level of the archive contained just one
147 file or directory, return the absolute path of that single
148 item.
149
150 zipball -- collection locator
151 path -- where to extract the archive: absolute, or relative to job tmp
152 """
153 if not re.search('^/', path):
154 path = os.path.join(arvados.current_job().tmpdir, path)
155 lockfile = open(path + '.lock', 'w')
156 fcntl.flock(lockfile, fcntl.LOCK_EX)
157 try:
158 os.stat(path)
159 except OSError:
160 os.mkdir(path)
161 already_have_it = False
162 try:
163 if os.readlink(os.path.join(path, '.locator')) == zipball:
164 already_have_it = True
165 except OSError:
166 pass
167 if not already_have_it:
168
169
170 try:
171 os.unlink(os.path.join(path, '.locator'))
172 except OSError:
173 if os.path.exists(os.path.join(path, '.locator')):
174 os.unlink(os.path.join(path, '.locator'))
175
176 for f in CollectionReader(zipball).all_files():
177 if not re.search('\.zip$', f.name()):
178 raise arvados.errors.NotImplementedError(
179 "zipball_extract cannot handle filename %s" % f.name())
180 zip_filename = os.path.join(path, os.path.basename(f.name()))
181 zip_file = open(zip_filename, 'wb')
182 while True:
183 buf = f.read(2**20)
184 if len(buf) == 0:
185 break
186 zip_file.write(buf)
187 zip_file.close()
188
189 p = subprocess.Popen(["unzip",
190 "-q", "-o",
191 "-d", path,
192 zip_filename],
193 stdout=None,
194 stdin=None, stderr=sys.stderr,
195 shell=False, close_fds=True)
196 p.wait()
197 if p.returncode != 0:
198 lockfile.close()
199 raise arvados.errors.CommandFailedError(
200 "unzip exited %d" % p.returncode)
201 os.unlink(zip_filename)
202 os.symlink(zipball, os.path.join(path, '.locator'))
203 tld_extracts = [f for f in os.listdir(path) if f != '.locator']
204 lockfile.close()
205 if len(tld_extracts) == 1:
206 return os.path.join(path, tld_extracts[0])
207 return path
208
210 """Retrieve a collection from Keep and extract it to a local
211 directory. Return the absolute path where the collection was
212 extracted.
213
214 collection -- collection locator
215 path -- where to extract: absolute, or relative to job tmp
216 """
217 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
218 if matches:
219 collection_hash = matches.group(1)
220 else:
221 collection_hash = hashlib.md5(collection).hexdigest()
222 if not re.search('^/', path):
223 path = os.path.join(arvados.current_job().tmpdir, path)
224 lockfile = open(path + '.lock', 'w')
225 fcntl.flock(lockfile, fcntl.LOCK_EX)
226 try:
227 os.stat(path)
228 except OSError:
229 os.mkdir(path)
230 already_have_it = False
231 try:
232 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
233 already_have_it = True
234 except OSError:
235 pass
236
237
238 try:
239 os.unlink(os.path.join(path, '.locator'))
240 except OSError:
241 if os.path.exists(os.path.join(path, '.locator')):
242 os.unlink(os.path.join(path, '.locator'))
243
244 files_got = []
245 for s in CollectionReader(collection).all_streams():
246 stream_name = s.name()
247 for f in s.all_files():
248 if (files == [] or
249 ((f.name() not in files_got) and
250 (f.name() in files or
251 (decompress and f.decompressed_name() in files)))):
252 outname = f.decompressed_name() if decompress else f.name()
253 files_got += [outname]
254 if os.path.exists(os.path.join(path, stream_name, outname)):
255 continue
256 mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
257 outfile = open(os.path.join(path, stream_name, outname), 'wb')
258 for buf in (f.readall_decompressed() if decompress
259 else f.readall()):
260 outfile.write(buf)
261 outfile.close()
262 if len(files_got) < len(files):
263 raise arvados.errors.AssertionError(
264 "Wanted files %s but only got %s from %s" %
265 (files, files_got,
266 [z.name() for z in CollectionReader(collection).all_files()]))
267 os.symlink(collection_hash, os.path.join(path, '.locator'))
268
269 lockfile.close()
270 return path
271
273 if not os.path.isdir(path):
274 try:
275 os.makedirs(path)
276 except OSError as e:
277 if e.errno == errno.EEXIST and os.path.isdir(path):
278
279
280 pass
281 else:
282 raise
283
285 """Retrieve a stream from Keep and extract it to a local
286 directory. Return the absolute path where the stream was
287 extracted.
288
289 stream -- StreamReader object
290 path -- where to extract: absolute, or relative to job tmp
291 """
292 if not re.search('^/', path):
293 path = os.path.join(arvados.current_job().tmpdir, path)
294 lockfile = open(path + '.lock', 'w')
295 fcntl.flock(lockfile, fcntl.LOCK_EX)
296 try:
297 os.stat(path)
298 except OSError:
299 os.mkdir(path)
300
301 files_got = []
302 for f in stream.all_files():
303 if (files == [] or
304 ((f.name() not in files_got) and
305 (f.name() in files or
306 (decompress and f.decompressed_name() in files)))):
307 outname = f.decompressed_name() if decompress else f.name()
308 files_got += [outname]
309 if os.path.exists(os.path.join(path, outname)):
310 os.unlink(os.path.join(path, outname))
311 mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
312 outfile = open(os.path.join(path, outname), 'wb')
313 for buf in (f.readall_decompressed() if decompress
314 else f.readall()):
315 outfile.write(buf)
316 outfile.close()
317 if len(files_got) < len(files):
318 raise arvados.errors.AssertionError(
319 "Wanted files %s but only got %s from %s" %
320 (files, files_got, [z.name() for z in stream.all_files()]))
321 lockfile.close()
322 return path
323
325 """listdir_recursive(dirname, base, max_depth)
326
327 Return a list of file and directory names found under dirname.
328
329 If base is not None, prepend "{base}/" to each returned name.
330
331 If max_depth is None, descend into directories and return only the
332 names of files found in the directory tree.
333
334 If max_depth is a non-negative integer, stop descending into
335 directories at the given depth, and at that point return directory
336 names instead.
337
338 If max_depth==0 (and base is None) this is equivalent to
339 sorted(os.listdir(dirname)).
340 """
341 allfiles = []
342 for ent in sorted(os.listdir(dirname)):
343 ent_path = os.path.join(dirname, ent)
344 ent_base = os.path.join(base, ent) if base else ent
345 if os.path.isdir(ent_path) and max_depth != 0:
346 allfiles += listdir_recursive(
347 ent_path, base=ent_base,
348 max_depth=(max_depth-1 if max_depth else None))
349 else:
350 allfiles += [ent_base]
351 return allfiles
352
354 """is_hex(s[, length[, max_length]]) -> boolean
355
356 Return True if s is a string of hexadecimal digits.
357 If one length argument is given, the string must contain exactly
358 that number of digits.
359 If two length arguments are given, the string must contain a number of
360 digits between those two lengths, inclusive.
361 Return False otherwise.
362 """
363 num_length_args = len(length_args)
364 if num_length_args > 2:
365 raise arvados.errors.ArgumentError(
366 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
367 elif num_length_args == 2:
368 good_len = (length_args[0] <= len(s) <= length_args[1])
369 elif num_length_args == 1:
370 good_len = (len(s) == length_args[0])
371 else:
372 good_len = True
373 return bool(good_len and HEX_RE.match(s))
374
375 -def list_all(fn, num_retries=0, **kwargs):
376
377 kwargs.setdefault('limit', sys.maxsize)
378 items = []
379 offset = 0
380 items_available = sys.maxsize
381 while len(items) < items_available:
382 c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
383 items += c['items']
384 items_available = c['items_available']
385 offset = c['offset'] + len(c['items'])
386 return items
387
389 """Return the path of the best available CA certs source.
390
391 This function searches for various distribution sources of CA
392 certificates, and returns the first it finds. If it doesn't find any,
393 it returns the value of `fallback` (httplib2's CA certs by default).
394 """
395 for ca_certs_path in [
396
397 '/etc/arvados/ca-certificates.crt',
398
399 '/etc/ssl/certs/ca-certificates.crt',
400
401 '/etc/pki/tls/certs/ca-bundle.crt',
402 ]:
403 if os.path.exists(ca_certs_path):
404 return ca_certs_path
405 return fallback
406
408 rid = "req-"
409
410 n = random.getrandbits(104)
411 for _ in range(20):
412 c = n % 36
413 if c < 10:
414 rid += chr(c+ord('0'))
415 else:
416 rid += chr(c+ord('a')-10)
417 n = n // 36
418 return rid
419