java开源调度如何给xxljob加k8s执行器

目录
  • 前言
  • 执行器接口
  • K8S执行器设计
    • 1、在XXL-JOB-ADMIN模块新增执行器
    • 2、引入K8S-CLIENT-JAVA,使用SERVICEACCOUNT机制与K8S交互
    • 3、编写代理执行器调度代码
  • 结语

前言

xxljob 是采用 java 开发的开源的任务调度系统,架构上分为调度管理器、执行器,目前除了官方提供的 java 执行器外,也有 go 开发者提供了 go 语言的执行器(看了 go 执行器的代码,除了任务日志没有实现,其他功能实现都比较完整)。 xxljob 在设计上,抽象出了执行器的接口,所以实现一个语言的执行器并不复杂,这里主要探索下,如何利用 k8s 的 pod 的能力,使用 xxljob 调度 pod 运行,实现一个通用的和语言无关的执行器

执行器接口

实现一个 xxljob 的执行器,如果不考虑执行器节点自动注册,只需要实现如下五个接口即可:

  • /beat :执行器心跳
  • /idleBeat :执行器的某个 job 是否空闲
  • /run :触发 job 执行
  • /kill :终止正在执行的 job
  • /log :查看本节点执行器的 job 执行日志

不过一些调度策略则需要每个执行器自行实现了,比如【阻塞处理策略】,当同一个job 的任务还在执行,突然又收到了一个新的,是串行执行,还是停止之前的任务,或者丢弃当前的任务,这些实现都需要执行器考虑。

K8S 执行器设计

上面已经了解了实现一个执行器的要素。但是让 k8s 实现这些接口,难度有点高。然后又希望不破坏现有的 xxljob 的设计,怎么办?代理解决。可以直接采用现有的 java 执行器,创建一个 job 任务,这个 job 任务专门发起 k8s 的调度,具体的调度 pod 信息通过调度参数传递,下面来实现下,以及看下需要注意的问题。

1、在 XXL-JOB-ADMIN 模块新增执行器

为了尽量减少系统维护的复杂度,我们可以将代理调度 k8s 的执行器,直接集成到 admin 模块,启动 admin 的时候,自动注册 k8s 执行器。

2、引入 K8S-CLIENT-JAVA ,使用 SERVICE ACCOUNT 机制与 K8S 交互

<dependency>
			<groupId>io.fabric8</groupId>
			<artifactId>kubernetes-client</artifactId>
			<version>5.4.0</version>
		</dependency>

这个客户端提供了完整的和 k8s-api-server 交互能力,使用这个客户端,基于 k8s 的 service account 认证,可以轻松在 xxljob 所在 namespace 内完成 pod 的生命周期管理。引入依赖后,首先创建 client 实例:

/**
 * @author kl (http://kailing.pub)
 * @since 2021/6/4
 */
@Configuration
public class KubernetesClientConfig {
    @Bean
    public KubernetesClient kubernetesClient(){
        return new DefaultKubernetesClient(Config.autoConfigure(null));
    }

}

这里初始化客户端时,采用了自动发现配置的模式,如果是本机开发时,就会自动寻找你本机的 kubectl 配置,当 xxljob 部署到 k8s 内时,如果找不到本地的就会尝试寻找 service account 创建出来的配置,然后从环境变量中自发现 k8s 集群的链接地址。所以无论是开发环境还是线上环境,都不用配置k8s 的链接认证信息。但是,部署到 k8s 时,因为需要借助 k8s 的 service account 机制与 k8s 交互,需要多定义一个 service account 的权限声明,可参考如下:

# In GKE need to get RBAC permissions first with
# kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin [--user=|--group=]
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: xxljob
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: xxljob
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["create","delete","get","list","patch","update","watch"]
- apiGroups: [""]
  resources: ["pods/exec"]
  verbs: ["create","delete","get","list","patch","update","watch"]
- apiGroups: [""]
  resources: ["pods/log"]
  verbs: ["get","list","watch"]
- apiGroups: [""]
  resources: ["events"]
  verbs: ["watch"]
- apiGroups: [""]
  resources: ["secrets"]
  verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: xxljob
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: xxljob
subjects:
- kind: ServiceAccount
  name: xxljob

3、编写代理执行器调度代码

/**
 * @author kl (http://kailing.pub)
 * @since 2021/5/28
 */
@Component
public class KubernetesExecutorHandler {
    private static final Logger logger = LoggerFactory.getLogger(KubernetesExecutorHandler.class);
    private static final String NAMESPACE = "xxl-job";
    private final KubernetesClient client;
    public KubernetesExecutorHandler(KubernetesClient client) {
        this.client = client;
    }
    @XxlJob(value = "callK8s")
    public void callK8s() throws InterruptedException {
        String podResource = XxlJobHelper.getJobParam();
        Pod pod = Serialization.unmarshal(podResource, Pod.class);
        pod.getSpec().setRestartPolicy("Never");//这里强制设置重启策略为不重启
        pod = client.pods().inNamespace(NAMESPACE).create(pod);
        client.resource(pod).waitUntilCondition(pod1 -> pod1.getStatus().getPhase().equals("Succeeded") || pod1.getStatus().getPhase().equals("Failed"), 2, TimeUnit.MINUTES);
        String log = client.pods().inNamespace(NAMESPACE).withName(pod.getMetadata().getName()).getLog();
        XxlJobHelper.log(log); //记录 pod 日志到 xxl-job
        logger.info(log);
        client.resource(pod).delete();
    }

}

如上,一个简版的 k8s 执行器便完成了,使用时,通过定义bean模式的 job ,然后选择 k8s 执行器,jobHandler 名称和填上 callk8s,通过job 参数传递 pod 调度信息,如:

这里定义了一个打印 当前时间和当前环境变量的 pod 任务,执行完成后,就可以从 job 的日志里看到执行结果了,如:

结语

目前的实现方式,单纯从兼容 xxljob 现有的架构模式,以及现有的实现出发的,所以采用了 java 代理执行器代为调度 pod 的方案,基本继承了所有 java 执行器的功能,比如 job 执行日志记录,并发执行策略等。需要注意的是,因为是单 handler 实现,每个job 都会用同一个 handler 去运行,所以创建任务的时候并发策略这块只能选择单机串行执行,否则非常容易丢任务。另一个需要考虑的问题,如果代理执行器非正常关闭,pod 没来的及删除就挂了,这个时候需要启动一个巡检的线程,检测已经完成或者已经出错的 pod ,然后清理掉。

以上就是java开源调度如何给xxljob加k8s执行器的详细内容,更多关于java开源xxljob加k8s执行器的资料请关注我们其它相关文章!

(0)

相关推荐

  • xxl-job如何滥用netty导致的问题及解决方案

    netty作为一种高性能的网络编程框架,在很多开源项目中大放异彩,十分亮眼,但是在有些项目中却被滥用,导致使用者使用起来非常的难受. 笔者使用的是2.3.0版本的xxl-job,也是当前的最新版本:下面所有的代码修改全部基于2.3.0版本的xxl-job源代码 https://github.com/xuxueli/xxl-job/tree/2.3.0 其中,xxl-job-admin对应着项目:https://github.com/xuxueli/xxl-job/tree/2.3.0/xxl-j

  • K8S中五种控制器的介绍以及使用

    目录 k8s的控制器类型 pod与控制器之间的关系 Deployment(无状态化应用) 状态与无状态化对特点 Deployment的更新 Deployment的回滚 CronJob控制器 总结 k8s的控制器类型 Kubernetes中内建了很多controller(控制器),这些相当于一个状态机,用来控制Pod的具体状态和行为 Deployment:适合无状态的服务部署 StatefullSet:适合有状态的服务部署 DaemonSet:一次部署,所有的node节点都会部署,例如一些典型的应

  • SpringBoot整合Xxl-Job的完整步骤记录

    一.下载Xxl-Job源代码并导入本地并运行 Github地址: https://github.com/xuxueli/xxl-job 中文文档地址: https://www.xuxueli.com/xxl-job/ 1.使用Idea或Eclipse导入 2.执行sql脚本(红色标记处) 3.运行xxl-job-admin(xxl-job后台管理,主要方便管理各种各样的任务) 注意:在运行之前,需要把2的sql脚本执行完毕,并修改数据库连接池. 正常启动,访问地址为:http://localho

  • SpringBoot项目集成xxljob实现全纪录

    目录 xxljob介绍 代码配置过程 1.引入xxl-job的依赖 2.编写配置文件 3. 编写配置类 4.新建Job文件夹,将自己写的类放到此文件夹下 5. 编写业务代码 登录xxl-Job并配置 1.执行器管理--新增执行器 2.任务管理--新增任务 测试: 断点调试 查看调度日志: xxljob介绍 XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速.学习简单.轻量级.易扩展.现已开放源代码并接入多家公司线上产品线,开箱即用. 被称为任务调度中心,可做定时任务. 优点特性如下

  • java开源调度如何给xxljob加k8s执行器

    目录 前言 执行器接口 K8S执行器设计 1.在XXL-JOB-ADMIN模块新增执行器 2.引入K8S-CLIENT-JAVA,使用SERVICEACCOUNT机制与K8S交互 3.编写代理执行器调度代码 结语 前言 xxljob 是采用 java 开发的开源的任务调度系统,架构上分为调度管理器.执行器,目前除了官方提供的 java 执行器外,也有 go 开发者提供了 go 语言的执行器(看了 go 执行器的代码,除了任务日志没有实现,其他功能实现都比较完整). xxljob 在设计上,抽象出

  • 5个主流的Java开源IDE工具详解

    Java IDE工具提供了多种用户独特需求和个人偏好来创建编程环境的方法. Java框架能够简化程序员的工作.这些框架被设计和开发用于在任何服务器环境上运行任意应用程序;包括解析注释.扫描描述符.加载配置和在Java虚拟机(JVM)上启动实际服务方面的动态行为.控制这么大的范围需要更多的代码,使得减少内存占用或加速新应用程序的启动时间变得困难.无论如何,Java在当今使用的编程语言中始终排在前三名,在TIOBE索引中涉及700万到1000万的程序员和开发者. 因为Java如此的普及,意味着集成开

  • java开源区块链初始化创世区块jdchain服务搭建

    目录 初始化创世区块 第一步.生成公私钥 第二步.准备配置 第三步.执行初始化脚本 创世区块创建过程 结语 初始化创世区块 搭建区块链服务第一步就是初始化创世区块,创建账本.生成dchain初始化创世区块有两种方式,一种是通过官方提供的区块链部署工具,在页面上操作初始化,一种是通过初始化脚本创建.目前,部署工具初始化账本功能有限,只支持btfsmart共识算法的节点初始化,如果要支持mq的共识,只能使用初始化账本的脚本创建 第一步.生成公私钥 使用部署工具生成公私钥,虽然部署工具不支持mq共识的

  • java开源好用的简繁转换类库推荐

    目录 引言 1. 实例体验 maven依赖 gradle依赖 2. 进阶使用 2.1 预热 2.2 卸载 2.3 通用方法 3. 小结 引言 今天给大家介绍一个非常有意思类库,基于java实现的简繁体转换,适用于后端.android等开发领域 源码地址:* liuyueyi/quick-chinese-transfer: 简繁转换,支持香港繁体,台湾繁体与简体互转 1. 实例体验 以maven做项目管理,可以直接从中央仓库下载,再pom文件中添加以下依赖即可 <!-- https://mvnre

  • JAVA实现二维码生成加背景图代码实例

    这篇文章主要介绍了JAVA实现二维码生成加背景图代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 pom.xml依赖 <!-- 二维码生成 --> <!-- https://mvnrepository.com/artifact/com.google.zxing/core --> <dependency> <groupId>com.google.zxing</groupId> <art

  • Java 找不到或无法加载主类的修复方法

    有时,当我们运行Java程序时,我们可能会看到"找不到或无法加载主类".原因很容易猜测:JVM找不到主类并给出了这个错误.但是为什么不能呢? 在本文中,我们将讨论找不到主类的可能原因.另外,我们将看看如何修复它们. 示例程序 我们将从HelloWorld程序开始: public class HelloWorld { public static void main(String[] args) { System.out.println("Hello world..!!!&quo

  • java实现动态编译并动态加载

    在D盘test目录下有个java文件:AlTest.java public class AlTest { public String sayHello(){ System.out.println("AlTest类 sayHello()方法正在执行...."); return "hello word"; } } 现需要实现在工程已经运行过程中,进行java文件到class文件的编译操作,并运行AlTest类的方法 package com.piao.job; impor

  • java开源区块链jdchain入门

    目录 前言 部署组件 傻瓜式部署 获取部署包 效果预览 部署遇到的问题: 网关里的异常 结语 前言 jdchain是京东数科开源的区块链平台,目标是实现一个面向企业应用场景的通用区块链框架系统,能够作为企业级基础设施,为业务创新提供高效.灵活和安全的解决方案. 之所以选择jdchain研究是因为jdchain是为数不多的底层也是采用java实现的一个区块链平台 项目地址:https://github.com/blockchain... 文档地址:https://gitee.com/jdchain

  • Java开源诊断工具Arthas使用方法详解

    一.前言 1.热更新代码的场景 (1)当线上服务器出现问题时,有些时候现有的手段不足以发现问题所在,可能需要追加打印日志或者增加一些调试代码,如果我们去改代码重新部署,会破坏问题现场,可以通过热部署的手段来增加调试代码 (2)线上出现紧急bug,通过Review代码找到问题,修改好后打包部署的流程可能比较久,可以通过热部署代码及时解决问题 二.使用阿里巴巴开源的Java诊断工具 ---Arthas,他可以附着在我们的Java服务器进程上面,查看服务器状态,jvm状态等各种参数指标,还可以进行热更

  • 解析Java虚拟机中类的初始化及加载器的父委托机制

    类的初始化 在初始化阶段,Java虚拟机执行类的初始化语句,为类的静态变量赋予初始值. 在程序中,静态变量的初始化有两种途径: 1.在静态变量的声明处进行初始化: 2.在静态代码块中进行初始化. 没有经过显式初始化的静态变量将原有的值. 一个比较奇怪的例子: package com.mengdd.classloader; class Singleton { // private static Singleton mInstance = new Singleton();// 位置1 // 位置1输

随机推荐