client.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. from easyfl.tracking.metric import TaskMetric, RoundMetric, ClientMetric
  2. from easyfl.tracking.storage import get_store
  3. class TrackingClient(object):
  4. """Client for tracking task metrics, round metrics, and client metrics.
  5. Task Tracking:
  6. client.create_task(task_id, conf)
  7. Round Tracking:
  8. client.track_round(name, value)
  9. client.save_round() # auto increment to next round
  10. client.track_round(name, value)
  11. Client Tracking:
  12. client.set_client_context(task_id, round_id, client_id)
  13. client.track_client(name, value)
  14. """
  15. def __init__(self, db_path=None, db_address=None, init_store=True):
  16. """If storage is not initialized, the tracking client can only collect metrics but not save them.
  17. """
  18. self._task_id = None
  19. self._round_id = None
  20. self._client_id = None
  21. self._current_task = None
  22. self._current_round = None
  23. self._current_client = None
  24. self._cached_task_metrics = {}
  25. self._cached_round_metrics = {}
  26. if init_store:
  27. self._storage = get_store(db_path, db_address)
  28. def get_task_metric(self, task_id):
  29. """Get task from storage
  30. """
  31. task_metric = self._storage.get_task_metric(task_id)
  32. if task_metric is None:
  33. return
  34. return TaskMetric.from_sql(task_metric)
  35. def get_round_metric(self, round_id, task_id):
  36. if task_id == self._task_id and round_id == self._round_id:
  37. return self._current_round
  38. return self._storage.get_round_metrics(task_id, [round_id])
  39. def get_client_metric(self, client_id=None, round_id=None, task_id=None):
  40. if (task_id == self._task_id and round_id == self._round_id and client_id == self._client_id) or \
  41. (client_id is None and round_id is None and task_id is None):
  42. return self._current_client
  43. return self._storage.get_client_metrics(task_id, round_id, [client_id])
  44. def get_client_metrics(self, client_ids, round_id, task_id):
  45. """Get list of client metrics.
  46. :param client_ids: list of client ids.
  47. :param round_id: round id.
  48. :param task_id: task id.
  49. """
  50. return self._storage.get_client_metrics(task_id, round_id, client_ids)
  51. def create_task(self, task_id, conf=None, save=True):
  52. if task_id is None:
  53. raise ValueError("task_id cannot be None to create task")
  54. self._task_id = task_id
  55. self._current_task = TaskMetric(task_id, conf)
  56. if save:
  57. self._storage.store_task_metric(self._current_task)
  58. def create_round(self, round_id, task_id=None):
  59. if task_id is None:
  60. task_id = self._task_id
  61. if round_id is None:
  62. raise ValueError("round_id cannot be None to create round")
  63. if round_id != self._round_id:
  64. self._round_id = round_id
  65. self._current_round = RoundMetric(task_id, self._round_id)
  66. def create_client(self, client_id, reset=True):
  67. """Create client under current round of task.
  68. Current implementation requires round and task exist to create client.
  69. """
  70. self._check_context()
  71. if client_id is None:
  72. raise ValueError("client_id cannot be None to create client.")
  73. if reset or not self._current_client or client_id != self._client_id:
  74. self._current_client = ClientMetric(self._task_id, self._round_id, client_id)
  75. self._client_id = client_id
  76. return
  77. self._current_client.task_id = self._task_id
  78. self._current_client.round_id = self._round_id
  79. def track_task(self, name, value, task_id=None):
  80. if self._diff_task_id(task_id):
  81. self._cached_task_metrics[self._task_id] = self._current_task
  82. self._task_id = task_id
  83. self._current_task = TaskMetric(task_id)
  84. self._current_task.add(name, value)
  85. self._storage.store_task_metric(self._current_task)
  86. def track_round(self, name, value, round_id=None, task_id=None):
  87. if self._diff_task_id(task_id):
  88. create_task(task_id)
  89. if self._diff_round_id(round_id):
  90. # self._cached_round_metrics[self.unique_round_id] = self._current_round
  91. self.create_round(round_id)
  92. if self._current_round is None:
  93. self.create_round(0)
  94. self._current_round.add(name, value)
  95. def track_client(self, name, value, client_id=None):
  96. """Track client under current round and task.
  97. Current implementation requires round and task exist to track client.
  98. """
  99. self._check_context()
  100. if self._diff_client_id(client_id) or self._current_client is None:
  101. self.create_client(client_id)
  102. self._current_client.add(name, value)
  103. def save_round(self, increment=True, cache=False):
  104. if self._current_round is None:
  105. raise ValueError("Round metric is not initialized")
  106. self._storage.store_round_metric(self._current_round)
  107. if cache:
  108. self._cached_round_metrics[self.unique_round_id] = self._current_round
  109. if increment:
  110. self.create_round(self._round_id + 1)
  111. def save_client(self):
  112. if self._current_client is None:
  113. raise ValueError("Client metric is not initialized")
  114. self._storage.store_client_metrics([self._current_client])
  115. def save_clients(self, client_metrics):
  116. self._storage.store_client_metrics(client_metrics)
  117. def set_task(self, task_id):
  118. if self._current_task is None:
  119. self.create_task(task_id, save=False)
  120. def set_round(self, round_id):
  121. self.create_round(round_id)
  122. def set_client_context(self, task_id, round_id, client_id, reset_client=True):
  123. """Set the client context for tracking.
  124. :param task_id: task id, indicating current the training task
  125. :param round_id: round id, indicating current round of training/testing
  126. :param client_id: client id
  127. :param reset_client: resets and creates a new client.
  128. """
  129. self.set_task(task_id)
  130. self.set_round(round_id)
  131. self.create_client(client_id, reset=reset_client)
  132. @property
  133. def unique_round_id(self):
  134. return f"{self._task_id}_{self._round_id}"
  135. def _diff_task_id(self, task_id):
  136. return task_id is not None and task_id != self._task_id
  137. def _diff_round_id(self, round_id):
  138. return round_id is not None and round_id != self._round_id
  139. def _diff_client_id(self, client_id):
  140. return client_id is not None and client_id != self._client_id
  141. def _check_context(self):
  142. if self._task_id is None or self._round_id is None:
  143. raise LookupError("task_id or round_id of the client is not set")
  144. _client = TrackingClient(init_store=False)
  145. """easyfl.tracking.TrackingClient: The global tracking client object"""
  146. def init_tracking(path=None, address=None, init_store=True):
  147. """Initialize tracking client. This tracking client is isolated from the global tracking client.
  148. This is useful when an application need to run multiple tasks.
  149. :param path: database path
  150. :param address: remote address of tracking service to connect to
  151. :param init_store: whether initialize storage
  152. """
  153. return TrackingClient(path, address, init_store)
  154. # ------ following methods are not well tested yet ------
  155. def setup_tracking(path=None, address=None):
  156. """Setup tracking with global tracking client.
  157. """
  158. global _client
  159. _client = init_tracking(path, address)
  160. def get_task(task_id):
  161. return _client.get_task_metric(task_id)
  162. def get_round(round_id, task_id):
  163. return _client.get_round_metric(round_id, task_id)
  164. def create_task(task_id, conf=None):
  165. _client.create_task(task_id, conf)
  166. def track_task(name, value, task_id=None):
  167. _client.track_task(name, value, task_id)
  168. def track_round(name, value, round_id=None, task_id=None):
  169. _client.track_round(name, value, round_id, task_id)
  170. def track_client(name, value, client_id=None):
  171. _client.track_client(name, value, client_id)
  172. def set_task(task_id):
  173. _client.set_task(task_id)
  174. def set_round(round_id):
  175. _client.set_round(round_id)
  176. def save_round():
  177. _client.save_round()