home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __revision__ = '$Id: randpool.py,v 1.14 2004/05/06 12:56:54 akuchling Exp $'
- import time
- import array
- import types
- import warnings
- import os.path as os
- from Crypto.Util.number import long_to_bytes
-
- try:
- import Crypto.Util.winrandom as winrandom
- except:
- winrandom = None
-
- STIRNUM = 3
-
- class RandomPool:
- '''randpool.py : Cryptographically strong random number generation.
-
- The implementation here is similar to the one in PGP. To be
- cryptographically strong, it must be difficult to determine the RNG\'s
- output, whether in the future or the past. This is done by using
- a cryptographic hash function to "stir" the random data.
-
- Entropy is gathered in the same fashion as PGP; the highest-resolution
- clock around is read and the data is added to the random number pool.
- A conservative estimate of the entropy is then kept.
-
- If a cryptographically secure random source is available (/dev/urandom
- on many Unixes, Windows CryptGenRandom on most Windows), then use
- it.
-
- Instance Attributes:
- bits : int
- Maximum size of pool in bits
- bytes : int
- Maximum size of pool in bytes
- entropy : int
- Number of bits of entropy in this pool.
-
- Methods:
- add_event([s]) : add some entropy to the pool
- get_bytes(int) : get N bytes of random data
- randomize([N]) : get N bytes of randomness from external source
- '''
-
- def __init__(self, numbytes = 160, cipher = None, hash = None):
- if hash is None:
- hash = SHA
- import Crypto.Hash
-
- if cipher is not None:
- warnings.warn("'cipher' parameter is no longer used")
-
- if isinstance(hash, types.StringType):
- hash = __import__('Crypto.Hash.' + hash, None, None, [
- 'new'])
- warnings.warn("'hash' parameter should now be a hashing module")
-
- self.bytes = numbytes
- self.bits = self.bytes * 8
- self.entropy = 0
- self._hash = hash
- self._randpool = array.array('B', [
- 0] * self.bytes)
- self._event1 = self._event2 = 0
- self._addPos = 0
- self._getPos = hash.digest_size
- self._lastcounter = time.time()
- self._RandomPool__counter = 0
- self._measureTickSize()
- self._randomize()
-
-
- def _updateEntropyEstimate(self, nbits):
- self.entropy += nbits
- if self.entropy < 0:
- self.entropy = 0
- elif self.entropy > self.bits:
- self.entropy = self.bits
-
-
-
- def _randomize(self, N = 0, devname = '/dev/urandom'):
- '''_randomize(N, DEVNAME:device-filepath)
- collects N bits of randomness from some entropy source (e.g.,
- /dev/urandom on Unixes that have it, Windows CryptoAPI
- CryptGenRandom, etc)
- DEVNAME is optional, defaults to /dev/urandom. You can change it
- to /dev/random if you want to block till you get enough
- entropy.
- '''
- data = ''
- if N <= 0:
- nbytes = int((self.bits - self.entropy) / 8 + 0.5)
- else:
- nbytes = int(N / 8 + 0.5)
- if winrandom:
- data = winrandom.new().get_bytes(nbytes)
- elif os.path.exists(devname):
-
- try:
- f = open(devname)
- data = f.read(nbytes)
- f.close()
- except IOError:
- (num, msg) = None
- if num != 2:
- raise IOError, (num, msg)
- num != 2
- except:
- None<EXCEPTION MATCH>IOError
-
-
- None<EXCEPTION MATCH>IOError
- if data:
- self._addBytes(data)
- self._updateEntropyEstimate(8 * len(data))
-
- self.stir_n()
-
-
- def randomize(self, N = 0):
- '''randomize(N:int)
- use the class entropy source to get some entropy data.
- This is overridden by KeyboardRandomize().
- '''
- return self._randomize(N)
-
-
- def stir_n(self, N = STIRNUM):
- '''stir_n(N)
- stirs the random pool N times
- '''
- for i in xrange(N):
- self.stir()
-
-
-
- def stir(self, s = ''):
- """stir(s:string)
- Mix up the randomness pool. This will call add_event() twice,
- but out of paranoia the entropy attribute will not be
- increased. The optional 's' parameter is a string that will
- be hashed with the randomness pool.
- """
- entropy = self.entropy
- self.add_event()
- for i in range(self.bytes / self._hash.digest_size):
- h = self._hash.new(self._randpool)
- h.update(str(self._RandomPool__counter) + str(i) + str(self._addPos) + s)
- self._addBytes(h.digest())
- self._RandomPool__counter = self._RandomPool__counter + 1 & 0xFFFFFFFFL
-
- self._addPos = 0
- self._getPos = self._hash.digest_size
- self.add_event()
- self.entropy = entropy
-
-
- def get_bytes(self, N):
- '''get_bytes(N:int) : string
- Return N bytes of random data.
- '''
- s = ''
- i = self._getPos
- pool = self._randpool
- h = self._hash.new()
- dsize = self._hash.digest_size
- num = N
- while num > 0:
- h.update(self._randpool[i:i + dsize])
- s = s + h.digest()
- num = num - dsize
- i = (i + dsize) % self.bytes
- if i < dsize:
- self.stir()
- i = self._getPos
- continue
- self._getPos = i
- self._updateEntropyEstimate(-8 * N)
- return s[:N]
-
-
- def add_event(self, s = ''):
- """add_event(s:string)
- Add an event to the random pool. The current time is stored
- between calls and used to estimate the entropy. The optional
- 's' parameter is a string that will also be XORed into the pool.
- Returns the estimated number of additional bits of entropy gain.
- """
- event = time.time() * 1000
- delta = self._noise()
- s = s + long_to_bytes(event) + 4 * chr(170) + long_to_bytes(delta)
- self._addBytes(s)
- if event == self._event1 and event == self._event2:
- bits = 0
- else:
- bits = 0
- while delta:
- delta = delta >> 1
- bits = bits + 1
- if bits > 8:
- bits = 8
-
- self._event1 = event
- self._event2 = self._event1
- self._updateEntropyEstimate(bits)
- return bits
-
-
- def _noise(self):
- t = time.time()
- delta = ((t - self._lastcounter) / self._ticksize) * 1e+06
- self._lastcounter = t
- self._addBytes(long_to_bytes(long(1000 * time.time())))
- self._addBytes(long_to_bytes(long(1000 * time.clock())))
- self._addBytes(long_to_bytes(long(1000 * time.time())))
- self._addBytes(long_to_bytes(long(delta)))
- delta = delta % 255
- return int(delta)
-
-
- def _measureTickSize(self):
- interval = [
- None] * 100
- h = self._hash.new(`(id(self), id(interval))`)
- t = time.time()
- h.update(`t`)
- i = 0
- j = 0
- while i < 100:
- t2 = time.time()
- h.update(`(i, j, t2)`)
- j += 1
- delta = int((t2 - t) * 1e+06)
- if delta:
- interval[i] = delta
- i += 1
- t = t2
- continue
- interval.sort()
- self._ticksize = interval[len(interval) / 2]
- h.update(`(interval, self._ticksize)`)
- self.stir(h.digest())
-
-
- def _addBytes(self, s):
- '''XOR the contents of the string S into the random pool'''
- i = self._addPos
- pool = self._randpool
- for j in range(0, len(s)):
- pool[i] = pool[i] ^ ord(s[j])
- i = (i + 1) % self.bytes
-
- self._addPos = i
-
-
- def getBytes(self, N):
- warnings.warn('getBytes() method replaced by get_bytes()', DeprecationWarning)
- return self.get_bytes(N)
-
-
- def addEvent(self, event, s = ''):
- warnings.warn('addEvent() method replaced by add_event()', DeprecationWarning)
- return self.add_event(s + str(event))
-
-
-
- class PersistentRandomPool(RandomPool):
-
- def __init__(self, filename = None, *args, **kwargs):
- RandomPool.__init__(self, *args, **kwargs)
- self.filename = filename
- if filename:
-
- try:
- f = open(filename, 'rb')
- self.add_event()
- data = f.read()
- self.add_event()
- self.stir(data)
- f.close()
- except IOError:
- pass
- except:
- None<EXCEPTION MATCH>IOError
-
-
- None<EXCEPTION MATCH>IOError
-
-
- def save(self):
- if self.filename == '':
- raise ValueError, 'No filename set for this object'
- self.filename == ''
- self.stir_n()
- f = open(self.filename, 'wb')
- self.add_event()
- f.write(self._randpool.tostring())
- f.close()
- self.add_event()
- self.stir()
-
-
- _kb = 0
- if not _kb:
-
- try:
- import msvcrt
-
- class KeyboardEntry:
-
- def getch(self):
- c = msvcrt.getch()
- if c in ('\x00', '\xe0'):
- c += msvcrt.getch()
-
- return c
-
-
- def close(self, delay = 0):
- if delay:
- time.sleep(delay)
- while msvcrt.kbhit():
- msvcrt.getch()
-
-
-
- _kb = 1
- except:
- pass
-
-
- if not _kb:
-
- try:
- import termios
-
- class KeyboardEntry:
-
- def __init__(self, fd = 0):
- self._fd = fd
- self._old = termios.tcgetattr(fd)
- new = termios.tcgetattr(fd)
- new[3] = new[3] & ~(termios.ICANON) & ~(termios.ECHO)
- termios.tcsetattr(fd, termios.TCSANOW, new)
-
-
- def getch(self):
- termios.tcflush(0, termios.TCIFLUSH)
- return os.read(self._fd, 1)
-
-
- def close(self, delay = 0):
- if delay:
- time.sleep(delay)
- termios.tcflush(self._fd, termios.TCIFLUSH)
-
- termios.tcsetattr(self._fd, termios.TCSAFLUSH, self._old)
-
-
- _kb = 1
- except:
- pass
-
-
-
- class KeyboardRandomPool(PersistentRandomPool):
-
- def __init__(self, *args, **kwargs):
- PersistentRandomPool.__init__(self, *args, **kwargs)
-
-
- def randomize(self, N = 0):
- '''Adds N bits of entropy to random pool. If N is 0, fill up pool.'''
- import os
- import string
- import time
- if N <= 0:
- bits = self.bits - self.entropy
- else:
- bits = N * 8
- if bits == 0:
- return None
- print bits, 'bits of entropy are now required. Please type on the keyboard'
- print 'until enough randomness has been accumulated.'
- kb = KeyboardEntry()
- s = ''
- hash = self._hash
- e = 0
-
- try:
- while e < bits:
- temp = str(bits - e).rjust(6)
- os.write(1, temp)
- s = s + kb.getch()
- e += self.add_event(s)
- os.write(1, 6 * chr(8))
- self.add_event(s + hash.new(s).digest())
- finally:
- kb.close()
-
- print '\n\x07 Enough. Please wait a moment.\n'
- self.stir_n()
- kb.close(4)
-
-
- if __name__ == '__main__':
- pool = RandomPool()
- print 'random pool entropy', pool.entropy, 'bits'
- pool.add_event('something')
- print `pool.get_bytes(100)`
- import tempfile
- import os
- fname = tempfile.mktemp()
- pool = KeyboardRandomPool(filename = fname)
- print 'keyboard random pool entropy', pool.entropy, 'bits'
- pool.randomize()
- print 'keyboard random pool entropy', pool.entropy, 'bits'
- pool.randomize(128)
- pool.save()
- saved = open(fname, 'rb').read()
- print 'saved', `saved`
- print 'pool ', `pool._randpool.tostring()`
- newpool = PersistentRandomPool(fname)
- print 'persistent random pool entropy', pool.entropy, 'bits'
- os.remove(fname)
-
-