使用ES的bulk接口导入批量数据(python进行格式化,curl提交)
发布时间:2020-05-25 00:00:09 所属栏目:Python 来源:互联网
导读:使用ES的bulk接口导入批量数据(python进行格式化,curl提交)
|
下面是脚本之家 jb51.cc 通过网络收集整理的代码片段。 脚本之家小编现在分享给大家,也给大家做个参考。 #!/bin/env python
# -*- coding: utf-8 -*-
'''
修改:2015/9/25 ver.2
原因:ver.1 要使用elasticsearch的官方库,不方便,这版使用bulk接口,curl提交
修改:2015/9/30 ver.3
原因:封装成class,方便调用
'''
import sys
import os
from optparse import OptionParser
from datetime import datetime
import subprocess as sub
import json
class loadDataToES:
def __init__(self,field_desc,data_file,host='127.0.0.1',port='9200',index='test',doctype='others',delimeter=',',tmp_file='/dev/shm/_tmp_data_to_es',cut_off=10000):
self.host = host
self.port = port
self.index = index
self.doctype = doctype
self.delimeter = delimeter
self.tmp_file = tmp_file
self.field_desc = field_desc
self.data_file = data_file
self.header = '{"index":{"_index":"%s","_type":"%s"}}' %(self.index,self.doctype)
self.cut_off = cut_off
self.url = 'http://%s:%s/_bulk' %(self.host,self.port)
def load_data(self):
'''
expample data from the file:
2015-09-24 09:17:29,memory_11601,123988
'''
self.body_list = []
self.bulk = ''
self.line_num = 0
self.pretty_print('INFO: loadding data to es,host: %s,index: %s' %(self.host,self.index))
self.parse_field()
with open(self.data_file,'r') as f_desc:
for line in f_desc:
self.do_line(line)
self.line_num += 1
if self.line_num >= self.cut_off:
self.bulk_content = 'n'.join(self.body_list)
self._load_data()
self.body_list = []
self.bulk = ''
self.line_num = 0
if self.line_num > 0:
self.bulk_content = 'n'.join(self.body_list)
self._load_data()
self.pretty_print('INFO: all lines parsed.')
def do_line(self,line):
fields = line.strip().split(self.delimeter)
if len(fields) != self.field_len:
self.pretty_print("ERROR: line %d not match fields" % line_num)
return
body_tmp = str(self.get_body(fields,self.fields_list))
self.body_list.append(self.header)
self.body_list.append(body_tmp.replace("'",'"'))
def parse_field(self):
fields_list = []
fields_desc = self.field_desc.strip().split(self.delimeter)
for item in fields_desc:
items = item.split('|')
fields_list.append([items[0],items[1]])
self.fields_list = fields_list
self.field_len = len(fields_list)
def _load_data(self):
open(self.tmp_file,'w').write(self.bulk_content)
p = sub.Popen(['curl','-s','-XPOST',self.url,'--data-binary',"@" + self.tmp_file ],stdout=sub.PIPE)
for line in iter(p.stdout.readline,b''):
ret_dict = json.loads(line)
if not ret_dict['errors']:
self.pretty_print("INFO: %6s lines parseed with no errors,total cost %d ms." %(len(ret_dict['items']),ret_dict['took']))
else:
self.pretty_print("ERROR: %6s lines parseed with some errors,ret_dict['took']))
def pretty_print(self,str):
print('%s %s' %(datetime.now(),str))
def get_body(self,fields,fields_list):
counter = 0
body = {}
while (counter < len(fields)):
# if the data type is 'date',we will translate the value str to date
# type
if fields_list[counter][1] == 'date':
body[fields_list[counter][0]] = self.translate_str_to_date(
fields[counter])
# and if the data type is 'int',we translate it to int
elif fields_list[counter][1] == 'int':
body[fields_list[counter][0]] = self.translate_str_to_int(
fields[counter])
elif fields_list[counter][1] == 'float':
body[fields_list[counter][0]] = self.translate_str_to_float(
fields[counter])
# other is defalut to be str
else:
body[fields_list[counter][0]] = fields[counter]
counter += 1
# print(my_body)
return body
def translate_str_to_date(self,date_str):
try:
date = datetime.strptime(date_str,'%Y-%m-%d %H:%M:%S')
return date.isoformat()
except:
self.pretty_print("Unexpected error: %s" % (sys.exc_info()[0]))
self.pretty_print("Failed to translate '%s' to date." % (date_str))
return False
def translate_str_to_int(self,num_str):
try:
return int(num_str)
except:
self.pretty_print("Failed to translate '%s' to int." % (num_str))
return False
def translate_str_to_float(self,num_str):
try:
return float(num_str)
except:
self.pretty_print("Failed to translate '%s' to int." % (num_str))
return False
if __name__ == '__main__':
'''
example fields_desc:@timestamp|date,process|str,mem|int
example lines in file:
2015-09-24 09:17:29,203532
2015-09-24 09:17:29,memory_11602,223112
'''
loader = loadDataToES(field_desc='@timestamp|date,mem|int',data_file='/root/scripts/in.data',host='10.21.102.60',index = 'test')
loader.load_data()
以上是脚本之家(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。 如果觉得脚本之家网站内容还不错,欢迎将脚本之家网站推荐给程序员好友。 (编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
