Java Elastic Job动态添加任务实现过程解析

背景

在使用Elastic-Job的过程中,有很多人遇到了这么一个问题,就是如何动态的去添加任务?

在官方的文档中也有对此作出回答,如下:

动态添加作业这个概念每个人理解不尽相同。

elastic-job-lite为jar包,由开发或运维人员负责启动。启动时自动向注册中心注册作业信息并进行分布式协调,因此并不需要手工在注册中心填写作业信息。 但注册中心与作业部署机无从属关系,注册中心并不能控制将单点的作业分发至其他作业机,也无法将远程服务器未启动的作业启动。elastic-job-lite并不会包含ssh免密管理等功能。

elastic-job-cloud为mesos框架,由mesos负责作业启动和分发。 但需要将作业打包上传,并调用elastic-job-cloud提供的REST API写入注册中心。 打包上传属于部署系统的范畴elastic-job-cloud并未涉及。

综上所述,elastic-job已做了基本动态添加功能,但无法做到真正意义的完全自动化添加。

接下来谈谈我对动态任务的理解,我眼中的动态任务分为2种:

一种是全新的任务,包括实现的逻辑也是全新的,也就是当我们的程序打成一个jar包后,线上已经在运行了,这个时候我加了一个新的任务,如何能做到不停服务,将这个任务集成到已有的任务中去,这个实现起来难度比较大,涉及到Java类的热加载等,不过最近阿里又有一开源大作JarsLink,GitHub地址:https://github.com/alibaba/jarslink,可以支持在运行时动态加载到系统中,实现不需要重启和发布系统新增功能。还有一种实现思路我们可以利用Groovy脚本来做这样的事情,一般情况下重启来发布新的任务会比较常见,如果各位一定要实现动态的任务可以自己尝试着去研究下我提供的思路。

另一种就是执行的业务逻辑不变,只是运行的时间发生变化。比如文章的定时发布,可以设置文章在某天的某分钟进行自动发布,实现这个功能有多种方式,你可以不停的扫描任务,一到时间点就自动发布,比较优雅的方式就是为每篇文章的自动发布都设置一个任务,通过Cron表达式来指定执行时间,不同的是每个任务都有自己的参数,业务逻辑都是固定的定时发布。
接下来我给大家介绍下Elastic-Job实现上面讲的第二种动态任务的方式,也就是任务的实现逻辑已经是存在的,只是需要发布成多个不同时间去触发的任务。

实战

实现任务的动态添加比较简单,只需要接收任务的信息,然后初始化一下就可以了,在实现的过程中笔者遇到了一个麻烦的问题?

在多节点分片任务却只有一个节点能执行,问题原因在于当有任务A和任务B,2个节点的时候,我们调用A节点的接口进行任务的动态添加,在A节点中初始化了任务调度器,数据也存储到了注册中心,但是B节点是不知道有新的任务添加,默认的使用方法是每个节点在启动时去初始化任务调度器,而我们的B节点已经启动过了,任务是新添加的。

解决这个问题最简单的方式就是将任务的节点都集中管理起来,无论动态任务在哪个节点上进行注册,都需要将这个请求转发到其他的节点上进行初始化操作,这样就可以保证多节点分片的任务正常执行。

还有一种对使用者更友好的办法是对Zookeeper中的节点进行监听,当有新的节点创建时,就自动获取这个节点的配置信息,在本地进行任务初始化,通过这样的方式就可以不用去转发请求到其他节点了,只要在任何节点有添加操作,都能被监听到,并自己去初始化。

监控代码如下:

/**
 * 开启任务监听,当有任务添加时,监听zk中的数据增加,自动在其他节点也初始化该任务
 */
public void monitorJobRegister() {
  CuratorFramework client = zookeeperRegistryCenter.getClient();
  @SuppressWarnings("resource")
  PathChildrenCache childrenCache = new PathChildrenCache(client, "/", true);
  PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
  public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    ChildData data = event.getData();
    switch (event.getType()) {
        case CHILD_ADDED:
          String config = new String(client.getData().forPath(data.getPath() + "/config"));
          Job job = JsonUtils.toBean(Job.class, config);
          addJob(job);
          break;
        default:
          break;
    }
   }
 };
  childrenCache.getListenable().addListener(childrenCacheListener);
  try {
    childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
  } catch (Exception e) {
    e.printStackTrace();
  }
}

为了方便大家使用,我将动态添加任务的功能集成到了我之前的elastic-job-spring-boot-starter(https://github.com/yinjihuan/elastic-job-spring-boot-starter)中集成了动态添加的逻辑,大家引入依赖即可使用。

使用方式比较简单,只需要在启动类上加一个ComponentScan注解,让Spring能够扫描到elastic-job-spring-boot-starter提供的代码即可:

@SpringBootApplication
@EnableElasticJob
//开启动态任务添加API
@ComponentScan(basePackages = {"com.cxytiandi"})
public class JobApplication {
  public static void main(String[] args) {
    new SpringApplicationBuilder().sources(JobApplication.class).web(true).run(args);
    try {
      new CountDownLatch(1).await();
    } catch (InterruptedException e) {
    }
  }
}

配置好之后,启动项目就可以通过REST API来动态的注册任务,API列表如下:

/job
添加任务是POST请求,数据格式为JSON体提交,格式如下:
{
"jobName":"DynamicJob13",
"cron":"0 33 16 ?",
"jobType":"SIMPLE",
"jobClass":"com.cxytiandi.job.demo.DynamicJob",
"jobParameter":"2222222",
"shardingTotalCount":1
}

完整字段请参考:

https://github.com/yinjihuan/elastic-job-spring-boot-starter/blob/master/spring-boot-elastic-job-starter/src/main/java/com/cxytiandi/elasticjob/dynamic/bean/Job.java

注意:jobClass必须事先存在于服务中
* /job/remove

删除任务是GET请求,参数只要任务名称即可,比如:/job/remove?jobName=任务名。可以用于任务完成之后清空注册中心的任务信息。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java如何使用elasticsearch进行模糊查询

    这篇文章主要介绍了Java如何使用elasticsearch进行模糊查询,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 使用环境上篇文章本人已书写过,需要maven坐标,ES连接工具类的请看上一篇文章,以下是内容是笔者在真实项目中运用总结而产生,并写的是主要方法和思路,具体实现大家可以看后面文章,若其中有不适,请大家多多包涵 一.ES模糊查询 (一)不含中文模糊查询,适用于数字 SearchResponse searchResponse=nul

  • java使用elasticsearch分组进行聚合查询过程解析

    这篇文章主要介绍了java使用elasticsearch分组进行聚合查询过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java连接elasticsearch 进行聚合查询进行相应操作 一:对单个字段进行分组求和 1.表结构图片: 根据任务id分组,分别统计出每个任务id下有多少个文字标题 1.SQL:select id, count(*) as sum from task group by taskid; java ES连接工具类 p

  • python 使用elasticsearch 实现翻页的三种方式

    使用ES做搜索引擎拉取数据的时候,如果数据量太大,通过传统的from + size的方式并不能获取所有的数据(默认最大记录数10000),因为随着页数的增加,会消耗大量的内存,导致ES集群不稳定.因此延伸出了scroll,search_after等翻页方式. 一.from + size 浅分页 "浅"分页可以理解为简单意义上的分页.它的原理很简单,就是查询前20条数据,然后截断前10条,只返回10-20的数据.这样其实白白浪费了前10条的查询. GET test/_search { &

  • docker启动elasticsearch时内存不足问题及解决方法

    问题 docker安装并启动elasticsearch时内存不足 系统centos8(阿里云ecs服务器) [root@iZ2zeczvvb79boy368xppwZ ~]# cat /etc/redhat-release CentOS Linux release 8.1.1911 (Core) 安装过程 docker pull elasticsearch:6.4.0 修改虚拟机内存(貌似没有效果) sysctl -w vm.max_map_count=262144 使用docker run命令

  • Springboot集成spring data elasticsearch过程详解

    版本对照 各版本的文档说明:https://docs.spring.io/spring-data/elasticsearch/docs/ 1.在application.yml中添加配置 spring: data: elasticsearch: repositories: enabled: true #多实例集群扩展时需要配置以下两个参数 #cluster-name: datab-search #cluster-nodes: 127.0.0.1:9300,127.0.0.1:9301 2.添加 M

  • PHP ElasticSearch做搜索实例讲解

    ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便. PHP基于ElasticSearch做搜索 在做搜索的时候想到了 ElasticSearch ,而且其也支持 PHP,所以就做了一个简单的例子做测试,感觉还不错,做下记录.

  • SpringBoot2整合ElasticJob框架过程详解

    一.ElasticJob 简介 1.定时任务 在前面的文章中,说过QuartJob这个定时任务,被广泛应用的定时任务标准.但Quartz核心点在于执行定时任务并不是在于关注的业务模式和场景,缺少高度自定义的功能.Quartz能够基于数据库实现任务的高可用,但是不具备分布式并行调度的功能. -> QuartJob定时任务 2.ElasticJob说明基础简介 Elastic-Job 是一个开源的分布式调度中间件,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-

  • Java基于elasticsearch实现集群管理

    这篇文章主要介绍了java基于elasticsearch实现集群管理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 本篇文章主要是查看集群中的相关信息,具体请看代码和注释 @Test public void test45() throws UnknownHostException{ //1.指定es集群 cluster.name 是固定的key值,my-application是ES集群的名称 Settings settings = Settin

  • Java Elastic Job动态添加任务实现过程解析

    背景 在使用Elastic-Job的过程中,有很多人遇到了这么一个问题,就是如何动态的去添加任务? 在官方的文档中也有对此作出回答,如下: 动态添加作业这个概念每个人理解不尽相同. elastic-job-lite为jar包,由开发或运维人员负责启动.启动时自动向注册中心注册作业信息并进行分布式协调,因此并不需要手工在注册中心填写作业信息. 但注册中心与作业部署机无从属关系,注册中心并不能控制将单点的作业分发至其他作业机,也无法将远程服务器未启动的作业启动.elastic-job-lite并不会

  • JAVA Swing实现窗口添加课程信息过程解析

    基本思路: 先创建出一个添加课程信息的框架,随后就设置按钮的鼠标监听事件,确保单机后录入信息的合法性,以及确定合法性之后的后续操作,如保存课程信息,信息有误弹出窗口等操作. 代码 package Test; import javax.swing.JButton; import java.io.*; import javax.swing.JFrame; import javax.swing.JLabel; import javax.swing.JPanel; import javax.swing.

  • Java使用EasyExcel动态添加自增序号列

    目录 前言 实现 思路 其它 总结 前言 本文将介绍如何通过使用EasyExcel自定义拦截器实现在最终的Excel文件中新增一列自增的序号列,最终的效果如下: 此外,本文所使用的完整代码示例已上传到GitHub. 实现 本文主要是通过自定义一个继承AbstractRowWriteHandler的拦截器来实现在最终导出的结果中新增序号列,通过修改源码中保存头部标题的Map内容来给自己添加的序号列留出位置,先展示最终的代码: /** * 自定义 excel 行处理器, 增加序号列 * * @aut

  • Java实现简单双色球摇奖功能过程解析

    这篇文章主要介绍了Java实现简单双色球摇奖功能过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 双色球:从1-33号球中选取6个红球,且红球不重复 从1-16号球中选取一个篮球 话不多说 上代码~~~ package Javaee; import java.util.Arrays; import java.util.Random; public class DoubleChromosphere { public static void

  • 使用java NIO及高速缓冲区写入文件过程解析

    这篇文章主要介绍了使用java NIO及高速缓冲区写入文件过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 代码如下 byte[] bytes = Files.readAllBytes(Paths.get("E:\\pdf\\aaa\\html\\text.txt").normalize()); String text = IOUtils.toString(bytes); String xml = text.substring(

  • java根据富文本生成pdf文件过程解析

    这篇文章主要介绍了java根据富文本生成pdf文件过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 public class PdfUtil { /* * 生成pdf工具类 * wmy 12:40 2019/8/9 * @Param [guideBook, pdfPath] * @return java.lang.Boolean **/ public static Boolean htmlToPdf(GuideBook guideBook

  • java实现上传文件类型检测过程解析

    这篇文章主要介绍了java实现上传文件类型检测过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在进行文件上传时,特别是向普通用户开放文件上传功能时,需要对上传文件的格式进行控制,以防止黑客将病毒脚本上传.单纯的将文件名的类型进行截取的方式非常容易遭到破解,上传者只需要将病毒改换文件名便可以完成上传. 可以读取文件的十六进制的文件头,来判断文件真正的格式. 读取文件的二进制数据并将其转换为十六进制时,同类型文件的文件头数据是相同的,即使改

  • Java静态和非静态成员变量初始化过程解析

    这篇文章主要介绍了Java静态和非静态成员变量初始化过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Java中非静态成员变量.静态成员变量的初始化时机. 非静态变量 我们在这里分析三种结构,着重分析这三种结构的初始化顺序: 成员变量初始化语句: 成员变量初始化块: 构造函数: 示例一: public class MyTest { private String name = "wei.hu"; public MyTest(Str

  • Java 在PDF中添加骑缝章示例解析

    骑缝章是用于往来业务合同,以确保合同真实.有效的印章加盖方法,是一种防范风险的重要方式.在Java程序中,可以通过使用工具来辅助加盖这种骑缝章. 工具:Free Spire.PDF for Java (免费版) 工具获取及jar文件导入: 方式1:通过官网下载jar包,并解压,手动导入lib文件夹下的Spire.Pdf.jar文件. 方式2:通过创建Maven程序,在pom.xml中配置maven仓库路径并指定Free Spire.PDF for Java 的依赖,配置完成后,在IDEA中,点击

随机推荐