Bug 988134: Drop the unique constraint on client_state.

bug-988134-port
Ryan Kelly 6 years ago
parent 2865b19181
commit 1549ebe62e

@ -8,8 +8,6 @@ This is a greatly-simplified node-assignment backend. It keeps user records
in an SQL database, but does not attempt to do any node management. All users in an SQL database, but does not attempt to do any node management. All users
are implicitly assigned to a single, static node. are implicitly assigned to a single, static node.
XXX TODO: move this into the tokenserver repo.
""" """
import time import time
import urlparse import urlparse
@ -19,7 +17,6 @@ from sqlalchemy import Column, Integer, String, BigInteger, Index
from sqlalchemy import create_engine, Table, MetaData from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.pool import QueuePool from sqlalchemy.pool import QueuePool
from sqlalchemy.sql import text as sqltext from sqlalchemy.sql import text as sqltext
from sqlalchemy.exc import IntegrityError
from tokenserver.assignment import INodeAssignment from tokenserver.assignment import INodeAssignment
from zope.interface import implements from zope.interface import implements
@ -40,13 +37,12 @@ users = Table(
Column("created_at", BigInteger(), nullable=False), Column("created_at", BigInteger(), nullable=False),
Column("replaced_at", BigInteger(), nullable=True), Column("replaced_at", BigInteger(), nullable=True),
Index('lookup_idx', 'email', 'service', 'created_at'), Index('lookup_idx', 'email', 'service', 'created_at'),
Index('clientstate_idx', 'email', 'service', 'client_state', unique=True),
) )
_GET_USER_RECORDS = sqltext("""\ _GET_USER_RECORDS = sqltext("""\
select select
uid, generation, client_state, created_at uid, generation, client_state, created_at, replaced_at
from from
users users
where where
@ -111,10 +107,10 @@ class StaticNodeAssignment(object):
} }
if self.driver == "sqlite": if self.driver == "sqlite":
# We must mark it as safe to share sqlite connections between # We must mark it as safe to share sqlite connections between
# threads. The pool will ensure there's on race conditions. # threads. The pool will ensure there's no race conditions.
sqlkw["connect_args"]["check_same_thread"] = False sqlkw["connect_args"]["check_same_thread"] = False
# If using a :memory: database, we must use a QueuePool of size # If using a :memory: database, we must use a QueuePool of
# 1 so that a single connection is shared by all threads. # size 1 so that a single connection is shared by all threads.
if urlparse.urlparse(sqluri).path.lower() in ("/", "/:memory:"): if urlparse.urlparse(sqluri).path.lower() in ("/", "/:memory:"):
sqlkw["pool_size"] = 1 sqlkw["pool_size"] = 1
sqlkw["max_overflow"] = 0 sqlkw["max_overflow"] = 0
@ -142,10 +138,22 @@ class StaticNodeAssignment(object):
'old_client_states': {} 'old_client_states': {}
} }
# Any subsequent rows are due to old client-state values. # Any subsequent rows are due to old client-state values.
row = res.fetchone() old_row = res.fetchone()
while row is not None: update_replaced_at = False
user['old_client_states'][row.client_state] = True while old_row is not None:
row = res.fetchone() if old_row.client_state != user['client_state']:
user['old_client_states'][old_row.client_state] = True
# Make sure each old row is marked as replaced.
# They might not be, due to races in row creation.
if old_row.replaced_at is None:
update_replaced_at = True
old_row = res.fetchone()
if update_replaced_at:
self._engine.execute(_REPLACE_USER_RECORDS, {
'service': service,
'email': user['email'],
'timestamp': row.created_at,
}).close()
return user return user
finally: finally:
res.close() res.close()
@ -156,22 +164,17 @@ class StaticNodeAssignment(object):
'service': service, 'email': email, 'generation': generation, 'service': service, 'email': email, 'generation': generation,
'client_state': client_state, 'timestamp': now 'client_state': client_state, 'timestamp': now
} }
try: res = self._engine.execute(_CREATE_USER_RECORD, **params)
res = self._engine.execute(_CREATE_USER_RECORD, **params) res.close()
except IntegrityError: return {
raise 'email': email,
return self.get_user(service, email) 'uid': res.lastrowid,
else: 'node': self.node_url,
res.close() 'generation': generation,
return { 'client_state': client_state,
'email': email, 'first_seen_at': now,
'uid': res.lastrowid, 'old_client_states': {}
'node': self.node_url, }
'generation': generation,
'client_state': client_state,
'first_seen_at': now,
'old_client_states': {}
}
def update_user(self, service, user, generation=None, client_state=None): def update_user(self, service, user, generation=None, client_state=None):
if client_state is None: if client_state is None:
@ -202,19 +205,14 @@ class StaticNodeAssignment(object):
'generation': generation, 'client_state': client_state, 'generation': generation, 'client_state': client_state,
'timestamp': now, 'timestamp': now,
} }
try: res = self._engine.execute(_CREATE_USER_RECORD, **params)
res = self._engine.execute(_CREATE_USER_RECORD, **params) res.close()
except IntegrityError: user['uid'] = res.lastrowid
user.update(self.get_user(service, user['email'])) user['generation'] = generation
else: user['old_client_states'][user['client_state']] = True
self.get_user(service, user['email']) user['client_state'] = client_state
user['uid'] = res.lastrowid # Mark old records as having been replaced.
user['generation'] = generation # If we crash here, they are unmarked and we may fail to
user['old_client_states'][user['client_state']] = True
user['client_state'] = client_state
res.close()
# mark old records as having been replaced.
# if we crash here, they are unmarked and we may fail to
# garbage collect them for a while, but the active state # garbage collect them for a while, but the active state
# will be undamaged. # will be undamaged.
params = { params = {

Loading…
Cancel
Save