Rxjava2_Flowable_Sqlite_Android数据库访问实例

一、使用Rxjava访问数据库的优点:

1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程

2.随时订阅和取消订阅,而不必再使用回调函数

3.对读取的数据用rxjava进行过滤,流式处理

4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架

(有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果,

同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘)

二、接下来之关注实现过程:

本次实现用rxjava2的Flowable,有被压支持(在不需要被压支持的情况建议使用Observable)

实现一个稳健的的可灵活切换其他数据库的结构,当然是先定义数据库访问接口。然后跟具不同的数据库实现接口的方法

定义接口:(对于update,delete,insert,可以选择void类型,来简化调用代码,但缺少了执行结果判断)

public interface DbSource {
  //String sql = "insert into table_task (tid,startts) values(tid,startts)";
  Flowable<Boolean> insertNewTask(int tid, int startts); 

  //String sql = "select * from table_task";
  Flowable<List<TaskItem>> getAllTask(); 

  //String sql = "select * from table_task where endts = 0";
  Flowable<Optional<TaskItem>> getRunningTask(); 

  //String sql = "update table_task set isuploadend=isuploadend where tid=tid";
  Flowable<Boolean> markUploadEnd(int tid, boolean isuploadend); 

  //String sql = "delete from table_task where tid=tid and endts>0";
  Flowable<Boolean> deleteTask(int tid);
} 

三、用Android原生的Sqlite实现数据库操作

public class SimpleDb implements DbSource { 

 private static SimpleDb sqlite;
 private SqliteHelper sqliteHelper; 

 private SimpleDb(Context context) {
  this.sqliteHelper = new SqliteHelper(context);
 } 

 public static synchronized SimpleDb getInstance(Context context) {
  if (sqlite == null )
   sqlite = new SimpleDb(context);
  return sqlite;
 } 

 Flowable<Boolean> insertNewTask(int tid, int startts) {
  return Flowable.create(new FlowableOnSubscribe<Boolean>() {
   @Override
   public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
    //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
    ContentValues values = new ContentValues();
    values.put(“tid”, 1);
    values.put(“startts”,13233);
    if(sqliteHelper.getWriteableDatabase().insert(TABLE_NAME, null, values) != -1)
     e.onNext(true);
    else
     e.onNext(false);
    e.onComplete();
   }
  }, BackpressureStrategy.BUFFER);
 } 

 Flowable<List<TaskItem>> getAllTask() {
  return Flowable.create(new FlowableOnSubscribe<List<TaskItem>>() {
   @Override
   public void subscribe(FlowableEmitter<List<TaskItem>> e) throws Exception { 

    List<TaskItem> taskList = new ArrayList<>();
    StringBuilder sql = new StringBuilder(100);
    sql.append("select * from ");
    sql.append(SqliteHelper.TABLE_NAME_TASK); 

    SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase();
    Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null);
    if (cursor.moveToFirst()) {
     int count = cursor.getCount();
     for (int a = 0; a < count; a ++) {
      TaskItem item = new TaskItem();
      item.setTid(cursor.getInt(1));
      item.setStartts(cursor.getInt(2));
      item.setEndts(cursor.getInt(3));
      taskList.add(item);
      cursor.move(1);
     }
    }
    cursor.close();
    sqLiteDatabase.close(); 

    e.onNext(taskList);
    e.onComplete();
   }
  }, BackpressureStrategy.BUFFER);
 } 

 Flowable<Optional<TaskItem>> getRunningTask() {
  return Flowable.create(new FlowableOnSubscribe<Optional<TaskItem>>() {
   @Override
   public void subscribe(FlowableEmitter<Optional<TaskItem>> e) throws Exception {
    TaskItem item = null;
    StringBuilder sql = new StringBuilder(100);
    sql.append("select * from ");
    sql.append(SqliteHelper.TABLE_NAME_TASK);
    sql.append(" where endts=0 limit 1");
    SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase();
    Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null);
    if (cursor.moveToFirst()) {
     int count = cursor.getCount();
     if (count == 1) {
      item = new TaskItem();
      item.setId(cursor.getInt(0));
      item.setTid(cursor.getInt(1));
      item.setStartts(cursor.getInt(2));
      item.setEndts(cursor.getInt(3));
     }
    }
    cursor.close();
    sqLiteDatabase.close(); 

    e.onNext(Optional.fromNullable(item)); //import com.google.common.base.Optional;//安全检查,待会看调用的代码,配合rxjava很好
    e.onComplete();
   }
  }, BackpressureStrategy.BUFFER);
 } 

 Flowable<Boolean> markUploadEnd(int tid, boolean isuploadend) {
   return Flowable.create(new FlowableOnSubscribe<Boolean>() {
   @Override
   public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
    //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
    //数据库操作代码
    e.onNext(false);//返回结果
    e.onComplete();//返回结束
   }
  }, BackpressureStrategy.BUFFER);
 } 

 Flowable<Boolean> deleteTask(int tid) {
  return Flowable.create(new FlowableOnSubscribe<Boolean>() {
   @Override
   public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
    //这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
    //数据库操作代码
    e.onNext(false);//返回结果
    e.onComplete();//返回结束
   }
  }, BackpressureStrategy.BUFFER);
 }
} 

四、同一个接口使用sqlbrite的实现方式

public class BriteDb implements DbSource {
 @NonNull
 protected final BriteDatabase mDatabaseHelper;
 @NonNull
 private Function<Cursor, TaskItem> mTaskMapperFunction;
 @NonNull
 private Function<Cursor, PoiItem> mPoiMapperFunction;
 @NonNull
 private Function<Cursor, InterestPoiItem> mInterestPoiMapperFunction;
 // Prevent direct instantiation.
 private BriteDb(@NonNull Context context) {
  DbHelper dbHelper = new DbHelper(context);
  SqlBrite sqlBrite = new SqlBrite.Builder().build();
  mDatabaseHelper = sqlBrite.wrapDatabaseHelper(dbHelper, Schedulers.io();
  mTaskMapperFunction = this::getTask;
  mPoiMapperFunction = this::getPoi;
  mInterestPoiMapperFunction = this::getInterestPoi;
 } 

 @Nullable
 private static BriteDb INSTANCE;
 public static BriteDb getInstance(@NonNull Context context) {
  if (INSTANCE == null) {
   INSTANCE = new BriteDb(context);
  }
  return INSTANCE;
 } 

 @NonNull
 private TaskItem getTask(@NonNull Cursor c) {
  TaskItem item = new TaskItem();
  item.setId(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ID)));
  item.setTid(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_TID)));
  item.setStartts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS)));
  item.setEndts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS)));
  return item;
 } 

 @Override
 public void insertNewTask(int tid, int startts) {
  ContentValues values = new ContentValues();
  values.put(PersistenceContract.TaskEntry.COLUMN_TASK_TID, tid);
  values.put(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS, startts);
  mDatabaseHelper.insert(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, SQLiteDatabase.CONFLICT_REPLACE);
 } 

 @Override
 public Flowable<List<TaskItem>> getAllTask() {
  String sql = String.format("SELECT * FROM %s", PersistenceContract.TaskEntry.TABLE_NAME_TASK);//TABLE_NAME_TASK表的名字字符串
  return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql)
    .mapToList(mTaskMapperFunction)
    .toFlowable(BackpressureStrategy.BUFFER);
 } 

 @Override
 public Flowable<Optional<TaskItem>> getRunningTask() {
  String sql = String.format("SELECT * FROM %s WHERE %s = ? limit 1",
    PersistenceContract.TaskEntry.TABLE_NAME_TASK, PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS);
  return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql, "0")
    .mapToOne(cursor -> Optional.fromNullable(mTaskMapperFunction.apply(cursor)))
    .toFlowable(BackpressureStrategy.BUFFER);
 } 

 @Override
 public Flowable<Boolean> markUploadEnd(int tid, boolean isuploadend) {
  return Flowable.create(new FlowableOnSubscribe<Boolean>() {
   @Override
   public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
     ContentValues values = new ContentValues();
     if(isuploadend) {
      values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 1);
     } else {
      values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 0);
     }
     String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ?";
     //String[] selectionArgs = {String.valueOf(tid)};
     String selectionArgs = String.valueOf(tid);
     int res = mDatabaseHelper.update(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, selection, selectionArgs);
     if (res > 0) {
      e.onNext(true);//返回结果
     } else {
       e.onNext(false);//返回结果
     }
     e.onComplete();//返回结束
   }
  }, BackpressureStrategy.BUFFER);
 } 

 @Override
 public Flowable<Boolean> deleteTask(int tid) {
  return Flowable.create(new FlowableOnSubscribe<Boolean>() {
   @Override
   public void subscribe(FlowableEmitter<Boolean> e) throws Exception {
     String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ? AND "+
             PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS + " > 0";
     String[] selectionArgs = new String[1];
     selectionArgs[0] = String.valueOf(tid);
     int res = mDatabaseHelper.delete(PersistenceContract.TaskEntry.TABLE_NAME_TASK, selection, selectionArgs);
     if (res > 0) {
      e.onNext(true);//返回结果
     } else {
       e.onNext(false);//返回结果
     }
     e.onComplete();//返回结束
   }
  }, BackpressureStrategy.BUFFER);
 }
} 

五、数据库调用使用方法

使用了lambda简化了表达式进一步简化代码:

简化方法:在/app/build.gradle里面加入如下内容:(defaultConfig的外面)

compileOptions {
 sourceCompatibility JavaVersion.VERSION_1_8
 targetCompatibility JavaVersion.VERSION_1_8
} 

接口调用(获得数据库实例):

//全局定义的实例获取类,以后想要换数据库,只需在这个类里切换即可
public class Injection {
 public static DbSource getDbSource(Context context) {
  //choose one of them
  //return BriteDb.getInstance(context);
  return SimpleDb.getInstance(context);
 }
} 

DbSource db = Injection.getInstance(mContext); 

disposable1 = db.getAllTask()
       .flatMap(Flowable::fromIterable)
       .filter(task -> {     //自定义过滤
         if (!task.getIsuploadend()) {
          return true;
         } else {
          return false;
         }
       })
       .subscribe(taskItems -> //这里是使用了lambda简化了表达式
        doTaskProcess(taskItems)
       , throwable -> {
        throwable.printStackTrace();
       },// onCompleted
       () -> {
        if (disposable1 != null && !disposable1.isDisposed()) {
         disposable1.dispose();
        }
       }); 

 disposable1 = db.getRunningTask()
    .filter(Optional::isPresent) //判断是否为空,为空的就跳过
    .map(Optional::get)    //获取到真的参数
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(taskItem -> {     //onNext()
       //has running task
       mTid = taskItem.getTid();
    }, throwable -> throwable.printStackTrace() //onError()
    , () -> disposable1.dispose());    //onComplete() 

disposable1 = db.markUploadEnd(tid, isuploadend)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(status -> {     //onNext()
       if (status) {
        //dosomething
       }
    }, throwable -> throwable.printStackTrace() //onError()
    , () -> disposable1.dispose());    //onComplete() 

disposable1 = db.deleteTask(tid)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(status -> {     //onNext()
       if (status) {
        //dosomething
       }
    }, throwable -> throwable.printStackTrace() //onError()
    , () -> disposable1.dispose());    //onComplete() 

以上这篇Rxjava2_Flowable_Sqlite_Android数据库访问实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • RxJava2.x+ReTrofit2.x多线程下载文件的示例代码

    写在前面: 接到公司需求:要做一个apk升级的功能,原理其实很简单,百度也一大堆例子,可大部分都是用框架,要么就是HttpURLConnection,实在是不想这么干.正好看了两天的RxJava2.x+ReTrofit2.x,据说这俩框架是目前最火的异步请求框架了.固本文使用RxJava2.x+ReTrofit2.x实现多线程下载文件的功能. 如果对RxJava2.x+ReTrofit2.x不太了解的请先去看相关的文档. 大神至此请无视. 思路分析: 思路及其简洁明了,主要分为以下四步 1.获取

  • Android 使用 RxJava2 实现倒计时功能的示例代码

    倒计时功能被广泛运用在 App 启动页.短信验证码倒计时等,通常做法是起一个Handler ,在子线程里完成倒计时,如今这一做法有了替代品 -- RxJava ,RxJava是被行内一致认可的第三方开源库,我们可以使用RxJava实现倒计时功能. 示例图: 示例代码: 导入必要的库文件(Android支持库和Reactivex系列支持库) implementation 'com.android.support:appcompat-v7:27.1.0' implementation 'com.an

  • RxJava2.x实现定时器的实例代码

    前言 由于现在网络层已经升级到RxJava2.x相关的了,所以需要做些调整.虽然RxJava1.x和RxJava2.x同属RxJava系列,但由于RxJava2.x部分代码的重写,导致RxJava2.x与RxJava1.x已是两个不同的版本,RxJava2.x在性能上更优,尤其在背压支持上.当然,此篇重点不在Rx版本上的区别,有兴趣的同学可以自行研究.当然,2.x之于1.x的区别之一是2.x中已经没有 Subscription mSubscription, Observable.create()

  • 基于Retrofit2+RxJava2实现Android App自动更新

    本文实例为大家分享了Retrofit2 RxJava2实现Android App自动更新,具体内容如下 功能解析 自动更新可以说已经是App的标配了,很多第三方平台也都支持这个功能,最近手头上的项目需要加入这个App自动更新,考虑到项目里有用到Retrofit2和RxJava2,于是打算使用它俩自己实现这个功能. 分析App自动更新,可以分为以下三个功能点: 1.APK文件的下载 2.下载进度的实时更新显示 3.下载完成后的自动安装 其中比较难的一点是下载进度的实时更新显示,更难的是如何优雅的进

  • 基于RxJava2实现的简单图片爬虫的方法

    今年十月份以来,跟朋友尝试导入一些图片到tensorflow来生成模型,这就需要大量的图片.刚开始我只写了一个简单的HttpClient程序来抓取图片,后来为了通用性索性写一个简单的图片爬虫程序.它可以用于抓取单张图片.多张图片.某个网页下的所有图片.多个网页下的所有图片. github地址:https://github.com/fengzhizi715/PicCrawler 这个爬虫使用了HttpClient.RxJava2以及Java 8的一些特性.它支持一些简单的定制,比如定制User-A

  • 详解RxJava2 Retrofit2 网络框架简洁轻便封装

    前言 RxJava2.Retrofit2火了有一段时间了,前段时间给公司的项目引入了这方面相关的技术,在此记录一下相关封装的思路. 需求 封装之前要先明白需要满足哪些需求. RxJava2衔接Retrofit2 Retrofit2网络框架异常的统一处理 兼容fastjson(可选) RxJava2内存泄漏的处理 异步请求加入Loading Dialog 依赖 implementation 'io.reactivex.rxjava2:rxandroid:2.0.1' implementation

  • RxJava2配置及使用详解

    RxJava2.0是一个非常棒的流式编程,采用的观察者模式思想,事件的产生者产生事间之后发送给绑定的接受者,接受顺序与发送顺序一致. 依赖: compile 'io.reactivex.rxjava2:rxjava:2.0.1' compile 'io.reactivex.rxjava2:rxandroid:2.0.1' 简单使用: //观察者模式,这里产生事件,事件产生后发送给接受者,但是一定要记得将事件的产生者和接收者捆绑在一起,否则会出现错误 Observable.create(new O

  • Android 用RxBinding与RxJava2实现短信验证码倒计时功能

    场景:注册账号页面时,我们点击按钮发送验证码,在等待验证码时,界面会有倒计时提示,这此期间按钮不可点击.当倒计时结束时,按钮恢复. 实现与功能都不难,这次用 RxBinding,RxJava2 的方法去实现.并实现了手动.自动停止倒计时,防止多次点击. 功能动态图 要使用 RxBinding.RxJava2 先添加 Gradle 配置: compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxj

  • 谈谈RxJava2中的异常及处理方法

    前言 众所周知,RxJava2 中当链式调用中抛出异常时,如果没有对应的 Consumer 去处理异常,则这个异常会被抛出到虚拟机中去,Android 上的直接表现就是 crash,程序崩溃. 订阅方式 说异常处理前咱们先来看一下 RxJava2 中 Observable 订阅方法 subscribe() 我们常用的几种订阅方式: // 1 subscribe() // 2 Disposable subscribe(Consumer<? super T> onNext) // 3 Dispos

  • RxJava2和Retrofit2封装教程(整洁、简单、实用)

    前言 RxJava2与Retrofit2是老搭档了,之前写了一篇<RxJava和Retrofit2的统一处理单个请求>,是用的Rxjava1.0,本次使用Rxjava2.0与Retrofit2进行封装,一样整洁.简单.实用.Rxjava2相比Rxjava1优化和改动不少了东西,网上有很多大神写的文章,这里就不粘贴复制了.封装的过程有什么问题.疑问,请在下方留言. 下面话不多说了,来一起看看详细的介绍吧 封装教程如下: 核心网络请求: package com.lin.netrequestdemo

随机推荐