(starters)springboot-starter整合阿里云datahub方式

目录
  • 1. 功能介绍
  • 2.快速开始
    • 2.1 启动客户端
    • 2.2 获取DataHub客户端
    • 2.3 写数据
    • 2.4 读数据
  • 3. 核心代码

DataHub 类似于传统大数据解决方案中 Kafka 的角色,提供了一个数据队列功能。

DataHub 除了供了一个缓冲的队列作用。同时由于 DataHub 提供了各种与其他阿里云

上下游产品的对接功能,所以 DataHub 又扮演了一个数据的分发枢纽工作。

datahub提供了开发者生产和消费的sdk,在平时的开发中往往会写很多重复的代码,我们可以利用springboot为我们提供的自定义starter的方式,模仿springboot官方的starter组件实现方式,来封装一个更高效简单易用的starter组件,实现开箱即用。

本文仅提供核心思路实现供学习使用,应根据自己所在公司开发习惯做定制开发

1. 功能介绍

1.无需关心DataHub底层如何操作,安心编写业务代码即可进行数据的获取和上传,

2.类似RabbitMQ的starter,通过注解方式,Listener和Handler方式进行队列消费

3.支持游标的上次记忆功能

<dependency>
      <artifactId>cry-starters-projects</artifactId>
      <groupId>cn.com.cry.starters</groupId>
      <version>2022-1.0.0</version>
</dependency>

2.快速开始

2.1 启动客户端

配置阿里云DataHub的endpoint以及AK信息

aliyun:
  datahub:
  	# 开启功能
  	havingValue: true
    #是否为私有云
    isPrivate: false
    accessId: xxx
    accessKey: xxx
    endpoint: xxx
    #连接DataHub客户端超时时间
    conn-timeout: 10000

启动SpringBoot,你会发现datahub客户端已经启动完毕

2.2 获取DataHub客户端

DatahubClient datahubClient=DataHubTemplate.getDataHubClient();

2.3 写数据

public int write(@RequestParam("id") Integer shardId) {
    List<Student> datas = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        Student s = new Student();
        s.setAge(i);
        s.setName("name-" + i);
        s.setAddress("address-" + i);
        datas.add(s);
    }
    int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
    return successNumbers;
}

以上示例代码表示往 projectName为my_test, topicName为student, shardId 为N的hub里写数据,并且返回插入成功的条数

2.4 读数据

读数据开发的逻辑类似RabbitMq的starter,使用@DataHubListener和@DataHubHandler处理器注解进行使用

@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl {    @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
    public void handler(Message message) {
        System.out.println("读取到shardId=0的消息");
        System.out.println(message.getData());
        System.out.println(message.getCreateTsime());
        System.out.println(message.getSize());
        System.out.println(message.getConfig());
        System.out.println(message.getMessageId());
    }
}

以上代码说明: 通过LATEST游标的方式,监听 project=my_test ,topicName=student,shardId=0 ,最终通过Message的包装类拿到dataHub实时写入的数据。

这边可以设置多种游标类型,例如根据最新的系统时间、最早录入的序号等

3. 核心代码

首先需要一个DataHubClient增强类,在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId,根据游标规则来读取当前的cursor进行数据的读取。

public class DataHubClientWrapper implements InitializingBean, DisposableBean {    @Autowired
    private AliyunAccountProperties properties;    @Autowired
    private ApplicationContext context;    private DatahubClient datahubClient;
    public DataHubClientWrapper() {    }    /**
     * 执行销毁方法
     *
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        WorkerResourceExecutor.shutdown();
    }    @Override
    public void afterPropertiesSet() throws Exception {        /**
         * 创建DataHubClient
         */
        this.datahubClient = DataHubClientFactory.create(properties);        /**
         * 打印Banner
         */
        BannerUtil.printBanner();        /**
         * 赋值Template的静态对象dataHubClient
         */
        DataHubTemplate.setDataHubClient(datahubClient);        /**
         * 初始化Worker线程
         */
        WorkerResourceExecutor.initWorkerResource(context);
        /**
         * 启动Worker线程
         */
        WorkerResourceExecutor.start();
    }
}

写数据,构建了一个类似RedisDataTemplate的模板类,封装了write的逻辑,调用时只需要用DataHubTemplate.write调用

public class DataHubTemplate {    private static DatahubClient dataHubClient;    private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);    /**
     * 默认不开启重试机制
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @return
     */
    public static int write(String projectName, String topicName, List<?> datas, Integer shardId) {
        return write(projectName, topicName, datas, shardId, false);
    }    /**
     * 往指定的projectName以及topic和shard下面写数据
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @param retry
     * @return
     */
    private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) {
        RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (Object o : datas) {
            RecordEntry entry = new RecordEntry();
            Map<String, Object> data = BeanUtil.beanToMap(o);
            TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
            for (String key : data.keySet()) {
                tupleRecordData.setField(key, data.get(key));
            }
            entry.setRecordData(tupleRecordData);
            entry.setShardId(String.valueOf(shardId));
            recordEntries.add(entry);
        }
        PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
        int failedRecordCount = result.getFailedRecordCount();
        if (failedRecordCount > 0 && retry) {
            retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
        }
        return datas.size() - failedRecordCount;
    }    /**
     * @param client
     * @param records
     * @param retryTimes
     * @param project
     * @param topic
     */
    private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
        boolean suc = false;
        List<RecordEntry> failedRecords = records;
        while (retryTimes != 0) {
            logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
            retryTimes = retryTimes - 1;
            PutRecordsResult result = client.putRecords(project, topic, failedRecords);
            int failedNum = result.getFailedRecordCount();
            if (failedNum > 0) {
                failedRecords = result.getFailedRecords();
                continue;
            }
            suc = true;
            break;
        }
        if (!suc) {
            logger.error("DataHub send message retry failure");
        }
    }    public static DatahubClient getDataHubClient() {
        return dataHubClient;
    }    public static void setDataHubClient(DatahubClient dataHubClient) {
        DataHubTemplate.dataHubClient = dataHubClient;
    }
}

读数据,需要在Spring启动时开启一个监听线程DataListenerWorkerThread,执行一个死循环不停轮询DataHub下的对应通道。

public class DataListenerWorkerThread extends Thread {
    private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
    private volatile boolean init = false;
    private DatahubConfig config;
    private String workerKey;
    private int recordLimits;
    private int sleep;
    private RecordSchema recordSchema;
    private RecordHandler recordHandler;
    private CursorHandler cursorHandler;    public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {
        this.config = new DatahubConfig(projectName, topicName, shardId);
        this.workerKey = projectName + "-" + topicName + "-" + shardId;
        this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
        this.recordLimits = recordLimits;
        this.sleep = sleep;
        this.setName("DataHub-Worker");
        this.setDaemon(true);
    }    @Override
    public void run() {
        initRecordSchema();
        String cursor = cursorHandler.positioningCursor(config);
        for (; ; ) {
            try {
                GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
                if (result.getRecordCount() <= 0) {
                    // 无数据,sleep后读取
                    Thread.sleep(sleep);
                    continue;
                }
                List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords());
                logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());
                // 拿到下一个游标
                cursor = cursorHandler.nextCursor(result);
                //执行方法
                WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);
            } catch (InvalidParameterException ex) {
                //非法游标或游标已过期,建议重新定位后开始消费
                cursor = cursorHandler.resetCursor(config);
                logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());
            } catch (DatahubClientException e) {
                logger.error("DataHubException:{}", e.getErrorMessage());
                this.interrupt();
            } catch (InterruptedException e) {
                logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());
            } catch (Exception e) {
                this.interrupt();
                logger.error("receive DataHub records cry.exception:{}", e, e);
            }
        }
    }    /**
     * 终止
     */
    public void shutdown() {
        if (!interrupted()) {
            interrupt();
        }
    }    /**
     * 初始化topic字段以及recordSchema
     */
    private void initRecordSchema() {
        try {
            if (!init) {
                recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();
                List<Field> fields = recordSchema.getFields();
                this.recordHandler = new RecordHandler(fields);
                init = true;
            }
        } catch (Exception e) {
            logger.error("initRecordSchema error:{}", e, e);
        }
    }
}

read的时候结合了注解开发,通过定义类注解DataHubListener和方法注解DataHubHandler内置属性,来动态的控制需要在哪些方法中处理监听到的数据的逻辑:

DataHubHandler

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubHandler {
    /**
     * 话题名称
     *
     * @return
     */
    String topicName();    /**
     * shardId
     *
     * @return
     */
    int shardId();    /**
     * 最大数据量限制
     *
     * @return
     */
    int recordLimit() default 1000;    /**
     * 游标类型
     *
     * @return
     */
    CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST;    /**
     * 若未监听到数据添加,休眠时间 ms
     *
     * @return
     */
    int sleep() default 10000;    /**
     * 使用CursorType.SYSTEM_TIME的时候配置 时间偏移量
     *
     * @return
     */
    String startTime() default "";    /**
     * 使用使用CursorType.SEQUENCE的时候配置,偏移量,必须是正整数
     *
     * @return
     */
    int sequenceOffset() default 0;
}

DataHubListener

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {
    String projectName();
}

最后我们需要启动SpringBootStarter的EnableConfigurationProperties 功能,通过配置文件来控制default-bean的开启或者关闭。

启动类:

@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {
    /**
     * 初始化dataHub装饰bean
     *
     * @return
     */
    @Bean
    public DataHubClientWrapper dataHubWrapper() {
        return new DataHubClientWrapper();
    }
}

属性配置类

@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{    /**
     * http://xxx.aliyuncs.com
     */
    private String endpoint;    /**
     * account
     */
    private String accessId;    /**
     * password
     */
    private String accessKey;    /**
     * private cloud || public cloud
     */
    private boolean isPrivate;    /**
     * unit: ms
     */
    private Integer connTimeout = 10000;
}

最后记得要做成一个starter,在resources下新建一个META-INF文件夹,新建一个spring.factories文件,

org.springframework.boot.autoconfigure.EnableAutoConfiguration= \
  cry.starter.datahub.DataHubClientAutoConfiguration

大体逻辑就是这样了,你学会了吗? hhhhhhhhh~

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot整合阿里云OSS对象存储服务实现文件上传

    1. 准备工作: 一.首先登录阿里云OSS对象存储控制台创建一个Bucket作为你的存储空间. 二.创建Access Keyan按要求创建进行,这里的方法步骤我就不展现出来了,你们可以自行查询阿里云文档,这个获取值本身就不难. 重点:记下你的AccessKey ID.AccessKey Secret以及你刚才创建的Buacket名字BucketName. 2. 配置: 在pom里引入oss要用的依赖 <dependency> <groupId>com.aliyun.oss</

  • 手撸一个 spring-boot-starter的全过程

    我们使用 Spring Boot,基本上都是沉醉在它 Stater 的方便之中.Starter 为我们带来了众多的自动化配置,有了这些自动化配置,我们可以不费吹灰之力就能搭建一个生产级开发环境,有的小伙伴会觉得这个 Starter 好神奇呀!其实 Starter 也都是 Spring + SpringMVC 中的基础知识点实现的,接下来带大家自己来撸一个 Starter ,慢慢揭开 Starter 的神秘面纱! 核心知识 其实 Starter 的核心就是条件注解 @Conditional ,当

  • spring boot微服务自定义starter原理详解

    这篇文章主要介绍了spring boot微服务自定义starter原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 使用spring boot开发微服务后,工程的数量大大增加(一定要按照领域来切,不要一个中间件客户端包一个),让各个jar从开发和运行时自包含成了一个重要的内容之一.spring boot starter就可以用来解决该问题(没事启动时别依赖于applicationContext.getBean获取bean进行处理,依赖关系

  • SpringBoot整合阿里云OSS对象存储服务的实现

    今天来整合一下SpringBoot和阿里云OSS对象存储服务. 一.配置OSS服务 先在阿里云开通对象存储服务,拿到AccessKeyId.AccessKeySecret. 创建你的bucket(存储空间),相当于一个一个的文件夹目录.按业务需求分类存储你的文件,图片,音频,app包等等.创建bucket是要选择该bucket的权限,私有,公共读,公共读写,按需求选择.创建bucket时对应的endpoint要记住,上传文件需要用到. 可以配置bucket的生命周期,比如说某些文件有过期时间的,

  • springboot+thymeleaf整合阿里云OOS对象存储图片的实现

    目录 1.先引入pom依赖 2.编写前端thymleeaf代码tetsfile.html 3.service层编写 4.controller层编写 今天再进行创建项目时想使用阿里云oos进行存储图片 下面进行实操 1.先引入pom依赖 <dependency> <groupId>com.aliyun.oss</groupId> <artifactId>aliyun-sdk-oss</artifactId> <version>3.9.1

  • SpringBoot整合阿里云视频点播的过程详解

    目录 1.准备工作 2.服务端SDK的使用 2.1 导入依赖 2.2 初始化类 2.3 创建读取公共常量的工具类 2.4 获取视频播放地址 2.5 获取视频播放凭证 2.6 上传视频到阿里云视频点播服务 3.springboot项目中实践 3.1 上传视频到阿里云 3.2 根据视频id删除视频 1.准备工作 首先需要在阿里云开通视频点播服务: 1.首先,进入到阿里云视频点播平台,点击开通服务,选择按使用流量计费即可 2.开通之后点击进入管理控制台即可 视频点播有什么用? 视频点播(ApsaraV

  • springboot整合阿里云oss上传的方法示例

    OSS申请和配置 1. 注册登录 输入网址:https://www.aliyun.com/product/oss 如果没有账号点击免费注册,然后登录. 2.开通以及配置 点击立即开通 进入管理控制台 第一次使用会出现引导,按引导点击"我知道了",然后点击创建Bucket. 如果没有存储包或流量包点击购买. 点击确定,返回主页面,出现该页面,点击我知道了 将EndPoint记录下来,方便后期添加到我们项目的配置文件中 创建 AccessKeyID 和 AccessKeySecret 点击

  • SpringBoot整合阿里云短信服务的方法

    目录 一.新建短信微服务 1.在service模块下创建子模块service-msm 3.配置application.properties 4.创建启动类 二.阿里云短信服务 三.编写发送短信接口 1.在service-msm的pom中引入依赖 2.编写controller,根据手机号发送短信 3.编写service 一.新建短信微服务 1.在service模块下创建子模块service-msm 2.创建controller和service代码 3.配置application.propertie

  • SpringBoot整合阿里云开通短信服务详解

    准备工作 开通短信服务 如果开通不成功,就只能借下别人已经开通好的短信,如果不想重复,可在其下创建一个新的模板管理 这里只是介绍如何使用 导入依赖 com.aliyun aliyun-java-sdk-core 4.5.1 com.aliyun aliyun-java-sdk-dysmsapi 1.1.0 com.alibaba fastjson 1.2.62 发送验证码到手机上,验证码生成工具类(内容较为固定,也可根据需求改) package com.xsha.msmservice.utils

  • yii2.0整合阿里云oss上传单个文件的示例

    上一篇文章已经介绍了如何整合阿里云oss,这一篇主要介绍上传文件到阿里云oss. 主要思路:首先文件要上传到服务器,然后把服务器里边的文件传到阿里云oss,成功以后就把文件信息写入数据库,失败了就删除服务器的文件. 主要步骤: 0 介绍几个oss的概念. accessKeyId     ==>> 可以理解为访问阿里云oss的账号 accessKeySecret ==>> 可以理解为访问阿里云oss的密码 bucket          ==>> 可以理解为文件在保存的根

  • 实战分布式医疗挂号系统登录接口整合阿里云短信详情

    目录 步骤1:搭建service-user用户模块 1.启动类&配置网关 步骤2:整合JWT 步骤3: 搭建service-msm短信模块(整合阿里云短信) 1.启动类&配置网关 4.三层调用 步骤4:登录页面前端 1.封装api请求 2.添加登录组件 3.登录全局事件 附加:用户认证与网关整合 1.在服务网关添加fillter 2.调整前端代码 本篇文章完成的需求: 1,登录采取弹出层的形式. 2,登录方式: (1)手机号码+手机验证码 (2)微信扫描(后文完成) 3,无注册界面,第一次

  • Thinkphp整合阿里云OSS图片上传实例代码

    Thinkphp3.2整合阿里云OSS图片上传实例,图片上传至OSS可减少服务器压力,节省宽带,安全又稳定,阿里云OSS对于做负载均衡非常方便,不用传到各个服务器了 首先引入阿里云OSS类库 <?php namespace Home\Controller; use Think\Controller; use OSS\Core\OssException; vendor('aliyun.autoload'); 上传图片自己写,上传成功到阿里云后,删除临时文件 function aliyun() {

随机推荐