node.py 11 KB


  1. import os
  2. import json
  3. import time
  4. import random
  5. from network_api import NetworkAPI
  6. from logger import Log
  7. class Node(object):
  8. def __init__(self, meta):
  9. self.role = 'follower'
  10. self.id = meta['id']
  11. self.addr = meta['addr']
  12. self.peers = meta['peers']
  13. self.path = "data/raft_front_end/public/log/"
  14. if not os.path.exists(self.path):
  15. os.makedirs(self.path)
  16. # 选举状态
  17. self.current_term = 0
  18. self.voted_for = None
  19. # 初始化选举情况
  20. self.load()
  21. # 初始化Logger
  22. log_name = self.path + '_' + self.id + "_log.json"
  23. self.log = Log(log_name)
  24. # volatile state
  25. # rule 1, 2
  26. self.commit_index = 0
  27. self.last_applied = 0
  28. # volatile state on leaders
  29. # rule 1, 2
  30. self.next_index = {_id: self.log.last_log_index + 1 for _id in self.peers}
  31. self.match_index = {_id: -1 for _id in self.peers}
  32. self.leader_id = None
  33. # 其他所有节点对于此节点的投票数
  34. self.vote_ids = {_id: 0 for _id in self.peers}
  35. self.client_addr = None
  36. self.wait_ms = (10, 20)
  37. self.next_leader_election_time = time.time() + random.randint(*self.wait_ms)
  38. self.next_heartbeat_time = 0
  39. # rpc
  40. self.network_api = NetworkAPI(self.addr, timeout=2)
  41. def load(self):
  42. """
  43. 从文件中读取目前选举情况,如果文件不存在,则初始化
  44. :return: null
  45. """
  46. filename = self.path + "_" + self.id + '_persistent.json'
  47. if os.path.exists(filename):
  48. with open(filename, 'r') as f:
  49. data = json.load(f)
  50. self.current_term = data['current_term']
  51. self.voted_for = data['voted_for']
  52. else:
  53. self.save()
  54. def save(self):
  55. """
  56. 将选举情况保存到json文件中
  57. :return: null
  58. """
  59. data = {'current_term': self.current_term,
  60. 'voted_for': self.voted_for,
  61. }
  62. filename = self.path + "_" + self.id + '_persistent.json'
  63. with open(filename, 'w') as f:
  64. json.dump(data, f, indent=4)
  65. def redirect(self, data, addr):
  66. """
  67. 按照data的类别以及此时该node的类别将address重定向到应有路径
  68. :param data:
  69. :param addr:
  70. :return:
  71. """
  72. if data is None:
  73. return None
  74. if data['type'] == 'client_append_entries':
  75. if self.role != 'leader':
  76. if self.leader_id:
  77. # print('Hey,leader! Here is a message!')
  78. self.network_api.send(data, self.peers[self.leader_id])
  79. return None
  80. else:
  81. return data
  82. if data['dst_id'] != self.id:
  83. self.network_api.send(data, self.peers[data['dst_id']])
  84. return None
  85. else:
  86. return data
  87. def append_entries(self, data):
  88. response = {'type': 'append_entries_response',
  89. 'src_id': self.id,
  90. 'dst_id': data['src_id'],
  91. 'term': self.current_term,
  92. 'success': False
  93. }
  94. if data['term'] < self.current_term:
  95. response['success'] = False
  96. self.network_api.send(response, self.peers[data['src_id']])
  97. return
  98. self.leader_id = data['leader_id']
  99. # heartbeat
  100. if not data['entries']:
  101. return
  102. prev_log_index = data['prev_log_index']
  103. prev_log_term = data['prev_log_term']
  104. tmp_prev_log_term = self.log.get_log_term(prev_log_index)
  105. if tmp_prev_log_term != prev_log_term:
  106. response['success'] = False
  107. self.network_api.send(response, self.peers[data['src_id']])
  108. self.log.delete_entries(prev_log_index)
  109. else:
  110. response['success'] = True
  111. self.network_api.send(response, self.peers[data['src_id']])
  112. self.log.append_entries(prev_log_index, data['entries'])
  113. leader_commit = data['leader_commit']
  114. if leader_commit > self.commit_index:
  115. commit_index = min(leader_commit, self.log.last_log_index)
  116. self.commit_index = commit_index
  117. return
  118. def request_vote(self, data):
  119. response = {'type': 'request_vote_response',
  120. 'src_id': self.id,
  121. 'dst_id': data['src_id'],
  122. 'term': self.current_term,
  123. 'vote_granted': False
  124. }
  125. if data['term'] < self.current_term:
  126. response['vote_granted'] = False
  127. self.network_api.send(response, self.peers[data['src_id']])
  128. return
  129. candidate_id = data['candidate_id']
  130. last_log_index = data['last_log_index']
  131. last_log_term = data['last_log_term']
  132. if self.voted_for is None or self.voted_for == candidate_id:
  133. if last_log_index >= self.log.last_log_index and last_log_term >= self.log.last_log_term:
  134. self.voted_for = data['src_id']
  135. self.save()
  136. response['vote_granted'] = True
  137. self.network_api.send(response, self.peers[data['src_id']])
  138. else:
  139. self.voted_for = None
  140. self.save()
  141. response['vote_granted'] = False
  142. self.network_api.send(response, self.peers[data['src_id']])
  143. else:
  144. response['vote_granted'] = False
  145. self.network_api.send(response, self.peers[data['src_id']])
  146. return
  147. def all_do(self, data):
  148. t = time.time()
  149. if self.commit_index > self.last_applied:
  150. self.last_applied = self.commit_index
  151. if data is None:
  152. return
  153. if data['type'] == 'client_append_entries':
  154. return
  155. if data['term'] > self.current_term:
  156. self.next_leader_election_time = t + random.randint(*self.wait_ms)
  157. self.role = 'follower'
  158. self.current_term = data['term']
  159. self.voted_for = None
  160. self.save()
  161. return
  162. def follower_do(self, data):
  163. t = time.time()
  164. if data is not None:
  165. if data['type'] == 'append_entries':
  166. if data['term'] == self.current_term:
  167. self.next_leader_election_time = t + random.randint(*self.wait_ms)
  168. self.append_entries(data)
  169. elif data['type'] == 'request_vote':
  170. self.request_vote(data)
  171. if t > self.next_leader_election_time:
  172. self.next_leader_election_time = t + random.randint(*self.wait_ms)
  173. self.role = 'candidate'
  174. self.current_term += 1
  175. self.voted_for = self.id
  176. self.save()
  177. self.vote_ids = {_id: 0 for _id in self.peers}
  178. return
  179. def candidate_do(self, data):
  180. t = time.time()
  181. for dst_id in self.peers:
  182. if self.vote_ids[dst_id] == 0:
  183. request = {
  184. 'type': 'request_vote',
  185. 'src_id': self.id,
  186. 'dst_id': dst_id,
  187. 'term': self.current_term,
  188. 'candidate_id': self.id,
  189. 'last_log_index': self.log.last_log_index,
  190. 'last_log_term': self.log.last_log_term
  191. }
  192. self.network_api.send(request, self.peers[dst_id])
  193. if data is not None and data['term'] == self.current_term:
  194. if data['type'] == 'request_vote_response':
  195. self.vote_ids[data['src_id']] = data['vote_granted']
  196. vote_count = sum(list(self.vote_ids.values()))
  197. if vote_count >= len(self.peers) // 2:
  198. self.role = 'leader'
  199. self.voted_for = None
  200. self.save()
  201. self.next_heartbeat_time = 0
  202. self.next_index = {_id: self.log.last_log_index + 1 for _id in self.peers}
  203. self.match_index = {_id: 0 for _id in self.peers}
  204. return
  205. elif data['type'] == 'append_entries':
  206. self.next_leader_election_time = t + random.randint(*self.wait_ms)
  207. self.role = 'follower'
  208. self.voted_for = None
  209. self.save()
  210. return
  211. if t > self.next_leader_election_time:
  212. self.next_leader_election_time = t + random.randint(*self.wait_ms)
  213. self.role = 'candidate'
  214. self.current_term += 1
  215. self.voted_for = self.id
  216. self.save()
  217. self.vote_ids = {_id: 0 for _id in self.peers}
  218. return
  219. def leader_do(self, data):
  220. t = time.time()
  221. # print('Here is leader!')
  222. if t > self.next_heartbeat_time:
  223. self.next_heartbeat_time = t + random.randint(0, 5)
  224. for dst_id in self.peers:
  225. request = {'type': 'append_entries',
  226. 'src_id': self.id,
  227. 'dst_id': dst_id,
  228. 'term': self.current_term,
  229. 'leader_id': self.id,
  230. 'prev_log_index': self.next_index[dst_id] - 1,
  231. 'prev_log_term': self.log.get_log_term(self.next_index[dst_id] - 1),
  232. 'entries': self.log.get_entries(self.next_index[dst_id]),
  233. 'leader_commit': self.commit_index
  234. }
  235. self.network_api.send(request, self.peers[dst_id])
  236. if data is not None and data['type'] == 'client_append_entries':
  237. # print('Leader:Here is a message!')
  238. data['term'] = self.current_term
  239. self.log.append_entries(self.log.last_log_index, [data])
  240. return
  241. if data is not None and data['term'] == self.current_term:
  242. if data['type'] == 'append_entries_response':
  243. if not data['success']:
  244. self.next_index[data['src_id']] -= 1
  245. else:
  246. self.match_index[data['src_id']] = self.next_index[data['src_id']]
  247. self.next_index[data['src_id']] = self.log.last_log_index + 1
  248. flag = False
  249. N = self.commit_index + 1
  250. count = 0
  251. for _id in self.match_index:
  252. if self.match_index[_id] >= N:
  253. count += 1
  254. if count >= len(self.peers) // 2:
  255. self.commit_index = N
  256. if self.client_addr:
  257. response = {'index': self.commit_index}
  258. self.network_api.send(response, self.client_addr)
  259. flag = True
  260. break
  261. def run(self):
  262. while True:
  263. # try:
  264. try:
  265. data, addr = self.network_api.receive()
  266. except Exception as e:
  267. data, addr = None, None
  268. data = self.redirect(data, addr)
  269. # print(data)
  270. self.all_do(data)
  271. if self.role == 'follower':
  272. self.follower_do(data)
  273. if self.role == 'candidate':
  274. self.candidate_do(data)
  275. if self.role == 'leader':
  276. self.leader_do(data)
  277. # except Exception as e:
  278. # print(e)