From 1549ebe62e545b55af4af6bfc4a2960576feaa44 Mon Sep 17 00:00:00 2001 From: Ryan Kelly Date: Tue, 29 May 2018 07:27:39 +1000 Subject: [PATCH] Bug 988134: Drop the unique constraint on client_state. --- syncserver/staticnode.py | 80 ++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/syncserver/staticnode.py b/syncserver/staticnode.py index 1f6562a..bfc4142 100644 --- a/syncserver/staticnode.py +++ b/syncserver/staticnode.py @@ -8,8 +8,6 @@ This is a greatly-simplified node-assignment backend. It keeps user records in an SQL database, but does not attempt to do any node management. All users are implicitly assigned to a single, static node. -XXX TODO: move this into the tokenserver repo. - """ import time import urlparse @@ -19,7 +17,6 @@ from sqlalchemy import Column, Integer, String, BigInteger, Index from sqlalchemy import create_engine, Table, MetaData from sqlalchemy.pool import QueuePool from sqlalchemy.sql import text as sqltext -from sqlalchemy.exc import IntegrityError from tokenserver.assignment import INodeAssignment from zope.interface import implements @@ -40,13 +37,12 @@ users = Table( Column("created_at", BigInteger(), nullable=False), Column("replaced_at", BigInteger(), nullable=True), Index('lookup_idx', 'email', 'service', 'created_at'), - Index('clientstate_idx', 'email', 'service', 'client_state', unique=True), ) _GET_USER_RECORDS = sqltext("""\ select - uid, generation, client_state, created_at + uid, generation, client_state, created_at, replaced_at from users where @@ -111,10 +107,10 @@ class StaticNodeAssignment(object): } if self.driver == "sqlite": # We must mark it as safe to share sqlite connections between - # threads. The pool will ensure there's on race conditions. + # threads. The pool will ensure there's no race conditions. sqlkw["connect_args"]["check_same_thread"] = False - # If using a :memory: database, we must use a QueuePool of size - # 1 so that a single connection is shared by all threads. + # If using a :memory: database, we must use a QueuePool of + # size 1 so that a single connection is shared by all threads. if urlparse.urlparse(sqluri).path.lower() in ("/", "/:memory:"): sqlkw["pool_size"] = 1 sqlkw["max_overflow"] = 0 @@ -142,10 +138,22 @@ class StaticNodeAssignment(object): 'old_client_states': {} } # Any subsequent rows are due to old client-state values. - row = res.fetchone() - while row is not None: - user['old_client_states'][row.client_state] = True - row = res.fetchone() + old_row = res.fetchone() + update_replaced_at = False + while old_row is not None: + if old_row.client_state != user['client_state']: + user['old_client_states'][old_row.client_state] = True + # Make sure each old row is marked as replaced. + # They might not be, due to races in row creation. + if old_row.replaced_at is None: + update_replaced_at = True + old_row = res.fetchone() + if update_replaced_at: + self._engine.execute(_REPLACE_USER_RECORDS, { + 'service': service, + 'email': user['email'], + 'timestamp': row.created_at, + }).close() return user finally: res.close() @@ -156,22 +164,17 @@ class StaticNodeAssignment(object): 'service': service, 'email': email, 'generation': generation, 'client_state': client_state, 'timestamp': now } - try: - res = self._engine.execute(_CREATE_USER_RECORD, **params) - except IntegrityError: - raise - return self.get_user(service, email) - else: - res.close() - return { - 'email': email, - 'uid': res.lastrowid, - 'node': self.node_url, - 'generation': generation, - 'client_state': client_state, - 'first_seen_at': now, - 'old_client_states': {} - } + res = self._engine.execute(_CREATE_USER_RECORD, **params) + res.close() + return { + 'email': email, + 'uid': res.lastrowid, + 'node': self.node_url, + 'generation': generation, + 'client_state': client_state, + 'first_seen_at': now, + 'old_client_states': {} + } def update_user(self, service, user, generation=None, client_state=None): if client_state is None: @@ -202,19 +205,14 @@ class StaticNodeAssignment(object): 'generation': generation, 'client_state': client_state, 'timestamp': now, } - try: - res = self._engine.execute(_CREATE_USER_RECORD, **params) - except IntegrityError: - user.update(self.get_user(service, user['email'])) - else: - self.get_user(service, user['email']) - user['uid'] = res.lastrowid - user['generation'] = generation - user['old_client_states'][user['client_state']] = True - user['client_state'] = client_state - res.close() - # mark old records as having been replaced. - # if we crash here, they are unmarked and we may fail to + res = self._engine.execute(_CREATE_USER_RECORD, **params) + res.close() + user['uid'] = res.lastrowid + user['generation'] = generation + user['old_client_states'][user['client_state']] = True + user['client_state'] = client_state + # Mark old records as having been replaced. + # If we crash here, they are unmarked and we may fail to # garbage collect them for a while, but the active state # will be undamaged. params = {