# Copyright (C) 2011-2015 by the Free Software Foundation, Inc. # # This file is part of GNU Mailman. # # GNU Mailman is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # GNU Mailman. If not, see . """Unique ID generation. Use these functions to create unique ids rather than inlining calls to hashlib and whatnot. These are better instrumented for testing purposes. """ __all__ = [ 'UniqueIDFactory', 'factory', ] import os import time import uuid import errno import random import hashlib from flufl.lock import Lock from mailman.config import config from mailman.model.uid import UID from mailman.testing import layers class UniqueIDFactory: """A factory for unique ids.""" def __init__(self, context=None): # We can't call reset() when the factory is created below, because # config.VAR_DIR will not be set at that time. So initialize it at # the first use. self._uid_file = None self._lock_file = None self._lockobj = None self._context = context layers.MockAndMonkeyLayer.register_reset(self.reset) @property def _lock(self): if self._lockobj is None: # These will get automatically cleaned up by the test # infrastructure. self._uid_file = os.path.join(config.VAR_DIR, '.uid') if self._context: self._uid_file += '.' + self._context self._lock_file = self._uid_file + '.lock' self._lockobj = Lock(self._lock_file) return self._lockobj def new_uid(self): """Return a new UID. :return: The new uid :rtype: int """ if layers.is_testing(): # When in testing mode we want to produce predictable id, but we # need to coordinate this among separate processes. We could use # the database, but I don't want to add schema just to handle this # case, and besides transactions could get aborted, causing some # ids to be recycled. So we'll use a data file with a lock. This # may still not be ideal due to race conditions, but I think the # tests will be serialized enough (and the ids reset between # tests) that it will not be a problem. Maybe. return uuid.UUID(int=self._next_uid()) while True: uid = uuid.uuid4() try: UID.record(uid) except ValueError: pass else: return uid def _next_uid(self): with self._lock: try: with open(self._uid_file) as fp: uid = int(fp.read().strip()) next_uid = uid + 1 with open(self._uid_file, 'w') as fp: fp.write(str(next_uid)) return uid except IOError as error: if error.errno != errno.ENOENT: raise with open(self._uid_file, 'w') as fp: fp.write('2') return 1 def reset(self): with self._lock: with open(self._uid_file, 'w') as fp: fp.write('1') class TokenFactory(UniqueIDFactory): def __init__(self): super(TokenFactory, self).__init__(context='token') def new_token(self): """ Calculate a unique token. Algorithm vetted by the Timbot. time() has high resolution on Linux, clock() on Windows. random gives us about 45 bits in Python 2.2, 53 bits on Python 2.3. The time and clock values basically help obscure the random number generator, as does the hash calculation. The integral parts of the time values are discarded because they're the most predictable bits. """ if layers.is_testing(): # When in testing mode we want to produce predictable tokens, see # UniqueIDFactory for a similar use case. return str(self._next_uid()).zfill(40) right_now = time.time() x = random.random() + right_now % 1.0 + time.clock() % 1.0 # Use sha1 because it produces shorter strings. return hashlib.sha1(repr(x).encode('utf-8')).hexdigest()