⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 juejin.cn/post/7102343528525037576 「天机术士」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

前言

相信很多系统里都有这一种场景:用户上传Excel,后端解析Excel生成相应的数据,校验数据并落库。这就引发了一个问题:如果Excel的行非常多,或者解析非常复杂,那么解析+校验的过程就非常耗时。如果接口是一个同步的接口,则非常容易出现接口超时,进而返回的校验错误信息也无法展示给前端,这就需要从功能上解决这个问题。一般来说都是启动一个子线程去做解析工作,主线程正常返回,由子线程记录上传状态+校验结果到数据库。同时提供一个查询页面用于实时查询上传的状态和校验信息。

多线程处理导入excel

进一步的,如果我们每一个上传的任务都写一次线程池异步+日志记录的代码就显得非常冗余。同时,非业务代码也侵入了业务代码导致代码可读性下降。从通用性的角度上讲,这种业务场景非常适合模板方法的设计模式。即设计一个抽象类,定义上传的抽象方法,同时实现记录日志的方法,例如:

//伪代码,省略了一些步骤
@Slf4j
public abstract class AbstractUploadService<T> {
public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("-upload-pool-%d")
.setPriority(Thread.NORM_PRIORITY).build();
public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(10, 20, 300L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());

protected abstract String upload(List<T> data);

protected void execute(String userName, List<T> data) {
// 生成一个唯一编号
String uuid = UUID.randomUUID().toString().replace("-", "");
uploadExecuteService.submit(() -> {
// 记录日志
writeLogToDb(uuid, userName, updateTime, "导入中");
// 一个字符串,用于记录upload的校验信息
String errorLog = "";
//执行上传
try {
errorLog = upload(data);
writeSuccess(uuid, "导入中", updateTime);
} catch (Exception e) {
LOGGER.error("导入错误", e);
//计入导入错误日志
writeFailToDb(uuid, "导入失败", e.getMessage(), updateTime);
}
/**
* 检查一下upload是不是返回了错误日志,如果有,需要注意记录
*
* 因为错误日志可能比较长,
* 可以写入一个文件然后上传到公司的文件服务器,
* 然后在查看结果的时候允许用户下载该文件,
* 这里不展开只做示意
*/
if (StringUtils.isNotEmpty(errorLog)) {
writeFailToDb(uuid, "导入失败", errorLog, updateTime);
}

});
}
}

如上文所示,模板方法的方式虽然能够极大地减少重复代码,但是仍有下面两个问题:

  • upload方法得限定死参数结构,一旦有变化,不是很容易更改参数类型or数量
  • 每个上传的service还是要继承一下这个抽象类,还是不够简便和优雅

为解决上面两个问题,我也经常进行思考,结果在某次自定义事务提交or回滚的方法的时候得到了启发。这个上传的逻辑过程和事务提交的逻辑过程非常像,都是在实际操作前需要做初始化操作,然后在异常或者成功的时候做进一步操作。这种完全可以通过环装切面的方式实现,由此,我写了一个小轮子给团队使用。(当然了,这个小轮子在本人所在的大团队内部使用的很好,但是不一定适合其他人,但是思路一样,大家可以扩展自己的功能)

「多说无益,上代码!」

代码与实现

首先定义一个日志实体

public class FileUploadLog {
private Integer id;
// 唯一编码
private String batchNo;
// 上传到文件服务器的文件key
private String key;
// 错误日志文件名
private String fileName;
//上传状态
private Integer status;
//上传人
private String createName;
//上传类型
private String uploadType;
//结束时间
private Date endTime;
// 开始时间
private Date startTime;
}

然后定义一个上传的类型枚举,用于记录是哪里操作的

public enum UploadType {
未知(1,"未知"),
类型2(2,"类型2"),
类型1(3,"类型1");

private int code;
private String desc;
private static Map<Integer, UploadType> map = new HashMap<>();
static {
for (UploadType value : UploadType.values()) {
map.put(value.code, value);
}
}

UploadType(int code, String desc) {
this.code = code;
this.desc = desc;
}

public int getCode() {
return code;
}

public String getDesc() {
return desc;
}

public static UploadType getByCode(Integer code) {
return map.get(code);
}
}

最后,定义一个注解,用于标识切点

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Upload {
// 记录上传类型
UploadType type() default UploadType.未知;
}

然后,编写切面

@Component
@Aspect
@Slf4j
public class UploadAspect {
public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("upload-pool-%d")
.setPriority(Thread.NORM_PRIORITY).build();
public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(10, 20, 300L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());


@Pointcut("@annotation(com.aaa.bbb.Upload)")
public void uploadPoint() {}

@Around(value = "uploadPoint()")
public Object uploadControl(ProceedingJoinPoint pjp) {
// 获取方法上的注解,进而获取uploadType
MethodSignature signature = (MethodSignature)pjp.getSignature();
Upload annotation = signature.getMethod().getAnnotation(Upload.class);
UploadType type = annotation == null ? UploadType.未知 : annotation.type();
// 获取batchNo
String batchNo = UUID.randomUUID().toString().replace("-", "");
// 初始化一条上传的日志,记录开始时间
writeLogToDB(batchNo, type, new Date)
// 线程池启动异步线程,开始执行上传的逻辑,pjp.proceed()就是你实现的上传功能
uploadExecuteService.submit(() -> {
try {
String errorMessage = pjp.proceed();
// 没有异常直接成功
if (StringUtils.isEmpty(errorMessage)) {
// 成功,写入数据库,具体不展开了
writeSuccessToDB(batchNo);
} else {
// 失败,因为返回了校验信息
fail(errorMessage, batchNo);
}
} catch (Throwable e) {
LOGGER.error("导入失败:", e);
// 失败,抛了异常,需要记录
fail(e.toString(), batchNo);
}
});
return new Object();
}

private void fail(String message, String batchNo) {
// 生成上传错误日志文件的文件key
String s3Key = UUID.randomUUID().toString().replace("-", "");
// 生成文件名称
String fileName = "错误日志_" +
DateUtil.dateToString(new Date(), "yyyy年MM月dd日HH时mm分ss秒") + ExportConstant.txtSuffix;
String filePath = "/home/xxx/xxx/" + fileName;
// 生成一个文件,写入错误数据
File file = new File(filePath);
OutputStream outputStream = null;
try {
outputStream = new FileOutputStream(file);
outputStream.write(message.getBytes());

} catch (Exception e) {
LOGGER.error("写入文件错误", e);
} finally {
try {
if (outputStream != null)
outputStream.close();
} catch (Exception e) {
LOGGER.error("关闭错误", e);
}
}
// 上传错误日志文件到文件服务器,我们用的是s3
upFileToS3(file, s3Key);
// 记录上传失败,同时记录错误日志文件地址到数据库,方便用户查看错误信息
writeFailToDB(batchNo, s3Key, fileName);
// 删除文件,防止硬盘爆炸
deleteFile(file)
}

}

至此整个异步上传功能就完成了,是不是很简单?(笑)

那么怎么使用呢?更简单,只需要在service层加入注解即可,顶多就是把错误信息return出去。

@Upload(type = UploadType.类型1)
public String upload(List<ClassOne> items) {
if (items == null || items.size() == 0) {
return;
}
//校验
String error = uploadCheck(items);
if (StringUtils.isNotEmpty) {
return error;
}
//删除旧的
deleteAll();
//插入新的
batchInsert(items);
}

结语

写了个小轮子提升团队整体开发效率感觉真不错。程序员的最高品质就是解放双手(偷懒?),然后成功的用自己写的代码把自己干毕业。。。。。。

文章目录
  1. 1. 前言
  2. 2. 代码与实现
  3. 3. 结语