This repository has been archived on 2025-03-01. You can view files and clone it, but cannot push or open issues or pull requests.
silverstream/silverstream/bittorrent/dht/routing.py
Casper V. Kristensen 49eb78d5c0
Publish.
2019-04-19 02:33:31 +02:00

236 lines
8.1 KiB
Python

from __future__ import annotations
import asyncio
import logging
import random
import typing
from collections import OrderedDict
from contextlib import suppress
from typing import Tuple, List, Dict, Set, Iterator
from . import config
from .peer import PeerStatus
from .util import closest
from ...util import b2i, i2b, log2
if typing.TYPE_CHECKING: # PyCharm
from .node import Node
from .peer import Contact, Peer
logger = logging.getLogger(__name__)
class ReplacementCache:
def __init__(self, maxlen: int) -> None:
self.maxlen = maxlen
self.peers = OrderedDict()
def add(self, peer: Peer) -> None:
compact = peer.compact
try:
self.peers.move_to_end(compact, last=True) # the cache should be kept sorted by time last seen
except KeyError:
if len(self.peers) >= self.maxlen:
self.peers.popitem(last=False) # remove oldest if full to make room for the new one
self.peers[compact] = peer # save peer in cache (or update its status)
def pop(self) -> Peer:
compact, peer = self.peers.popitem(last=True)
return peer
def __len__(self) -> int:
return self.peers.__len__()
def __iter__(self) -> Iterator[Peer]:
yield from self.peers.values()
class Bucket(dict):
size = config.k
def __init__(self, min: int, max: int) -> None:
super().__init__()
self.min = min
self.max = max
self.replacements: ReplacementCache = ReplacementCache(maxlen=self.size)
@property
def full(self) -> bool:
return len(self) >= self.size
@property
def fresh(self) -> bool:
return any(peer.status == PeerStatus.GOOD for peer in self.values())
@property
def depth(self):
"""
Return the depth of the bucket, i.e. the number of prefix bits shared by all contacts in this bucket.
"""
return int(log2(config.id_space // (self.max - self.min)))
def fits(self, id: bytes) -> bool:
"""
Return whether or not the given node/peer id fits in this bucket.
"""
return self.min <= b2i(id) < self.max
def add(self, peer: Peer) -> bool:
"""
Add the given peer to the bucket.
:return: True if the peer was added to the bucket, otherwise False.
"""
# If the bucket is not full or the peer is already present, peer is added or updated, respectively
if peer.contact in self or not self.full:
self[peer.contact] = peer
return True
# Otherwise, if any peers in the bucket are known to have become bad, then one is replaced by the new peer
self.replacements.add(peer)
return self.fill()
def fill(self) -> bool:
"""
Fill the bucket using peers from the replacement cache if it isn't full. Also replaces bad peers if any.
:return: True if any new peer was added to the bucket, otherwise False.
"""
added = False
# Fill bucket using replacements
while not self.full and self.replacements:
self.add(self.replacements.pop())
added = True
# Replace bad peers
bad = {contact for contact, peer in self.items() if peer.status == PeerStatus.BAD}
while bad and self.replacements:
del self[bad.pop()]
self.add(self.replacements.pop())
added = True
return added
def split(self) -> Tuple[Bucket, Bucket]:
"""
Split the bucket in two, dividing the contents between them.
:return: The two new buckets.
"""
logger.debug("Splitting bucket")
half = (self.min + self.max) // 2
left = Bucket(self.min, half)
right = Bucket(half, self.max)
# Divide peers
for peer in self.values():
bucket = left if left.fits(peer.id) else right
bucket.add(peer)
# Divide replacement peers
for peer in self.replacements:
bucket = left if left.fits(peer.id) else right
bucket.replacements.add(peer) # this doesn't change the replacement cache order
# Fill buckets to capacity using replacement cache
left.fill()
right.fill()
return left, right
def __repr__(self) -> str:
return "Bucket(min=2^{min}, max=2^{max}, fresh={fresh}, full={full}, peers={peers}," \
" replacements={replacements})".format(min=log2(self.min),
max=log2(self.max),
fresh=self.fresh,
full=self.full,
peers=list(self.values()),
replacements=self.replacements)
class RoutingTable:
def __init__(self, node: Node) -> None:
self.node = node
self.buckets: List[Bucket] = [Bucket(0, config.id_space)] # initially, the table has a single bucket of the entire ID space
@property
def peers(self) -> Set[Peer]:
return {peer
for bucket in self.buckets
for peer in bucket.values()}
@property
def contacts(self) -> Dict[Contact, Peer]:
return {contact: peer
for bucket in self.buckets
for contact, peer in bucket.items()}
@property
def replacements(self) -> Set[Peer]:
return {peer
for bucket in self.buckets
for peer in bucket.replacements}
def find_bucket(self, id: bytes) -> Bucket:
"""
Find the appropriate bucket for the given id.
"""
return next(bucket for bucket in self.buckets if bucket.fits(id))
def add(self, peer: Peer) -> None:
"""
Add the given peer to the appropriate bucket in the routing table.
"""
if peer.id == self.node.id:
return
logger.debug("Adding %s to routing table", peer)
bucket = self.find_bucket(peer.id)
# Try to add the peer to the appropriate bucket
if bucket.add(peer):
return
# Otherwise, if the bucket's range includes our own ID, it is split into two and the insertion attempt repeated
if bucket.fits(self.node.id) or bucket.depth % config.b != 0:
self.buckets.remove(bucket)
self.buckets.extend(bucket.split())
self.add(peer)
def closest(self, id: bytes, k=config.k, status: PeerStatus = None) -> List[Peer]:
"""
Return a list with the k closest peers to the given id in our buckets, optionally with given status.
"""
peers = self.peers
if status is not None:
peers = {peer for peer in peers if peer.status == status}
return closest(k, id, peers)
async def refresh_bucket(self, bucket: Bucket) -> None:
"""
Refresh the bucket by picking a random ID in the range of the bucket and performing a find_nodes search on it.
"""
logger.debug("Refreshing bucket %s", bucket)
random_id = i2b(random.randint(bucket.min, bucket.max), length=20) # 20*8 bytes = 160 bits
await self.node.find_nodes(random_id)
async def refresh_table(self) -> None:
"""
Refresh all buckets that have not been changed in 15 minutes.
"""
if all(peer.status == PeerStatus.BAD for peer in self.peers):
logger.warning("All peers in routing table have gone bad; bootstrapping again")
await self.node.bootstrap()
await asyncio.gather(*[self.refresh_bucket(bucket)
for bucket in self.buckets
if not bucket.fresh],
return_exceptions=True)
def update_peer_status(self, contact: Contact, status: PeerStatus) -> None:
"""
Update the peer status for a peer given its contact information.
"""
with suppress(KeyError):
peer = self.contacts[contact]
peer.status = status
logger.debug("Updated status for %s to %s", contact, status.name)
self.find_bucket(peer.id).fill()