import os import json import time import random from network_api import NetworkAPI from logger import Log class Node(object): def __init__(self, meta): self.role = 'follower' self.id = meta['id'] self.addr = meta['addr'] self.peers = meta['peers'] self.path = "data/raft_front_end/public/log/" if not os.path.exists(self.path): os.makedirs(self.path) # 选举状态 self.current_term = 0 self.voted_for = None # 初始化选举情况 self.load() # 初始化Logger log_name = self.path + '_' + self.id + "_log.json" self.log = Log(log_name) # volatile state # rule 1, 2 self.commit_index = 0 self.last_applied = 0 # volatile state on leaders # rule 1, 2 self.next_index = {_id: self.log.last_log_index + 1 for _id in self.peers} self.match_index = {_id: -1 for _id in self.peers} self.leader_id = None # 其他所有节点对于此节点的投票数 self.vote_ids = {_id: 0 for _id in self.peers} self.client_addr = None self.wait_ms = (10, 20) self.next_leader_election_time = time.time() + random.randint(*self.wait_ms) self.next_heartbeat_time = 0 # rpc self.network_api = NetworkAPI(self.addr, timeout=2) def load(self): """ 从文件中读取目前选举情况,如果文件不存在,则初始化 :return: null """ filename = self.path + "_" + self.id + '_persistent.json' if os.path.exists(filename): with open(filename, 'r') as f: data = json.load(f) self.current_term = data['current_term'] self.voted_for = data['voted_for'] else: self.save() def save(self): """ 将选举情况保存到json文件中 :return: null """ data = {'current_term': self.current_term, 'voted_for': self.voted_for, } filename = self.path + "_" + self.id + '_persistent.json' with open(filename, 'w') as f: json.dump(data, f, indent=4) def redirect(self, data, addr): """ 按照data的类别以及此时该node的类别将address重定向到应有路径 :param data: :param addr: :return: """ if data is None: return None if data['type'] == 'client_append_entries': if self.role != 'leader': if self.leader_id: # print('Hey,leader! Here is a message!') self.network_api.send(data, self.peers[self.leader_id]) return None else: return data if data['dst_id'] != self.id: self.network_api.send(data, self.peers[data['dst_id']]) return None else: return data def append_entries(self, data): response = {'type': 'append_entries_response', 'src_id': self.id, 'dst_id': data['src_id'], 'term': self.current_term, 'success': False } if data['term'] < self.current_term: response['success'] = False self.network_api.send(response, self.peers[data['src_id']]) return self.leader_id = data['leader_id'] # heartbeat if not data['entries']: return prev_log_index = data['prev_log_index'] prev_log_term = data['prev_log_term'] tmp_prev_log_term = self.log.get_log_term(prev_log_index) if tmp_prev_log_term != prev_log_term: response['success'] = False self.network_api.send(response, self.peers[data['src_id']]) self.log.delete_entries(prev_log_index) else: response['success'] = True self.network_api.send(response, self.peers[data['src_id']]) self.log.append_entries(prev_log_index, data['entries']) leader_commit = data['leader_commit'] if leader_commit > self.commit_index: commit_index = min(leader_commit, self.log.last_log_index) self.commit_index = commit_index return def request_vote(self, data): response = {'type': 'request_vote_response', 'src_id': self.id, 'dst_id': data['src_id'], 'term': self.current_term, 'vote_granted': False } if data['term'] < self.current_term: response['vote_granted'] = False self.network_api.send(response, self.peers[data['src_id']]) return candidate_id = data['candidate_id'] last_log_index = data['last_log_index'] last_log_term = data['last_log_term'] if self.voted_for is None or self.voted_for == candidate_id: if last_log_index >= self.log.last_log_index and last_log_term >= self.log.last_log_term: self.voted_for = data['src_id'] self.save() response['vote_granted'] = True self.network_api.send(response, self.peers[data['src_id']]) else: self.voted_for = None self.save() response['vote_granted'] = False self.network_api.send(response, self.peers[data['src_id']]) else: response['vote_granted'] = False self.network_api.send(response, self.peers[data['src_id']]) return def all_do(self, data): t = time.time() if self.commit_index > self.last_applied: self.last_applied = self.commit_index if data is None: return if data['type'] == 'client_append_entries': return if data['term'] > self.current_term: self.next_leader_election_time = t + random.randint(*self.wait_ms) self.role = 'follower' self.current_term = data['term'] self.voted_for = None self.save() return def follower_do(self, data): t = time.time() if data is not None: if data['type'] == 'append_entries': if data['term'] == self.current_term: self.next_leader_election_time = t + random.randint(*self.wait_ms) self.append_entries(data) elif data['type'] == 'request_vote': self.request_vote(data) if t > self.next_leader_election_time: self.next_leader_election_time = t + random.randint(*self.wait_ms) self.role = 'candidate' self.current_term += 1 self.voted_for = self.id self.save() self.vote_ids = {_id: 0 for _id in self.peers} return def candidate_do(self, data): t = time.time() for dst_id in self.peers: if self.vote_ids[dst_id] == 0: request = { 'type': 'request_vote', 'src_id': self.id, 'dst_id': dst_id, 'term': self.current_term, 'candidate_id': self.id, 'last_log_index': self.log.last_log_index, 'last_log_term': self.log.last_log_term } self.network_api.send(request, self.peers[dst_id]) if data is not None and data['term'] == self.current_term: if data['type'] == 'request_vote_response': self.vote_ids[data['src_id']] = data['vote_granted'] vote_count = sum(list(self.vote_ids.values())) if vote_count >= len(self.peers) // 2: self.role = 'leader' self.voted_for = None self.save() self.next_heartbeat_time = 0 self.next_index = {_id: self.log.last_log_index + 1 for _id in self.peers} self.match_index = {_id: 0 for _id in self.peers} return elif data['type'] == 'append_entries': self.next_leader_election_time = t + random.randint(*self.wait_ms) self.role = 'follower' self.voted_for = None self.save() return if t > self.next_leader_election_time: self.next_leader_election_time = t + random.randint(*self.wait_ms) self.role = 'candidate' self.current_term += 1 self.voted_for = self.id self.save() self.vote_ids = {_id: 0 for _id in self.peers} return def leader_do(self, data): t = time.time() # print('Here is leader!') if t > self.next_heartbeat_time: self.next_heartbeat_time = t + random.randint(0, 5) for dst_id in self.peers: request = {'type': 'append_entries', 'src_id': self.id, 'dst_id': dst_id, 'term': self.current_term, 'leader_id': self.id, 'prev_log_index': self.next_index[dst_id] - 1, 'prev_log_term': self.log.get_log_term(self.next_index[dst_id] - 1), 'entries': self.log.get_entries(self.next_index[dst_id]), 'leader_commit': self.commit_index } self.network_api.send(request, self.peers[dst_id]) if data is not None and data['type'] == 'client_append_entries': # print('Leader:Here is a message!') data['term'] = self.current_term self.log.append_entries(self.log.last_log_index, [data]) return if data is not None and data['term'] == self.current_term: if data['type'] == 'append_entries_response': if not data['success']: self.next_index[data['src_id']] -= 1 else: self.match_index[data['src_id']] = self.next_index[data['src_id']] self.next_index[data['src_id']] = self.log.last_log_index + 1 flag = False N = self.commit_index + 1 count = 0 for _id in self.match_index: if self.match_index[_id] >= N: count += 1 if count >= len(self.peers) // 2: self.commit_index = N if self.client_addr: response = {'index': self.commit_index} self.network_api.send(response, self.client_addr) flag = True break def run(self): while True: # try: try: data, addr = self.network_api.receive() except Exception as e: data, addr = None, None data = self.redirect(data, addr) # print(data) self.all_do(data) if self.role == 'follower': self.follower_do(data) if self.role == 'candidate': self.candidate_do(data) if self.role == 'leader': self.leader_do(data) # except Exception as e: # print(e)