Python对ElasticSearch获取数据及操作

使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下

Version

Python :2.7

ElasticSearch:6.3

代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
  @Time  : 2018/7/4
  @Author : LiuXueWen
  @Site  :
  @File  : ElasticSearchOperation.py
  @Software: PyCharm
  @Description: 对elasticsearch数据的操作,包括获取数据,发送数据
"""
import elasticsearch
import json

import Util_Ini_Operation

class elasticsearch_data():
  def __init__(self,hosts,username,password,maxsize,is_ssl):
    # 初始化ini操作脚本,获取配置文件
    try:
      # 判断请求方式是否ssl加密
      if is_ssl == "true":
        # 获取证书地址
        cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
        es_ssl = elasticsearch.Elasticsearch(
          # 地址
          hosts=hosts,
          # 用户名密码
          http_auth=(username,password),
          # 开启ssl
          use_ssl=True,
          # 确认有加密证书
          verify_certs=True,
          # 对应的加密证书地址
          client_cert=cert_pem
        )
        self.es = es_ssl
      elif is_ssl == "false":
        # 创建普通类型的ES客户端
        es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
        self.es = es_ordinary
    except Exception as e:
      print(e)

  def query_data(self,keywords_list,date):
    gte = "now-"+str(date)
    query_data = {
      # 查询语句
      "query": {
        "bool": {
          "must": [
            {
              "query_string": {
                "query": keywords_list,
                "analyze_wildcard": True
              }
            },
            {
              "range": {
                "@timestamp": {
                  "gte": gte,
                  "lte": "now",
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
    return query_data

  # 从es获取数据
  def get_datas_by_query(self,index_name,keywords,param,date):
    '''
    :param index_name: 索引名称
    :param keywords: 关键字词,数组
    :param param: 需要数据条件,例如_source
    :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m"
    :return: all_datas 返回查询到的所有数据(已经过param过滤)
    '''

    all_datas = []
    # 遍历所有的查询条件
    for keywords_list in keywords:
      # DSL语句
      query_data = self.query_data(keywords_list,date)
      res = self.es.search(
        index=index_name,
        body=query_data
      )
      for hit in res['hits']['hits']:
        # 获取指定的内容
        response = hit[param]
        # 添加所有数据到数据集中
        all_datas.append(response)
    # 返回所有数据内容
    return all_datas

  # 当索引不存在创建索引
  def create_index(self,index_name):
    '''
    :param index_name: 索引名称
    :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称
    '''
    # 获取索引的映射
    # index_mapping = IndexMapping.index_mapping
    # # 判断索引是否存在
    # if self.es.indices.exists(index=index_name) is not True:
    #   # 创建索引
    #   res = self.es.indices.create(index=index_name,body=index_mapping)
    #   # 返回结果
    #   return res
    # else:
    #   # 返回索引名称
    #   return index_name
    pass

  # 插入指定的单条数据内容
  def insert_single_data(self,index_name,doc_type,data):
    '''
    :param index_name: 索引名称
    :param doc_type: 文档类型
    :param data: 需要插入的数据内容
    :return: 执行结果
    '''
    res = self.es.index(index=index_name,doc_type=doc_type,body=data)
    return res

  # 向ES中新增数据,批量插入
  def insert_datas(self,index_name):
    '''
    :desc 通过读取指定的文件内容获取需要插入的数据集
    :param index_name: 索引名称
    :return: 插入成功的数据条数
    '''
    insert_datas = []
    # 判断插入数据的索引是否存在
    self.createIndex(index_name=index_name)
    # 获取插入数据的文件地址
    data_file_path = self.ini.get_key_value("datafile","datafilepath")
    # 获取需要插入的数据集
    with open(data_file_path,"r+") as data_file:
      # 获取文件所有数据
      data_lines = data_file.readlines()
      for data_line in data_lines:
        # string to json
        data_line = json.loads(data_line)
        insert_datas.append(data_line)
    # 批量处理
    res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
    return res

  # 从ES中在指定的索引中删除指定数据(根据id判断)
  def delete_data_by_id(self,index_name,doc_type,id):
    '''
    :param index_name: 索引名称
    :param index_type: 文档类型
    :param id: 唯一标识id
    :return: 删除结果信息
    '''
    res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
    return res

  # 根据条件删除数据
  def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
    '''
    :param index_name:索引名称,为空查询所有索引
    :param doc_type:文档类型,为空查询所有文档类型
    :param param:过滤条件值
    :param gt_time:时间范围,大于该时间
    :param lt_time:时间范围,小于该时间
    :return:执行条件删除后的结果信息
    '''
    # DSL语句
    query_data = {
      # 查询语句
      "query": {
        "bool": {
          "must": [
            {
              "query_string": {
                "query": param,
                "analyze_wildcard": True
              }
            },
            {
              "range": {
                "@timestamp": {
                  "gte": gt_time,
                  "lte": lt_time,
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
    res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
    return res

  # 指定index中删除指定时间段内的全部数据
  def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
    '''
    :param index_name:索引名称,为空查询所有索引
    :param doc_type:文档类型,为空查询所有文档类型
    :param gt_time:时间范围,大于该时间
    :param lt_time:时间范围,小于该时间
    :return:执行条件删除后的结果信息
    '''
    # DSL语句
    query_data = {
      # 查询语句
      "query": {
        "bool": {
          "must": [
            {
              "match_all": {}
            },
            {
              "range": {
                "@timestamp": {
                  "gte": gt_time,
                  "lte": lt_time,
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
    res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)
    return res

  # 修改ES中指定的数据
  def update_data_by_id(self,index_name,doc_type,id,data):
    '''
    :param index_name: 索引名称
    :param doc_type: 文档类型,为空表示所有类型
    :param id: 文档唯一标识编号
    :param data: 更新的数据
    :return: 更新结果信息
    '''
    res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
    return res

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Elasticsearch 基础介绍及索引原理分析

    前言 最近在参与一个基于Elasticsearch作为底层数据框架提供大数据量(亿级)的实时统计查询的方案设计工作,花了些时间学习Elasticsearch的基础理论知识,整理了一下,希望能对Elasticsearch感兴趣/想了解的同学有所帮助. 同时也希望有发现内容不正确或者有疑问的地方,望指明,一起探讨,学习,进步. 介绍 Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene(TM) 基础上的搜索引擎.当然 Elastics

  • 使用Python操作Elasticsearch数据索引的教程

    Elasticsearch是一个分布式.Restful的搜索及分析服务器,Apache Solr一样,它也是基于Lucence的索引服务器,但我认为Elasticsearch对比Solr的优点在于: 轻量级:安装启动方便,下载文件之后一条命令就可以启动: Schema free:可以向服务器提交任意结构的JSON对象,Solr中使用schema.xml指定了索引结构: 多索引文件支持:使用不同的index参数就能创建另一个索引文件,Solr中需要另行配置: 分布式:Solr Cloud的配置比较

  • ElasticSearch的完整安装教程

    ElasticSearch安装 下载ElasticSearch 官网地址: https://www.elastic.co/products/elasticsearch 本地下载:https://www.jb51.net/codes/579429.html 上传到ElasticSearch 可以使用第三方工具filezilla 解压elasticsearch-6.4.0.tar.gztar -zxvf elasticsearch-6.4.0.tar.gz [root@localhost elast

  • 详解spring-boot集成elasticsearch及其简单应用

    介绍 记录将elasticsearch集成到spring boot的过程,以及一些简单的应用和helper类使用. 接入方式 使用spring-boot中的spring-data-elasticsearch,可以使用两种内置客户端接入 1.节点客户端(node client): 配置文件中设置为local:false,节点客户端以无数据节点(node-master或node-client)身份加入集群,换言之,它自己不存储任何数据,但是它知道数据在集群中的具体位置,并且能够直接转发请求到对应的节

  • Spring Boot集成ElasticSearch实现搜索引擎的示例

    Elastic Search是一个开源的,分布式,实时搜索和分析引擎.Spring Boot为Elasticsearch及Spring Data Elasticsearch提供的基于它的抽象提供了基本的配置.Spring Boot提供了一个用于聚集依赖的spring-boot-starter-data-elasticsearch 'StarterPOM'. ElasticSearch作为搜索引擎,我们需要解决2大问题: 1,  如何将被搜索的数据在ES上创建反向索引 2,  Java代码如何与E

  • Python-ElasticSearch搜索查询的讲解

    Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上. Lucene 可能是目前存在的,不论开源还是私有的,拥有最先进,高性能和全功能搜索引擎功能的库.但是 Lucene 仅仅只是一个库.为了利用它,你需要编写 Java 程序,并在你的 java 程序里面直接集成 Lucene 包. 更坏的情况是,你需要对信息检索有一定程度的理解才能明白 Lucene 是怎么工作的.Lucene 是 很 复杂的. 在上一篇文章中介绍了ElasticS

  • JAVA使用ElasticSearch查询in和not in的实现方式

    ElasticSearch Elasticsearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便. 最近用到ES查询,因用的是Java写的,需要实现一个需求:过滤一部分id,查询时不需要查出来. 既然需要不包含,那么首先需要实现包含的方式(精确完

  • Python对ElasticSearch获取数据及操作

    使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下 Version Python :2.7 ElasticSearch:6.3 代码: #!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2018/7/4 @Author : LiuXueWen @Site : @File : ElasticSearchOperation.py @Software: PyCharm @Descri

  • 使用python实现多维数据降维操作

    一,首先介绍下多维列表的降维 def flatten(a): for each in a: if not isinstance(each,list): yield each else: yield from flatten(each) if __name__ == "__main__": a = [[1,2],[3,[4,5]],6] print(list(flatten(a))) 二.这种降维方法同样适用于多维迭代器的降维 from collections import Iterab

  • python爬虫分布式获取数据的实例方法

    在我们进行卫生大扫除的时候,因为工作任务较多,所以我们会进行分工,每个人负责不同的打扫项目.同样分工合作的理念,在python分布式爬虫中也得到了应用.我们需要给不同的爬虫分配指令,让它们去分头行动获取同一个网站的数据.那么这些爬虫是怎么分工搜集数据的呢?感兴趣的小伙伴,我们可以通过下面的示例进行解惑. 假设我有三台爬虫服务器A.B和C.我想让我所有的账号登录任务分散到三台服务器.让用户抓取在A和B上执行,让粉丝和关注抓取在C上执行,那么启动A.B.C三个服务器的celery worker的命令

  • python实现对excel进行数据剔除操作实例

    前言 学习Python的过程中,我们会遇到Excel的各种问题.下面这篇文章主要给大家介绍了关于python对excel进行数据剔除操作的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. Python解析Excel时需要安装两个包,分别是xlrd(读excel)和xlwt(写excel),安装方法如下: pip install xlrd pip install xlwt 需求分析: 判断excel2表中的某个唯一字段是否满足条件,如果满足条件,就在excel1中进行查询

  • python数据库操作常用功能使用详解(创建表/插入数据/获取数据)

    实例1.取得MYSQL版本 复制代码 代码如下: # -*- coding: UTF-8 -*-#安装MYSQL DB for pythonimport MySQLdb as mdbcon = Nonetry:    #连接mysql的方法:connect('ip','user','password','dbname')    con = mdb.connect('localhost', 'root',        'root', 'test');    #所有的查询,都在连接con的一个模块

  • Python中elasticsearch插入和更新数据的实现方法

    首先,我的索引结构是酱紫的. 存储以name_id为主键的索引,待插入或更新数据为: 一般会有有两种操作: 以下图片为个人见解,我没试过能不能直接运行,但形式上没错. 数据不存在,我需要插入地址为空字符串. 单条插入: 批量插入: 该数据存在,我需要更新地址字段为空字符串. 单条更新: 批量更新: 总结 以上所述是小编给大家介绍的Python中elasticsearch插入和更新数据的实现方法,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的! 您可能感兴趣的文章: 使用

  • 用python简单实现mysql数据同步到ElasticSearch的教程

    之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个 思路: 网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是

  • Python爬虫之获取心知天气API实时天气数据并弹窗提醒

    一.心知天气API密钥获取 首先,访问https://www.seniverse.com,进行登录或者注册操作,然后在控制台上创建一个免费版的项目,创建后,项目信息为: 复制你的私钥信息,来到官方文档,发现API地址为: 在文档中,可以看到查询的API网址,接下来,把url中的参数key改为你的密钥,访问,返回的json数据: 二.编写代码 代码如下: import requests from plyer import notification """ plyer是用来显示弹

  • Python获取时间的操作示例详解

    目录 获得当前时间时间戳 获取当前时间 获取昨天日期 生成日历 计算每个月天数 计算3天前并转换为指定格式 获取时间戳的旧时间 获取时间并指定格式 pandas 每日一练 21读取本地EXCEL数据 22查看df数据前5行 23将popularity列数据转换为最大值与最小值的平均值 24将数据根据project进行分组并计算平均分 25将test_time列具体时间拆分为两部分(一半日期,一半时间) 获得当前时间时间戳 # 注意时区的设置 import time # 获得当前时间时间戳 now

  • Python常见数据类型转换操作示例

    本文实例讲述了Python常见数据类型转换操作.分享给大家供大家参考,具体如下: 类型转换 主要针对几种存储工具:list.tuple.dict.set 特殊之处:dict是用来存储键值对的. 1.list 转换为set l1 = [1, 2, 4, 5] s1 = set(l1) print(type(s1)) print(s1) 输出: <class 'set'> {1, 2, 4, 5} 2.set转换为list s1 = set([1, 2, 3, 4]) l1 = list(s1)

随机推荐