upload_utils.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import argparse
  17. import uuid
  18. from fate_arch import storage
  19. from fate_arch.session import Session
  20. from fate_arch.storage import StorageEngine, EggRollStoreType, StorageTableOrigin
  21. from fate_flow.utils import data_utils
  22. class UploadFile(object):
  23. @classmethod
  24. def run(cls):
  25. parser = argparse.ArgumentParser()
  26. parser.add_argument('--session_id', required=True, type=str, help="session id")
  27. parser.add_argument('--storage', help="storage engine", type=str)
  28. parser.add_argument('--file', required=True, type=str, help="file path")
  29. parser.add_argument('--namespace', required=True, type=str, help="namespace")
  30. parser.add_argument('--name', required=True, type=str, help="name")
  31. parser.add_argument('--partitions', required=True, type=int, help="partitions")
  32. args = parser.parse_args()
  33. session_id = args.session_id
  34. with Session(session_id=session_id) as sess:
  35. storage_session = sess.storage(
  36. storage_engine=args.storage
  37. )
  38. if args.storage in {StorageEngine.EGGROLL, StorageEngine.STANDALONE}:
  39. upload_address = {
  40. "name": args.name,
  41. "namespace": args.namespace,
  42. "storage_type": EggRollStoreType.ROLLPAIR_LMDB,
  43. }
  44. address = storage.StorageTableMeta.create_address(
  45. storage_engine=args.storage, address_dict=upload_address
  46. )
  47. table = storage_session.create_table(address=address, name=args.name, namespace=args.namespace, partitions=args.partitions, origin=StorageTableOrigin.UPLOAD)
  48. cls.upload(args.file, False, table=table)
  49. @classmethod
  50. def upload(cls, input_file, head, table=None, id_delimiter=",", extend_sid=False):
  51. with open(input_file, "r") as fin:
  52. if head is True:
  53. data_head = fin.readline()
  54. _, meta = table.meta.update_metas(
  55. schema=data_utils.get_header_schema(
  56. header_line=data_head,
  57. id_delimiter=id_delimiter
  58. )
  59. )
  60. table.meta = meta
  61. fate_uuid = uuid.uuid1().hex
  62. get_line = cls.get_line(extend_sid)
  63. line_index = 0
  64. n = 0
  65. while True:
  66. data = list()
  67. lines = fin.readlines(1024 * 1024 * 8 * 500)
  68. if lines:
  69. # self.append_data_line(lines, data, n)
  70. for line in lines:
  71. values = line.rstrip().split(',')
  72. k, v = get_line(
  73. values=values,
  74. line_index=line_index,
  75. extend_sid=extend_sid,
  76. auto_increasing_sid=False,
  77. id_delimiter=id_delimiter,
  78. fate_uuid=fate_uuid
  79. )
  80. data.append((k, v))
  81. line_index += 1
  82. table.put_all(data)
  83. if n == 0:
  84. table.meta.update_metas(part_of_data=data[:100])
  85. n += 1
  86. else:
  87. return line_index
  88. @classmethod
  89. def get_line(self, extend_sid=False):
  90. if extend_sid:
  91. line = data_utils.get_sid_data_line
  92. else:
  93. line = data_utils.get_data_line
  94. return line
  95. if __name__ == '__main__':
  96. UploadFile.run()