arvados.commands.federation_migrate

  1#!/usr/bin/env python3
  2# Copyright (C) The Arvados Authors. All rights reserved.
  3#
  4# SPDX-License-Identifier: Apache-2.0
  5
  6#
  7# Migration tool for merging user accounts belonging to the same user
  8# but on separate clusters to use a single user account managed by a
  9# specific cluster.
 10#
 11# If you're working on this, see
 12# arvados/sdk/python/tests/fed-migrate/README for information about
 13# the testing infrastructure.
 14
 15import arvados
 16import arvados.util
 17import arvados.errors
 18import csv
 19import sys
 20import argparse
 21import hmac
 22import urllib.parse
 23import os
 24import hashlib
 25import re
 26from arvados._version import __version__
 27from . import _util as arv_cmd
 28
 29EMAIL=0
 30USERNAME=1
 31UUID=2
 32HOMECLUSTER=3
 33
 34def connect_clusters(args):
 35    clusters = {}
 36    errors = []
 37    loginCluster = None
 38    if args.tokens:
 39        print("Reading %s" % args.tokens)
 40        with open(args.tokens, "rt") as f:
 41            for r in csv.reader(f):
 42                if len(r) != 2:
 43                    continue
 44                host = r[0]
 45                token = r[1]
 46                print("Contacting %s" % (host))
 47                arv = arvados.api(host=host, token=token, cache=False, num_retries=args.retries)
 48                clusters[arv._rootDesc["uuidPrefix"]] = arv
 49    else:
 50        arv = arvados.api(cache=False, num_retries=args.retries)
 51        rh = arv._rootDesc["remoteHosts"]
 52        tok = arv.api_client_authorizations().current().execute()
 53        token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
 54
 55        for k,v in rh.items():
 56            arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
 57            clusters[k] = arv
 58
 59    for _, arv in clusters.items():
 60        config = arv.configs().get().execute()
 61        if config["Login"]["LoginCluster"] != "" and loginCluster is None:
 62            loginCluster = config["Login"]["LoginCluster"]
 63
 64    print("Checking that the federation is well connected")
 65    for arv in clusters.values():
 66        config = arv.configs().get().execute()
 67        if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
 68            errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
 69            continue
 70
 71        if arv._rootDesc["revision"] < "20200331":
 72            errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
 73            continue
 74
 75        try:
 76            cur = arv.users().current().execute()
 77        except arvados.errors.ApiError as e:
 78            errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
 79            continue
 80
 81        if not cur["is_admin"]:
 82            errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
 83            continue
 84
 85        for r in clusters:
 86            if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
 87                errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
 88        for r in arv._rootDesc["remoteHosts"]:
 89            if r != "*" and r not in clusters:
 90                print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
 91
 92    return clusters, errors, loginCluster
 93
 94
 95def fetch_users(clusters, loginCluster):
 96    rows = []
 97    by_email = {}
 98    by_username = {}
 99
100    users = [
101        user
102        for prefix, arv in clusters.items()
103        for user in arvados.util.keyset_list_all(arv.users().list, bypass_federation=True)
104        if user['uuid'].startswith(prefix)
105    ]
106
107    # Users list is sorted by email
108    # Go through users and collect users with same email
109    # when we see a different email (or get to the end)
110    # call add_accum_rows() to generate the report rows with
111    # the "home cluster" set, and also fill in the by_email table.
112
113    users.sort(key=lambda u: (u["email"], u["username"] or "", u["uuid"]))
114
115    accum = []
116    lastemail = None
117
118    def add_accum_rows():
119        homeuuid = None
120        for a in accum:
121            uuids = set(a["uuid"] for a in accum)
122            homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
123        for a in accum:
124            r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
125            by_email.setdefault(a["email"], {})
126            by_email[a["email"]][a["uuid"]] = r
127            homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
128            if homeuuid_and_username not in by_username:
129                by_username[homeuuid_and_username] = a["email"]
130            elif by_username[homeuuid_and_username] != a["email"]:
131                print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
132                exit(1)
133            rows.append(r)
134
135    for u in users:
136        if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
137            continue
138        if lastemail == None:
139            lastemail = u["email"]
140        if u["email"] == lastemail:
141            accum.append(u)
142        else:
143            add_accum_rows()
144            lastemail = u["email"]
145            accum = [u]
146
147    add_accum_rows()
148
149    return rows, by_email, by_username
150
151
152def read_migrations(args, by_email, by_username):
153    rows = []
154    with open(args.migrate or args.dry_run, "rt") as f:
155        for r in csv.reader(f):
156            if r[EMAIL] == "email":
157                continue
158            by_email.setdefault(r[EMAIL], {})
159            by_email[r[EMAIL]][r[UUID]] = r
160
161            homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
162            if homeuuid_and_username not in by_username:
163                by_username[homeuuid_and_username] = r[EMAIL]
164            elif by_username[homeuuid_and_username] != r[EMAIL]:
165                print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
166                exit(1)
167
168            rows.append(r)
169    return rows
170
171def update_username(args, email, user_uuid, username, migratecluster, migratearv):
172    print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
173    if args.dry_run:
174        return
175    try:
176        conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
177        if conflicts["items"]:
178            # There's already a user with the username, move the old user out of the way
179            migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
180                                        bypass_federation=True,
181                                        body={"user": {"username": username+"migrate"}}).execute()
182        migratearv.users().update(uuid=user_uuid,
183                                    bypass_federation=True,
184                                    body={"user": {"username": username}}).execute()
185    except arvados.errors.ApiError as e:
186        print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
187
188
189def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
190    candidates = []
191    conflict = False
192    for b in by_email[email].values():
193        if b[2].startswith(userhome):
194            candidates.append(b)
195        if b[1] != username and b[3] == userhome:
196            print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
197            conflict = True
198            break
199    if conflict:
200        return None
201    if len(candidates) == 0:
202        if len(userhome) == 5 and userhome not in clusters:
203            print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
204            return None
205        print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
206        if not args.dry_run:
207            oldhomecluster = old_user_uuid[0:5]
208            oldhomearv = clusters[oldhomecluster]
209            newhomecluster = userhome[0:5]
210            homearv = clusters[userhome]
211            user = None
212            try:
213                olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
214                conflicts = homearv.users().list(filters=[["username", "=", username]],
215                                                 bypass_federation=True).execute()
216                if conflicts["items"]:
217                    homearv.users().update(
218                        uuid=conflicts["items"][0]["uuid"],
219                        bypass_federation=True,
220                        body={"user": {"username": username+"migrate"}}).execute()
221                user = homearv.users().create(
222                    body={"user": {
223                        "email": email,
224                        "first_name": olduser["first_name"],
225                        "last_name": olduser["last_name"],
226                        "username": username,
227                        "is_active": olduser["is_active"]}}).execute()
228            except arvados.errors.ApiError as e:
229                print("(%s) Could not create user: %s" % (email, str(e)))
230                return None
231
232            tup = (email, username, user["uuid"], userhome)
233        else:
234            # dry run
235            tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
236        by_email[email][tup[2]] = tup
237        candidates.append(tup)
238    if len(candidates) > 1:
239        print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
240        return None
241    return candidates[0][2]
242
243
244def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
245    # create a token for the new user and salt it for the
246    # migration cluster, then use it to access the migration
247    # cluster as the new user once before merging to ensure
248    # the new user is known on that cluster.
249    migratecluster = migratearv._rootDesc["uuidPrefix"]
250    try:
251        if not args.dry_run:
252            newtok = homearv.api_client_authorizations().create(body={
253                "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
254        else:
255            newtok = {"uuid": "dry-run", "api_token": "12345"}
256    except arvados.errors.ApiError as e:
257        print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
258        return None
259
260    try:
261        findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
262        if len(findolduser["items"]) == 0:
263            return False
264        if len(findolduser["items"]) == 1:
265            olduser = findolduser["items"][0]
266        else:
267            print("(%s) Unexpected result" % (email))
268            return None
269    except arvados.errors.ApiError as e:
270        print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
271        return None
272
273    salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
274                                                     msg=migratecluster.encode(),
275                                                     digestmod=hashlib.sha1).hexdigest()
276    try:
277        ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
278        if not args.dry_run:
279            newuser = arvados.api(host=ru.netloc, token=salted,
280                                  insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
281        else:
282            newuser = {"is_active": True, "username": email.split('@')[0], "is_admin": False}
283    except arvados.errors.ApiError as e:
284        print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
285        return None
286
287    if not newuser["is_active"] and olduser["is_active"]:
288        print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
289        try:
290            if not args.dry_run:
291                migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
292                                          body={"is_active": True}).execute()
293        except arvados.errors.ApiError as e:
294            print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
295            return None
296
297    if olduser["is_admin"] and not newuser["is_admin"]:
298        print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s. Please ensure the user admin status is the same on both clusters. Note that a federated admin account has admin privileges on the entire federation." % (email, old_user_uuid, new_user_uuid, migratecluster))
299        return None
300
301    return newuser
302
303def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
304    if args.dry_run:
305        return
306    try:
307        new_owner_uuid = new_user_uuid
308        if args.data_into_subproject:
309            grp = migratearv.groups().create(body={
310                "owner_uuid": new_user_uuid,
311                "name": "Migrated from %s (%s)" % (email, old_user_uuid),
312                "group_class": "project"
313            }, ensure_unique_name=True).execute()
314            new_owner_uuid = grp["uuid"]
315        migratearv.users().merge(old_user_uuid=old_user_uuid,
316                                    new_user_uuid=new_user_uuid,
317                                    new_owner_uuid=new_owner_uuid,
318                                    redirect_to_new_user=True).execute()
319    except arvados.errors.ApiError as e:
320        name_collision = re.search(r'Key \(owner_uuid, name\)=\((.*?), (.*?)\) already exists\.\n.*UPDATE "(.*?)"', e._get_reason())
321        if name_collision:
322            target_owner, rsc_name, rsc_type = name_collision.groups()
323            print("(%s) Cannot migrate to %s because both origin and target users have a %s named '%s'. Please rename the conflicting items or use --data-into-subproject to migrate all users' data into a special subproject." % (email, target_owner, rsc_type[:-1], rsc_name))
324        else:
325            print("(%s) Skipping user migration because of error: %s" % (email, e))
326
327
328def main():
329    parser = argparse.ArgumentParser(
330        description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html',
331        parents=[arv_cmd.retry_opt],
332    )
333    parser.add_argument(
334        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
335        help='Print version and exit.')
336    parser.add_argument('--tokens', type=str, metavar='FILE', required=False, help="Read tokens from FILE. Not needed when using LoginCluster.")
337    parser.add_argument('--data-into-subproject', action="store_true", help="Migrate user's data into a separate subproject. This can be used to avoid name collisions from within an account.")
338    group = parser.add_mutually_exclusive_group(required=True)
339    group.add_argument('--report', type=str, metavar='FILE', help="Generate report .csv file listing users by email address and their associated Arvados accounts.")
340    group.add_argument('--migrate', type=str, metavar='FILE', help="Consume report .csv and migrate users to designated Arvados accounts.")
341    group.add_argument('--dry-run', type=str, metavar='FILE', help="Consume report .csv and report how user would be migrated to designated Arvados accounts.")
342    group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected.")
343    args = parser.parse_args()
344
345    clusters, errors, loginCluster = connect_clusters(args)
346
347    if errors:
348        for e in errors:
349            print("ERROR: "+str(e))
350        exit(1)
351
352    if args.check:
353        print("Tokens file passed checks")
354        exit(0)
355
356    rows, by_email, by_username = fetch_users(clusters, loginCluster)
357
358    if args.report:
359        out = csv.writer(open(args.report, "wt"))
360        out.writerow(("email", "username", "user uuid", "home cluster"))
361        for r in rows:
362            out.writerow(r)
363        print("Wrote %s" % args.report)
364        return
365
366    if args.migrate or args.dry_run:
367        if args.dry_run:
368            print("Performing dry run")
369
370        rows = read_migrations(args, by_email, by_username)
371
372        for r in rows:
373            email = r[EMAIL]
374            username = r[USERNAME]
375            old_user_uuid = r[UUID]
376            userhome = r[HOMECLUSTER]
377
378            if userhome == "":
379                print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
380            if old_user_uuid.startswith(userhome):
381                migratecluster = old_user_uuid[0:5]
382                migratearv = clusters[migratecluster]
383                if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
384                    update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
385                continue
386
387            new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
388            if new_user_uuid is None:
389                continue
390
391            remote_users = {}
392            got_error = False
393            for migratecluster in clusters:
394                # cluster where the migration is happening
395                migratearv = clusters[migratecluster]
396
397                # the user's new home cluster
398                newhomecluster = userhome[0:5]
399                homearv = clusters[newhomecluster]
400
401                newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
402                if newuser is None:
403                    got_error = True
404                remote_users[migratecluster] = newuser
405
406            if not got_error:
407                for migratecluster in clusters:
408                    migratearv = clusters[migratecluster]
409                    newuser = remote_users[migratecluster]
410                    if newuser is False:
411                        continue
412
413                    print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
414
415                    migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
416
417                    if newuser['username'] != username:
418                        update_username(args, email, new_user_uuid, username, migratecluster, migratearv)
419
420if __name__ == "__main__":
421    main()
EMAIL = 0
USERNAME = 1
UUID = 2
HOMECLUSTER = 3
def connect_clusters(args):
35def connect_clusters(args):
36    clusters = {}
37    errors = []
38    loginCluster = None
39    if args.tokens:
40        print("Reading %s" % args.tokens)
41        with open(args.tokens, "rt") as f:
42            for r in csv.reader(f):
43                if len(r) != 2:
44                    continue
45                host = r[0]
46                token = r[1]
47                print("Contacting %s" % (host))
48                arv = arvados.api(host=host, token=token, cache=False, num_retries=args.retries)
49                clusters[arv._rootDesc["uuidPrefix"]] = arv
50    else:
51        arv = arvados.api(cache=False, num_retries=args.retries)
52        rh = arv._rootDesc["remoteHosts"]
53        tok = arv.api_client_authorizations().current().execute()
54        token = "v2/%s/%s" % (tok["uuid"], tok["api_token"])
55
56        for k,v in rh.items():
57            arv = arvados.api(host=v, token=token, cache=False, insecure=os.environ.get("ARVADOS_API_HOST_INSECURE"))
58            clusters[k] = arv
59
60    for _, arv in clusters.items():
61        config = arv.configs().get().execute()
62        if config["Login"]["LoginCluster"] != "" and loginCluster is None:
63            loginCluster = config["Login"]["LoginCluster"]
64
65    print("Checking that the federation is well connected")
66    for arv in clusters.values():
67        config = arv.configs().get().execute()
68        if loginCluster and config["Login"]["LoginCluster"] != loginCluster and config["ClusterID"] != loginCluster:
69            errors.append("Inconsistent login cluster configuration, expected '%s' on %s but was '%s'" % (loginCluster, config["ClusterID"], config["Login"]["LoginCluster"]))
70            continue
71
72        if arv._rootDesc["revision"] < "20200331":
73            errors.append("Arvados API server revision on cluster '%s' is too old, must be updated to at least Arvados 2.0.2 before running migration." % config["ClusterID"])
74            continue
75
76        try:
77            cur = arv.users().current().execute()
78        except arvados.errors.ApiError as e:
79            errors.append("checking token for %s   %s" % (arv._rootDesc["rootUrl"], e))
80            continue
81
82        if not cur["is_admin"]:
83            errors.append("User %s is not admin on %s" % (cur["uuid"], arv._rootDesc["uuidPrefix"]))
84            continue
85
86        for r in clusters:
87            if r != arv._rootDesc["uuidPrefix"] and r not in arv._rootDesc["remoteHosts"]:
88                errors.append("%s is missing from remoteHosts of %s" % (r, arv._rootDesc["uuidPrefix"]))
89        for r in arv._rootDesc["remoteHosts"]:
90            if r != "*" and r not in clusters:
91                print("WARNING: %s is federated with %s but %s is missing from the tokens file or the token is invalid" % (arv._rootDesc["uuidPrefix"], r, r))
92
93    return clusters, errors, loginCluster
def fetch_users(clusters, loginCluster):
 96def fetch_users(clusters, loginCluster):
 97    rows = []
 98    by_email = {}
 99    by_username = {}
100
101    users = [
102        user
103        for prefix, arv in clusters.items()
104        for user in arvados.util.keyset_list_all(arv.users().list, bypass_federation=True)
105        if user['uuid'].startswith(prefix)
106    ]
107
108    # Users list is sorted by email
109    # Go through users and collect users with same email
110    # when we see a different email (or get to the end)
111    # call add_accum_rows() to generate the report rows with
112    # the "home cluster" set, and also fill in the by_email table.
113
114    users.sort(key=lambda u: (u["email"], u["username"] or "", u["uuid"]))
115
116    accum = []
117    lastemail = None
118
119    def add_accum_rows():
120        homeuuid = None
121        for a in accum:
122            uuids = set(a["uuid"] for a in accum)
123            homeuuid = ((len(uuids) == 1) and uuids.pop()) or ""
124        for a in accum:
125            r = (a["email"], a["username"], a["uuid"], loginCluster or homeuuid[0:5])
126            by_email.setdefault(a["email"], {})
127            by_email[a["email"]][a["uuid"]] = r
128            homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], a["username"])
129            if homeuuid_and_username not in by_username:
130                by_username[homeuuid_and_username] = a["email"]
131            elif by_username[homeuuid_and_username] != a["email"]:
132                print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
133                exit(1)
134            rows.append(r)
135
136    for u in users:
137        if u["uuid"].endswith("-anonymouspublic") or u["uuid"].endswith("-000000000000000"):
138            continue
139        if lastemail == None:
140            lastemail = u["email"]
141        if u["email"] == lastemail:
142            accum.append(u)
143        else:
144            add_accum_rows()
145            lastemail = u["email"]
146            accum = [u]
147
148    add_accum_rows()
149
150    return rows, by_email, by_username
def read_migrations(args, by_email, by_username):
153def read_migrations(args, by_email, by_username):
154    rows = []
155    with open(args.migrate or args.dry_run, "rt") as f:
156        for r in csv.reader(f):
157            if r[EMAIL] == "email":
158                continue
159            by_email.setdefault(r[EMAIL], {})
160            by_email[r[EMAIL]][r[UUID]] = r
161
162            homeuuid_and_username = "%s::%s" % (r[HOMECLUSTER], r[USERNAME])
163            if homeuuid_and_username not in by_username:
164                by_username[homeuuid_and_username] = r[EMAIL]
165            elif by_username[homeuuid_and_username] != r[EMAIL]:
166                print("ERROR: the username '%s' is listed for both '%s' and '%s' on cluster '%s'" % (r[USERNAME], r[EMAIL], by_username[homeuuid_and_username], r[HOMECLUSTER]))
167                exit(1)
168
169            rows.append(r)
170    return rows
def update_username(args, email, user_uuid, username, migratecluster, migratearv):
172def update_username(args, email, user_uuid, username, migratecluster, migratearv):
173    print("(%s) Updating username of %s to '%s' on %s" % (email, user_uuid, username, migratecluster))
174    if args.dry_run:
175        return
176    try:
177        conflicts = migratearv.users().list(filters=[["username", "=", username]], bypass_federation=True).execute()
178        if conflicts["items"]:
179            # There's already a user with the username, move the old user out of the way
180            migratearv.users().update(uuid=conflicts["items"][0]["uuid"],
181                                        bypass_federation=True,
182                                        body={"user": {"username": username+"migrate"}}).execute()
183        migratearv.users().update(uuid=user_uuid,
184                                    bypass_federation=True,
185                                    body={"user": {"username": username}}).execute()
186    except arvados.errors.ApiError as e:
187        print("(%s) Error updating username of %s to '%s' on %s: %s" % (email, user_uuid, username, migratecluster, e))
def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
190def choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters):
191    candidates = []
192    conflict = False
193    for b in by_email[email].values():
194        if b[2].startswith(userhome):
195            candidates.append(b)
196        if b[1] != username and b[3] == userhome:
197            print("(%s) Cannot migrate %s, conflicting usernames %s and %s" % (email, old_user_uuid, b[1], username))
198            conflict = True
199            break
200    if conflict:
201        return None
202    if len(candidates) == 0:
203        if len(userhome) == 5 and userhome not in clusters:
204            print("(%s) Cannot migrate %s, unknown home cluster %s (typo?)" % (email, old_user_uuid, userhome))
205            return None
206        print("(%s) No user listed with same email to migrate %s to %s, will create new user with username '%s'" % (email, old_user_uuid, userhome, username))
207        if not args.dry_run:
208            oldhomecluster = old_user_uuid[0:5]
209            oldhomearv = clusters[oldhomecluster]
210            newhomecluster = userhome[0:5]
211            homearv = clusters[userhome]
212            user = None
213            try:
214                olduser = oldhomearv.users().get(uuid=old_user_uuid).execute()
215                conflicts = homearv.users().list(filters=[["username", "=", username]],
216                                                 bypass_federation=True).execute()
217                if conflicts["items"]:
218                    homearv.users().update(
219                        uuid=conflicts["items"][0]["uuid"],
220                        bypass_federation=True,
221                        body={"user": {"username": username+"migrate"}}).execute()
222                user = homearv.users().create(
223                    body={"user": {
224                        "email": email,
225                        "first_name": olduser["first_name"],
226                        "last_name": olduser["last_name"],
227                        "username": username,
228                        "is_active": olduser["is_active"]}}).execute()
229            except arvados.errors.ApiError as e:
230                print("(%s) Could not create user: %s" % (email, str(e)))
231                return None
232
233            tup = (email, username, user["uuid"], userhome)
234        else:
235            # dry run
236            tup = (email, username, "%s-tpzed-xfakexfakexfake" % (userhome[0:5]), userhome)
237        by_email[email][tup[2]] = tup
238        candidates.append(tup)
239    if len(candidates) > 1:
240        print("(%s) Multiple users listed to migrate %s to %s, use full uuid" % (email, old_user_uuid, userhome))
241        return None
242    return candidates[0][2]
def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
245def activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid):
246    # create a token for the new user and salt it for the
247    # migration cluster, then use it to access the migration
248    # cluster as the new user once before merging to ensure
249    # the new user is known on that cluster.
250    migratecluster = migratearv._rootDesc["uuidPrefix"]
251    try:
252        if not args.dry_run:
253            newtok = homearv.api_client_authorizations().create(body={
254                "api_client_authorization": {'owner_uuid': new_user_uuid}}).execute()
255        else:
256            newtok = {"uuid": "dry-run", "api_token": "12345"}
257    except arvados.errors.ApiError as e:
258        print("(%s) Could not create API token for %s: %s" % (email, new_user_uuid, e))
259        return None
260
261    try:
262        findolduser = migratearv.users().list(filters=[["uuid", "=", old_user_uuid]], bypass_federation=True).execute()
263        if len(findolduser["items"]) == 0:
264            return False
265        if len(findolduser["items"]) == 1:
266            olduser = findolduser["items"][0]
267        else:
268            print("(%s) Unexpected result" % (email))
269            return None
270    except arvados.errors.ApiError as e:
271        print("(%s) Could not retrieve user %s from %s, user may have already been migrated: %s" % (email, old_user_uuid, migratecluster, e))
272        return None
273
274    salted = 'v2/' + newtok["uuid"] + '/' + hmac.new(newtok["api_token"].encode(),
275                                                     msg=migratecluster.encode(),
276                                                     digestmod=hashlib.sha1).hexdigest()
277    try:
278        ru = urllib.parse.urlparse(migratearv._rootDesc["rootUrl"])
279        if not args.dry_run:
280            newuser = arvados.api(host=ru.netloc, token=salted,
281                                  insecure=os.environ.get("ARVADOS_API_HOST_INSECURE")).users().current().execute()
282        else:
283            newuser = {"is_active": True, "username": email.split('@')[0], "is_admin": False}
284    except arvados.errors.ApiError as e:
285        print("(%s) Error getting user info for %s from %s: %s" % (email, new_user_uuid, migratecluster, e))
286        return None
287
288    if not newuser["is_active"] and olduser["is_active"]:
289        print("(%s) Activating user %s on %s" % (email, new_user_uuid, migratecluster))
290        try:
291            if not args.dry_run:
292                migratearv.users().update(uuid=new_user_uuid, bypass_federation=True,
293                                          body={"is_active": True}).execute()
294        except arvados.errors.ApiError as e:
295            print("(%s) Could not activate user %s on %s: %s" % (email, new_user_uuid, migratecluster, e))
296            return None
297
298    if olduser["is_admin"] and not newuser["is_admin"]:
299        print("(%s) Not migrating %s because user is admin but target user %s is not admin on %s. Please ensure the user admin status is the same on both clusters. Note that a federated admin account has admin privileges on the entire federation." % (email, old_user_uuid, new_user_uuid, migratecluster))
300        return None
301
302    return newuser
def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
304def migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid):
305    if args.dry_run:
306        return
307    try:
308        new_owner_uuid = new_user_uuid
309        if args.data_into_subproject:
310            grp = migratearv.groups().create(body={
311                "owner_uuid": new_user_uuid,
312                "name": "Migrated from %s (%s)" % (email, old_user_uuid),
313                "group_class": "project"
314            }, ensure_unique_name=True).execute()
315            new_owner_uuid = grp["uuid"]
316        migratearv.users().merge(old_user_uuid=old_user_uuid,
317                                    new_user_uuid=new_user_uuid,
318                                    new_owner_uuid=new_owner_uuid,
319                                    redirect_to_new_user=True).execute()
320    except arvados.errors.ApiError as e:
321        name_collision = re.search(r'Key \(owner_uuid, name\)=\((.*?), (.*?)\) already exists\.\n.*UPDATE "(.*?)"', e._get_reason())
322        if name_collision:
323            target_owner, rsc_name, rsc_type = name_collision.groups()
324            print("(%s) Cannot migrate to %s because both origin and target users have a %s named '%s'. Please rename the conflicting items or use --data-into-subproject to migrate all users' data into a special subproject." % (email, target_owner, rsc_type[:-1], rsc_name))
325        else:
326            print("(%s) Skipping user migration because of error: %s" % (email, e))
def main():
329def main():
330    parser = argparse.ArgumentParser(
331        description='Migrate users to federated identity, see https://doc.arvados.org/admin/merge-remote-account.html',
332        parents=[arv_cmd.retry_opt],
333    )
334    parser.add_argument(
335        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
336        help='Print version and exit.')
337    parser.add_argument('--tokens', type=str, metavar='FILE', required=False, help="Read tokens from FILE. Not needed when using LoginCluster.")
338    parser.add_argument('--data-into-subproject', action="store_true", help="Migrate user's data into a separate subproject. This can be used to avoid name collisions from within an account.")
339    group = parser.add_mutually_exclusive_group(required=True)
340    group.add_argument('--report', type=str, metavar='FILE', help="Generate report .csv file listing users by email address and their associated Arvados accounts.")
341    group.add_argument('--migrate', type=str, metavar='FILE', help="Consume report .csv and migrate users to designated Arvados accounts.")
342    group.add_argument('--dry-run', type=str, metavar='FILE', help="Consume report .csv and report how user would be migrated to designated Arvados accounts.")
343    group.add_argument('--check', action="store_true", help="Check that tokens are usable and the federation is well connected.")
344    args = parser.parse_args()
345
346    clusters, errors, loginCluster = connect_clusters(args)
347
348    if errors:
349        for e in errors:
350            print("ERROR: "+str(e))
351        exit(1)
352
353    if args.check:
354        print("Tokens file passed checks")
355        exit(0)
356
357    rows, by_email, by_username = fetch_users(clusters, loginCluster)
358
359    if args.report:
360        out = csv.writer(open(args.report, "wt"))
361        out.writerow(("email", "username", "user uuid", "home cluster"))
362        for r in rows:
363            out.writerow(r)
364        print("Wrote %s" % args.report)
365        return
366
367    if args.migrate or args.dry_run:
368        if args.dry_run:
369            print("Performing dry run")
370
371        rows = read_migrations(args, by_email, by_username)
372
373        for r in rows:
374            email = r[EMAIL]
375            username = r[USERNAME]
376            old_user_uuid = r[UUID]
377            userhome = r[HOMECLUSTER]
378
379            if userhome == "":
380                print("(%s) Skipping %s, no home cluster specified" % (email, old_user_uuid))
381            if old_user_uuid.startswith(userhome):
382                migratecluster = old_user_uuid[0:5]
383                migratearv = clusters[migratecluster]
384                if migratearv.users().get(uuid=old_user_uuid).execute()["username"] != username:
385                    update_username(args, email, old_user_uuid, username, migratecluster, migratearv)
386                continue
387
388            new_user_uuid = choose_new_user(args, by_email, email, userhome, username, old_user_uuid, clusters)
389            if new_user_uuid is None:
390                continue
391
392            remote_users = {}
393            got_error = False
394            for migratecluster in clusters:
395                # cluster where the migration is happening
396                migratearv = clusters[migratecluster]
397
398                # the user's new home cluster
399                newhomecluster = userhome[0:5]
400                homearv = clusters[newhomecluster]
401
402                newuser = activate_remote_user(args, email, homearv, migratearv, old_user_uuid, new_user_uuid)
403                if newuser is None:
404                    got_error = True
405                remote_users[migratecluster] = newuser
406
407            if not got_error:
408                for migratecluster in clusters:
409                    migratearv = clusters[migratecluster]
410                    newuser = remote_users[migratecluster]
411                    if newuser is False:
412                        continue
413
414                    print("(%s) Migrating %s to %s on %s" % (email, old_user_uuid, new_user_uuid, migratecluster))
415
416                    migrate_user(args, migratearv, email, new_user_uuid, old_user_uuid)
417
418                    if newuser['username'] != username:
419                        update_username(args, email, new_user_uuid, username, migratecluster, migratearv)