⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/cloud-local-executor/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Elastic-Job-Cloud 本地运行模式,对应《官方文档 —— 本地运行模式》

有什么用呢?引用官方解答:

在开发 Elastic-Job-Cloud 作业时,开发人员可以脱离 Mesos 环境,在本地运行和调试作业。可以利用本地运行模式充分的调试业务功能以及单元测试,完成之后再部署至 Mesos 集群。
本地运行作业无需安装 Mesos 环境。

😈 是不是很赞 + 1024?!

本文涉及到主体类的类图如下( 打开大图 ):

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门

2. 配置

LocalCloudJobConfiguration,本地云作业配置,在《Elastic-Job-Cloud 源码分析 —— 作业配置》「3.2 本地云作业配置」有详细解析。

创建本地云作业配置示例代码如下(来自官方):

LocalCloudJobConfiguration config = new LocalCloudJobConfiguration(
new SimpleJobConfiguration(
// 配置作业类型和作业基本信息
JobCoreConfiguration.newBuilder("FooJob", "*/2 * * * * ?", 3)
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
.jobParameter("dbName=dangdang").build(), "com.dangdang.foo.FooJob"),
// 配置当前运行的作业是第几个分片
1,
// 配置Spring相关参数。如果不配置,代表不使用 Spring 配置。
"testSimpleJob" , "applicationContext.xml");

3. 运行

LocalTaskExecutor,本地作业执行器。

创建本地作业执行器示例代码如下(来自官方):

new LocalTaskExecutor(localJobConfig).execute();

可以看到,调用 LocalTaskExecutor#execute() 方法,执行作业逻辑,实现代码如下:

// LocalTaskExecutor.java
public void execute() {
AbstractElasticJobExecutor jobExecutor;
CloudJobFacade jobFacade = new CloudJobFacade(getShardingContexts(), getJobConfigurationContext(), new JobEventBus());
// 创建执行器
switch (localCloudJobConfiguration.getTypeConfig().getJobType()) {
case SIMPLE:
jobExecutor = new SimpleJobExecutor(getJobInstance(SimpleJob.class), jobFacade);
break;
case DATAFLOW:
jobExecutor = new DataflowJobExecutor(getJobInstance(DataflowJob.class), jobFacade);
break;
case SCRIPT:
jobExecutor = new ScriptJobExecutor(jobFacade);
break;
default:
throw new UnsupportedOperationException(localCloudJobConfiguration.getTypeConfig().getJobType().name());
}
// 执行作业
jobExecutor.execute();
}

  • 调用 #getShardingContexts() 方法,创建分片上下文集合( ShardingContexts ),实现代码如下:

    private ShardingContexts getShardingContexts() {
    JobCoreConfiguration coreConfig = localCloudJobConfiguration.getTypeConfig().getCoreConfig();
    Map<Integer, String> shardingItemMap = new HashMap<>(1, 1);
    shardingItemMap.put(localCloudJobConfiguration.getShardingItem(),
    new ShardingItemParameters(coreConfig.getShardingItemParameters()).getMap().get(localCloudJobConfiguration.getShardingItem()));
    return new ShardingContexts(
    // taskId 👇
    Joiner.on("@-@").join(localCloudJobConfiguration.getJobName(), localCloudJobConfiguration.getShardingItem(), "READY", "foo_slave_id", "foo_uuid"),
    localCloudJobConfiguration.getJobName(), coreConfig.getShardingTotalCount(), coreConfig.getJobParameter(), shardingItemMap);
    }

  • 调用 #getJobConfigurationContext() 方法,创建内部的作业配置上下文( JobConfigurationContext ),实现代码如下:

    private JobConfigurationContext getJobConfigurationContext() {
    Map<String, String> jobConfigurationMap = new HashMap<>();
    jobConfigurationMap.put("jobClass", localCloudJobConfiguration.getTypeConfig().getJobClass());
    jobConfigurationMap.put("jobType", localCloudJobConfiguration.getTypeConfig().getJobType().name());
    jobConfigurationMap.put("jobName", localCloudJobConfiguration.getJobName());
    jobConfigurationMap.put("beanName", localCloudJobConfiguration.getBeanName());
    jobConfigurationMap.put("applicationContext", localCloudJobConfiguration.getApplicationContext());
    if (JobType.DATAFLOW == localCloudJobConfiguration.getTypeConfig().getJobType()) { // 数据流作业
    jobConfigurationMap.put("streamingProcess", Boolean.toString(((DataflowJobConfiguration) localCloudJobConfiguration.getTypeConfig()).isStreamingProcess()));
    } else if (JobType.SCRIPT == localCloudJobConfiguration.getTypeConfig().getJobType()) { // 脚本作业
    jobConfigurationMap.put("scriptCommandLine", ((ScriptJobConfiguration) localCloudJobConfiguration.getTypeConfig()).getScriptCommandLine());
    }
    return new JobConfigurationContext(jobConfigurationMap);
    }

  • 调用 #getJobInstance(...) 方法, 获得分布式作业( ElasticJob )实现实例,实现代码如下:

    private <T extends ElasticJob> T getJobInstance(final Class<T> clazz) {
    Object result;
    if (Strings.isNullOrEmpty(localCloudJobConfiguration.getApplicationContext())) { // 直接创建 ElasticJob
    String jobClass = localCloudJobConfiguration.getTypeConfig().getJobClass();
    try {
    result = Class.forName(jobClass).newInstance();
    } catch (final ReflectiveOperationException ex) {
    throw new JobSystemException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", jobClass, ex.getMessage());
    }
    } else { // Spring 环境获得 ElasticJob
    result = new ClassPathXmlApplicationContext(localCloudJobConfiguration.getApplicationContext()).getBean(localCloudJobConfiguration.getBeanName());
    }
    return clazz.cast(result);
    }

  • 调用 AbstractElasticJobExecutor#execute() 方法,执行作业逻辑。 Elastic-Job-Lite 和 Elastic-Job-Cloud 作业执行基本一致,在《Elastic-Job-Lite 源码分析 —— 作业执行》有详细解析。

666. 彩蛋

知识星球

芋道君:可能有点水更,和大家实际开发太相关,想想还是更新下。
旁白君:哎哟哟,哎哟喂。

道友,赶紧上车,分享一波朋友圈!

文章目录
  1. 1. 1. 概述
  2. 2. 2. 配置
  3. 3. 3. 运行
  4. 4. 666. 彩蛋