metric.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. import json
  2. import logging
  3. import random
  4. import string
  5. import time
  6. import numpy as np
  7. from easyfl.pb import common_pb2 as common_pb
  8. from easyfl.utils.float import rounding
  9. PREFIX_TASK_ID = "task"
  10. CONFIGURATION = "configuration"
  11. # clients
  12. SELECTED_CLIENTS = 'selected_clients'
  13. GROUPED_CLIENTS = 'grouped_clients'
  14. # communication cost
  15. DOWNLOAD_SIZE = 'download_size'
  16. TRAIN_DOWNLOAD_SIZE = 'train_download_size'
  17. TRAIN_UPLOAD_SIZE = 'train_upload_size'
  18. TEST_DOWNLOAD_SIZE = 'test_download_size'
  19. TEST_UPLOAD_SIZE = 'test_upload_size'
  20. # distribute time
  21. UPLOAD_TIME = "upload_time"
  22. TRAIN_UPLOAD_TIME = "train_upload_time"
  23. TEST_UPLOAD_TIME = "test_upload_time"
  24. TRAIN_DISTRIBUTE_TIME = "train_distribute_time"
  25. TEST_DISTRIBUTE_TIME = "test_distribute_time"
  26. # time
  27. ROUND_TIME = "round_time"
  28. TRAIN_TIME = 'train_time'
  29. TEST_TIME = 'test_time'
  30. TRAIN_EPOCH_TIME = 'train_epoch_time'
  31. # performance
  32. TRAIN_ACCURACY = 'train_accuracy'
  33. TRAIN_LOSS = 'train_loss'
  34. AVG_TRAIN_LOSS = 'avg_train_loss'
  35. TEST_ACCURACY = 'test_accuracy'
  36. TEST_LOSS = 'test_loss'
  37. TEST_LOCAL_ACCURACY = 'test_local_accuracy'
  38. TEST_LOCAL_LOSS = 'test_local_loss'
  39. # general
  40. EXTRA = "extra" # for not specifically defined metrics
  41. DEFAULT_FOLDER = "metrics"
  42. PREFIX_METRIC_ID = "metric"
  43. logger = logging.getLogger(__name__)
  44. class Metric(object):
  45. def __init__(self):
  46. self.metrics = {
  47. EXTRA: {}
  48. }
  49. def add(self, metric_name, metric_value, convert=True):
  50. """Add metrics. Add to "extra" if the metric is not predefined.
  51. """
  52. if self.predefined_metrics() and metric_name in self.predefined_metrics():
  53. if convert:
  54. metric_value = self._value_conversion(metric_value)
  55. self.metrics[metric_name] = metric_value
  56. elif metric_name == EXTRA:
  57. self.metrics[EXTRA].update(metric_value)
  58. else:
  59. self.metrics[EXTRA][metric_name] = metric_value
  60. def get(self, metric_name, default=0):
  61. if metric_name in self.metrics:
  62. return self.metrics[metric_name]
  63. return default
  64. @classmethod
  65. def predefined_metrics(cls):
  66. return []
  67. @property
  68. def extra(self):
  69. """Retrieve extra information, not specifically defined metric, stored in the metrics.
  70. :return dictionary of metrics, return {} if extra stored.
  71. """
  72. return self.metrics[EXTRA]
  73. @staticmethod
  74. def _value_conversion(value):
  75. """Convert float to keep only 4 decimal points
  76. """
  77. if isinstance(value, float):
  78. value = np.around(value, 4)
  79. elif isinstance(value, list) and len(value) > 0 and isinstance(value[0], float):
  80. value = rounding(value, 4)
  81. return value
  82. class TaskMetric(object):
  83. def __init__(self, task_id, conf=None):
  84. self._task_id = task_id
  85. self._conf = conf
  86. def add(self, name, value):
  87. if name == CONFIGURATION:
  88. self.add_configuration(value)
  89. def add_configuration(self, conf):
  90. self._conf = conf
  91. @classmethod
  92. def from_sql(cls, sql_result):
  93. task_id, conf = sql_result
  94. conf = {} if conf == "" else json.loads(conf)
  95. return cls(task_id, conf)
  96. def to_sql_param(self):
  97. conf = json.dumps(self.configuration) if self.configuration is not None else ""
  98. return self.task_id, conf
  99. @property
  100. def task_id(self):
  101. return self._task_id
  102. @property
  103. def configuration(self):
  104. return self._conf
  105. def to_proto(self):
  106. return common_pb.TaskMetric(
  107. task_id=self.task_id,
  108. configuration=json.dumps(self.configuration)
  109. )
  110. @classmethod
  111. def from_proto(cls, proto):
  112. return cls(proto.task_id, json.loads(proto.configuration))
  113. class RoundMetric(Metric):
  114. """Metrics of a training round
  115. Note: testing related metrics may not be available in every round.
  116. """
  117. def __init__(self, task_id, round_id):
  118. super().__init__()
  119. self.task_id = task_id
  120. self.round_id = round_id
  121. @property
  122. def test_accuracy(self):
  123. return self.get(TEST_ACCURACY)
  124. @property
  125. def test_loss(self):
  126. return self.get(TEST_LOSS)
  127. @property
  128. def train_time(self):
  129. return self.get(TRAIN_TIME)
  130. @property
  131. def test_time(self):
  132. return self.get(TEST_TIME)
  133. @property
  134. def round_time(self):
  135. return self.get(ROUND_TIME)
  136. @property
  137. def train_distribute_time(self):
  138. return self.get(TRAIN_DISTRIBUTE_TIME, 0)
  139. @property
  140. def test_distribute_time(self):
  141. return self.get(TEST_DISTRIBUTE_TIME, 0)
  142. @property
  143. def train_upload_size(self):
  144. """Communication cost of uploading content from client to server
  145. """
  146. return self.get(TRAIN_UPLOAD_SIZE)
  147. @property
  148. def train_download_size(self):
  149. """Communication cost of distributing content from server to client
  150. """
  151. return self.get(TRAIN_DOWNLOAD_SIZE)
  152. @property
  153. def test_upload_size(self):
  154. """Communication cost of uploading content from client to server
  155. """
  156. return self.get(TEST_UPLOAD_SIZE)
  157. @property
  158. def test_download_size(self):
  159. """Communication cost of distributing content from server to client
  160. """
  161. return self.get(TEST_DOWNLOAD_SIZE)
  162. @property
  163. def communication_cost(self):
  164. return self.train_upload_size + self.train_download_size + self.test_upload_size + self.test_download_size
  165. @classmethod
  166. def predefined_metrics(cls):
  167. return [TEST_ACCURACY,
  168. TEST_LOSS,
  169. ROUND_TIME,
  170. TRAIN_TIME,
  171. TEST_TIME,
  172. TRAIN_DISTRIBUTE_TIME,
  173. TEST_DISTRIBUTE_TIME,
  174. TRAIN_UPLOAD_SIZE,
  175. TRAIN_DOWNLOAD_SIZE,
  176. TEST_UPLOAD_SIZE,
  177. TEST_DOWNLOAD_SIZE]
  178. @classmethod
  179. def from_sql(cls, sql_result):
  180. task_id = sql_result[0]
  181. round_id = sql_result[1]
  182. m = cls(task_id, round_id)
  183. metrics = cls.predefined_metrics()
  184. for name, value in zip(metrics, sql_result[2:-1]):
  185. m.add(name, value)
  186. m.add(EXTRA, json.loads(sql_result[-1]))
  187. return m
  188. def to_sql_param(self):
  189. return (self.task_id,
  190. self.round_id,
  191. self.test_accuracy,
  192. self.test_loss,
  193. self.round_time,
  194. self.train_time,
  195. self.test_time,
  196. self.train_distribute_time,
  197. self.test_distribute_time,
  198. self.train_upload_size,
  199. self.train_download_size,
  200. self.test_upload_size,
  201. self.test_download_size,
  202. json.dumps(self.extra))
  203. def to_proto(self):
  204. return common_pb.RoundMetric(
  205. task_id=self.task_id,
  206. round_id=self.round_id,
  207. test_accuracy=self.test_accuracy,
  208. test_loss=self.test_loss,
  209. round_time=self.round_time,
  210. train_time=self.train_time,
  211. test_time=self.test_time,
  212. train_distribute_time=self.train_distribute_time,
  213. test_distribute_time=self.test_distribute_time,
  214. train_upload_size=self.train_upload_size,
  215. train_download_size=self.train_download_size,
  216. test_upload_size=self.test_upload_size,
  217. test_download_size=self.test_download_size,
  218. extra=json.dumps(self.extra)
  219. )
  220. @classmethod
  221. def from_proto(cls, proto):
  222. m = cls(proto.task_id, proto.round_id)
  223. metrics = cls.predefined_metrics()
  224. values = [proto.test_accuracy,
  225. proto.test_loss,
  226. proto.round_time,
  227. proto.train_time,
  228. proto.test_time,
  229. proto.train_distribute_time,
  230. proto.test_distribute_time,
  231. proto.train_upload_size,
  232. proto.train_download_size,
  233. proto.test_upload_size,
  234. proto.test_download_size]
  235. for name, value in zip(metrics, values):
  236. m.add(name, value)
  237. if proto.extra:
  238. m.add(EXTRA, json.loads(proto.extra))
  239. return m
  240. class ClientMetric(Metric):
  241. """Metrics for a client in a round of training.
  242. """
  243. def __init__(self, task_id, round_id, client_id):
  244. super().__init__()
  245. self.task_id = task_id
  246. self.round_id = round_id
  247. self.client_id = client_id
  248. @property
  249. def train_accuracy(self):
  250. return self.get(TRAIN_ACCURACY)
  251. @property
  252. def test_accuracy(self):
  253. return self.get(TEST_ACCURACY)
  254. @property
  255. def train_loss(self):
  256. return self.get(TRAIN_LOSS)
  257. @property
  258. def test_loss(self):
  259. return self.get(TEST_LOSS)
  260. @property
  261. def train_time(self):
  262. return self.get(TRAIN_TIME)
  263. @property
  264. def test_time(self):
  265. return self.get(TEST_TIME)
  266. @property
  267. def train_upload_time(self):
  268. return self.get(TRAIN_UPLOAD_TIME)
  269. @property
  270. def test_upload_time(self):
  271. return self.get(TEST_UPLOAD_TIME)
  272. @property
  273. def train_upload_size(self):
  274. return self.get(TRAIN_UPLOAD_SIZE)
  275. @property
  276. def train_download_size(self):
  277. return self.get(TRAIN_DOWNLOAD_SIZE)
  278. @property
  279. def test_upload_size(self):
  280. return self.get(TEST_UPLOAD_SIZE)
  281. @property
  282. def test_download_size(self):
  283. return self.get(TEST_DOWNLOAD_SIZE)
  284. @property
  285. def communication_cost(self):
  286. return self.train_upload_size + self.train_download_size + self.test_upload_size + self.test_download_size
  287. @classmethod
  288. def predefined_metrics(cls):
  289. return [TRAIN_ACCURACY,
  290. TRAIN_LOSS,
  291. TEST_ACCURACY,
  292. TEST_LOSS,
  293. TRAIN_TIME,
  294. TEST_TIME,
  295. TRAIN_UPLOAD_TIME,
  296. TEST_UPLOAD_TIME,
  297. TRAIN_UPLOAD_SIZE,
  298. TRAIN_DOWNLOAD_SIZE,
  299. TEST_UPLOAD_SIZE,
  300. TEST_DOWNLOAD_SIZE]
  301. @classmethod
  302. def from_sql(cls, sql_result):
  303. task_id = sql_result[0]
  304. round_id = sql_result[1]
  305. client_id = sql_result[2]
  306. m = cls(task_id, round_id, client_id)
  307. metrics = cls.predefined_metrics() + [EXTRA]
  308. for name, value in zip(metrics, sql_result[3:]):
  309. if name in [TRAIN_ACCURACY, TRAIN_LOSS, EXTRA]:
  310. value = json.loads(value)
  311. m.add(name, value)
  312. return m
  313. def to_sql_param(self):
  314. return (self.task_id,
  315. self.round_id,
  316. self.client_id,
  317. json.dumps(self.train_accuracy),
  318. json.dumps(self.train_loss),
  319. self.test_accuracy,
  320. self.test_loss,
  321. self.train_time,
  322. self.test_time,
  323. self.train_upload_time,
  324. self.test_upload_time,
  325. self.train_upload_size,
  326. self.train_download_size,
  327. self.test_upload_size,
  328. self.test_download_size,
  329. json.dumps(self.extra))
  330. def to_proto(self):
  331. return common_pb.ClientMetric(
  332. task_id=self.task_id,
  333. round_id=self.round_id,
  334. client_id=self.client_id,
  335. train_accuracy=self.train_accuracy,
  336. train_loss=self.train_loss,
  337. test_accuracy=self.test_accuracy,
  338. test_loss=self.test_loss,
  339. train_time=self.train_time,
  340. test_time=self.test_time,
  341. train_upload_time=self.train_upload_time,
  342. test_upload_time=self.test_upload_time,
  343. train_upload_size=self.train_upload_size,
  344. train_download_size=self.train_download_size,
  345. test_upload_size=self.test_upload_size,
  346. test_download_size=self.test_download_size,
  347. extra=json.dumps(self.extra)
  348. )
  349. @classmethod
  350. def from_proto(cls, proto):
  351. m = cls(proto.task_id, proto.round_id, proto.client_id)
  352. train_accuracy = [x for x in proto.train_accuracy]
  353. train_loss = [x for x in proto.train_loss]
  354. metrics = cls.predefined_metrics()
  355. values = [train_accuracy,
  356. train_loss,
  357. proto.test_accuracy,
  358. proto.test_loss,
  359. proto.train_time,
  360. proto.test_time,
  361. proto.train_upload_time,
  362. proto.test_upload_time,
  363. proto.train_upload_size,
  364. proto.train_download_size,
  365. proto.test_upload_size,
  366. proto.test_download_size]
  367. for name, value in zip(metrics, values):
  368. m.add(name, value)
  369. if proto.extra:
  370. m.add(EXTRA, json.loads(proto.extra))
  371. return m
  372. def set_train_metrics(self, m):
  373. if self.is_same_metric(m):
  374. self.metrics[TRAIN_ACCURACY] = m.train_accuracy
  375. self.metrics[TRAIN_LOSS] = m.train_loss
  376. self.metrics[TRAIN_TIME] = m.train_time
  377. self.metrics[TRAIN_UPLOAD_TIME] = m.train_upload_time
  378. self.metrics[TRAIN_UPLOAD_SIZE] = m.train_upload_size
  379. self.metrics[TRAIN_DOWNLOAD_SIZE] = m.train_download_size
  380. def set_test_metrics(self, m):
  381. if self.is_same_metric(m):
  382. self.metrics[TEST_ACCURACY] = m.test_accuracy
  383. self.metrics[TEST_LOSS] = m.test_loss
  384. self.metrics[TEST_TIME] = m.test_time
  385. self.metrics[TEST_UPLOAD_TIME] = m.test_upload_time
  386. self.metrics[TEST_UPLOAD_SIZE] = m.test_upload_size
  387. self.metrics[TEST_DOWNLOAD_SIZE] = m.test_download_size
  388. def is_same_metric(self, m):
  389. return self.task_id == m.task_id and self.round_id == m.round_id and self.client_id == m.client_id
  390. @classmethod
  391. def merge_train_to_test_metrics(cls, train_metrics, test_metrics):
  392. """Merge train metrics to test_metrics
  393. """
  394. train_metrics_ = {m.client_id: m for m in train_metrics}
  395. for test_metric in test_metrics:
  396. client_id = test_metric.client_id
  397. if client_id in train_metrics_:
  398. test_metric.set_train_metrics(train_metrics_[client_id])
  399. return test_metrics
  400. def generate_tid():
  401. length = 6
  402. letters = string.ascii_lowercase
  403. random.seed(time.time())
  404. result = "".join(random.choice(letters) for i in range(length))
  405. return "{}_{}".format(PREFIX_TASK_ID, result)