123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- import os
- import json
- import time
- import random
- from network_api import NetworkAPI
- from logger import Log
- class Node(object):
- def __init__(self, meta):
- self.role = 'follower'
- self.id = meta['id']
- self.addr = meta['addr']
- self.peers = meta['peers']
- self.path = "data/raft_front_end/public/log/"
- if not os.path.exists(self.path):
- os.makedirs(self.path)
- # 选举状态
- self.current_term = 0
- self.voted_for = None
- # 初始化选举情况
- self.load()
- # 初始化Logger
- log_name = self.path + '_' + self.id + "_log.json"
- self.log = Log(log_name)
- # volatile state
- # rule 1, 2
- self.commit_index = 0
- self.last_applied = 0
- # volatile state on leaders
- # rule 1, 2
- self.next_index = {_id: self.log.last_log_index + 1 for _id in self.peers}
- self.match_index = {_id: -1 for _id in self.peers}
- self.leader_id = None
- # 其他所有节点对于此节点的投票数
- self.vote_ids = {_id: 0 for _id in self.peers}
- self.client_addr = None
- self.wait_ms = (10, 20)
- self.next_leader_election_time = time.time() + random.randint(*self.wait_ms)
- self.next_heartbeat_time = 0
- # rpc
- self.network_api = NetworkAPI(self.addr, timeout=2)
- def load(self):
- """
- 从文件中读取目前选举情况,如果文件不存在,则初始化
- :return: null
- """
- filename = self.path + "_" + self.id + '_persistent.json'
- if os.path.exists(filename):
- with open(filename, 'r') as f:
- data = json.load(f)
- self.current_term = data['current_term']
- self.voted_for = data['voted_for']
- else:
- self.save()
- def save(self):
- """
- 将选举情况保存到json文件中
- :return: null
- """
- data = {'current_term': self.current_term,
- 'voted_for': self.voted_for,
- }
- filename = self.path + "_" + self.id + '_persistent.json'
- with open(filename, 'w') as f:
- json.dump(data, f, indent=4)
- def redirect(self, data, addr):
- """
- 按照data的类别以及此时该node的类别将address重定向到应有路径
- :param data:
- :param addr:
- :return:
- """
- if data is None:
- return None
- if data['type'] == 'client_append_entries':
- if self.role != 'leader':
- if self.leader_id:
- # print('Hey,leader! Here is a message!')
- self.network_api.send(data, self.peers[self.leader_id])
- return None
- else:
- return data
- if data['dst_id'] != self.id:
- self.network_api.send(data, self.peers[data['dst_id']])
- return None
- else:
- return data
- def append_entries(self, data):
- response = {'type': 'append_entries_response',
- 'src_id': self.id,
- 'dst_id': data['src_id'],
- 'term': self.current_term,
- 'success': False
- }
- if data['term'] < self.current_term:
- response['success'] = False
- self.network_api.send(response, self.peers[data['src_id']])
- return
- self.leader_id = data['leader_id']
- # heartbeat
- if not data['entries']:
- return
- prev_log_index = data['prev_log_index']
- prev_log_term = data['prev_log_term']
- tmp_prev_log_term = self.log.get_log_term(prev_log_index)
- if tmp_prev_log_term != prev_log_term:
- response['success'] = False
- self.network_api.send(response, self.peers[data['src_id']])
- self.log.delete_entries(prev_log_index)
- else:
- response['success'] = True
- self.network_api.send(response, self.peers[data['src_id']])
- self.log.append_entries(prev_log_index, data['entries'])
- leader_commit = data['leader_commit']
- if leader_commit > self.commit_index:
- commit_index = min(leader_commit, self.log.last_log_index)
- self.commit_index = commit_index
- return
- def request_vote(self, data):
- response = {'type': 'request_vote_response',
- 'src_id': self.id,
- 'dst_id': data['src_id'],
- 'term': self.current_term,
- 'vote_granted': False
- }
- if data['term'] < self.current_term:
- response['vote_granted'] = False
- self.network_api.send(response, self.peers[data['src_id']])
- return
- candidate_id = data['candidate_id']
- last_log_index = data['last_log_index']
- last_log_term = data['last_log_term']
- if self.voted_for is None or self.voted_for == candidate_id:
- if last_log_index >= self.log.last_log_index and last_log_term >= self.log.last_log_term:
- self.voted_for = data['src_id']
- self.save()
- response['vote_granted'] = True
- self.network_api.send(response, self.peers[data['src_id']])
- else:
- self.voted_for = None
- self.save()
- response['vote_granted'] = False
- self.network_api.send(response, self.peers[data['src_id']])
- else:
- response['vote_granted'] = False
- self.network_api.send(response, self.peers[data['src_id']])
- return
- def all_do(self, data):
- t = time.time()
- if self.commit_index > self.last_applied:
- self.last_applied = self.commit_index
- if data is None:
- return
- if data['type'] == 'client_append_entries':
- return
- if data['term'] > self.current_term:
- self.next_leader_election_time = t + random.randint(*self.wait_ms)
- self.role = 'follower'
- self.current_term = data['term']
- self.voted_for = None
- self.save()
- return
- def follower_do(self, data):
- t = time.time()
- if data is not None:
- if data['type'] == 'append_entries':
- if data['term'] == self.current_term:
- self.next_leader_election_time = t + random.randint(*self.wait_ms)
- self.append_entries(data)
- elif data['type'] == 'request_vote':
- self.request_vote(data)
- if t > self.next_leader_election_time:
- self.next_leader_election_time = t + random.randint(*self.wait_ms)
- self.role = 'candidate'
- self.current_term += 1
- self.voted_for = self.id
- self.save()
- self.vote_ids = {_id: 0 for _id in self.peers}
- return
- def candidate_do(self, data):
- t = time.time()
- for dst_id in self.peers:
- if self.vote_ids[dst_id] == 0:
- request = {
- 'type': 'request_vote',
- 'src_id': self.id,
- 'dst_id': dst_id,
- 'term': self.current_term,
- 'candidate_id': self.id,
- 'last_log_index': self.log.last_log_index,
- 'last_log_term': self.log.last_log_term
- }
- self.network_api.send(request, self.peers[dst_id])
- if data is not None and data['term'] == self.current_term:
- if data['type'] == 'request_vote_response':
- self.vote_ids[data['src_id']] = data['vote_granted']
- vote_count = sum(list(self.vote_ids.values()))
- if vote_count >= len(self.peers) // 2:
- self.role = 'leader'
- self.voted_for = None
- self.save()
- self.next_heartbeat_time = 0
- self.next_index = {_id: self.log.last_log_index + 1 for _id in self.peers}
- self.match_index = {_id: 0 for _id in self.peers}
- return
- elif data['type'] == 'append_entries':
- self.next_leader_election_time = t + random.randint(*self.wait_ms)
- self.role = 'follower'
- self.voted_for = None
- self.save()
- return
- if t > self.next_leader_election_time:
- self.next_leader_election_time = t + random.randint(*self.wait_ms)
- self.role = 'candidate'
- self.current_term += 1
- self.voted_for = self.id
- self.save()
- self.vote_ids = {_id: 0 for _id in self.peers}
- return
- def leader_do(self, data):
- t = time.time()
- # print('Here is leader!')
- if t > self.next_heartbeat_time:
- self.next_heartbeat_time = t + random.randint(0, 5)
- for dst_id in self.peers:
- request = {'type': 'append_entries',
- 'src_id': self.id,
- 'dst_id': dst_id,
- 'term': self.current_term,
- 'leader_id': self.id,
- 'prev_log_index': self.next_index[dst_id] - 1,
- 'prev_log_term': self.log.get_log_term(self.next_index[dst_id] - 1),
- 'entries': self.log.get_entries(self.next_index[dst_id]),
- 'leader_commit': self.commit_index
- }
- self.network_api.send(request, self.peers[dst_id])
- if data is not None and data['type'] == 'client_append_entries':
- # print('Leader:Here is a message!')
- data['term'] = self.current_term
- self.log.append_entries(self.log.last_log_index, [data])
- return
- if data is not None and data['term'] == self.current_term:
- if data['type'] == 'append_entries_response':
- if not data['success']:
- self.next_index[data['src_id']] -= 1
- else:
- self.match_index[data['src_id']] = self.next_index[data['src_id']]
- self.next_index[data['src_id']] = self.log.last_log_index + 1
- flag = False
- N = self.commit_index + 1
- count = 0
- for _id in self.match_index:
- if self.match_index[_id] >= N:
- count += 1
- if count >= len(self.peers) // 2:
- self.commit_index = N
- if self.client_addr:
- response = {'index': self.commit_index}
- self.network_api.send(response, self.client_addr)
- flag = True
- break
- def run(self):
- while True:
- # try:
- try:
- data, addr = self.network_api.receive()
- except Exception as e:
- data, addr = None, None
- data = self.redirect(data, addr)
- # print(data)
- self.all_do(data)
- if self.role == 'follower':
- self.follower_do(data)
- if self.role == 'candidate':
- self.candidate_do(data)
- if self.role == 'leader':
- self.leader_do(data)
- # except Exception as e:
- # print(e)
|