Serveur P2P simple en python



Un réseau peer-to-peer ou simplement un réseau P2P est un réseau dans lequel tous les utilisateurs sont égaux et ont des droits égaux . Une caractéristique distinctive de ces réseaux par rapport aux réseaux ordinaires est qu'ils ne disposent pas d'un seul serveur auquel les utilisateurs se connectent, mais se connectent les uns aux autres . Il existe des variantes hybrides de ces réseaux, dans lesquelles il existe un serveur qui n'effectue qu'un travail de coordination.



Aujourd'hui je souhaite proposer une implémentation simple d'un serveur P2P pour un tel réseau en python.



Contexte



1- . . , python, . .



. , 2 , . , P2P .





.



import socket
import rsa
from threading import Thread
from time import sleep
import datetime


# P2P 
class P2P:
    def __init__(self, _port: int, _max_clients: int = 1):
        #   
        self.running = True
        #  
        self.port = _port
        #  - 
        self.max_clients = _max_clients
        #  
        self.clients_ip = ["" for i in range(self.max_clients)]
        #    
        self.incoming_requests = {}
        #  
        self.clients_logs = [Log for i in range(self.max_clients)]
        #  
        self.client_sockets = [socket.socket() for i in range(self.max_clients)]
        #  
        for i in self.client_sockets:
            i.settimeout(0.2)
        #     
        self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
        #     
        self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
        #   
        self.socket_busy = [False for i in range(self.max_clients)]
        #  
        self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
        #  
        self.server_socket = socket.socket()
        #  
        self.server_socket.settimeout(0.2)
        #  
        self.server_socket.bind(('localhost', _port))
        self.server_socket.listen(self.max_clients)
        self.log = Log("server.log")
        self.log.save_data("Server initialized")

    # server control

    #     
    def create_session(self, _address: str):
        self.log.save_data("Creating session with {}".format(_address))
        ind = self.__get_free_socket()
        if _address in self.blacklist:
            self.log.save_data("{} in blacklist".format(_address))
            return
        if ind is None:
            self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
            return
        try:
            self.__add_user(_address)
            thread = Thread(target=self.__connect, args=(_address, 1))
            thread.start()
            thread.join(0)
            connection, address = self.server_socket.accept()
            connection.settimeout(0.2)
        except OSError:
            self.log.save_data("Failed to create session with {}".format(_address))
            self.__del_user(_address)
            return
        my_key = rsa.newkeys(512)
        self.raw_send(_address, my_key[0].save_pkcs1())
        key = connection.recv(162).decode()
        self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
        key = rsa.PublicKey.load_pkcs1(key)
        self.__add_keys(_address, key, my_key[1])
        while self.running and self.socket_busy[ind]:
            try:
                data = connection.recv(2048)
            except socket.timeout:
                continue
            except OSError:
                self.close_connection(_address)
                return
            if data:
                data = rsa.decrypt(data, self.my_keys[ind])
                self.__add_request(_address, data)
        try:
            self.close_connection(_address)
        except TypeError or KeyError:
            pass

    #   
    def __connect(self, _address: str, *args):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].connect((_address, self.port))
            self.socket_busy[ind] = True
            return True
        except OSError:
            return False

    #  
    def __reload_socket(self, _ind: int):
        self.client_sockets[_ind].close()
        self.client_sockets[_ind] = socket.socket()
        self.socket_busy[_ind] = False

    #  
    def close_connection(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.__del_key(_address)
        self.__reload_socket(ind)
        self.__del_user(_address)

    #  
    def kill_server(self):
        self.running = False
        sleep(1)
        self.server_socket.close()
        self.log.kill_log()
        for i in self.client_sockets:
            i.close()
        for i in self.clients_logs:
            try:
                i.kill_log()
            except TypeError:
                pass

    #    
    def send(self, _address: str, _message: str):
        ind = self.__get_ind_by_address(_address)
        try:
            self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
            self.client_sockets[ind].send(rsa.encrypt(_message.encode(), self.keys[ind]))
            self.log.save_data("Send message to {}".format(_address))
        except OSError:
            self.log.save_data("Can`t send message to {}".format(_address))

    #    
    def raw_send(self, _address: str, _message: bytes):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].send(_message)
            self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
            self.log.save_data("Raw send message to {}".format(_address))
        except OSError:
            self.log.save_data("Raw send to {} Failed".format(_address))
    # add

    #  
    def __add_user(self, _address: str):
        ind = self.__get_free_socket()
        self.clients_logs[ind] = Log("{}.log".format(_address))
        self.clients_ip[ind] = _address
        self.incoming_requests[_address] = []
        self.log.save_data("Added user {}".format(_address))

    #       
    def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
        ind = self.__get_ind_by_address(_address)
        try:
            self.keys[ind] = _key
            self.my_keys[ind] = _my_key
        except TypeError:
            return

    #     
    def __add_request(self, _address: str, _message: bytes):
        self.incoming_requests[_address].append(_message.decode())
        self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
        self.log.save_data("Get incoming message from {}".format(_address))

    # get

    #     
    # if self.__get_free_socket() is not None: *
    def __get_free_socket(self):
        for i in range(len(self.socket_busy)):
            if not self.socket_busy[i]:
                return i
        return None

    #   ,    
    def __get_ind_by_address(self, _address: str):
        for i in range(len(self.clients_ip)):
            if self.clients_ip[i] == _address:
                return i
        else:
            return None

    #     
    def get_request(self, _address: str):
        data = self.incoming_requests[_address][0]
        self.incoming_requests[_address] = [self.incoming_requests[_address][i]
                                            for i in range(1, len(self.incoming_requests[_address]))]
        return data

    # check

    #      
    # if self.check_request(_address): *
    def check_request(self, _address: str):
        return bool(self.incoming_requests.get(_address))

    # return True if you already connected to _address else False
    def check_address(self, _address: str):
        return True if _address in self.clients_ip else False

    # del

    #  
    def __del_user(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.clients_logs[ind].kill_log()
        self.clients_logs[ind] = Log
        self.clients_ip[ind] = ""
        self.incoming_requests.pop(_address)
        self.log.save_data("Deleted user {}".format(_address))

    #  
    def __del_key(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.keys[ind] = rsa.key.PublicKey
        self.my_keys[ind] = rsa.key.PrivateKey

    # others

    #    
    def __len__(self):
        num = 0
        for i in self.clients_ip:
            if i != "":
                num += 1
        return num

    #        
    def __bool__(self):
        for i in self.clients_ip:
            if i != "":
                return True
        return False


class Log:
    def __init__(self, _name: str):
        self.name = _name
        try:
            self.file = open(_name, "a")
        except FileNotFoundError:
            self.file = open(_name, "w")
        self.save_data("Log started at " + str(datetime.datetime.now()))
        self.file.close()

    #    
    def save_data(self, _data: str):
        self.file = open(self.name, "a")
        self.file.write("{}\n".format(_data))
        self.file.close()

    #       
    @staticmethod
    def read_and_return_list(_name: str):
        try:
            file = open(_name, "r")
        except FileNotFoundError:
            return []
        data = file.read()
        return data.split("\n")

    #  
    def kill_log(self):
        self.file = open(self.name, "a")
        self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
        self.file.close()


. , :



  • add
  • del
  • check
  • get
  • server control




init
    def __init__(self, _port: int, _max_clients: int = 1):
        #   
        self.running = True
        #  
        self.port = _port
        #  - 
        self.max_clients = _max_clients
        #  
        self.clients_ip = ["" for i in range(self.max_clients)]
        #    
        self.incoming_requests = {}
        #  
        self.clients_logs = [Log for i in range(self.max_clients)]
        #  
        self.client_sockets = [socket.socket() for i in range(self.max_clients)]
        #  
        for i in self.client_sockets:
            i.settimeout(0.2)
        #     
        self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
        #     
        self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
        #   
        self.socket_busy = [False for i in range(self.max_clients)]
        #  
        self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
        #  
        self.server_socket = socket.socket()
        #  
        self.server_socket.settimeout(0.2)
        #  
        self.server_socket.bind(('localhost', _port))
        self.server_socket.listen(self.max_clients)
        self.log = Log("server.log")
        self.log.save_data("Server initialized")


, - , 1. :



  • -


:



  • Ip


, "127.0.0.1" " " ( localhost ), , . - listen().



add



. .



add_user , .



add_user
    def __add_user(self, _address: str):
        ind = self.__get_free_socket()
        self.clients_logs[ind] = Log("{}.log".format(_address))
        self.clients_ip[ind] = _address
        self.incoming_requests[_address] = []
        self.log.save_data("Added user {}".format(_address))


add_keys .



add_keys
    def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
        ind = self.__get_ind_by_address(_address)
        try:
            self.keys[ind] = _key
            self.my_keys[ind] = _my_key
        except TypeError:
            return


add_request .



add_request
    def __add_request(self, _address: str, _message: bytes):
        self.incoming_requests[_address].append(_message.decode())
        self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
        self.log.save_data("Get incoming message from {}".format(_address))


del



. , , .



del_user add_user. , , .



del_user
    def __del_user(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.clients_logs[ind].kill_log()
        self.clients_logs[ind] = Log
        self.clients_ip[ind] = ""
        self.incoming_requests.pop(_address)
        self.log.save_data("Deleted user {}".format(_address))


del_key .



del_key
    def __del_key(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.keys[ind] = rsa.key.PublicKey
        self.my_keys[ind] = rsa.key.PrivateKey


check



.



check_request True False .



check_request
    def check_request(self, _address: str):
        return bool(self.incoming_requests.get(_address))


check_address True, False, .



check_address
    def check_address(self, _address: str):
        return True if _address in self.clients_ip else False


get



get_free_socket , , .



get_free_socket
    def __get_free_socket(self):
        for i in range(len(self.socket_busy)):
            if not self.socket_busy[i]:
                return i
        return None


get_ind_by_address , , , .



get_ind_by_address
    def __get_ind_by_address(self, _address: str):
        for i in range(len(self.clients_ip)):
            if self.clients_ip[i] == _address:
                return i
        else:
            return None


get_request . , .



get_request
    def get_request(self, _address: str):
        data = self.incoming_requests[_address][0]
        self.incoming_requests[_address] = [self.incoming_requests[_address][i]
                                            for i in range(1, len(self.incoming_requests[_address]))]
        return data


server control



, .



— create_session — . , , , .



create_session
    def create_session(self, _address: str):
        self.log.save_data("Creating session with {}".format(_address))
        ind = self.__get_free_socket()
        if _address in self.blacklist:
            self.log.save_data("{} in blacklist".format(_address))
            return
        if ind is None:
            self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
            return
        try:
            self.__add_user(_address)
            thread = Thread(target=self.__connect, args=(_address, 1))
            thread.start()
            thread.join(0)
            connection, address = self.server_socket.accept()
            connection.settimeout(0.2)
        except OSError:
            self.log.save_data("Failed to create session with {}".format(_address))
            self.__del_user(_address)
            return
        my_key = rsa.newkeys(512)
        self.raw_send(_address, my_key[0].save_pkcs1())
        key = connection.recv(162).decode()
        self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
        key = rsa.PublicKey.load_pkcs1(key)
        self.__add_keys(_address, key, my_key[1])
        while self.running and self.socket_busy[ind]:
            try:
                data = connection.recv(2048)
            except socket.timeout:
                continue
            except OSError:
                self.close_connection(_address)
                return
            if data:
                data = rsa.decrypt(data, self.my_keys[ind])
                self.__add_request(_address, data)
        try:
            self.close_connection(_address)
        except TypeError or KeyError:
            pass


connect True False . .



connect
    def __connect(self, _address: str, *args):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].connect((_address, self.port))
            self.socket_busy[ind] = True
            return True
        except OSError:
            return False


close_connection .



close_connection
    def close_connection(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.__del_key(_address)
        self.__reload_socket(ind)
        self.__del_user(_address)


kill_server .



kill_server
    def kill_server(self):
        self.running = False
        sleep(1)
        self.server_socket.close()
        self.log.kill_log()
        for i in self.client_sockets:
            i.close()
        for i in self.clients_logs:
            try:
                i.kill_log()
            except TypeError:
                pass


reload_socket, , .



reload_socket
    def __reload_socket(self, _ind: int):
        self.client_sockets[_ind].close()
        self.client_sockets[_ind] = socket.socket()
        self.socket_busy[_ind] = False




bool True, - , False, .



bool
    def __bool__(self):
        for i in self.clients_ip:
            if i != "":
                return True
        return False


len .



len
    def __len__(self):
        num = 0
        for i in self.clients_ip:
            if i != "":
                num += 1
        return num




, . , . , .



, .



. , , . .



init
    def __init__(self, _name: str):
        self.name = _name
        try:
            self.file = open(_name, "a")
        except FileNotFoundError:
            self.file = open(_name, "w")
        self.save_data("Log started at " + str(datetime.datetime.now()))
        self.file.close()


save_data .



save_data
    def save_data(self, _data: str):
        self.file = open(self.name, "a")
        self.file.write("{}\n".format(_data))
        self.file.close()


La fonction statique read_and_return_list ne nécessite pas d'objet de classe pour être utilisée, mais pour son fonctionnement, elle nécessite le nom du fichier à partir duquel toutes les informations seront extraites et renvoyées sous forme de feuille.



read_and_return_list
    @staticmethod
    def read_and_return_list(_name: str):
        try:
            file = open(_name, "r")
        except FileNotFoundError:
            return []
        data = file.read()
        return data.split("\n")


Et la dernière fonction kill_log écrit l'heure d'arrêt du journal dans le fichier et ferme le fichier.



kill_log
    def kill_log(self):
        self.file = open(self.name, "a")
        self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
        self.file.close()


Conclusion



Ecrire un serveur peer-to-peer n'est pas difficile, mais d'un autre côté, il y a de la place pour bouger. Vous pouvez implémenter l'envoi de fichiers et d'images, en les téléchargeant en plusieurs parties à la fois à partir de plusieurs utilisateurs. Vous pouvez améliorer le journal ou ajouter un chiffrement pour envoyer des clés de chiffrement et de déchiffrement.



S'il y a des suggestions à ce sujet ou des options pour améliorer le code, je serai heureux de lire dans les commentaires.




All Articles