使用ElasticSearch6.0快速实现全文搜索功能的示例代码

本文不涉及ElasticSearch具体原理,只记录如何快速的导入mysql中的数据进行全文检索。

工作中需要实现一个搜索功能,并且导入现有数据库数据,组长推荐用ElasticSearch实现,网上翻一通教程,都是比较古老的文章了,无奈只能自己摸索,参考ES的文档,总算是把服务搭起来了,记录下,希望有同样需求的朋友可以少走弯路,能按照这篇教程快速的搭建一个可用的ElasticSearch服务。

ES的搭建

ES搭建有直接下载zip文件,也有docker容器的方式,相对来说,docker更适合我们跑ES服务。可以方便的搭建集群或建立测试环境。这里使用的也是容器方式,首先我们需要一份Dockerfile:

FROM docker.elastic.co/elasticsearch/elasticsearch-oss:6.0.0
# 提交配置 包括新的elasticsearch.yml 和 keystore.jks文件
COPY --chown=elasticsearch:elasticsearch conf/ /usr/share/elasticsearch/config/
# 安装ik
RUN ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.0.0/elasticsearch-analysis-ik-6.0.0.zip
# 安装readonlyrest
RUN ./bin/elasticsearch-plugin install https://github.com/HYY-yu/BezierCurveDemo/raw/master/readonlyrest-1.16.14_es6.0.0.zip

USER elasticsearch
CMD ./bin/elasticsearch

这里对上面的操作做一下说明:

  1. 首先在Dockerfile下的同级目录中需要建立一个conf文件夹,保存elasticsearch.yml文件(稍后给出)和keystore.jks。(jks是自签名文件,用于https,如何生成请自行搜索)
  2. ik是一款很流行的中文分词库,使用它来支持中文搜索。
  3. readonlyrest是一款开源的ES插件,用于用户管理、安全验证,土豪可以使用ES自带的X-pack包,有更完善的安全功能。

elactic配置 elasticsearch.yml

cluster.name: "docker-cluster"
network.host: 0.0.0.0

# minimum_master_nodes need to be explicitly set when bound on a public IP
# set to 1 to allow single node clusters
# Details: https://github.com/elastic/elasticsearch/pull/17288
discovery.zen.minimum_master_nodes: 1

# 禁止系统对ES交换内存
bootstrap.memory_lock: true

http.type: ssl_netty4

readonlyrest:
 enable: true
 ssl:
  enable: true
  keystore_file: "server.jks"
  keystore_pass: server
  key_pass: server

 access_control_rules:

  - name: "Block 1 - ROOT"
   type: allow
   groups: ["admin"]

  - name: "User read only - paper"
   groups: ["user"]
   indices: ["paper*"]
   actions: ["indices:data/read/*"]

 users:

  - username: root
   auth_key_sha256: cb7c98bae153065db931980a13bd45ee3a77cb8f27a7dfee68f686377acc33f1
   groups: ["admin"]

  - username: xiaoming
   auth_key: xiaoming:xiaoming
   groups: ["user"]

这里bootstrap.memory_lock: true是个坑,禁止交换内存这里文档已经说明了,有的os会在运行时把暂时不用的内存交换到硬盘的一块区域,然而这种行为会让ES的资源占用率飙升,甚至让系统无法响应。

配置文件里已经很明显了,一个root用户属于admin组,而admin有所有权限,xiaoming同学因为在user组,只能访问paper索引,并且只能读取,不能操作。更详细的配置请见:readonlyrest文档

至此,ES的准备工作算是做完了,docker build -t ESImage:tag 一下,docker run -p 9200:9200 ESImage:Tag跑起来。
如果https://127.0.0.1:9200/返回

{
  "name": "VaKwrIR",
  "cluster_name": "docker-cluster",
  "cluster_uuid": "YsYdOWKvRh2swz907s2m_w",
  "version": {
    "number": "6.0.0",
    "build_hash": "8f0685b",
    "build_date": "2017-11-10T18:41:22.859Z",
    "build_snapshot": false,
    "lucene_version": "7.0.1",
    "minimum_wire_compatibility_version": "5.6.0",
    "minimum_index_compatibility_version": "5.0.0"
  },
  "tagline": "You Know, for Search"
}

我们本次教程的主角算是出场了,分享几个常用的API调戏调试ES用:

{{url}}替换成你本地的ES地址。

  1. 查看所有插件:{{url}}/_cat/plugins?v
  2. 查看所有索引:{{url}}/_cat/indices?v
  3. 对ES进行健康检查:{{url}}/_cat/health?v
  4. 查看当前的磁盘占用率:{{url}}/_cat/allocation?v

导入MYSQL数据

这里我使用的是MYSQL数据,其实其它的数据库也是一样,关键在于如何导入,网上教程会推荐Logstash、Beat、ES的mysql插件进行导入,我也都实验过,配置繁琐,文档稀少,要是数据库结构复杂一点,导入是个劳心劳神的活计,所以并不推荐。其实ES在各个语言都有对应的API库,你在语言层面把数据组装成json,通过API库发送到ES即可。流程大致如下:

我使用的是Golang的ES库elastic,其它语言可以去github上自行搜索,操作的方式都是一样的。

接下来使用一个简单的数据库做介绍:

Paper表

id name
1 北京第一小学模拟卷
2 江西北京通用高考真题

Province表

id name
1 北京
2 江西

Paper_Province表

paper_id province_id
1 1
2 1
2 2

如上,Paper和Province是多对多关系,现在把Paper数据打入ES,,可以按Paper名称模糊搜索,也可通过Province进行筛选。json数据格式如下:

{
  "id":1,
  "name": "北京第一小学模拟卷",
  "provinces":[
    {
      "id":1,
      "name":"北京"
    }
  ]
}

首先准备一份mapping.json文件,这是在ES中数据的存储结构定义,

{
  "mappings":{
    "docs":{
  "include_in_all": false,
      "properties":{
        "id":{
          "type":"long"
        },
        "name":{
          "type":"text",
          "analyzer":"ik_max_word" // 使用最大词分词器
        },
        "provinces":{
          "type":"nested",
          "properties":{
            "id":{
              "type":"integer"
            },
            "name":{
              "type":"text",
              "index":"false" // 不索引
            }
          }
        }
      }
    }
  },
  "settings":{
    "number_of_shards":1,
    "number_of_replicas":0
  }
}

需要注意的是取消_all字段,这个默认的_all会收集所有的存储字段,实现无条件限制的搜索,缺点是空间占用大。

shard(分片)数我设置为了1,没有设置replicas(副本),毕竟这不是一个集群,处理的数据也不是很多,如果有大量数据需要处理可以自行设置分片和副本的数量。

首先与ES建立连接,ca.crt与jks自签名有关。当然,在这里我使用InsecureSkipVerify忽略了证书文件的验证。

func InitElasticSearch() {
 pool := x509.NewCertPool()
 crt, err0 := ioutil.ReadFile("conf/ca.crt")
 if err0 != nil {
 cannotOpenES(err0, "read crt file err")
 return
 }

 pool.AppendCertsFromPEM(crt)
 tr := &http.Transport{
 TLSClientConfig: &tls.Config{RootCAs: pool, InsecureSkipVerify: true},
 }
 httpClient := &http.Client{Transport: tr}

 //后台构造elasticClient
 var err error
 elasticClient, err = elastic.NewClient(elastic.SetURL(MyConfig.ElasticUrl),
 elastic.SetErrorLog(GetLogger()),
 elastic.SetGzip(true),
 elastic.SetHttpClient(httpClient),
 elastic.SetSniff(false), // 集群嗅探,单节点记得关闭。
 elastic.SetScheme("https"),
 elastic.SetBasicAuth(MyConfig.ElasticUsername, MyConfig.ElasticPassword))
 if err != nil {
 cannotOpenES(err, "search_client_error")
 return
 }
 //elasticClient构造完成

 //查询是否有paper索引
 exist, err := elasticClient.IndexExists(MyConfig.ElasticIndexName).Do(context.Background())
 if err != nil {
 cannotOpenES(err, "exist_paper_index_check")
 return
 }

 //索引存在且通过完整性检查则不发送任何数据
 if exist {
 if !isIndexIntegrity(elasticClient) {
  //删除当前索引  准备重建
  deleteResponse, err := elasticClient.DeleteIndex(MyConfig.ElasticIndexName).Do(context.Background())
  if err != nil || !deleteResponse.Acknowledged {
  cannotOpenES(err, "delete_index_error")
  return
  }
 } else {
  return
 }
 }

 //后台查询数据库,发送数据到elasticsearch中
 go fetchDBGetAllPaperAndSendToES()
}
type PaperSearch struct {
 PaperId  int64   `gorm:"primary_key;column:F_paper_id;type:BIGINT(20)" json:"id"`
 Name    string  `gorm:"column:F_name;size:80" json:"name"`
 Provinces []Province `gorm:"many2many:t_paper_province;" json:"provinces"`    // 试卷适用的省份
}

func fetchDBGetAllPaperAndSendToES() {
 //fetch paper
 var allPaper []PaperSearch

 GetDb().Table("t_papers").Find(&allPaper)

 //province
 for i := range allPaper {
 var allPro []Province
 GetDb().Table("t_provinces").Joins("INNER JOIN `t_paper_province` ON `t_paper_province`.`province_F_province_id` = `t_provinces`.`F_province_id`").
  Where("t_paper_province.paper_F_paper_id = ?", allPaper[i].PaperId).Find(&allPro)
 allPaper[i].Provinces = allPro
 }

 if len(allPaper) > 0 {
 //send to es - create index
 createService := GetElasticSearch().CreateIndex(MyConfig.ElasticIndexName)
 // 此处的index_default_setting就是上面mapping.json中的内容。
 createService.Body(index_default_setting)
 createResult, err := createService.Do(context.Background())
 if err != nil {
  cannotOpenES(err, "create_paper_index")
  return
 }

 if !createResult.Acknowledged || !createResult.ShardsAcknowledged {
  cannotOpenES(err, "create_paper_index_fail")
 }

 // - send all paper
 bulkRequest := GetElasticSearch().Bulk()

 for i := range allPaper {
  indexReq := elastic.NewBulkIndexRequest().OpType("create").Index(MyConfig.ElasticIndexName).Type("docs").
  Id(helper.Int64ToString(allPaper[i].PaperId)).
  Doc(allPaper[i])

  bulkRequest.Add(indexReq)
 }

 // Do sends the bulk requests to Elasticsearch
 bulkResponse, err := bulkRequest.Do(context.Background())
 if err != nil {
  cannotOpenES(err, "insert_docs_error")
  return
 }

 // Bulk request actions get cleared
 if len(bulkResponse.Created()) != len(allPaper) {
  cannotOpenES(err, "insert_docs_nums_error")
  return
 }
 //send success
 }
}

跑通上面的代码后,使用{{url}}/_cat/indices?v看看ES中是否出现了新创建的索引,使用{{url}}/papers/_search看看命中了多少文档,如果文档数等于你发送过去的数据量,搜索服务就算跑起来了。

搜索

现在就可以通过ProvinceID和q来搜索试卷,默认按照相关度评分排序。

//q 搜索字符串 provinceID 限定省份id limit page 分页参数
func SearchPaper(q string, provinceId uint, limit int, page int) (list []PaperSearch, totalPage int, currentPage int, pageIsEnd int, returnErr error) {
 //不满足条件,使用数据库搜索
 if !CanUseElasticSearch && !MyConfig.UseElasticSearch {
 return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page)
 }

 list = make([]PaperSimple, 0)
 totalPage = 0
 currentPage = page
 pageIsEnd = 0
 returnErr = nil

 client := GetElasticSearch()
 if client == nil {
 return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page)
 }

 //ElasticSearch有问题,使用数据库搜索
 if !isIndexIntegrity(client) {
 return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page)
 }

 if !client.IsRunning() {
 client.Start()
 }
 defer client.Stop()

 q = html.EscapeString(q)
 boolQuery := elastic.NewBoolQuery()
 // Paper.name
 matchQuery := elastic.NewMatchQuery("name", q)

 //省份
 if provinceId > 0 && provinceId != DEFAULT_PROVINCE_ALL {
 proBool := elastic.NewBoolQuery()
 tpro := elastic.NewTermQuery("provinces.id", provinceId)
 proNest := elastic.NewNestedQuery("provinces", proBool.Must(tpro))
 boolQuery.Must(proNest)
 }

 boolQuery.Must(matchQuery)

 for _, e := range termQuerys {
 boolQuery.Must(e)
 }

 highligt := elastic.NewHighlight()
 highligt.Field(ELASTIC_SEARCH_SEARCH_FIELD_NAME)
 highligt.PreTags(ELASTIC_SEARCH_SEARCH_FIELD_TAG_START)
 highligt.PostTags(ELASTIC_SEARCH_SEARCH_FIELD_TAG_END)
 searchResult, err2 := client.Search(MyConfig.ElasticIndexName).
 Highlight(highligt).
 Query(boolQuery).
 From((page - 1) * limit).
 Size(limit).
 Do(context.Background())

 if err2 != nil {
 // Handle error
 GetLogger().LogErr("搜索时出错 "+err2.Error(), "search_error")
 // Handle error
 returnErr = errors.New("搜索时出错")
 } else {
 if searchResult.Hits.TotalHits > 0 {
  // Iterate through results
  for _, hit := range searchResult.Hits.Hits {
  var p PaperSearch
  err := json.Unmarshal(*hit.Source, &p)
  if err != nil {
   // Deserialization failed
   GetLogger().LogErr("搜索时出错 "+err.Error(), "search_deserialization_error")
   returnErr = errors.New("搜索时出错")
   return
  }

  if len(hit.Highlight[ELASTIC_SEARCH_SEARCH_FIELD_NAME]) > 0 {
   p.Name = hit.Highlight[ELASTIC_SEARCH_SEARCH_FIELD_NAME][0]
  }

  list = append(list, p)
  }

  count := searchResult.TotalHits()

  currentPage = page
  if count > 0 {
  totalPage = int(math.Ceil(float64(count) / float64(limit)))
  }
  if currentPage >= totalPage {
  pageIsEnd = 1
  }
 } else {
  // No hits
 }
 }
 return
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解ElasticSearch6.4.0集群搭建

    最近在学习ES相关内容,为了方便自己使用,在本地虚拟机上搭建了一个3节点的ES集群,在搭建过程中,遇到了许多坑,网上的资料也比较分散,所以详细整理一下搭建过程发出来供参考.搭建过程中,由于是两台虚拟机,所以有一种很快速的办法,搭建一台,然后通过克隆虚拟机然后修改可以迅速完成,但是为了更熟悉整个过程,本过程是每台虚拟机都逐一去操作. 一.软件及环境准备 服务器环境 CentOS6.9_64位两台 es-master:192.168.0.11 es-slave:192.168.0.12 注意:内存允

  • 使用java操作elasticsearch的具体方法

    系统环境: vm12 下的centos 7.2 当前安装版本: elasticsearch-2.4.0.tar.gz Java操作es集群步骤1:配置集群对象信息:2:创建客户端:3:查看集群信息 1:集群名称 默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错. 2:嗅探功能 通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以

  • 基于Spring Batch向Elasticsearch批量导入数据示例

    1.介绍 当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率.Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据.由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入. 2.示例 2.1 pom.xml 本文使用spr

  • Docker 简单部署 ElasticSearch的实现方法

    一.ElasticSearch是什么? Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单. 不过,Elasticsearch不仅仅是Lucene和全文搜索,我们还能这样去描述它: 分布式的实时文件存储,每个字段都被索 引并可被搜索分布式的实时分析搜索引擎 可以扩展到上百台服务器,处理PB级结构化或非结构化数据 二.Docker 部署 ElasticS

  • python批量导入数据进Elasticsearch的实例

    ES在之前的博客已有介绍,提供很多接口,本文介绍如何使用python批量导入.ES官网上有较多说明文档,仔细研究并结合搜索引擎应该不难使用. 先给代码 #coding=utf-8 from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [] f=open('index.txt') i=

  • Laravel使用scout集成elasticsearch做全文搜索的实现方法

    本文介绍了Laravel使用scout集成elasticsearch做全文搜索的实现方法,分享给大家,具体如下: 安装需要的组件 composer require tamayo/laravel-scout-elastic composer require laravel/scout 如果composer require laravel/scout 出现报错 Using version ^6.1 for laravel/scout ./composer.json has been updated

  • JAVA使用ElasticSearch查询in和not in的实现方式

    ElasticSearch Elasticsearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便. 最近用到ES查询,因用的是Java写的,需要实现一个需求:过滤一部分id,查询时不需要查出来. 既然需要不包含,那么首先需要实现包含的方式(精确完

  • java 使用ElasticSearch完成百万级数据查询附近的人功能

    上一篇文章介绍了ElasticSearch使用Repository和ElasticSearchTemplate完成构建复杂查询条件,简单介绍了ElasticSearch使用地理位置的功能. 这一篇我们来看一下使用ElasticSearch完成大数据量查询附近的人功能,搜索N米范围的内的数据. 准备环境 本机测试使用了ElasticSearch最新版5.5.1,SpringBoot1.5.4,spring-data-ElasticSearch2.1.4. 新建Springboot项目,勾选Elas

  • Spring Boot整合ElasticSearch实现多版本兼容的方法详解

    前言 在上一篇学习SpringBoot中,整合了Mybatis.Druid和PageHelper并实现了多数据源的操作.本篇主要是介绍和使用目前最火的搜索引擎ElastiSearch,并和SpringBoot进行结合使用. ElasticSearch介绍 ElasticSearch是一个基于Lucene的搜索服务器,其实就是对Lucene进行封装,提供了 REST API 的操作接口 ElasticSearch作为一个高度可拓展的开源全文搜索和分析引擎,可用于快速地对大数据进行存储,搜索和分析.

  • SpringBoot整合Elasticsearch并实现CRUD操作

     配置准备 在build.gradle文件中添加如下依赖: compile "org.elasticsearch.client:transport:5.5.2" compile "org.elasticsearch:elasticsearch:5.5.2" //es 5.x的内部使用的 apache log4日志 compile "org.apache.logging.log4j:log4j-core:2.7" compile "org

  • 基于Lucene的Java搜索服务器Elasticsearch安装使用教程

    一.安装Elasticsearch Elasticsearch下载地址:http://www.elasticsearch.org/download/ ·下载后直接解压,进入目录下的bin,在cmd下运行elasticsearch.bat 即可启动Elasticsearch ·用浏览器访问: http://localhost:9200/   ,如果出现类似如下结果则说明安装成功: { "name" : "Benedict Kine", "cluster_na

随机推荐