java连接ElasticSearch集群操作

我就废话不多说了,大家还是直接看代码吧~

/*
 *es配置类
 *
 */

@Configuration
public class ElasticSearchDataSourceConfigurer {

  private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class);
  @Bean
  public TransportClient getESClient() {
    //设置集群名称
    Settings settings = Settings.builder().put("cluster.name", "bigData-cluster").put("client.transport.sniff", true).build();
    //创建client
    TransportClient client = null;
    try {
      client = new PreBuiltTransportClient(settings)
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(""), 9300));//集群ip
      LOG.info("ESClient连接建立成功");
    } catch (UnknownHostException e) {
      LOG.info("ESClient连接建立失败");
      e.printStackTrace();
    }
    return client;
  }
}
/**
 * Simple to Introduction
 *
 * @Description: [添加类]
 */
@Repository
public class UserDaoImpl implements userDao {

	private static final String INDEXNAME = "user";//小写
	private static final String TYPENAME = "info";

	@Resource
	TransportClient transportClient;

	@Override
	public int addUser(User[] user) {
		IndexResponse indexResponse = null;
		int successNum = 0;
		for (int i = 0; i < user.length; i++) {
			UUID uuid = UUID.randomUUID();
			String str = uuid.toString();
			String jsonValue = null;
			try {
				jsonValue = JsonUtil.object2JsonString(user[i]);
				if (jsonValue != null) {
					indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue)
							.execute().actionGet();
					successNum++;
				}
			} catch (JsonProcessingException e) {
				e.printStackTrace();
			}

		}
		return successNum;
	}
}

/**
 *批量插入
 */
public static void bathAddUser(TransportClient client, List<User> users) {

		BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
		for (int i = 0; i < users.size(); i++) {
			UUID uuid = UUID.randomUUID();
			String str = uuid.toString();

			String jsonValue = null;
			try {
				jsonValue = JsonUtil.object2JsonString(users.get(i));
			} catch (JsonProcessingException e) {
				e.printStackTrace();
			}
			bulkRequest.add(client.prepareIndex("user", "info", str).setSource(jsonValue));
			// 一万条插入一次
			if (i % 10000 == 0) {
				bulkRequest.execute().actionGet();
			}
			System.out.println("已经插入第" + i + "多少条");
		}

	}

补充知识:使用java创建ES(ElasticSearch)连接池

1.首先要有一个创建连接的工厂类

package com.aly.util;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

/**
 * EliasticSearch连接池工厂对象
 * @author 00000
 *
 */
public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient>{

	@Override
	public void activateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
		System.out.println("activateObject");

	}

	/**
	 * 销毁对象
	 */
	@Override
	public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
		RestHighLevelClient highLevelClient = pooledObject.getObject();
		highLevelClient.close();
	}

	/**
	 * 生产对象
	 */
//	@SuppressWarnings({ "resource" })
	@Override
	public PooledObject<RestHighLevelClient> makeObject() throws Exception {
//		Settings settings = Settings.builder().put("cluster.name","elasticsearch").build();
		RestHighLevelClient client = null;
		try {
			/*client = new PreBuiltTransportClient(settings)
          .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));*/
			client = new RestHighLevelClient(RestClient.builder(
					new HttpHost("192.168.1.121", 9200, "http"), new HttpHost("192.168.1.122", 9200, "http"),
					new HttpHost("192.168.1.123", 9200, "http"), new HttpHost("192.168.1.125", 9200, "http"),
					new HttpHost("192.168.1.126", 9200, "http"), new HttpHost("192.168.1.127", 9200, "http")));

		} catch (Exception e) {
			e.printStackTrace();
		}
		return new DefaultPooledObject<RestHighLevelClient>(client);
	}

	@Override
	public void passivateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
		System.out.println("passivateObject");
	}

	@Override
	public boolean validateObject(PooledObject<RestHighLevelClient> arg0) {
		return true;
	}
}

2.然后再写我们的连接池工具类

package com.aly.util;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.RestHighLevelClient;

/**
 * ElasticSearch 连接池工具类
 *
 * @author 00000
 *
 */
public class ElasticSearchPoolUtil {
	// 对象池配置类,不写也可以,采用默认配置
	private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
	// 采用默认配置maxTotal是8,池中有8个client
	static {
		poolConfig.setMaxTotal(8);
	}
	// 要池化的对象的工厂类,这个是我们要实现的类
	private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();
	// 利用对象工厂类和配置类生成对象池
	private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory,
			poolConfig);

	/**
	 * 获得对象
	 *
	 * @return
	 * @throws Exception
	 */
	public static RestHighLevelClient getClient() throws Exception {
		// 从池中取一个对象
		RestHighLevelClient client = clientPool.borrowObject();
		return client;
	}

	/**
	 * 归还对象
	 *
	 * @param client
	 */
	public static void returnClient(RestHighLevelClient client) {
		// 使用完毕之后,归还对象
		clientPool.returnObject(client);
	}
}

以上这篇java连接ElasticSearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • java操作elasticsearch的案例解析

    这篇文章主要介绍了java操作elasticsearch的案例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 到目前为止,我们一直都是使用RESTful风格的 API操作elasticsearch服务,但是通过我们之前的学习知道,elasticsearch提供了很多语言的客户端用于操作elasticsearch服务,例如:java.python..net.JavaScript.PHP等.而我们此次就学习如何使用java语言来操作elasti

  • 利用Java多线程技术导入数据到Elasticsearch的方法步骤

    前言 近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低.所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高). 关键技术栈 Elasticsearch jdbc ExecutorService\Thread sql 工具说明

  • Java基于elasticsearch实现集群管理

    这篇文章主要介绍了java基于elasticsearch实现集群管理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 本篇文章主要是查看集群中的相关信息,具体请看代码和注释 @Test public void test45() throws UnknownHostException{ //1.指定es集群 cluster.name 是固定的key值,my-application是ES集群的名称 Settings settings = Settin

  • JAVA使用ElasticSearch查询in和not in的实现方式

    ElasticSearch Elasticsearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便. 最近用到ES查询,因用的是Java写的,需要实现一个需求:过滤一部分id,查询时不需要查出来. 既然需要不包含,那么首先需要实现包含的方式(精确完

  • java使用elasticsearch分组进行聚合查询过程解析

    这篇文章主要介绍了java使用elasticsearch分组进行聚合查询过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java连接elasticsearch 进行聚合查询进行相应操作 一:对单个字段进行分组求和 1.表结构图片: 根据任务id分组,分别统计出每个任务id下有多少个文字标题 1.SQL:select id, count(*) as sum from task group by taskid; java ES连接工具类 p

  • java连接ElasticSearch集群操作

    我就废话不多说了,大家还是直接看代码吧~ /* *es配置类 * */ @Configuration public class ElasticSearchDataSourceConfigurer { private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class); @Bean public TransportClient getESClient() { //设置集群名称

  • 详解Java 连接MongoDB集群的几种方式

    先决条件 先运行mongodb肯定是必须的,然后导入以下包: import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.ServerAddress; import com.mongodb.MongoCredential; import com.mongodb.MongoClientOptions; MongoClient MongoClient()实例表示到数据库的连接池; 你将只需

  • 关于Java中配置ElasticSearch集群环境账号密码的问题

    1.修改主站点的elasticsearch.yml添加一下行: xpack.security.enabled: true 2.生成安全秘钥 切到ES安装目录,执行bin/elasticsearch-certutil ca -out config/elastic-certificates.p12 -pass “” 会在/home/elasticsearch-7.9.3/config目录生成elastic-certificates.p12 3.继续修改ES yml文件 添加以下四行: xpack.s

  • 关于Springboot2.x集成lettuce连接redis集群报超时异常Command timed out after 6 second(s)

    背景:最近在对一新开发Springboot系统做压测,发现刚开始压测时,可以正常对redis集群进行数据存取,但是暂停几分钟后,接着继续用jmeter进行压测时,发现redis就开始突然疯狂爆出异常提示:Command timed out after 6 second(s)...... Caused by: io.lettuce.core.RedisCommandTimeoutException: Command timed out after 6 second(s) at io.lettuce

  • pycharm利用pyspark远程连接spark集群的实现

    0 背景 由于工作需要,利用spark完成机器学习.因此需要对spark集群进行操作.所以利用pycharm和pyspark远程连接spark集群.这里记录下遇到的问题及方法. 主要是参照下面的文献完成相应的内容,但是具体问题要具体分析. 1 方法 1.1 软件配置 spark2.3.3, hadoop2.6, python3 1.2 spark配置 Spark集群的每个节点的Python版本必须保持一致.在每个节点的$SPARK_HOME/conf/spark-env.sh中添加一行:具体看你

  • elasticsearch集群发现zendiscovery的Ping机制分析

    目录 zenDiscovery实现机制 广播的过程 nodeping处理代码 ping请求的发送策略 总结 zenDiscovery实现机制 ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群.zenDiscovery实现了两种ping机制:广播与单播.本篇将详细分析一些这MulticastZenPing机制的实现为后面的集群发现和master选举做好铺垫. 广播的过程 首先看一下广播(MulticastZenPing),广

  • ZooKeeper集群操作及集群Master选举搭建启动

    目录 ZooKeeper介绍 ZooKeeper特征 分层命名空间 搭建ZK集群 启动zk集群 zk集群master选举 ZooKeeper介绍 ZooKeeper 是一个为 分布式应用 提供的 分布式 .开源的 协调服务 . 它公开了一组简单的 原语 ,分布式应用程序可以根据这些原语来实现用于 同步 .配置维护 以及 命名 的更高级别的服务. 怎么理解协调服务呢?比如我们有很多应用程序,他们之间都需要读写维护一个 id ,那么这些 id 怎么命名呢,程序一多,必然会乱套,ZooKeeper 能

  • Mongodb3.0.5 副本集搭建及spring和java连接副本集配置详细介绍

    Mongodb3.0.5 副本集搭建及spring和java连接副本集配置详细介绍 一.基本环境: mongdb3.0.5数据库 spring-data-MongoDB-1.7.2.jar mongo-Java-driver-3.0.2.jar Linux-redhat6.3 tomcat7 二.搭建mongodb副本集: 1.  分别在三台linux系统机上安装mongodb,(为避免和机器上原有的mongodb端口冲突,这里设为57017): 192.168.0.160 192.168.0.

  • 使用docker搭建kong集群操作

    docker容器下搭建kong的集群很简单,官网介绍的也很简单,初学者也许往往不知道如何去处理,经过本人的呕心沥血的琢磨,终于搭建出来了. 主要思想:不同的kong连接同一个数据库(就这么一句话) 难点:如何在不同的主机上用kong连接同一数据库 要求: 1.两台主机 172.16.100.101 172.16.100.102 步骤: 1.在101上安装数据库(这里就用cassandra) docker run -d --name kong-database \ -p 9042:9042 \ c

  • SpringBoot连接Nacos集群报400问题及完美解决方法

    问题描述 搭建一个Nacos集群,使用 SpringBoot 程序连接 Nacos 集群,在启动的时候报异常,程序启动失败,而后程序假死.详细控制台日志信息如下: failed to req API:/nacos/v1/ns/instance after all servers([192.168.1.169:8848]) tried: ErrCode:400, ErrMsg:<html><body><h1>Whitelabel Error Page</h1>

随机推荐