_computing.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. """
  17. distributed computing
  18. """
  19. import abc
  20. import typing
  21. from abc import ABCMeta
  22. from collections import Iterable
  23. from fate_arch.abc._address import AddressABC
  24. from fate_arch.abc._path import PathABC
  25. __all__ = ["CTableABC", "CSessionABC"]
  26. # noinspection PyPep8Naming
  27. class CTableABC(metaclass=ABCMeta):
  28. """
  29. a table of pair-like data supports distributed processing
  30. """
  31. @property
  32. @abc.abstractmethod
  33. def engine(self):
  34. """
  35. get the engine name of table
  36. Returns
  37. -------
  38. int
  39. number of partitions
  40. """
  41. ...
  42. @property
  43. @abc.abstractmethod
  44. def partitions(self):
  45. """
  46. get the partitions of table
  47. Returns
  48. -------
  49. int
  50. number of partitions
  51. """
  52. ...
  53. @abc.abstractmethod
  54. def copy(self):
  55. ...
  56. @abc.abstractmethod
  57. def save(self, address: AddressABC, partitions: int, schema: dict, **kwargs):
  58. """
  59. save table
  60. Parameters
  61. ----------
  62. address: AddressABC
  63. address to save table to
  64. partitions: int
  65. number of partitions to save as
  66. schema: dict
  67. table schema
  68. """
  69. ...
  70. @abc.abstractmethod
  71. def collect(self, **kwargs) -> typing.Generator:
  72. """
  73. collect data from table
  74. Returns
  75. -------
  76. generator
  77. generator of data
  78. Notes
  79. ------
  80. no order guarantee
  81. """
  82. ...
  83. @abc.abstractmethod
  84. def take(self, n=1, **kwargs):
  85. """
  86. take ``n`` data from table
  87. Parameters
  88. ----------
  89. n: int
  90. number of data to take
  91. Returns
  92. -------
  93. list
  94. a list of ``n`` data
  95. Notes
  96. ------
  97. no order guarantee
  98. """
  99. ...
  100. @abc.abstractmethod
  101. def first(self, **kwargs):
  102. """
  103. take one data from table
  104. Returns
  105. -------
  106. object
  107. a data from table
  108. Notes
  109. -------
  110. no order guarantee
  111. """
  112. ...
  113. @abc.abstractmethod
  114. def count(self) -> int:
  115. """
  116. number of data in table
  117. Returns
  118. -------
  119. int
  120. number of data
  121. """
  122. ...
  123. @abc.abstractmethod
  124. def map(self, func) -> 'CTableABC':
  125. """
  126. apply `func` to each data
  127. Parameters
  128. ----------
  129. func: ``typing.Callable[[object, object], typing.Tuple[object, object]]``
  130. function map (k1, v1) to (k2, v2)
  131. Returns
  132. -------
  133. CTableABC
  134. A new table
  135. Examples
  136. --------
  137. >>> from fate_arch.session import computing_session
  138. >>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
  139. >>> b = a.map(lambda k, v: (k, v**2))
  140. >>> list(b.collect())
  141. [("k1", 1), ("k2", 4), ("k3", 9)]
  142. """
  143. ...
  144. @abc.abstractmethod
  145. def mapValues(self, func):
  146. """
  147. apply `func` to each value of data
  148. Parameters
  149. ----------
  150. func: ``typing.Callable[[object], object]``
  151. map v1 to v2
  152. Returns
  153. -------
  154. CTableABC
  155. A new table
  156. Examples
  157. --------
  158. >>> from fate_arch.session import computing_session
  159. >>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
  160. >>> b = a.mapValues(lambda x: len(x))
  161. >>> list(b.collect())
  162. [('a', 3), ('b', 1)]
  163. """
  164. ...
  165. @abc.abstractmethod
  166. def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
  167. """
  168. apply ``func`` to each partition of table
  169. Parameters
  170. ----------
  171. func: ``typing.Callable[[iter], list]``
  172. accept an iterator of pair, return a list of pair
  173. use_previous_behavior: bool
  174. this parameter is provided for compatible reason, if set True, call this func will call ``applyPartitions`` instead
  175. preserves_partitioning: bool
  176. flag indicate whether the `func` will preserve partition
  177. Returns
  178. -------
  179. CTableABC
  180. a new table
  181. Examples
  182. --------
  183. >>> from fate_arch.session import computing_session
  184. >>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
  185. >>> def f(iterator):
  186. ... s = 0
  187. ... for k, v in iterator:
  188. ... s += v
  189. ... return [(s, s)]
  190. ...
  191. >>> b = a.mapPartitions(f)
  192. >>> list(b.collect())
  193. [(6, 6), (9, 9)]
  194. """
  195. ...
  196. @abc.abstractmethod
  197. def mapReducePartitions(self, mapper, reducer, **kwargs):
  198. """
  199. apply ``mapper`` to each partition of table and then perform reduce by key operation with `reducer`
  200. Parameters
  201. ----------
  202. mapper: ``typing.Callable[[iter], list]``
  203. accept an iterator of pair, return a list of pair
  204. reducer: ``typing.Callable[[object, object], object]``
  205. reduce v1, v2 to v3
  206. Returns
  207. -------
  208. CTableABC
  209. a new table
  210. Examples
  211. --------
  212. >>> from fate_arch.session import computing_session
  213. >>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
  214. >>> def _mapper(it):
  215. ... r = []
  216. ... for k, v in it:
  217. ... r.append((k % 3, v**2))
  218. ... r.append((k % 2, v ** 3))
  219. ... return r
  220. >>> def _reducer(a, b):
  221. ... return a + b
  222. >>> output = table.mapReducePartitions(_mapper, _reducer)
  223. >>> collected = dict(output.collect())
  224. >>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
  225. >>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
  226. >>> assert collected[2] == 3 ** 2
  227. """
  228. ...
  229. def applyPartitions(self, func):
  230. """
  231. apply ``func`` to each partitions as a single object
  232. Parameters
  233. ----------
  234. func: ``typing.Callable[[iter], object]``
  235. accept a iterator, return a object
  236. Returns
  237. -------
  238. CTableABC
  239. a new table, with each partition contains a single key-value pair
  240. Examples
  241. --------
  242. >>> from fate_arch.session import computing_session
  243. >>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
  244. >>> def f(it):
  245. ... r = []
  246. ... for k, v in it:
  247. ... r.append(v, v**2, v**3)
  248. ... return r
  249. >>> output = a.applyPartitions(f)
  250. >>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
  251. """
  252. ...
  253. @abc.abstractmethod
  254. def mapPartitionsWithIndex(self, func, preserves_partitioning=False):
  255. ...
  256. @abc.abstractmethod
  257. def flatMap(self, func):
  258. """
  259. apply a flat ``func`` to each data of table
  260. Parameters
  261. ----------
  262. func: ``typing.Callable[[object, object], typing.List[object, object]]``
  263. a flat function accept two parameters return a list of pair
  264. Returns
  265. -------
  266. CTableABC
  267. a new table
  268. Examples
  269. --------
  270. >>> from fate_arch.session import computing_session
  271. >>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
  272. >>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
  273. >>> c = list(b.collect())
  274. >>> assert len(c) = 4
  275. >>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
  276. """
  277. ...
  278. @abc.abstractmethod
  279. def reduce(self, func):
  280. """
  281. reduces all value in pair of table by a binary function `func`
  282. Parameters
  283. ----------
  284. func: typing.Callable[[object, object], object]
  285. binary function reduce two value into one
  286. Returns
  287. -------
  288. object
  289. a single object
  290. Examples
  291. --------
  292. >>> from fate_arch.session import computing_session
  293. >>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
  294. >>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
  295. Notes
  296. ------
  297. `func` should be associative
  298. """
  299. ...
  300. @abc.abstractmethod
  301. def glom(self):
  302. """
  303. coalesces all data within partition into a list
  304. Returns
  305. -------
  306. list
  307. list containing all coalesced partition and its elements.
  308. First element of each tuple is chosen from key of last element of each partition.
  309. Examples
  310. --------
  311. >>> from fate_arch.session import computing_session
  312. >>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
  313. >>> list(a)
  314. [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
  315. """
  316. ...
  317. @abc.abstractmethod
  318. def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
  319. """
  320. return a sampled subset of this Table.
  321. Parameters
  322. ----------
  323. fraction: float
  324. Expected size of the sample as a fraction of this table's size
  325. without replacement: probability that each element is chosen.
  326. Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
  327. num: int
  328. Exact number of the sample from this table's size
  329. seed: int
  330. Seed of the random number generator. Use current timestamp when `None` is passed.
  331. Returns
  332. -------
  333. CTableABC
  334. a new table
  335. Examples
  336. --------
  337. >>> from fate_arch.session import computing_session
  338. >>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
  339. >>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
  340. True
  341. Notes
  342. -------
  343. use one of ``fraction`` and ``num``, not both
  344. """
  345. ...
  346. @abc.abstractmethod
  347. def filter(self, func):
  348. """
  349. returns a new table containing only those keys which satisfy a predicate passed in via ``func``.
  350. Parameters
  351. ----------
  352. func: typing.Callable[[object, object], bool]
  353. Predicate function returning a boolean.
  354. Returns
  355. -------
  356. CTableABC
  357. A new table containing results.
  358. Examples
  359. --------
  360. >>> from fate_arch.session import computing_session
  361. >>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
  362. >>> b = a.filter(lambda k, v : k % 2 == 0)
  363. >>> list(b.collect())
  364. [(0, 0), (2, 2)]
  365. >>> c = a.filter(lambda k, v : v % 2 != 0)
  366. >>> list(c.collect())
  367. [(1, 1)]
  368. """
  369. ...
  370. @abc.abstractmethod
  371. def join(self, other, func):
  372. """
  373. returns intersection of this table and the other table.
  374. function ``func`` will be applied to values of keys that exist in both table.
  375. Parameters
  376. ----------
  377. other: CTableABC
  378. another table to be operated with.
  379. func: ``typing.Callable[[object, object], object]``
  380. the function applying to values whose key exists in both tables.
  381. default using left table's value.
  382. Returns
  383. -------
  384. CTableABC
  385. a new table
  386. Examples
  387. --------
  388. >>> from fate_arch.session import computing_session
  389. >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)]
  390. >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
  391. >>> c = a.join(b, lambda v1, v2 : v1 + v2)
  392. >>> list(c.collect())
  393. [(1, 3), (2, 5)]
  394. """
  395. ...
  396. @abc.abstractmethod
  397. def union(self, other, func=lambda v1, v2: v1):
  398. """
  399. returns union of this table and the other table.
  400. function ``func`` will be applied to values of keys that exist in both table.
  401. Parameters
  402. ----------
  403. other: CTableABC
  404. another table to be operated with.
  405. func: ``typing.Callable[[object, object], object]``
  406. The function applying to values whose key exists in both tables.
  407. default using left table's value.
  408. Returns
  409. -------
  410. CTableABC
  411. a new table
  412. Examples
  413. --------
  414. >>> from fate_arch.session import computing_session
  415. >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)]
  416. >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
  417. >>> c = a.union(b, lambda v1, v2 : v1 + v2)
  418. >>> list(c.collect())
  419. [(0, 1), (1, 3), (2, 5), (3, 3)]
  420. """
  421. ...
  422. @abc.abstractmethod
  423. def subtractByKey(self, other):
  424. """
  425. returns a new table containing elements only in this table but not in the other table.
  426. Parameters
  427. ----------
  428. other: CTableABC
  429. Another table to be subtractbykey with.
  430. Returns
  431. -------
  432. CTableABC
  433. A new table
  434. Examples
  435. --------
  436. >>> from fate_arch.session import computing_session
  437. >>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
  438. >>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
  439. >>> c = a.subtractByKey(b)
  440. >>> list(c.collect())
  441. [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
  442. """
  443. ...
  444. @property
  445. def schema(self):
  446. if not hasattr(self, "_schema"):
  447. setattr(self, "_schema", {})
  448. return getattr(self, "_schema")
  449. @schema.setter
  450. def schema(self, value):
  451. setattr(self, "_schema", value)
  452. class CSessionABC(metaclass=ABCMeta):
  453. """
  454. computing session to load/create/clean tables
  455. """
  456. @abc.abstractmethod
  457. def load(self, address: AddressABC, partitions, schema: dict, **kwargs) -> typing.Union[PathABC, CTableABC]:
  458. """
  459. load a table from given address
  460. Parameters
  461. ----------
  462. address: AddressABC
  463. address to load table from
  464. partitions: int
  465. number of partitions of loaded table
  466. schema: dict
  467. schema associate with this table
  468. Returns
  469. -------
  470. CTableABC
  471. a table in memory
  472. """
  473. ...
  474. @abc.abstractmethod
  475. def parallelize(self, data: Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
  476. """
  477. create table from iterable data
  478. Parameters
  479. ----------
  480. data: Iterable
  481. data to create table from
  482. partition: int
  483. number of partitions of created table
  484. include_key: bool
  485. ``True`` for create table directly from data, ``False`` for create table with generated keys start from 0
  486. Returns
  487. -------
  488. CTableABC
  489. a table create from data
  490. """
  491. pass
  492. @abc.abstractmethod
  493. def cleanup(self, name, namespace):
  494. """
  495. delete table(s)
  496. Parameters
  497. ----------
  498. name: str
  499. table name or wildcard character
  500. namespace: str
  501. namespace
  502. """
  503. @abc.abstractmethod
  504. def destroy(self):
  505. pass
  506. @abc.abstractmethod
  507. def stop(self):
  508. pass
  509. @abc.abstractmethod
  510. def kill(self):
  511. pass
  512. @property
  513. @abc.abstractmethod
  514. def session_id(self) -> str:
  515. """
  516. get computing session id
  517. Returns
  518. -------
  519. str
  520. computing session id
  521. """
  522. ...