java多线程处理执行solr创建索引示例

代码如下:

public class SolrIndexer implements Indexer, Searcher, DisposableBean {
 //~ Static fields/initializers =============================================

static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);

private static final long SHUTDOWN_TIMEOUT    = 5 * 60 * 1000L; // long enough

private static final int  INPUT_QUEUE_LENGTH  = 16384;

//~ Instance fields ========================================================

private CommonsHttpSolrServer server;

private BlockingQueue<Operation> inputQueue;

private Thread updateThread;
 volatile boolean running = true;
 volatile boolean shuttingDown = false;

//~ Constructors ===========================================================

public SolrIndexer(String url) throws MalformedURLException {
  server = new CommonsHttpSolrServer(url);

inputQueue = new ArrayBlockingQueue<Operation>(INPUT_QUEUE_LENGTH);

updateThread = new Thread(new UpdateTask());
  updateThread.setName("SolrIndexer");
  updateThread.start();
 }

//~ Methods ================================================================

public void setSoTimeout(int timeout) {
  server.setSoTimeout(timeout);
 }

public void setConnectionTimeout(int timeout) {
  server.setConnectionTimeout(timeout);
 }

public void setAllowCompression(boolean allowCompression) {
  server.setAllowCompression(allowCompression);
 }

public void addIndex(Indexable indexable) throws IndexingException {
  if (shuttingDown) {
   throw new IllegalStateException("SolrIndexer is shutting down");
  }
  inputQueue.offer(new Operation(indexable, OperationType.UPDATE));
 }

public void delIndex(Indexable indexable) throws IndexingException {
  if (shuttingDown) {
   throw new IllegalStateException("SolrIndexer is shutting down");
  }
  inputQueue.offer(new Operation(indexable, OperationType.DELETE));
 }

private void updateIndices(String type, List<Indexable> indices) throws IndexingException {
  if (indices == null || indices.size() == 0) {
   return;
  }

logger.debug("Updating {} indices", indices.size());

UpdateRequest req = new UpdateRequest("/" + type + "/update");
  req.setAction(UpdateRequest.ACTION.COMMIT, false, false);

for (Indexable idx : indices) {
   Doc doc = idx.getDoc();

SolrInputDocument solrDoc = new SolrInputDocument();
   solrDoc.setDocumentBoost(doc.getDocumentBoost());
   for (Iterator<Field> i = doc.iterator(); i.hasNext();) {
    Field field = i.next();
    solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
   }

req.add(solrDoc);   
  }

try {
   req.process(server);   
  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  } catch (IOException e) {
   logger.error("IOException occurred", e);
   throw new IndexingException(e);
  }
 }

private void delIndices(String type, List<Indexable> indices) throws IndexingException {
  if (indices == null || indices.size() == 0) {
   return;
  }

logger.debug("Deleting {} indices", indices.size());

UpdateRequest req = new UpdateRequest("/" + type + "/update");
  req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
  for (Indexable indexable : indices) {   
   req.deleteById(indexable.getDocId());
  }

try {
   req.process(server);
  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  } catch (IOException e) {
   logger.error("IOException occurred", e);
   throw new IndexingException(e);
  }
 }

public QueryResult search(Query query) throws IndexingException {
  SolrQuery sq = new SolrQuery();
  sq.setQuery(query.getQuery());
  if (query.getFilter() != null) {
   sq.addFilterQuery(query.getFilter());
  }
  if (query.getOrderField() != null) {
   sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
  }
  sq.setStart(query.getOffset());
  sq.setRows(query.getLimit());

QueryRequest req = new QueryRequest(sq);
  req.setPath("/" + query.getType() + "/select");

try {
   QueryResponse rsp = req.process(server);
   SolrDocumentList docs = rsp.getResults();

QueryResult result = new QueryResult();
   result.setOffset(docs.getStart());
   result.setTotal(docs.getNumFound());
   result.setSize(sq.getRows());

List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());
   for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {
    SolrDocument solrDocument = i.next();

Doc doc = new Doc();
    for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {
     Map.Entry<String, Object> field = iter.next();
     doc.addField(field.getKey(), field.getValue());
    }

resultDocs.add(doc);
   }

result.setDocs(resultDocs);
   return result;

} catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  }
 }

public void destroy() throws Exception {
  shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);  
 }

public boolean shutdown(long timeout, TimeUnit unit) {
  if (shuttingDown) {
   logger.info("Suppressing duplicate attempt to shut down");
   return false;
  }
  shuttingDown = true;
  String baseName = updateThread.getName();
  updateThread.setName(baseName + " - SHUTTING DOWN");
  boolean rv = false;
  try {
   // Conditionally wait
   if (timeout > 0) {
    updateThread.setName(baseName + " - SHUTTING DOWN (waiting)");
    rv = waitForQueue(timeout, unit);
   }
  } finally {
   // But always begin the shutdown sequence
   running = false;
   updateThread.setName(baseName + " - SHUTTING DOWN (informed client)");
  }
  return rv;
 }

/**
  * @param timeout
  * @param unit
  * @return
  */
 private boolean waitForQueue(long timeout, TimeUnit unit) {
  CountDownLatch latch = new CountDownLatch(1);
  inputQueue.add(new StopOperation(latch));
  try {
   return latch.await(timeout, unit);
  } catch (InterruptedException e) {
   throw new RuntimeException("Interrupted waiting for queues", e);
  }
 }

class UpdateTask implements Runnable {
  public void run() {
   while (running) {
    try {
     syncIndices();
    } catch (Throwable e) {
     if (shuttingDown) {
      logger.warn("Exception occurred during shutdown", e);
     } else {
      logger.error("Problem handling solr indexing updating", e);
     }
    }
   }
   logger.info("Shut down SolrIndexer");
  }
 }

void syncIndices() throws InterruptedException {
  Operation op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);

if (op == null) {
   return;
  }

if (op instanceof StopOperation) {
   ((StopOperation) op).stop();
   return;
  }

// wait 1 second
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {

}

List<Operation> ops = new ArrayList<Operation>(inputQueue.size() + 1);
  ops.add(op);
  inputQueue.drainTo(ops);

Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);
  Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);

for (Operation o : ops) {
   if (o instanceof StopOperation) {
    ((StopOperation) o).stop();
   } else {
    Indexable indexable = o.indexable;
    if (o.type == OperationType.DELETE) {
     List<Indexable> docs = deleteMap.get(indexable.getType());
     if (docs == null) {
      docs = new LinkedList<Indexable>();
      deleteMap.put(indexable.getType(), docs);
     }
     docs.add(indexable);
    } else {
     List<Indexable> docs = updateMap.get(indexable.getType());
     if (docs == null) {
      docs = new LinkedList<Indexable>();
      updateMap.put(indexable.getType(), docs);
     }
     docs.add(indexable);
    }
   }
  }

for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {
   Map.Entry<String, List<Indexable>> entry = i.next();
   delIndices(entry.getKey(), entry.getValue());
  }

for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {
   Map.Entry<String, List<Indexable>> entry = i.next();
   updateIndices(entry.getKey(), entry.getValue());
  }
 }

enum OperationType { DELETE, UPDATE, SHUTDOWN }

static class Operation {
  OperationType type;
  Indexable indexable;

Operation() {}

Operation(Indexable indexable, OperationType type) {
   this.indexable = indexable;
   this.type = type;
  }
 }

static class StopOperation extends Operation {
  CountDownLatch latch;

StopOperation(CountDownLatch latch) {
   this.latch = latch;
   this.type = OperationType.SHUTDOWN;
  }

public void stop() {
   latch.countDown();
  }
 }

//~ Accessors ===============

}

(0)

相关推荐

  • solr在java中的使用实例代码

    SolrJ是操作Solr的Java客户端,它提供了增加.修改.删除.查询Solr索引的JAVA接口.SolrJ针对 Solr提供了Rest 的HTTP接口进行了封装, SolrJ底层是通过使用httpClient中的方法来完成Solr的操作. jar包的引用(maven pom.xml): <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId

  • Solr通过特殊字符分词实现自定义分词器详解

    前言 我们在对英文句子分词的时候,一般采用采用的分词器是WhiteSpaceTokenizerFactory,有一次因业务要求,需要根据某一个特殊字符(以逗号分词,以竖线分词)分词.感觉这种需求可能与WhiteSpaceTokenizerFactory相像,于是自己根据Solr源码自定义了分词策略. 业务场景 有一次,我拿到的数据都是以竖线"|"分隔,分词的时候,需要以竖线为分词单元.比如下面的这一堆数据: 有可能你拿到的是这样的数据,典型的例子就是来自csv文件的数据,格式和下面这种

  • 详解java整合solr5.0之solrj的使用

    1.首先导入solrj需要的的架包 2.需要注意的是低版本是solr是使用SolrServer进行URL实例的,5.0之后已经使用SolrClient替代这个类了,在添加之后首先我们需要根据schema.xml配置一下我们的分词器 这里的msg_all还需要在schema.xml中配置 它的主要作用是将msg_title,msg_content两个域的值拷贝到msg_all域中,我们在搜索的时候可以只搜索这个msg_all域就可以了, solr默认搜索需要带上域,比如 solr更改默认搜索域的地

  • 详解spring中使用solr的代码实现

    在介绍solr的使用方法之前,我们需要安装solr的服务端集群.基本上就是安装zookeeper,tomcat,jdk,solr,然后按照需要配置三者的配置文件即可.由于本人并没有具体操作过如何进行solr集群的搭建.所以关于如何搭建solr集群,读者可以去网上查看其它资料,有很多可以借鉴.这里只介绍搭建完solr集群之后,我们客户端是如何访问solr集群的. 之前介绍过,spring封装nosql和sql数据库的使用,都是通过xxxTemplate.solr也不例外. 我们需要引入solr的j

  • java多线程处理执行solr创建索引示例

    复制代码 代码如下: public class SolrIndexer implements Indexer, Searcher, DisposableBean { //~ Static fields/initializers ============================================= static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class); private static fi

  • java跟踪执行的sql语句示例分享

    代码: 复制代码 代码如下: package com.lwj.test.proxy; import java.lang.reflect.InvocationHandler;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.sql.Connection;import java.sql.SQLExce

  • Java中EnumMap代替序数索引代码详解

    本文研究的主要是Java中EnumMap代替序数索引的相关内容,具体介绍如下. 学习笔记<Effective Java 中文版 第2版> 经常会碰到使用Enum的ordinal方法来索引枚举类型. public class Herb { public enum Type { ANNUAL, PERENNIAL, BIENNIAL }; private final String name; private final Type type; Herb(String name, Type type)

  • Java多线程执行处理业务时间太久解决方法代码示例

    背景:在政府开发了一个应用系统,主要功能是让企业填写企业资质信息,然后通过给定的公式,统计这一系列的信息,以得分的形式展示给政府领导查看.目前有1300家企业填报.由于得分是实时显示的,所以导致统计功能很慢. 代码运行流程: 1.查出1300企业信息 2.遍历1300企业信息,ji计算每家企业得分信息.每家预计时间为0.3秒.合计390秒.导致页面请求超时 3.导出(用jxl jar) 解决方案: 由于处理业务的,所以需要能有返回值的线程.用:Callable 直接上代码 1.调用线程的代码 L

  • Java 在PPT中创建散点图的实现示例

    目录 创建图表前 创建图表时 其他注意事项 本文将以Java代码示例展示如何在PPT幻灯片中创建散点图表. 创建图表前 需要在Java程序中导入用于操作PPT的jar包 Free Spire.Presentation for Java.可参考如下两种方法导入: 方法1:手动导入jar包.需下载jar包到本地,并解压,找到lib文件夹下的jar文件.然后按照如下步骤执行,导入: 方法2:maven仓库下载导入.需在pom.xml文件中配置maven仓库路径,并指定依赖.配置内容如下: <repos

  • Java利用File类创建文件的示例代码

    只需要调用该类的一个方法createNewFile(),但是在实际操作中需要注意一些事项,如判断文件是否存在,以及如何向新建文件中写入数据等. import java.io.*; public class CreateNewFile{ //该方法用于创建文件,参数分别是文件路径和文件名.文件内容,如:myfile.doc HelloJava! public void createNewFile(String fileDirectoryAndName,String fileContent){ tr

  • MongoDB性能篇之创建索引,组合索引,唯一索引,删除索引和explain执行计划

    一.索引 MongoDB 提供了多样性的索引支持,索引信息被保存在system.indexes 中,且默认总是为_id创建索引,它的索引使用基本和MySQL 等关系型数据库一样.其实可以这样说说,索引是凌驾于数据存储系统之上的另一层系统,所以各种结构迥异的存储都有相同或相似的索引实现及使用接口并不足为 奇. 1.基础索引 在字段age 上创建索引,1(升序);-1(降序): db.users.ensureIndex({age:1}) _id 是创建表的时候自动创建的索引,此索引是不能够删除的.当

  • Java继承Thread类创建线程类示例

    本文实例讲述了Java继承Thread类创建线程类.分享给大家供大家参考,具体如下: 一 点睛 通过继承Thread类创建线程并启动多线程的步骤: 1 定义Thread的子类,并重写该类的run()方法,该run()方法的方法体代表了线程需要完成的任务.因此run()方法称为线程执行体. 2 创建Thread子类的实例,即创建子线程对象. 3 调用线程对象的start()方法来启动该线程. 二 代码 // 通过继承Thread类来创建线程类 public class FirstThread ex

  • macOS安装Solr并索引MySQL

    安装 Java 语言的软件开发工具包 brew cask install java 或者在Oracle官网 中选择 Mac 版本 jdk-8u111-macosx-x64.dmg 下载并安装. 安装 Solr brew install solr 当前(2016-11-10) brew 源的 Solr 版本为 6.1.0,你可以通过brew edit solr来修改你要安装的版本,或者通过官方网站下载源码包来进行编译安装. 启动 Solr solr start 返回以下文字提示,则表示 solr

  • 浅谈Java多线程的优点及代码示例

    尽管面临很多挑战,多线程有一些优点使得它一直被使用.这些优点是: 资源利用率更好 程序设计在某些情况下更简单 程序响应更快 资源利用率更好 想象一下,一个应用程序需要从本地文件系统中读取和处理文件的情景.比方说,从磁盘读取一个文件需要5秒,处理一个文件需要2秒.处理两个文件则需要: 5秒读取文件A 2秒处理文件A 5秒读取文件B 2秒处理文件B --------------------- 总共需要14秒 从磁盘中读取文件的时候,大部分的CPU时间用于等待磁盘去读取数据.在这段时间里,CPU非常的

随机推荐