本文共 1988 字,大约阅读时间需要 6 分钟。
Elasticsearch,一款非常优秀的分布式搜索程序,下面是一些简单的应用方法的封装,希望对大家有所帮助
#!/usr/bin/env python
#encoding:utf-8from elasticsearch import Elasticsearchimport osimport configparser as cparserfrom builtins import intfrom framework.logger import Loggerfrom utils.rcpUtils import rcpUtils'''========读取config.ini文件中redis配置========'''
base_dir = str(os.path.dirname(os.path.dirname(file)))file_path = base_dir + "\config\config.ini"
cf = cparser.ConfigParser()cf.read(file_path)host = cf.get("uatesconf", "host")port = cf.get("uatesconf", 'port')logger = Logger(logger="esUtils").getlog()'''===========封装es基本操作============='''
rcp = rcpUtils()
class esUtils:
def __init__(self): '''初始化,获得es的连接''' try: self.es = Elasticsearch([{'host':host,'port':port}]) logger.info('connect to es sevrver %s:%s successfully!'%(host,port)) except BaseException as e: logger.exception("Failed to get the es Connection,please check the configuration:%s" %e)def isClusterUP(self): '''检测服务是否起来''' return self.es.ping()def getServerInfo(self): '''获取服务的一些基本信息,如版本号,集群名称等''' return self.es.info()def isExist_table(self,table): '''判断表(索引)是否在es中存在''' if self.es.indices.exists(index=table) is True: return True else: return Falsedef getESInfo(self,index,_id): '''精确查询es表中的数据,返回的字典,用于风控检查接口发送后,到es的订单表等场景''' logger.info('Search %s contains %s.' %(index,_id)) params = { 'query': { 'match': { '_id': _id } } } _searched = self.es.search(index=index, doc_type=index, body=params) return _searched['hits']['hits'][0]['_source']
if name == 'main':
es = esUtils()index = 't_order'order_id = '8019079610268001'#system_id = '7b6a99f3bce14915863cde5104bdf2c3'system_id = '7ecedd704a124a91a0d46ca13bda87c1'_id = rcp.get_id(system_id, order_id)print(es.getESInfo(index, _id))
转载于:https://blog.51cto.com/11565901/2050276