upload_file_to_mysql.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import argparse
  2. import pymysql
  3. from fate_arch import storage
  4. database_config = {
  5. 'user': 'root',
  6. 'passwd': 'fate_dev',
  7. 'host': '127.0.0.1',
  8. 'port': 3306
  9. }
  10. class MysqldbHelper(object):
  11. def __init__(self, host='', user='', passwd='', port='', database=''):
  12. self.host = host
  13. self.user = user
  14. self.password = passwd
  15. self.database = database
  16. self.port = port
  17. self.con = None
  18. self.cur = None
  19. try:
  20. print(host, user, passwd, port, database)
  21. self.con = pymysql.connect(host=self.host, user=self.user, passwd=self.password, port=self.port, db=self.database)
  22. self.cur = self.con.cursor()
  23. except:
  24. print("DataBase connect error,please check the db config.")
  25. def execute(self, sql):
  26. self.cur.execute(sql)
  27. self.cur.fetchall()
  28. def create_db(namespace):
  29. conn = pymysql.connect(host=database_config.get('host'),
  30. port=database_config.get('port'),
  31. user=database_config.get('user'),
  32. password=database_config.get('passwd'))
  33. cursor = conn.cursor()
  34. cursor.execute("create database if not exists {}".format(namespace))
  35. print('create db {} success'.format(namespace))
  36. cursor.close()
  37. def list_to_str(input_list):
  38. return ','.join(list(map(str, input_list)))
  39. def write_to_db(conf, table_name, file_name, namespace, partitions, head):
  40. db = MysqldbHelper(**conf)
  41. table_meta = storage.StorageTableMeta(name=table_name, namespace=namespace)
  42. create_table = 'create table {}(id varchar(50) NOT NULL, features LONGTEXT, PRIMARY KEY(id))'.format(table_name)
  43. db.execute(create_table.format(table_name))
  44. print('create table {}'.format(table_name))
  45. with open(file_name, 'r') as f:
  46. if head:
  47. data_head = f.readline()
  48. header_source_item = data_head.split(',')
  49. table_meta.update_metas(schema={'header': ','.join(header_source_item[1:]).strip(), 'sid': header_source_item[0]})
  50. n = 0
  51. count = 0
  52. while True:
  53. data = list()
  54. lines = f.readlines(12400)
  55. if lines:
  56. sql = 'REPLACE INTO {}(id, features) VALUES'.format(table_name)
  57. for line in lines:
  58. count += 1
  59. values = line.replace("\n", "").replace("\t", ",").split(",")
  60. data.append((values[0], list_to_str(values[1:])))
  61. sql += '("{}", "{}"),'.format(values[0], list_to_str(values[1:]))
  62. sql = ','.join(sql.split(',')[:-1]) + ';'
  63. if n == 0:
  64. table_meta.update_metas(part_of_data=data, partitions=partitions)
  65. n +=1
  66. db.execute(sql)
  67. db.con.commit()
  68. else:
  69. break
  70. table_meta.update_metas(count=count)
  71. if __name__ == "__main__":
  72. parser = argparse.ArgumentParser()
  73. parser.add_argument('-n', '--namespace', required=True, type=str, help="namespace")
  74. parser.add_argument('-t', '--table_name', required=True, type=str, help="table_name")
  75. parser.add_argument('-f', '--file_name', required=True, type=str, help="file_name")
  76. parser.add_argument('-p', '--partitions', required=True, type=int, help="partitions")
  77. parser.add_argument('-head', '--head', required=True, type=int, help="head")
  78. args = parser.parse_args()
  79. namespace = args.namespace
  80. table_name = args.table_name
  81. file_name = args.file_name
  82. partitions = args.partitions
  83. head = args.head
  84. create_db(namespace)
  85. database_config['database'] = namespace
  86. write_to_db(database_config, table_name, file_name, namespace, partitions=partitions, head=head)