Source code for blockchain.client.miner

import os
import pickle
import hashlib
import logging
import requests
import jsonpickle

from datetime import timedelta
from multiprocessing import Process, Queue

from ..utils.constants import *
from import Data
from ..blockchain.block import Block
from ..client.server import start_server
from ..utils.errors import ChainNotValidError
from ..blockchain.blockchain import Blockchain
from ..utils.utils import split_url_string, create_proper_url_string, Job, encode_file_path_properly

logger = logging.getLogger(__name__)

[docs]class Miner(object): def __init__(self, path_to_chain: str, json_format: bool, port: int, difficulty: int, neighbours: list, force_new_chain: bool) -> None: """ Constructor for new ``Miner`` object. Args: path_to_chain (str): Path to chain for restore/ backup purposes. json_format (bool): Use JSON format for chain? Otherwise pickle is used. port (int): Port of neighbour. difficulty (int): Amount of trailing 0s for proof of work neighbours (list): List of known neighbours, e.g. ``["localhost:23456", "miner0815:6666"]`` force_new_chain (bool): Force miner to create a new chain instead of use the existing one. """"Create 'Miner' object ...") logger.debug(f"Arguments - path_to_chain: {path_to_chain}, json_format: {json_format}, port: {port}, difficulty: {difficulty}, neighbours: {neighbours}") logger.debug("Init parent Class.") super().__init__() logger.debug(f"Type checks: 'path_to_chain' ...") if not isinstance(path_to_chain, str): raise ValueError("'path_to_chain' has to be of type string!") logger.debug(f"Type checks: 'json_format' ...") if not isinstance(json_format, bool): raise ValueError("'json_format' has to be a boolean value!") logger.debug(f"Type checks: 'port' ...") if not (isinstance(port, int) and not isinstance(port, bool)) or port < 1 or port > 65535: raise ValueError("'port' is of wrong type or out of range!") logger.debug(f"Type checks: 'difficulty' ...") if not (isinstance(difficulty, int) and not isinstance(difficulty, bool)) or difficulty < 1: raise ValueError("'difficulty' is of wrong type or lower than 1!") logger.debug(f"Type checks: 'neighbours' ...") if not isinstance(neighbours, list): raise ValueError("'neighbours' has to be of type list!") for index, neighbour in enumerate(neighbours): if not isinstance(neighbour, str): raise ValueError("Elements of 'neighbours' has to be of type string!") try: split_url_string(neighbour) except: raise ValueError(f"Neighbour at index: {index} of 'neighbours' is not a valid 'ip:port' representation. (Maybe Port out of range or protocol is not in [http, https]?") logger.debug(f"Type checks done: all valid.") self._jobs = [] self._port = port self._queue = None self._neighbours = set() self._server_process = None self._difficulty = difficulty self._unprocessed_messages = set() self._blockchain = Blockchain(path_to_chain=encode_file_path_properly(path_to_chain), json_format=json_format, force_new_chain= force_new_chain) logger.debug(f"Check chain ...") # check if chain is valid if not self._is_chain_valid(): #TODO: test raise ChainNotValidError("Local chain is not valid!") logger.debug(f"Check chain: valid.") logger.debug(f"Create neighbours: ...") for neighbour in neighbours: if len(self.neighbours) < MAX_NEIGHBOURS: neighbour_internal = split_url_string(neighbour) self.neighbours.add(neighbour_internal)"Created 'Miner' object.") logger.debug(f"'Miner' object created.")
[docs] def start(self) -> None: """ Starts some background ``Job`` s for the Gossip Protocol, Chain syncing, Data syncing, communication thread as well as the server functionalities as process. Starts the blocking function ``mine()``. """"Configure and start 'Miner' background tasks ...") update_neighbour_job = ("Gossip Job", Job(interval=timedelta(seconds=GOSSIP_TIME_SECONDS), execute=self._update_neighbours)) logger.debug(f"Background thread configured: '{update_neighbour_job[0]}' - interval: {GOSSIP_TIME_SECONDS} seconds.") check_for_longest_chain_job = ("Sync Chain Job", Job(interval=timedelta(seconds=CHAIN_SYNC_TIME_SECONDS), execute=self._check_for_longest_chain)) logger.debug(f"Background thread configured: '{check_for_longest_chain_job[0]}' - interval: {CHAIN_SYNC_TIME_SECONDS} seconds.") fetch_unprocessed_data_job = ("Sync Unprocessed Data Job)", Job(interval=timedelta(seconds=UNPROCESSED_DATA_SYNC_TIME_SECONDS), execute=self._fetch_unprocessed_data)) logger.debug(f"Background thread configured: '{fetch_unprocessed_data_job[0]}' - interval: {UNPROCESSED_DATA_SYNC_TIME_SECONDS} seconds.") backup_local_chain_job = ("Backup Local Chain Job", Job(interval=timedelta(seconds=BACKUP_LOCAL_CHAIN_TIME_SECONDS), execute=self._backup_local_chain)) logger.debug(f"Background thread configured: '{backup_local_chain_job[0]}' - interval: {BACKUP_LOCAL_CHAIN_TIME_SECONDS} seconds.") communicate_job = ("Communication Job", Job(interval=timedelta(seconds=0), execute=self._communicate)) logger.debug(f"Background thread configured: '{communicate_job[0]}'.") self._queue = Queue() self._server_process = Process(target=start_server, args=[self.queue, self.port]) logger.debug(f"'Server Process' configured.") logger.debug("Start 'Miner' background threads ...") update_neighbour_job[1].start() logger.debug(f"'{update_neighbour_job[0]}' thread started.") check_for_longest_chain_job[1].start() logger.debug(f"'{check_for_longest_chain_job[0]}' thread started.") fetch_unprocessed_data_job[1].start() logger.debug(f"'{fetch_unprocessed_data_job[0]}' thread started.") backup_local_chain_job[1].start() logger.debug(f"'{backup_local_chain_job[0]}' thread started.") communicate_job[1].start() logger.debug(f"'{communicate_job[0]}' thread started.") self.server_process.start() logger.debug(f"'Server Process' started.")"All 'Miner' background tasks started.") logger.debug("Start mining ...") self._mine()
[docs] def stop(self) -> None: """ Function that gets called when Python was killed. Takes care to shutting down all threads/process and saves the chain to disc. """"Start shutting down routine.") for job_name, job in logger.debug(f"Shutting down '{job_name}' ...") job.stop() logger.debug(f"'{job_name}' Stopped.") logger.debug(f"Shutting down 'Server Process' ...") self.server_process.terminate() self.server_process.join() logger.debug(f"'Server Process' Stopped.") logger.debug(f"Saving local chain ...") self.blockchain.save_chain() logger.debug(f"Chain saved.")"Shutting down routine done.")
[docs] def _communicate(self) -> None: """ Periodical thread to communicate with server process. """ if not self._queue.empty(): message = self._queue.get_nowait() logger.debug(f"Processing message: '{message[0]}'' ...") if ADD_KEY == message[0]: logger.debug(f"Found handle for message with key: '{ADD_KEY}'") self._new_message(message[1]) elif SEND_CHAIN_KEY == message[0]: logger.debug(f"Found handle for message with key: '{SEND_CHAIN_KEY}'") message[1].send({ "chain": jsonpickle.encode(self.blockchain.chain.copy()), "length": len(self.blockchain.chain), }) elif SEND_NEIGHBOURS_KEY == message[0]: logger.debug(f"Found handle for message with key: '{SEND_NEIGHBOURS_KEY}'") message[1].send({ "neighbours": jsonpickle.encode(self.neighbours.copy()), "length": len(self.neighbours), }) elif SEND_DATA_KEY == message[0]: logger.debug(f"Found handle for message with key: '{SEND_DATA_KEY}'") message[1].send(jsonpickle.encode(self.unprocessed_data.copy())) else: logger.warning(f"Could not find handle for message: '{message[0]}'")
[docs] def _proof_of_work(self, last_proof: int, difficulty: int) -> int: """ Simple proof of work: Find a number ``p`` that when hashed with the previous ``block``’s solution a hash with ``difficulty`` trailing 0s is produced. Args: last_proof (int): Solution of the last blocks' proof of work difficulty (int): Amount of trailing 0s for a valid proof of work. Returns: int: Solution for this proof of work quiz. Raises: ValueError: Will be raised if ``difficulty`` is not a positive integer value. """ logger.debug(f"Do Proof of Work. - last_proof: {last_proof}, difficulty: {difficulty}.") if difficulty <= 0: raise ValueError("'difficulty' has to be a positive integer value.") proof = 0 while not self._is_proof_of_work_valid(last_proof, proof, difficulty): proof += 1 logger.debug(f"Found Proof of Work - last_proof: {last_proof}, difficulty: {difficulty}.")"Found a valid Proof of Work.") return proof
[docs] def _is_chain_valid(self, chain: list = None) -> bool: """ Checks if the given ``chain`` satisfies the following rules: 1. The first (genesis) block: - ``index`` = 0 - ``previous_hash`` = None - ``proof`` = None 2. each and every following block: - ``index``: step size 1 and monotonically increasing (1, 2, 3, 4, ...) - ``previous_hash``: SHA-256 of the string representation of the preceding block - ``proof``: has to be valid -> see: :meth:`~Miner.is_proof_of_work_valid` - ``timestamp``: higher than the timestamp of of preceding block Args: chain (list): Optional chain if ``None`` internal representation is used. Returns: bool: ``True`` if ``chain`` is valid, ``False`` otherwise. """ logger.debug(f"Check if chain is valid.") previous_block = None if chain == None: chain_to_check = self.blockchain.chain else: chain_to_check = chain for index, block in enumerate(chain_to_check): # rules for genesis block if index == 0: # correct genesis block? if block.index != 0 or block.previous_hash != None or block.proof != None: logger.debug(f"Genesis Block is not valid: -> What is wrong? index: {block.index != 0}, previous_hash: {block.previous_hash != None}, proof: {block.proof != None}.") # genesis block is not valid! => wrong chain return False # rules for any other block else: previous_hash = Miner._hash(previous_block) if block.index != index or block.previous_hash != previous_hash or not self._is_proof_of_work_valid(previous_block.proof, block.proof, self.difficulty) or previous_block.timestamp >= block.timestamp: logger.debug(f"Block with index: {block.index} ist not valid: -> What is wrong? index: {block.index != index}, previous_hash: {block.previous_hash != previous_hash}, PoW valid: {self._is_proof_of_work_valid(previous_block.proof, block.proof, self.difficulty)}, timestamp: {previous_block.timestamp >= block.timestamp}.") # block ist not valid! => wrong chain return False previous_block = block logger.debug(f"Chain is valid.") return True
[docs] def _new_message(self, message: str) -> None: """ Adds the new ``message`` to its local cache. Args: message (str): """ logger.debug(f"Create new unprocessed Data ... - message: '{message}' ...") data = Data(message) self.unprocessed_data.add(data) logger.debug(f"New unprocessed Data created. - message: '{data.message}', id: '{}'")"New message added. - message: '{data.message}', id: '{}'")
[docs] def _backup_local_chain(self) -> None: """ Periodical thread to backup the local chain to disc. """ logger.debug(f"Backup local chain ... - json_format: {self.blockchain.json_format}") hash_file_path = f"{os.path.splitext(self.blockchain.path_to_chain)[0]}.hash" def _do_backup(hash_file_path: str, encoded_chain_hash: str): """ Helper function that does the actual backup steps. Args: hash_file_path (str): String that describes the path to the hash file. encoded_chain_hash (str): Hash value of the actual local chain. """ if not os.path.isfile(hash_file_path): # if no hash file exists -> create one with actual hash value with open(hash_file_path, "w") as hash_file: logger.debug(f"No existing hash file. Write actual hash value.") hash_file.write(encoded_chain_hash)"Backed up chain -> Missing hash file (fixed)") else: # All as expected: check hash and backup if necessary with(open(hash_file_path, "r")) as chain_hash_file: saved_chain_hash = if saved_chain_hash != encoded_chain_hash: logger.debug(f"Hash of chain on disc differ from local chain hash.") logger.debug(f"Backup Chain.") self.blockchain.save_chain() logger.debug(f"Save new hash file.") with(open(hash_file_path, "w")) as chain_hash_file: chain_hash_file.write(encoded_chain_hash)"Backed up chain -> Chain saved.") else:"Backed up chain -> No backup needed.") if self.blockchain.json_format: logger.debug(f"Encode to JSON") encoded_chain = jsonpickle.encode(self.blockchain.chain) encoded_chain_hash = hashlib.sha256(encoded_chain.encode()).hexdigest() _do_backup(hash_file_path, encoded_chain_hash) else: logger.debug(f"Encode with pickle") encoded_chain = pickle.dumps(self.blockchain.chain) encoded_chain_hash = hashlib.sha256(encoded_chain).hexdigest() _do_backup(hash_file_path, encoded_chain_hash)
[docs] def _fetch_unprocessed_data(self) -> None: """ Periodical thread to get unprocessed data form neighbours. => Broadcasts unprocessed data around the network. """ logger.debug(f"Syncing unprocessed data ... - neighbours: '{self.neighbours}'") old_data = self.unprocessed_data # ask all neighbours for their data queues. for neighbour in self.neighbours.copy(): logger.debug(f"Fetch data of neighbour: '{neighbour}'") response = requests.get(create_proper_url_string(neighbour, DATA_ENDPOINT)) if response.status_code == HTTP_OK: logger.debug(f"Get data of neighbour: '{neighbour}'") data_queue = jsonpickle.decode(response.json()) self.unprocessed_data.update(data_queue) logger.debug(f"Data of neighbour: '{neighbour}' added.") else: logger.warning(f"Response of neighbour: '{neighbour}' has bad status_code: '{response.status_code}'") if old_data == self.unprocessed_data:"Synced unprocessed data -> No new data.") else:"Synced unprocessed data -> New data.") logger.debug(f"Syncing unprocessed data done.")
[docs] def _is_data_processed(self, data: Data) -> bool: """ Checks if ``data`` is already in local chain. Args: data (Data): ``Data`` object to check if it exists in the actual chain. Returns: bool: ``True`` if unprocessed. """ # TODO: speedup with batches: # in: list of Data objects to check # out: list of Data objects to mine logger.debug(f"Check if data is not processed ... - '{}', data.message: '{data.message}'") for block in self.blockchain.chain: if == data: logger.debug(f"Data is not processed: - '{}', data.message: '{data.message}'") return True logger.debug(f"Data is already processed: - '{}', data.message: '{data.message}'") return False
[docs] def _update_neighbours(self) -> None: """ Periodical thread to update neighbours if limit is not exceeded. """ logger.debug(f"Update neighbours ... - neighbours: '{self.neighbours}'") length_old_neighbours = len(self.neighbours) # TODO: Delete not accessible neighbours if len(self.neighbours) < MAX_NEIGHBOURS: logger.debug(f"Maximum amount of neighbours not exceeded. -> update ...") # ask all neighbours for their neighbours. for neighbour in self.neighbours.copy(): logger.debug(f"Fetch neighbours of neighbour: '{neighbour}'") response = requests.get(create_proper_url_string(neighbour, NEIGHBOURS_ENDPOINT)) if response.status_code == HTTP_OK: logger.debug(f"Get neighbours of neighbour: '{neighbour}'") new_neighbours = jsonpickle.decode(response.json()["neighbours"]) # TODO check response like in constructor... # Add unknown miner to 'neighbours', return when max amount of neighbours is reached for new_neighbour in new_neighbours: # Do not add own address self.neighbours.add(new_neighbour) if len(self.neighbours) >= MAX_NEIGHBOURS: logger.debug(f"Maximum amount of neighbours exceeded -> Stop syncing")"Updated neighbours -> New neighbours added.") return else: logger.warning(f"Response of neighbour: '{neighbour}' has bad status_code: '{response.status_code}'") if length_old_neighbours < len(self.neighbours):"Updated neighbours -> New neighbours added.") else:"Updated neighbours -> No new neighbours available.") logger.debug(f"Update neighbours done.")
[docs] def _check_for_longest_chain(self) -> None: """ Consensus Algorithm: Ask each ``neighbour`` for that ``neighbours``. Add all unknown miner to ``neighbours`` set until maximum amount of neighbours is reached. """ logger.debug(f"Syncing chain ... - neighbours: '{self.neighbours}'") new_chain = None old_chain = self.blockchain.chain # only longest chain is of interest max_length = len(self.blockchain.chain) for neighbour in self.neighbours.copy(): logger.debug(f"Fetch chain of neighbour: '{neighbour}'") response = requests.get(create_proper_url_string(neighbour, CHAIN_ENDPOINT)) if response.status_code == HTTP_OK: logger.debug(f"Get chain of neighbour: '{neighbour}'") chain = jsonpickle.decode(response.json()['chain']) length = len(chain) # chain longer and valid? if length > max_length and self._is_chain_valid(chain): logger.debug(f"New chain is longer. - neighbour: '{neighbour}', length of old chain: '{max_length}', length of chain: '{length}'") max_length = length new_chain = chain else: logger.warning(f"Response of neighbour: '{neighbour}' has bad status_code: '{response.status_code}'") # replace local chain with longest valid chain of all neighbours network if new_chain: self.blockchain.chain = new_chain logger.debug(f"Longer chain added.") if old_chain == self.blockchain.chain:"Synced chain -> Have already longest chain.") else:"Synced chain -> New (longer) chain added.") logger.debug(f"Syncing chain done.")
[docs] def _mine(self) -> None: """ Blocking Mining loop. If ``not_processed_messages`` are available it uses a random message an mines a new block. """"Start Mining ...") logger.debug(f"Start Mining ...") while True: if len(self.unprocessed_data) > 0: data = self.unprocessed_data.pop() logger.debug(f"There is local unprocessed data. - '{}', data.message: '{data.message}'") if not self._is_data_processed(data): logger.debug(f"Data is not processed -> mine new block. - '{}', data.message: '{data.message}'") last_block = self.blockchain.last_block last_proof = last_block.proof previous_hash = self._hash(last_block) # proof of work for new block proof = self._proof_of_work(last_proof, self.difficulty) block = self.blockchain.add_new_block(data=data, proof=proof, previous_hash=previous_hash) logger.debug(f"New Block mined. - block.index: {block.index}, block.proof: {block.proof}, block.previous_hash: {block.previous_hash}, block.timestamp: {block.timestamp}, {}, {}")"New block mined. - block.index: {block.index}, block.timestamp: {block.timestamp}")
[docs] @staticmethod def _hash(block: Block) -> str: """ Hash a ``Block`` object with SHA-256. Args: block (Block): Object of class ``Block`` to hash. Returns: str: Hex representation of ``block`` hash. Raises: ValueError: Will be raised if no ``Block`` object is passed. """ if not isinstance(block, Block): raise ValueError("Only `Block` objects are hashable!") logger.debug(f"Hashing block ... - block.index: {block.index}, block.proof: {block.proof}, block.previous_hash: {block.previous_hash}, block.timestamp: {block.timestamp}, {}, {}") hash_value = hashlib.sha256(bytes(block)).hexdigest() logger.debug(f"Hashing block done. - block hash: {hash_value}") return hash_value
[docs] @staticmethod def _is_proof_of_work_valid(last_proof: int, proof: int, difficulty: int) -> bool: """ Checks if the proof of work was correct. The hash value of ``last_proof`` concatenated with ``proof`` has to be ``difficulty`` trailing 0s. Args: last_proof (int): Value of the ``proof`` of the preceding block. proof (int): ``proof`` of the actual block. difficulty (int): Amount of trailing 0s. Returns: bool: ``True`` if proof of work is correct, ``False`` otherwise. Raises: ValueError: Will be raised if ``difficulty`` is not a positive integer value. """ if difficulty <= 0: raise ValueError("'difficulty' has to be a positive integer value.") guess = "{}{}".format(last_proof, proof).encode() guess_hash = hashlib.sha256(guess).hexdigest() # hash ends with `difficulty` trailing 0? return guess_hash[-difficulty:] == "0" * difficulty
@property def blockchain(self) -> Blockchain: return self._blockchain @property def difficulty(self) -> int: return self._difficulty @property def unprocessed_data(self) -> set: return self._unprocessed_messages @unprocessed_data.setter def unprocessed_data(self, unprocessed_messages: set) -> None: self._unprocessed_messages = unprocessed_messages @property def neighbours(self) -> set: return self._neighbours @property def port(self) -> int: return self._port @property def jobs(self) -> list: return self._jobs @property def server_process(self) -> Process: return self._server_process @property def queue(self) -> Queue: return self._queue