⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 jianshu.com/p/549d88222528 「彦帧」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

公司 DBA 一直埋怨 Atlas 的难用,希望从客户端层出一个读写分离的方案。开源市场上在客户端做读写分离的开源软件基本上没有。业务方利用 Spring 自带的路由数据源能实现部分读写分离的功能,但是功能不够完善。部分参考 Sharing-JDBC 源码思想,利用部分业余时间,写了这个 Robustdb,总共只使用了十多个类,两千多行代码左右。

一、背景

随着业务量的增长,所有公司都不是直接选择分库分表设计方案的。很长一段时间内,会采用 库垂直拆分和分区表 来解决库表数据量比较大的问题,采用读写分离来解决访问压力比较大的问题。我们公司也是一样。目前绝大部分业务还是使用读写分离的方案。我相信很多公司和我们公司的架构一样,采用中间代理层做读写分离。结构如下:

第一层是 VIP 曾。通过 VIP 做中间映射层,避免了应用绑定数据库的真实 IP,这样在数据库故障时,可以通过 VIP 飘移来将流量打到另一个库。但是 VIP 无法跨机房,为未来的异地多活设计埋下绕不过去的坎。

VIP 下面一层是读写分离代理,我们公司使用的是 360 的 Atlas。Atlas 通过将 SQL 解析为 DML(Data Modify Language)和 DQL(Data Query Language),DML 的请求全部发到主库,DQL 根据配置比例分发到读库(读库包括主库和从库)。

使用 Atlas 有以下不足:

  • Altas 不再维护更新,现存一些 bug,bug 网上很多描述;
  • Altas 中没有具体应用请求 IP 与具体数据库 IP 之间的映射数据,所以无法准确查到访问DB的请求是来自哪个应用;
  • Altas 控制的粒度是 SQL 语句,只能指定某条查询 SQL 语句走主库,不能根据场景指定;
  • DB 在自动关闭某个与 Altas 之间的连接时,Altas 不会刷新,它仍有可能把这个失效的连接给下次请求的应用使用;
  • 使用 Altas,对后期增加其他功能模会比较麻烦。

基于 Atlas 以上问题,以及我们需要将数据库账号和连接配置集中管控。我们设计了下面这套方案:

通过在客户端做读写分离可以解决 Atlas 上面存在的不足。整个流程如下图所示:

二、Robustdb 原理

1、读写分离设计核心点——路由

支持每条 SQL 按照 DML、DQL 类型的默认路由。

需求描述

目前公司采用读写分离的方案来增强数据库的性能,所有的 DML(insert、updata、delete)操作在主库,通过 MySQL 的 binlog 同步,将数据同步到多个读库。所有的 DQL(select) 操作主库或从库,从而增强数据的读能力。

支持方法级别的指定路由

需求描述

在 Service 中指定方法中所有 DB 操作方法操作同一个数据库(主要是主库),保证方法中的 DB 读写都操作主库,避免数据同步延迟导致读从库数据异常。从而保证整个方法的事务属性。

解决思路

我们将获取真实数据库(主库还是哪个从库)放到需要建立连接时的地方,为此我们创建了 BackendConnection(传统是先连接数据库,然后再创建连接)。

在获取数据库连接时,通过对请求的 SQL 进行解析和类型判别,识别为 DML 和 DQL。如果是 DML,则在线程的单 SQL 线程本地变量上设置为 master,DQL 则设置为 slave,为后续选择数据库提供选择参考。

如果要支持方法级别的事务(也就是整个方法的 SQL 请求都发送到主库),需要借助拦截器,我们采用的是 AspectJ 方式的拦截器。会拦截所有带有类型为 dataSourceType 的 annotation 的方法。在执行方法前,在线程的多 SQL 线程本地变量上设置 dataSourceType 的 name 值(name 值为 master 代表走主库,name 值为 slave 代表走从库)。线程的多 SQL 线程本地变量为后续选择数据库提供选择参考。在方法执行完后,清理本地线程变量。

多 SQL 线程本地变量的优先级高于单 SQL 线程本地变量的优先级。

注意点

本地线程变量要使用阿里包装的 Ttl,防止用户在方法内部启动线程池,导致普通的线程本地变量丢失,从而导致选库异常。

使用 Ttl 之后,需要在公司的 JVM 启动参数中增加

-javaagent:/{Path}/transmittable-thread-local-2.6.0-SNAPSHOT.jar

原理就是在 JVM 启动时,加载 transmittable-thread-local 中的类替换逻辑,将以后的 Runnable、Callable、ExecuteService 等线程池相关类替换成增强后的 TtlRunnable、TtlCallable、TtlExecuteService 等。

下面展示一下时序图中类的核心代码,仅供参考:

DataSoueceAspect

@Aspect
@Component
public class DataSourceAspect{
@Around("execution(* *(..)) && @annotation(dataSourceType)")
public Object aroundMethod(ProceedingJoinPoint pjd, DataSourceType dataSourceType) throws Throwable { DataSourceContextHolder.setMultiSqlDataSourceType(dataSourceType.name());
Object result = pjd.proceed();
DataSourceContextHolder.clearMultiSqlDataSourceType();
return result;
}
}

BackendConnection

public final class BackendConnection extends AbstractConnectionAdapter {

private AbstractRoutingDataSource abstractRoutingDataSource;

//用于缓存一条sql(可能对应多个statement)或者一次事务中的连接
private final Map<String, Connection> connectionMap = new HashMap<String, Connection>();

//构造函数
public BackendConnection(AbstractRoutingDataSource abstractRoutingDataSource) {
this.abstractRoutingDataSource = abstractRoutingDataSource;
}

@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql);
}

@Override
public DatabaseMetaData getMetaData() throws SQLException {
if(connectionMap == null || connectionMap.isEmpty()){
return abstractRoutingDataSource.getResolvedDefaultDataSource().getConnection().getMetaData();
}
return fetchCachedConnection(connectionMap.keySet().iterator().next().toString()).get().getMetaData();
}

@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql,resultSetType,resultSetConcurrency);
}

@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}

@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql, autoGeneratedKeys);
}

@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql, columnIndexes);
}

@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return getConnectionInternal(sql).prepareStatement(sql, columnNames);
}

@Override
protected Collection<Connection> getConnections() {
return connectionMap.values();
}

/**
* 根据sql获取连接,对连接进行缓存
* @param sql
* @return
* @throws SQLException
*/
private Connection getConnectionInternal(final String sql) throws SQLException {
//设置线程环境遍历
if (ExecutionEventUtil.isDML(sql)) {
DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.MASTER);
} else if (ExecutionEventUtil.isDQL(sql)) {
DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.SLAVE);
}
//根据上面设置的环境变量,选择相应的数据源
Object dataSourceKey = abstractRoutingDataSource.determineCurrentLookupKey();
String dataSourceName = dataSourceKey.toString();

//看缓存中是否已经含有相应数据源的连接
Optional<Connection> connectionOptional = fetchCachedConnection(dataSourceName);
if (connectionOptional.isPresent()) {
return connectionOptional.get();
}
//缓存中没有相应连接,建立相应连接,并放入缓存
Connection connection = abstractRoutingDataSource.getTargetDataSource(dataSourceKey).getConnection();
connection.setAutoCommit(super.getAutoCommit());
connection.setTransactionIsolation(super.getTransactionIsolation());

connectionMap.put(dataSourceKey.toString(), connection);

return connection;
}

/**
* 从缓存中取数据源
* @param dataSourceName
* @return
*/
private Optional<Connection> fetchCachedConnection(final String dataSourceName) {
if (connectionMap.containsKey(dataSourceName)) {
return Optional.of(connectionMap.get(dataSourceName));
}
return Optional.absent();
}

}

AbstractRoutingDataSource

/**
*
* @Type AbstractRoutingDataSource
* @Desc 数据源路由器(spring的AbstractRoutingDataSource将resolvedDataSources的注入放在bean初始化)
* @Version V1.0
*/
public abstract class AbstractRoutingDataSource extends AbstractDataSource {

private boolean lenientFallback = true;

private Map<Object, Object> targetDataSources;

private Object defaultTargetDataSource;

private Map<Object, DataSource> resolvedDataSources = new HashMap<Object, DataSource>();

private DataSource resolvedDefaultDataSource;

private Logger logger = LoggerFactory.getLogger(AbstractRoutingDataSource.class);


public BackendConnection getConnection() throws SQLException {
return new BackendConnection(this);
}

public BackendConnection getConnection(String username, String password)
throws SQLException {
return new BackendConnection(this);

}

public void afterPropertiesSet() {
if (this.targetDataSources == null) {
throw new IllegalArgumentException("Property 'targetDataSources' is required");
}
this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size());
for (Map.Entry entry : this.targetDataSources.entrySet()) {
Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
this.resolvedDataSources.put(lookupKey, dataSource);
}
if (this.defaultTargetDataSource != null) {
this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
}
}

public void putNewDataSource(Object key, DataSource dataSource){
if(this.resolvedDataSources == null){
this.resolvedDataSources = new HashMap<Object, DataSource>();
}
if(this.resolvedDataSources.containsKey(key)){
this.resolvedDataSources.remove(key);
logger.info("remove old key:" + key);
}
logger.info("add key:" + key + ", value=" + dataSource);
this.resolvedDataSources.put(key, dataSource);
}

/**
* 数据源选择逻辑
*/
public DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");

Object lookupKey = determineCurrentLookupKey();
DataSourceContextHolder.clearSingleSqlDataSourceType();

int index = 0;
for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) {
logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString());
index++;
}
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
logger.debug("myAbstractDS, hit DS is " + dataSource.toString());
return dataSource;
}

public DataSource getTargetDataSource(Object lookupKey) {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");

if(lookupKey == null){
lookupKey = determineCurrentLookupKey();
}
DataSourceContextHolder.clearSingleSqlDataSourceType();

int index = 0;
for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) {
logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString());
index++;
}
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
logger.debug("myAbstractDS, hit DS is " + dataSource.toString());
return dataSource;
}

public abstract Object determineCurrentLookupKey();

public abstract Object getCurrentSlaveKey();

@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface));
}

@SuppressWarnings("unchecked")
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isInstance(this)){
return (T) this;
}
return determineTargetDataSource().unwrap(iface);
}

protected Object resolveSpecifiedLookupKey(Object lookupKey) {
return lookupKey;
}

protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
if (dataSource instanceof DataSource) {
return (DataSource) dataSource;
}
else {
throw new IllegalArgumentException(
"Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
}
}
//get set方法省略
}

DataSourceContextHolder

public class DataSourceContextHolder {

private static final TransmittableThreadLocal<String> singleSqlContextHolder = new TransmittableThreadLocal<String>();

private static final TransmittableThreadLocal<String> multiSqlContextHolder = new TransmittableThreadLocal<String>();

/**
* @Description: 设置单条sql数据源类型
* @param dataSourceType 数据库类型
* @return void
* @throws
*/
public static void setSingleSqlDataSourceType(String dataSourceType) {
singleSqlContextHolder.set(dataSourceType);
}

/**
* @Description: 获取单条sql数据源类型
* @param
* @return String
* @throws
*/
public static String getSingleSqlDataSourceType() {
return singleSqlContextHolder.get();
}

/**
* @Description: 清除单条sql数据源类型
* @param
* @return void
* @throws
*/
public static void clearSingleSqlDataSourceType() {
singleSqlContextHolder.remove();
}

/**
* @Description: 设置多条sql数据源类型
* @param dataSourceType 数据库类型
* @return void
* @throws
*/
public static void setMultiSqlDataSourceType(String dataSourceType) {
multiSqlContextHolder.set(dataSourceType);
}

/**
* @Description: 获取多条sql数据源类型
* @param
* @return String
* @throws
*/
public static String getMultiSqlDataSourceType() {
return multiSqlContextHolder.get();
}

/**
* @Description: 清除多条sql数据源类型
* @param
* @return void
* @throws
*/
public static void clearMultiSqlDataSourceType() {
multiSqlContextHolder.remove();
}

/**
* 判断当前线程是否为使用从库为数据源. 最外层service有slave的aop标签 或者 service没有aop标签且单条sql为DQL
*
* @return
*/
public static boolean isSlave() {
return "slave".equals(multiSqlContextHolder.get()) || (multiSqlContextHolder.get()==null && "slave".equals(singleSqlContextHolder.get())) ;
}

}

DynamicDataSource

public class DynamicDataSource extends AbstractRoutingDataSource implements InitializingBean{  

private static final Logger logger = LoggerFactory.getLogger(DynamicDataSource.class);

private Integer slaveCount = 0;

// 轮询计数,初始为-1,AtomicInteger是线程安全的
private AtomicInteger counter = new AtomicInteger(-1);

// 记录读库的key
private List<Object> slaveDataSources = new ArrayList<Object>(0);

// slave库的权重
private Map<Object,Integer> slaveDataSourcesWeight;

private Object currentSlaveKey;

public DynamicDataSource() {
super();
}

/**
* 构造函数
* @param defaultTargetDataSource
* @param targetDataSources
* @param slaveDataSourcesWeight
*/
public DynamicDataSource(Object defaultTargetDataSource, Map<Object,Object> targetDataSources, Map<Object,Integer> slaveDataSourcesWeight) {
this.setResolvedDataSources(new HashMap<Object, DataSource>(targetDataSources.size()));
for (Map.Entry<Object, Object> entry : targetDataSources.entrySet()) {
DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
this.putNewDataSource(entry.getKey(), dataSource);
}
if (defaultTargetDataSource != null) {
this.setResolvedDefaultDataSource(resolveSpecifiedDataSource(defaultTargetDataSource));
}
this.setSlaveDataSourcesWeight(slaveDataSourcesWeight);
this.afterPropertiesSet();
}

@Override
public Object determineCurrentLookupKey() {
// 使用DataSourceContextHolder保证线程安全,并且得到当前线程中的数据源key
if (DataSourceContextHolder.isSlave()) {
currentSlaveKey = getSlaveKey();
return currentSlaveKey;
}
//TODO
Object key = "master";
return key;
}


@Override
public void afterPropertiesSet() {
try {
super.afterPropertiesSet();
Map<Object, DataSource> resolvedDataSources = this.getResolvedDataSources();
//清空从库节点,重新生成
slaveDataSources.clear();
slaveCount = 0;
for (Map.Entry<Object, DataSource> entry : resolvedDataSources.entrySet()) {
if(slaveDataSourcesWeight.get(entry.getKey())==null){
continue;
}
for(int i=0; i<slaveDataSourcesWeight.get(entry.getKey());i++){
slaveDataSources.add(entry.getKey());
slaveCount++;
}
}
} catch (Exception e) {
logger.error("afterPropertiesSet error! ", e);
}
}

/**
* 轮询算法实现
*
* @return
*/
public Object getSlaveKey() {
if(slaveCount <= 0 || slaveDataSources == null || slaveDataSources.size() <= 0){
return null;
}
Integer index = counter.incrementAndGet() % slaveCount;
if (counter.get() > 9999) { // 以免超出Integer范围
counter.set(-1); // 还原
}
return slaveDataSources.get(index);
}

public Map<Object, Integer> getSlaveDataSourcesWeight() {
return slaveDataSourcesWeight;
}

public void setSlaveDataSourcesWeight(Map<Object, Integer> slaveDataSourcesWeight) {
this.slaveDataSourcesWeight = slaveDataSourcesWeight;
}

public Object getCurrentSlaveKey() {
return currentSlaveKey;
}
}

2、读库流量分配策略设计

我们所有的数据库连接都是管控起来的,包括每个库的流量配置都是支持动态分配的。

支持读库按不同比例承接读请求。通过配置页面动态调整应用的数据库连接以及比例,支持随机或者顺序的方式将流量分配到相应的读库中去。

这里我们使用的配置管理下发中心是我们公司自己开发的 gconfig,当然替换成开源的 diamond 或者 applo 也是可以的。

当接收到配管中心的调整指令,会动态更新应用数据源连接,然后更新 beanFactory 中的 datasource。核心函数如下:

/**
* 更新beanFactory
* @param properties
*/
public void refreshDataSource(String properties) {
YamlDynamicDataSource dataSource;
try {
dataSource = new YamlDynamicDataSource(properties);
} catch (IOException e) {
throw new RuntimeException("convert datasource config failed!");
}

// 验证必须字段是否存在
if (dataSource == null && dataSource.getResolvedDataSources() == null
|| dataSource.getResolvedDefaultDataSource() == null || dataSource.getSlaveDataSourcesWeight() == null) {
throw new RuntimeException("datasource config error!");
}
ConcurrentHashMap<Object, DataSource> newDataSource = new ConcurrentHashMap<Object, DataSource>(
dataSource.getResolvedDataSources());

//更新数据源的bean
DynamicDataSource dynamicDataSource = (DynamicDataSource) ((DefaultListableBeanFactory) beanFactory)
.getBean(dataSourceName);
dynamicDataSource.setResolvedDefaultDataSource(dataSource.getResolvedDefaultDataSource());
dynamicDataSource.setResolvedDataSources(new HashMap<Object, DataSource>());//将数据源清空,重新添加
for (Entry<Object, DataSource> element : newDataSource.entrySet()) {
dynamicDataSource.putNewDataSource(element.getKey(), element.getValue());
}
dynamicDataSource.setSlaveDataSourcesWeight(dataSource.getSlaveDataSourcesWeight());
dynamicDataSource.afterPropertiesSet();

}

三、性能

我们经过性能测试,发现 Robustdb 的性能在一定层度上比 Atlas 性能更好。压测结果如下:

文章目录
  1. 1. 一、背景
  2. 2. 二、Robustdb 原理
  3. 3. 三、性能