_table.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import time
  17. import requests
  18. from fate_arch.storage import StorageEngine, LinkisHiveStoreType
  19. from fate_arch.storage import StorageTableBase
  20. from fate_arch.storage.linkis_hive._settings import (
  21. Token_Code,
  22. Token_User,
  23. STATUS_URI,
  24. EXECUTE_URI,
  25. )
  26. class StorageTable(StorageTableBase):
  27. def __init__(
  28. self,
  29. address=None,
  30. name: str = None,
  31. namespace: str = None,
  32. partitions: int = 1,
  33. storage_type: LinkisHiveStoreType = LinkisHiveStoreType.DEFAULT,
  34. options=None,
  35. ):
  36. super(StorageTable, self).__init__(
  37. name=name,
  38. namespace=namespace,
  39. address=address,
  40. partitions=partitions,
  41. options=options,
  42. engine=StorageEngine.LINKIS_HIVE,
  43. store_type=storage_type,
  44. )
  45. def _count(self, **kwargs):
  46. sql = "select count(*) from {}".format(self._address.name)
  47. try:
  48. count = self.execute(sql)
  49. except BaseException:
  50. count = 0
  51. return count
  52. def _collect(self, **kwargs):
  53. if kwargs.get("is_spark"):
  54. from pyspark.sql import SparkSession
  55. session = SparkSession.builder.enableHiveSupport().getOrCreate()
  56. data = session.sql(
  57. f"select * from {self._address.database}.{self._address.name}"
  58. )
  59. return data
  60. else:
  61. sql = "select * from {}.{}".format(
  62. self._address.database, self._address.name
  63. )
  64. data = self.execute(sql)
  65. for i in data:
  66. yield i[0], self.meta.get_id_delimiter().join(list(i[1:]))
  67. def _put_all(self, kv_pd, **kwargs):
  68. from pyspark.sql import SparkSession
  69. session = SparkSession.builder.enableHiveSupport().getOrCreate()
  70. session.sql("use {}".format(self._address.database))
  71. spark_df = session.createDataFrame(kv_pd)
  72. spark_df.write.saveAsTable(self._address.name, format="orc")
  73. def _destroy(self):
  74. sql = "drop table {}.{}".format(self._address.database, self._address.name)
  75. return self.execute(sql)
  76. def _save_as(self, address, name, namespace, partitions, **kwargs):
  77. pass
  78. def execute(self, sql):
  79. exec_id = self._execute_entrance(sql)
  80. while True:
  81. status = self._status_entrance(exec_id)
  82. if status:
  83. break
  84. time.sleep(1)
  85. return self._result_entrance()
  86. def _execute_entrance(self, sql):
  87. execute_url = f"http://{self._address.host}:{self._address.port}{EXECUTE_URI}"
  88. data = {
  89. "method": EXECUTE_URI,
  90. "params": self._address.params,
  91. "executeApplicationName": self._address.execute_application_name,
  92. "executionCode": sql,
  93. "runType": self._address.run_type,
  94. "source": self._address.source,
  95. }
  96. # token
  97. headers = {
  98. "Token-Code": Token_Code,
  99. "Token-User": Token_User,
  100. "Content-Type": "application/json",
  101. }
  102. execute_response = requests.post(url=execute_url, headers=headers, json=data)
  103. if execute_response.json().get("status") == 0:
  104. return execute_response.json()["data"]["execID"]
  105. else:
  106. raise SystemError(
  107. f"request linkis hive execue entrance failed, status: {execute_response.json().get('status')},"
  108. f" message: {execute_response.json().get('message')}"
  109. )
  110. def _status_entrance(self, exec_id):
  111. execute_url = (
  112. f"http://{self._address.host}:{self._address.port}{STATUS_URI}".replace(
  113. "exec_id", exec_id
  114. )
  115. )
  116. headers = {
  117. "Token-Code": "MLSS",
  118. "Token-User": "alexwu",
  119. "Content-Type": "application/json",
  120. }
  121. execute_response = requests.Session().get(url=execute_url, headers=headers)
  122. if execute_response.json().get("status") == 0:
  123. execute_status = execute_response.json()["data"]["status"]
  124. if execute_status == "Success":
  125. return True
  126. elif execute_status == "Failed":
  127. raise Exception(
  128. f"request linkis hive status entrance failed, status: {execute_status}"
  129. )
  130. else:
  131. return False
  132. else:
  133. raise SystemError(
  134. f"request linkis hive status entrance failed, status: {execute_response.json().get('status')},"
  135. f" message: {execute_response.json().get('message')}"
  136. )
  137. def _result_entrance(self):
  138. pass