307 lines
10 KiB
Python
307 lines
10 KiB
Python
import logging
|
|
import unittest
|
|
from copy import deepcopy, copy
|
|
from statistics import median
|
|
|
|
import time
|
|
from freezegun import freeze_time
|
|
|
|
from aucoin import consensus
|
|
from aucoin import config
|
|
from aucoin.transactions import Input, Transaction, CoinbaseTransaction, Output
|
|
from aucoin.block import Block
|
|
from aucoin.blockchain import Blockchain, ExtendingBranchType
|
|
from aucoin.mempool import Mempool
|
|
from aucoin.validation import InvalidBlockException, Validator
|
|
from aucoin.wallet import Wallet
|
|
from tests.helpers import mine
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
|
|
class TestBlockchain(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
# set up blockchain
|
|
cls.blockchain = Blockchain(clear=True)
|
|
|
|
def setUp(self):
|
|
self.blockchain._reset()
|
|
|
|
# generate wallet
|
|
self.wallet = Wallet()
|
|
self.wallet.generate()
|
|
|
|
# TODO: Setup utxo
|
|
|
|
self.mempool = Mempool()
|
|
|
|
# valid transaction used for testing
|
|
self.tx = Transaction(
|
|
inputs=[
|
|
Input(
|
|
prev_tx_hash=b"tx_hash",
|
|
txout_index=2
|
|
)
|
|
],
|
|
outputs=[
|
|
Output(
|
|
value=40,
|
|
address=self.wallet.address
|
|
)
|
|
]
|
|
)
|
|
|
|
self.tx.inputs[0].signature = self.wallet.sign(self.tx.hash)
|
|
|
|
self.coinbase = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100,
|
|
block_height=1
|
|
)
|
|
|
|
self.block = mine(
|
|
Block(
|
|
hash_prev_block=self.blockchain.genesis_block.hash,
|
|
target=bytes.fromhex("00ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
|
|
transactions=[self.coinbase],
|
|
branch_type="main_branch"
|
|
)
|
|
)
|
|
|
|
self.validator = Validator(self.blockchain, self.blockchain.mempool)
|
|
|
|
self.blockchain.add(self.block)
|
|
|
|
def test_has_utxo_of_coinbase(self):
|
|
utxos = self.blockchain.utxos_of_addresses([self.wallet.public_key])
|
|
self.assertTrue(len(utxos) > 0)
|
|
|
|
def test_doesnt_get_non_existing_block(self):
|
|
block = self.blockchain.block(b"nonexistent")
|
|
self.assertEqual(block, None)
|
|
|
|
def test_get_block(self):
|
|
block = self.blockchain.block(self.block.hash)
|
|
|
|
self.assertEqual(block.hash, self.block.hash)
|
|
self.assertEqual(block.merkle_root_hash, self.block.merkle_root_hash)
|
|
self.assertEqual(block.nonce, self.block.nonce)
|
|
|
|
def test_get_header(self):
|
|
block = self.blockchain.header
|
|
self.assertEqual(block.hash, self.block.hash)
|
|
self.assertEqual(block.merkle_root_hash, self.block.merkle_root_hash)
|
|
self.assertEqual(block.nonce, self.block.nonce)
|
|
|
|
def test_get_utxo_of_address(self):
|
|
utxos = self.blockchain.utxos_of_address(self.wallet.public_key)
|
|
self.assertEqual(1, len(utxos))
|
|
|
|
@freeze_time("2010-01-01")
|
|
def test_calculates_median_of_timestamps_of_blocks(self):
|
|
timestamps = []
|
|
hash_prev_block = self.block.hash
|
|
# Create a lot of blocks to the blockchain to calculate median for.
|
|
for i in range(1, consensus.block_median_timestamp_nblocks +1):
|
|
coinbase = CoinbaseTransaction(self.wallet.public_key,
|
|
value=100,
|
|
block_height=self.coinbase.block_height + i)
|
|
block = Block(hash_prev_block=hash_prev_block,
|
|
target=self.block.target,
|
|
transactions=[coinbase])
|
|
block.timestamp += i*111
|
|
block.total_work = self.coinbase.block_height + i*10
|
|
hash_prev_block = block.hash
|
|
self.blockchain.add(block)
|
|
timestamps.append(block.timestamp)
|
|
|
|
# Add a block to a side-branch to ensure that we only count blocks on the main-branch
|
|
coinbase = CoinbaseTransaction(self.wallet.public_key,
|
|
value=100,
|
|
block_height=2)
|
|
side_branch_block = Block(hash_prev_block=self.block.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase])
|
|
side_branch_block.total_work = 1
|
|
side_branch_block.timestamp += 1000
|
|
|
|
self.blockchain.add(side_branch_block)
|
|
true_med = median(timestamps)
|
|
med = self.blockchain.get_median_timestamp(self.blockchain.header)
|
|
self.assertEqual(true_med, med)
|
|
|
|
def test_doesnt_add_invalid_block_to_blockchain(self):
|
|
coinbase = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100,
|
|
block_height=2
|
|
)
|
|
block = Block(
|
|
hash_prev_block=self.block.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase]
|
|
)
|
|
self.assertRaises(InvalidBlockException, self.blockchain.add, block, self.validator)
|
|
|
|
@unittest.skip
|
|
def test_adds_block_to_blockchain(self):
|
|
coinbase = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100,
|
|
block_height=2
|
|
)
|
|
new_block = mine(
|
|
Block(
|
|
hash_prev_block=self.block.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase],
|
|
)
|
|
)
|
|
self.assertTrue(self.blockchain.add(new_block, self.validator))
|
|
|
|
def test_block_is_extending_main_branch(self):
|
|
self.assertEqual(
|
|
self.block.branch_type,
|
|
ExtendingBranchType.EXTENDS_MAIN_BRANCH.value)
|
|
|
|
def test_block_is_extending_side_branch_becoming_new_main_branch(self):
|
|
coinbase = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100
|
|
)
|
|
block = mine(
|
|
Block(
|
|
hash_prev_block=self.blockchain.genesis_block.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase],
|
|
timestamp=int(time.time() + 100)
|
|
)
|
|
)
|
|
coinbase2 = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100
|
|
)
|
|
block2 = mine(
|
|
Block(
|
|
hash_prev_block=block.hash,
|
|
target=block.target,
|
|
transactions=[coinbase2],
|
|
timestamp=int(time.time() + 110)
|
|
)
|
|
)
|
|
self.blockchain.add(block)
|
|
self.blockchain.add(block2)
|
|
|
|
self.assertEqual(
|
|
block2.branch_type,
|
|
ExtendingBranchType.EXTENDS_SIDE_BRANCH_NEW_MAIN_BRANCH.value
|
|
)
|
|
|
|
def test_block_is_extending_side_branch(self):
|
|
coinbase = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100,
|
|
block_height=2
|
|
)
|
|
side_block = mine(
|
|
Block(
|
|
hash_prev_block=self.blockchain.genesis_block.hash,
|
|
target=self.block.target,
|
|
transactions=[self.coinbase],
|
|
timestamp=int(time.time() + 100)
|
|
)
|
|
)
|
|
main_block = mine(
|
|
Block(
|
|
hash_prev_block=self.block.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase],
|
|
timestamp=int(time.time() + 100)
|
|
)
|
|
)
|
|
|
|
self.blockchain.add(side_block)
|
|
self.blockchain.add(main_block)
|
|
self.assertEqual(
|
|
side_block.branch_type,
|
|
ExtendingBranchType.EXTENDS_SIDE_BRANCH.value
|
|
)
|
|
|
|
def test_reorganize_main_branch(self):
|
|
coinbase1a = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100,
|
|
block_height=2
|
|
)
|
|
coinbase2a = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100,
|
|
block_height=3
|
|
)
|
|
coinbase1b = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100,
|
|
block_height=2
|
|
)
|
|
coinbase2b = CoinbaseTransaction(
|
|
self.wallet.public_key,
|
|
value=100,
|
|
block_height=3
|
|
)
|
|
block1a = Block(
|
|
hash_prev_block=self.block.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase1a],
|
|
timestamp=int(time.time() + 100)
|
|
)
|
|
block2a = Block(
|
|
hash_prev_block=block1a.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase2a],
|
|
timestamp=int(time.time() + 104)
|
|
)
|
|
block1b = Block(
|
|
hash_prev_block=self.blockchain.genesis_block.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase1b],
|
|
timestamp=int(time.time() + 101)
|
|
)
|
|
block2b = Block(
|
|
hash_prev_block=block1b.hash,
|
|
target=self.block.target,
|
|
transactions=[coinbase2b],
|
|
timestamp=int(time.time() + 102)
|
|
)
|
|
|
|
self.blockchain.add(block1a)
|
|
self.blockchain.add(block2a)
|
|
self.blockchain.add(block1b)
|
|
self.blockchain.add(block2b)
|
|
|
|
changed = self.blockchain.reorganize_main_branch(block2b)
|
|
|
|
self.assertTrue(changed)
|
|
block1a = self.blockchain.block(block1a.hash)
|
|
block1b = self.blockchain.block(block1b.hash)
|
|
block2a = self.blockchain.block(block2a.hash)
|
|
block2b = self.blockchain.block(block2b.hash)
|
|
|
|
self.assertEqual(block1a.branch_type, ExtendingBranchType.EXTENDS_SIDE_BRANCH.value)
|
|
self.assertEqual(block1a.transactions[0].branch_type, ExtendingBranchType.EXTENDS_SIDE_BRANCH.value)
|
|
self.assertEqual(block2a.branch_type, ExtendingBranchType.EXTENDS_SIDE_BRANCH.value)
|
|
self.assertEqual(block2a.transactions[0].branch_type, ExtendingBranchType.EXTENDS_SIDE_BRANCH.value)
|
|
self.assertEqual(block1b.branch_type, ExtendingBranchType.EXTENDS_MAIN_BRANCH.value)
|
|
self.assertEqual(block1b.transactions[0].branch_type, ExtendingBranchType.EXTENDS_MAIN_BRANCH.value)
|
|
self.assertEqual(block2b.branch_type, ExtendingBranchType.EXTENDS_MAIN_BRANCH.value)
|
|
self.assertEqual(block2b.transactions[0].branch_type, ExtendingBranchType.EXTENDS_MAIN_BRANCH.value)
|
|
|
|
def test_get_utxo_of_tx_hash_at_index(self):
|
|
utxo = self.blockchain.get_utxo_of_tx_hash_at_index(self.block.transactions[0].hash, 0)
|
|
self.assertTrue(utxo.unspent)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|