Parcourir la source

完成了raft算法部分,接下来做前端

Shellmiao il y a 4 ans
Parent
commit
3109f14d68
9 fichiers modifiés avec 524 ajouts et 2 suppressions
  1. 6 0
      .idea/vcs.xml
  2. 55 0
      logger.py
  3. 42 2
      main.py
  4. 38 0
      network_api.py
  5. 312 0
      node.py
  6. 14 0
      test.py
  7. 19 0
      test1.py
  8. 19 0
      test2.py
  9. 19 0
      test3.py

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 55 - 0
logger.py

@@ -0,0 +1,55 @@
+import os
+import json
+
+
+class Log(object):
+    def __init__(self, filename):
+        self.filename = filename
+        if os.path.exists(self.filename):
+            with open(self.filename, "r") as f:
+                self.entries = json.load(f)
+        else:
+            self.entries = []
+            self.save()
+
+    @property
+    def last_log_index(self):
+        return len(self.entries) - 1
+
+    @property
+    def last_log_term(self):
+        return self.get_log_term(self.last_log_index)
+
+    def get_log_term(self, log_index):
+        """
+        leader do
+        follower
+        """
+        if log_index >= len(self.entries):
+            return -1
+        elif log_index < 0:
+            return -1
+        else:
+            return self.entries[log_index]["term"]
+
+    def get_entries(self, next_index):
+        """
+        leader do
+        """
+        # print('get_entries')
+        return self.entries[max(0, next_index):]
+
+    def delete_entries(self, prev_log_index):
+        # print('delete_entries')
+
+        self.entries = self.entries[: max(0, prev_log_index)]
+        self.save()
+
+    def append_entries(self, prev_log_index, entries):
+        # print('append_entries')
+        self.entries = self.entries[: max(0, prev_log_index + 1)] + entries
+        self.save()
+
+    def save(self):
+        with open(self.filename, "w") as f:
+            json.dump(self.entries, f, indent=4)

+ 42 - 2
main.py

@@ -1,5 +1,45 @@
+from node import Node
+from network_api import NetworkAPI
+from multiprocessing import Process
+import random
+import time
 
 
-if __name__ == '__main__':
-    print('hello')
+def create_node(meta):
+    node = Node(meta)
+    node.run()
 
+
+if __name__ == "__main__":
+    num = input("[-]请输入节点数量:")
+    ip = "localhost"
+    base_port = 10000
+    client_port = 9999
+    nodes_meta = []
+    network_api = NetworkAPI((ip, client_port))
+    for i in range(int(num)):
+        node_meta = {
+            "id": str(i),
+            "addr": ("localhost", base_port + i),
+            "peers": {str(j): ("localhost", base_port + j) for j in range(int(num)) if j != i},
+        }
+        nodes_meta.append(node_meta)
+        # create_node(node_meta)
+        p = Process(target=create_node, args=(node_meta,), daemon=True)
+        p.start()
+    address = random.choice(nodes_meta)["addr"]
+    # while True:
+    #     print('233ok')
+    #     time.sleep(30)
+    time.sleep(10)
+    while True:
+        message = input("[-]请输入要记录的日志:")
+        # message = '233ok'
+        data = {
+            "type": "client_append_entries",
+            "timestamp": int(time.time()),
+            "message": message,
+        }
+        print("send: ", data)
+        network_api.send(data, address)
+        # time.sleep(5)

+ 38 - 0
network_api.py

@@ -0,0 +1,38 @@
+import json
+import socket
+
+
+class NetworkAPI(object):
+    def __init__(self, addr=None, timeout=None):
+        self.addr = tuple(addr)
+        self.ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        if addr:
+            self.bind(tuple(addr))
+        if timeout:
+            self.ss.settimeout(timeout)
+
+    def bind(self, addr):
+        self.addr = tuple(addr)
+        self.ss.bind(addr)
+
+    def set_timeout(self, timeout):
+        self.ss.settimeout(timeout)
+
+    def send(self, data, addr):
+        data = json.dumps(data).encode("utf-8")
+        self.ss.sendto(data, tuple(addr))
+
+    def receive(self, addr=None, timeout=None):
+        if addr:
+            self.bind(addr)
+        if not self.addr:
+            raise "please bind to an address"
+
+        if timeout:
+            self.set_timeout(timeout)
+
+        data, addr = self.ss.recvfrom(65535)
+        return json.loads(data), addr
+
+    def close(self):
+        self.ss.close()

+ 312 - 0
node.py

@@ -0,0 +1,312 @@
+import os
+import json
+import time
+import random
+
+from network_api import NetworkAPI
+from logger import Log
+
+
+class Node(object):
+    def __init__(self, meta):
+        self.role = 'follower'
+        self.id = meta['id']
+        self.addr = meta['addr']
+        self.peers = meta['peers']
+        self.path = "data/node/"
+        if not os.path.exists(self.path):
+            os.makedirs(self.path)
+        # 选举状态
+        self.current_term = 0
+        self.voted_for = None
+        # 初始化选举情况
+        self.load()
+        # 初始化Logger
+        log_name = self.path + '_' + self.id + "_log.json"
+        self.log = Log(log_name)
+
+        # volatile state
+        # rule 1, 2
+        self.commit_index = 0
+        self.last_applied = 0
+
+        # volatile state on leaders
+        # rule 1, 2
+        self.next_index = {_id: self.log.last_log_index + 1 for _id in self.peers}
+        self.match_index = {_id: -1 for _id in self.peers}
+
+        self.leader_id = None
+        # 其他所有节点对于此节点的投票数
+        self.vote_ids = {_id: 0 for _id in self.peers}
+
+        self.client_addr = None
+
+        self.wait_ms = (10, 20)
+        self.next_leader_election_time = time.time() + random.randint(*self.wait_ms)
+        self.next_heartbeat_time = 0
+
+        # rpc
+        self.network_api = NetworkAPI(self.addr, timeout=2)
+
+    def load(self):
+        """
+        从文件中读取目前选举情况,如果文件不存在,则初始化
+        :return: null
+        """
+        filename = self.path + "_" + self.id + '_persistent.json'
+        if os.path.exists(filename):
+            with open(filename, 'r') as f:
+                data = json.load(f)
+
+            self.current_term = data['current_term']
+            self.voted_for = data['voted_for']
+        else:
+            self.save()
+
+    def save(self):
+        """
+        将选举情况保存到json文件中
+        :return: null
+        """
+        data = {'current_term': self.current_term,
+                'voted_for': self.voted_for,
+                }
+        filename = self.path + "_" + self.id + '_persistent.json'
+        with open(filename, 'w') as f:
+            json.dump(data, f, indent=4)
+
+    def redirect(self, data, addr):
+        """
+        按照data的类别以及此时该node的类别将address重定向到应有路径
+        :param data:
+        :param addr:
+        :return:
+        """
+        if data is None:
+            return None
+
+        if data['type'] == 'client_append_entries':
+            if self.role != 'leader':
+                if self.leader_id:
+                    # print('Hey,leader! Here is a message!')
+                    self.network_api.send(data, self.peers[self.leader_id])
+                return None
+            else:
+                return data
+
+        if data['dst_id'] != self.id:
+            self.network_api.send(data, self.peers[data['dst_id']])
+            return None
+        else:
+            return data
+
+    def append_entries(self, data):
+        response = {'type': 'append_entries_response',
+                    'src_id': self.id,
+                    'dst_id': data['src_id'],
+                    'term': self.current_term,
+                    'success': False
+                    }
+        if data['term'] < self.current_term:
+            response['success'] = False
+            self.network_api.send(response, self.peers[data['src_id']])
+            return
+
+        self.leader_id = data['leader_id']
+
+        # heartbeat
+        if not data['entries']:
+            return
+
+        prev_log_index = data['prev_log_index']
+        prev_log_term = data['prev_log_term']
+
+        tmp_prev_log_term = self.log.get_log_term(prev_log_index)
+        if tmp_prev_log_term != prev_log_term:
+            response['success'] = False
+            self.network_api.send(response, self.peers[data['src_id']])
+            self.log.delete_entries(prev_log_index)
+        else:
+            response['success'] = True
+            self.network_api.send(response, self.peers[data['src_id']])
+            self.log.append_entries(prev_log_index, data['entries'])
+            leader_commit = data['leader_commit']
+            if leader_commit > self.commit_index:
+                commit_index = min(leader_commit, self.log.last_log_index)
+                self.commit_index = commit_index
+        return
+
+    def request_vote(self, data):
+        response = {'type': 'request_vote_response',
+                    'src_id': self.id,
+                    'dst_id': data['src_id'],
+                    'term': self.current_term,
+                    'vote_granted': False
+                    }
+        if data['term'] < self.current_term:
+            response['vote_granted'] = False
+            self.network_api.send(response, self.peers[data['src_id']])
+            return
+
+        candidate_id = data['candidate_id']
+        last_log_index = data['last_log_index']
+        last_log_term = data['last_log_term']
+
+        if self.voted_for is None or self.voted_for == candidate_id:
+            if last_log_index >= self.log.last_log_index and last_log_term >= self.log.last_log_term:
+                self.voted_for = data['src_id']
+                self.save()
+                response['vote_granted'] = True
+                self.network_api.send(response, self.peers[data['src_id']])
+            else:
+                self.voted_for = None
+                self.save()
+                response['vote_granted'] = False
+                self.network_api.send(response, self.peers[data['src_id']])
+        else:
+            response['vote_granted'] = False
+            self.network_api.send(response, self.peers[data['src_id']])
+        return
+
+    def all_do(self, data):
+        t = time.time()
+        if self.commit_index > self.last_applied:
+            self.last_applied = self.commit_index
+        if data is None:
+            return
+
+        if data['type'] == 'client_append_entries':
+            return
+
+        if data['term'] > self.current_term:
+            self.next_leader_election_time = t + random.randint(*self.wait_ms)
+            self.role = 'follower'
+            self.current_term = data['term']
+            self.voted_for = None
+            self.save()
+        return
+
+    def follower_do(self, data):
+        t = time.time()
+        if data is not None:
+            if data['type'] == 'append_entries':
+                if data['term'] == self.current_term:
+                    self.next_leader_election_time = t + random.randint(*self.wait_ms)
+                self.append_entries(data)
+            elif data['type'] == 'request_vote':
+                self.request_vote(data)
+        if t > self.next_leader_election_time:
+            self.next_leader_election_time = t + random.randint(*self.wait_ms)
+            self.role = 'candidate'
+            self.current_term += 1
+            self.voted_for = self.id
+            self.save()
+            self.vote_ids = {_id: 0 for _id in self.peers}
+        return
+
+    def candidate_do(self, data):
+        t = time.time()
+        for dst_id in self.peers:
+            if self.vote_ids[dst_id] == 0:
+                request = {
+                    'type': 'request_vote',
+                    'src_id': self.id,
+                    'dst_id': dst_id,
+                    'term': self.current_term,
+                    'candidate_id': self.id,
+                    'last_log_index': self.log.last_log_index,
+                    'last_log_term': self.log.last_log_term
+                }
+                self.network_api.send(request, self.peers[dst_id])
+        if data is not None and data['term'] == self.current_term:
+            if data['type'] == 'request_vote_response':
+                self.vote_ids[data['src_id']] = data['vote_granted']
+                vote_count = sum(list(self.vote_ids.values()))
+                if vote_count >= len(self.peers) // 2:
+                    self.role = 'leader'
+                    self.voted_for = None
+                    self.save()
+                    self.next_heartbeat_time = 0
+                    self.next_index = {_id: self.log.last_log_index + 1 for _id in self.peers}
+                    self.match_index = {_id: 0 for _id in self.peers}
+                    return
+            elif data['type'] == 'append_entries':
+                self.next_leader_election_time = t + random.randint(*self.wait_ms)
+                self.role = 'follower'
+                self.voted_for = None
+                self.save()
+                return
+        if t > self.next_leader_election_time:
+            self.next_leader_election_time = t + random.randint(*self.wait_ms)
+            self.role = 'candidate'
+            self.current_term += 1
+            self.voted_for = self.id
+            self.save()
+            self.vote_ids = {_id: 0 for _id in self.peers}
+            return
+
+    def leader_do(self, data):
+        t = time.time()
+        # print('Here is leader!')
+        if t > self.next_heartbeat_time:
+            self.next_heartbeat_time = t + random.randint(0, 5)
+            for dst_id in self.peers:
+                request = {'type': 'append_entries',
+                           'src_id': self.id,
+                           'dst_id': dst_id,
+                           'term': self.current_term,
+                           'leader_id': self.id,
+                           'prev_log_index': self.next_index[dst_id] - 1,
+                           'prev_log_term': self.log.get_log_term(self.next_index[dst_id] - 1),
+                           'entries': self.log.get_entries(self.next_index[dst_id]),
+                           'leader_commit': self.commit_index
+                           }
+                self.network_api.send(request, self.peers[dst_id])
+        if data is not None and data['type'] == 'client_append_entries':
+            # print('Leader:Here is a message!')
+            data['term'] = self.current_term
+            self.log.append_entries(self.log.last_log_index, [data])
+            return
+        if data is not None and data['term'] == self.current_term:
+            if data['type'] == 'append_entries_response':
+                if not data['success']:
+                    self.next_index[data['src_id']] -= 1
+                else:
+                    self.match_index[data['src_id']] = self.next_index[data['src_id']]
+                    self.next_index[data['src_id']] = self.log.last_log_index + 1
+        flag = False
+        N = self.commit_index + 1
+        count = 0
+        for _id in self.match_index:
+            if self.match_index[_id] >= N:
+                count += 1
+            if count >= len(self.peers) // 2:
+                self.commit_index = N
+                if self.client_addr:
+                    response = {'index': self.commit_index}
+                    self.network_api.send(response, self.client_addr)
+                flag = True
+                break
+
+    def run(self):
+        while True:
+            # try:
+            try:
+                data, addr = self.network_api.receive()
+            except Exception as e:
+                data, addr = None, None
+            data = self.redirect(data, addr)
+            # print(data)
+            self.all_do(data)
+
+            if self.role == 'follower':
+                self.follower_do(data)
+
+            if self.role == 'candidate':
+                self.candidate_do(data)
+
+            if self.role == 'leader':
+                self.leader_do(data)
+
+            # except Exception as e:
+            #     print(e)

+ 14 - 0
test.py

@@ -0,0 +1,14 @@
+from network_api import NetworkAPI
+import time
+
+if __name__ == "__main__":
+    network_api = NetworkAPI(('localhost', 9998))
+    message = '233ok'
+    address = ('localhost', 10000)
+    data = {
+        "type": "client_append_entries",
+        "timestamp": int(time.time()),
+        "message": message,
+    }
+    print("send: ", data)
+    network_api.send(data, address)

+ 19 - 0
test1.py

@@ -0,0 +1,19 @@
+from node import Node
+
+
+def create_node(meta):
+    node = Node(meta)
+    node.run()
+
+
+if __name__ == "__main__":
+    ip = "localhost"
+    node_meta = {
+        "id": '0',
+        "addr": ("localhost", 10000),
+        "peers": {
+            '1': ("localhost", 10001),
+            '2': ("localhost", 10002),
+        },
+    }
+    create_node(node_meta)

+ 19 - 0
test2.py

@@ -0,0 +1,19 @@
+from node import Node
+
+
+def create_node(meta):
+    node = Node(meta)
+    node.run()
+
+
+if __name__ == "__main__":
+    ip = "localhost"
+    node_meta = {
+        "id": '1',
+        "addr": ("localhost", 10001),
+        "peers": {
+            '0': ("localhost", 10000),
+            '2': ("localhost", 10002),
+        },
+    }
+    create_node(node_meta)

+ 19 - 0
test3.py

@@ -0,0 +1,19 @@
+from node import Node
+
+
+def create_node(meta):
+    node = Node(meta)
+    node.run()
+
+
+if __name__ == "__main__":
+    ip = "localhost"
+    node_meta = {
+        "id": '2',
+        "addr": ("localhost", 10002),
+        "peers": {
+            '0': ("localhost", 10000),
+            '1': ("localhost", 10001),
+        },
+    }
+    create_node(node_meta)