java实现cassandra高级操作之分页实例(有项目具体需求)

上篇博客讲到了cassandra的分页,相信大家会有所注意:下一次的查询依赖上一次的查询(上一次查询的最后一条记录的全部主键),不像mysql那样灵活,所以只能实现上一页、下一页这样的功能,不能实现第多少页那样的功能(硬要实现的话性能就太低了)。

我们先看看驱动官方给的分页做法

如果一个查询得到的记录数太大,一次性返回回来,那么效率非常低,并且很有可能造成内存溢出,使得整个应用都奔溃。所以了,驱动对结果集进行了分页,并返回适当的某一页的数据。

一、设置抓取大小(Setting the fetch size)

抓取大小指的是一次从cassandra获取到的记录数,换句话说,就是每一页的记录数;我们能够在创建cluster实例的时候给它的fetch size指定一个默认值,如果没有指定,那么默认是5000

// At initialization:
Cluster cluster = Cluster.builder()
  .addContactPoint("127.0.0.1")
  .withQueryOptions(new QueryOptions().setFetchSize(2000))
  .build();

// Or at runtime:
cluster.getConfiguration().getQueryOptions().setFetchSize(2000);

另外,statement上也能设置fetch size

Statement statement = new SimpleStatement("your query");
statement.setFetchSize(2000);

如果statement上设置了fetch size,那么statement的fetch size将起作用,否则则是cluster上的fetch size起作用。

注意:设置了fetch size并不意味着cassandra总是返回准确的结果集(等于fetch size),它可能返回比fetch size稍微多一点或者少一点的结果集。

二、结果集迭代

fetch size限制了每一页返回的结果集的数量,如果你迭代某一页,驱动会在后台自动的抓取下一页的记录。如下例,fetch size = 20:

默认情况下,后台自动抓取发生在最后一刻,也就是当某一页的记录被迭代完的时候。如果需要更好的控制,ResultSet接口提供了以下方法:

getAvailableWithoutFetching() and isFullyFetched() to check the current state;

fetchMoreResults() to force a page fetch;

以下是如何使用这些方法提前预取下一页,以避免在某一页迭代完后才抓取下一页造成的性能下降:

ResultSet rs = session.execute("your query");
for (Row row : rs) {
  if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched())
    rs.fetchMoreResults(); // this is asynchronous
  // Process the row ...
  System.out.println(row);
}

三、保存并重新使用分页状态

有时候,将分页状态保存起来,对以后的恢复是非常有用的,想象一下:有一个无状态Web服务,显示结果列表,并显示下一页的链接,当用户点击这个链接的时候,我们需要执行与之前完全相同的查询,除了迭代应该从上一页停止的位置开始;相当于记住了上一页迭代到了哪了,那么下一页从这里开始即可。

为此,驱动程序会暴露一个PagingState对象,该对象表示下一页被提取时我们在结果集中的位置。

ResultSet resultSet = session.execute("your query");
// iterate the result set...
PagingState pagingState = resultSet.getExecutionInfo().getPagingState();

// PagingState对象可以被序列化成字符串或字节数组
String string = pagingState.toString();
byte[] bytes = pagingState.toBytes();

PagingState对象被序列化后的内容可以持久化存储起来,也可用作分页请求的参数,以备后续再次被利用,反序列化成对象即可:

PagingState.fromBytes(byte[] bytes);
PagingState.fromString(String str);

请注意,分页状态只能使用完全相同的语句重复使用(相同的查询,相同的参数)。而且,它是一个不透明的值,只是用来存储一个可以被重新使用的状态值,如果尝试修改其内容或将其使用在不同的语句上,驱动程序会抛出错误。

具体我们来看下代码,下例是模拟页面分页的请求,实现遍历teacher表中的全部记录:

接口:

import java.util.Map;

import com.datastax.driver.core.PagingState;

public interface ICassandraPage
{
  Map<String, Object> page(PagingState pagingState);

}

主体代码:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.huawei.cassandra.dao.ICassandraPage;
import com.huawei.cassandra.factory.SessionRepository;
import com.huawei.cassandra.model.Teacher;

public class CassandraPageDao implements ICassandraPage
{
  private static final Session session = SessionRepository.getSession();

  private static final String CQL_TEACHER_PAGE = "select * from mycas.teacher;";

  @Override
  public Map<String, Object> page(PagingState pagingState)
  {
    final int RESULTS_PER_PAGE = 2;
    Map<String, Object> result = new HashMap<String, Object>(2);
    List<Teacher> teachers = new ArrayList<Teacher>(RESULTS_PER_PAGE);

    Statement st = new SimpleStatement(CQL_TEACHER_PAGE);
    st.setFetchSize(RESULTS_PER_PAGE);

    // 第一页没有分页状态
    if (pagingState != null)
    {
      st.setPagingState(pagingState);
    }

    ResultSet rs = session.execute(st);
    result.put("pagingState", rs.getExecutionInfo().getPagingState());

    //请注意,我们不依赖RESULTS_PER_PAGE,因为fetch size并不意味着cassandra总是返回准确的结果集
    //它可能返回比fetch size稍微多一点或者少一点,另外,我们可能在结果集的结尾
    int remaining = rs.getAvailableWithoutFetching();
    for (Row row : rs)
    {
      Teacher teacher = this.obtainTeacherFromRow(row);
      teachers.add(teacher);

      if (--remaining == 0)
      {
        break;
      }
    }
    result.put("teachers", teachers);
    return result;
  }

  private Teacher obtainTeacherFromRow(Row row)
  {
    Teacher teacher = new Teacher();
    teacher.setAddress(row.getString("address"));
    teacher.setAge(row.getInt("age"));
    teacher.setHeight(row.getInt("height"));
    teacher.setId(row.getInt("id"));
    teacher.setName(row.getString("name"));

    return teacher;
  }

}

测试代码:

import java.util.Map;

import com.datastax.driver.core.PagingState;
import com.huawei.cassandra.dao.ICassandraPage;
import com.huawei.cassandra.dao.impl.CassandraPageDao;

public class PagingTest
{

  public static void main(String[] args)
  {
    ICassandraPage cassPage = new CassandraPageDao();
    Map<String, Object> result = cassPage.page(null);
    PagingState pagingState = (PagingState) result.get("pagingState");
    System.out.println(result.get("teachers"));
    while (pagingState != null)
    {
      // PagingState对象可以被序列化成字符串或字节数组
      System.out.println("==============================================");
      result = cassPage.page(pagingState);
      pagingState = (PagingState) result.get("pagingState");
      System.out.println(result.get("teachers"));
    }
  }

}

我们来看看Statement的setPagingState(pagingState)方法:

四、偏移查询

保存分页状态,能够保证从某一页移动到下一页很好地运行(也可以实现上一页),但是它不满足随机跳跃,比如直接跳到第10页,因为我们不知道第10页的前一页的分页状态。像这样需要偏移查询的特点,并不被cassandra原生支持,理由是偏移查询效率低下(性能与跳过的行数呈线性反比),所以cassandra官方不鼓励使用偏移量。如果非要实现偏移查询,我们可以在客户端模拟实现。但是性能还是呈线性反比,也就说偏移量越大,性能越低,如果性能在我们的接受范围内,那还是可以实现的。例如,每一页显示10行,最多显示20页,这就意味着,当显示第20页的时候,最多需要额外的多抓取190行,但这也不会对性能造成太大的降低,所以数据量不大的话,模拟实现偏移查询还是可以的。

举个例子,假设每页显示10条记录,fetch size 是50,我们请求第12页(也就是第110行到第119行):

1、第一次执行查询,结果集包含0到49行,我们不需要用到它,只需要分页状态;

2、用第一次查询得到的分页状态,执行第二次查询;

3、用第二次查询得到的分页状态,执行第三次查询。结果集包含100到149行;

4、用第三次查询得到的结果集,先过滤掉前10条记录,然后读取10条记录,最后丢弃剩下的记录,读取的10条记录则是第12页需要显示的记录。

我们需要尝试着找到最佳的fetch size来达到最佳平衡:太小就意味着后台更多的查询;太大则意味着返回了更大的信息量以及更多不需要的行。

另外,cassandra本身不支持偏移量查询。在满足性能的前提下,客户端模拟偏移量的实现只是一种妥协。官方建议如下:

1、使用预期的查询模式来测试代码,以确保假设是正确的

2、设置最高页码的硬限制,以防止恶意用户触发跳过大量行的查询

五、总结

Cassandra对分页的支持有限,上一页、下一页比较好实现。不支持偏移量的查询,硬要实现的话,可以采用客户端模拟的方式,但是这种场景最好不要用在cassandra上,因为cassandra一般而言是用来解决大数据问题,而偏移量查询一旦数据量太大,性能就不敢恭维了。

在我的项目中,索引修复用到了cassandra的分页,场景如下:cassandra的表不建二级索引,用elasticsearch实现cassandra表的二级索引,那么就会涉及到索引的一致性修复的问题,这里就用到了cassandra的分页,对cassandra的某张表进行全表遍历,逐条与elasticsearch中的数据进行匹对,若elasticsearch中不存在,则在elasticsearch中新增,若存在而又不一致,则在elasticsearch中修复。具体elasticsearch怎么样实现cassandra的索引功能,在我后续博客中会专门的讲解,这里就不多说了。而在cassandra表进行全表遍历的时候就需要用到分页,因为表中数据量太大,亿级别的数据不可能一次全部加载到内存中。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java语言中cas指令的无锁编程实现实例

    最开始接触到相关的内容应该是从volatile关键字开始的吧,知道它可以保证变量的可见性,而且利用它可以实现读与写的原子操作...但是要实现一些复合的操作volatile就无能为力了...最典型的代表是递增和递减的操作.... 我们知道,在并发的环境下,要实现数据的一致性,最简单的方式就是加锁,保证同一时刻只有一个线程可以对数据进行操作....例如一个计数器,我们可以用如下的方式来实现: public class Counter { private volatile int a = 0; pub

  • 解决cannot be cast to javax.servlet.Filter 报错的问题

    cannot be cast to javax.servlet.Filter 报错, 原因servlet-api.jar冲突 使用maven开发web应用程序, 启动的时候报错: jar not loaded. See Servlet Spec 2.3, section 9.7.2. Offending class: javax/servlet/Servlet.class 然后输出错误: 严重: Exception starting filter encodingFilter java.lang

  • Java并发编程总结——慎用CAS详解

    一.CAS和synchronized适用场景 1.对于资源竞争较少的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源:而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能. 2.对于资源竞争严重的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized.以java.util.concurrent.atomic包中AtomicInteger类为例,其getAn

  • java中switch case语句需要加入break的原因解析

    java中switch case语句需要加入break的原因解析            java 中使用switch case语句需要加入break 做了具体的实例分析,及编译源码,在源码中分析应该如何使用,大家可以参考下: 假设我们有如下这样一个switch语句: public static void test(int index) { switch (index) { case 1: System.out.println(1); case 2: System.out.println(2);

  • java实现cassandra高级操作之分页实例(有项目具体需求)

    上篇博客讲到了cassandra的分页,相信大家会有所注意:下一次的查询依赖上一次的查询(上一次查询的最后一条记录的全部主键),不像mysql那样灵活,所以只能实现上一页.下一页这样的功能,不能实现第多少页那样的功能(硬要实现的话性能就太低了). 我们先看看驱动官方给的分页做法 如果一个查询得到的记录数太大,一次性返回回来,那么效率非常低,并且很有可能造成内存溢出,使得整个应用都奔溃.所以了,驱动对结果集进行了分页,并返回适当的某一页的数据. 一.设置抓取大小(Setting the fetch

  • Java中使用MyBatis-Plus操作数据库的实例

    目录 MyBatis-Plus 官网 使用 测试数据插入数据库 测试查询所有 测试删除数据 测试修改数据 MyBatis-Plus MyBatis-Plus (opens new window)(简称 MP)是一个 MyBatis (opens new window)的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发.提高效率而生. MyBatis可以直接在xml中通过SQL语句操作数据库,很灵活.但其操作都要通过SQL语句进行,就必须写大量的xml文件,很麻烦.mybatis

  • Java中使用JDBC操作数据库简单实例

    好久没有编写有关数据库应用程序啦,这里回顾一下java JDBC. 1.使用Java JDBC操作数据库一般需要6步: (1)建立JDBC桥接器,加载数据库驱动: (2)连接数据库,获得Connection对象(使用数据库连接地址,用户名,密码): (3)获得数据库Statement对象: (4)执行数据库操作: (5)读取结果: (6)关闭数据库连接: 2.使用Java JDBC操作数据库(mysql)代码: 连接mysql数据库,需要导入mysql数据库jar包,本代码使用mysql-con

  • java 中mongodb的各种操作查询的实例详解

    java 中mongodb的各种操作查询的实例详解 一. 常用查询: 1. 查询一条数据:(多用于保存时判断db中是否已有当前数据,这里 is  精确匹配,模糊匹配 使用regex...) public PageUrl getByUrl(String url) { return findOne(new Query(Criteria.where("url").is(url)),PageUrl.class); } 2. 查询多条数据:linkUrl.id 属于分级查询 public Lis

  • Java实现的文本字符串操作工具类实例【数据替换,加密解密操作】

    本文实例讲述了Java实现的文本字符串操作工具类.分享给大家供大家参考,具体如下: package com.gcloud.common; import org.apache.commons.lang.StringUtils; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.text.BreakIterator; import java.util.Array

  • java序列化与反序列化操作实例分析

    本文实例分析了java序列化与反序列化操作.分享给大家供大家参考,具体如下: 概述: Java序列化是指把Java对象转换为字节序列的过程;而Java反序列化是指把字节序列恢复为Java对象的过程. 示例代码: import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.i

  • Java NIO Path接口和Files类配合操作文件的实例

    Path接口 1.Path表示的是一个目录名序列,其后还可以跟着一个文件名,路径中第一个部件是根部件时就是绝对路径,例如 / 或 C:\ ,而允许访问的根部件取决于文件系统: 2.以根部件开始的路径是绝对路径,否则就是相对路径: 3.静态的Paths.get方法接受一个或多个字符串,字符串之间自动使用默认文件系统的路径分隔符连接起来(Unix是 /,Windows是 \ ),这就解决了跨平台的问题,接着解析连接起来的结果,如果不是合法路径就抛出InvalidPathException异常,否则就

  • Java流操作之数据流实例代码

    实例1: package dataInputStreamAndPrintStreamDemo; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.PrintStream; //示范如何自键

  • Java中join线程操作实例分析

    本文实例讲述了Java中join线程操作.分享给大家供大家参考,具体如下: 一 点睛 Tread提供了让一个线程等待另外一个线程完成的方法--join()方法.当在某个程序执行流中调用其他线程的join()方法时,调用线程将被阻塞,直到被join()方法加入的join线程执行完后为止. join()方法通常由使用线程的程序调用,以将大问题划分成许多小问题,每个小问题分配一个线程.当所有小问题都得到处理后,再调用主线程来进一步操作. 二 代码 public class JoinThread ext

  • Java线程协调运行操作实例详解

    本文实例讲述了Java线程协调运行操作.分享给大家供大家参考,具体如下: 一 点睛 借助于Object类提供的wait().notify()和notifyAll()三个方法,可实现Java线程协调运行.这三个方法并不属于Thread类,而是属于Object类.但这三个方法必须同步监视器对象调用. 关于这三个方法的解释如下: wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程.该wait()方法有三种形式:无时间参数的wait

随机推荐