Categories
程式開發

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台


本文由 dbaplus 社群授權轉載。

之前設計篇講了數據拆分的方式、場景、優缺點以及實施步驟,偏方法與理論。技術篇會介紹分佈式數據服務平台設計與實現,講述如何通過技術手段解決數據拆分帶來的各種問題,以及各中間件的架構與原理。

平台主要包括分佈式數據訪問中間件(SDK、Proxy)、平滑擴容、數據集成、管控平台等四部分。

一、分佈式數據訪問中間件

數據拆分後,分散在多個庫與表中,但應用開發時怎樣才能準確訪問數據庫,換言之,如何才能拿到準確的數據庫連接,拼接出正確的sql(主要是實際表名),然後執行返回結果集呢?

為了盡可能減少業務侵入性,應用少做改造,往往都會抽像出一個數據訪問層負責上述功能。數據訪問層按實現方式不同,可分為應用自定義、數據中間件、分佈式數據庫三種方式,在我們項目中採用的是中間件方式,其技術架構如下:

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台 1

分佈式數據訪問層

按照接入方式不同,數據訪問中間件可以分為 SDK、Proxy(雲原生架構下可能還會有sidecar方式)。

一個典型的分庫分錶中間件由JDBC接口實現(SDK模式)、MySQL報文解析(Proxy、Sider模式)、SQL解析器,路由計算、SQL重寫, SQL執行、聚合處理、結果集合併、數據源管理、配置管理等部分構成。

1、JDBC接口實現

JDBC接口實現起來並不太難,數據庫連接池都是基於此實現,本質上就是一種裝飾器模式,主要就是java.sql與javax.sql包下DataSource、Connection、Statement,PreparedStatement,ResultSet、DatabaseMetaData、 ResultSetMetaData等接口。這些接口也並不是都需要實現,不常用的接口可在集成一些框架時根據需要再實現。

2、MySQL報文解析

MySQL報文解析比JDBC接口復雜些,它包含了很多MySQL的命令,需要對照MySQL報文規範分別進行解析,另外由於proxy還要支持常見DBA工具接入,比如MySQL CLI、Navicat、Dbvisualizer、MySQL workbench等,這些工具甚至不同版本使用的MySQL報文都不完全一樣,這塊的兼容性也是一個繁瑣的工作,考驗對Mysql報文的支持的完整度。這部分像Sharding-Proxy、Mycat等都有實現,如果要自行研發或者擴展優化,可參考其實現細節。

MySQL報文規範:https://dev.mysql.com/doc/internals/en/client-server-protocol.html

3、SQL解析

SQL解析是個繁瑣複雜的活兒,對應就是詞法Lexer與語法分析Parser,因為要最大程度兼容各數據庫廠商SQL,這塊是需要不斷的迭代增強的。開源的手寫解析器有阿里開源的druid,也可以使用javacc、antlr等進行實現,相比手寫解析器速度要慢些,但擴展定制化能力更好。這類解析器在使用方式上,多采用vistor設計模式,如果需要可以編寫自己的vistor從而獲取所需AST(Abstract Syntax Tree)中的各類值。

4、路由計算

路由計算是根據SQL解析後AST,提取分庫分錶列值(提取規則是預先配置好的),然後根據應用指定的運算表達式或者函數進行計算,分別得到數據庫與表對應的序號(一般就是一個整型數值,類似一個數組下標)或者是真正的物理表名。讀寫分離模式下,只涉及庫路由,會根據一個負載均衡算法選取一個合適的物理庫,如果寫SQL則會選擇主庫,如果是讀則會按照隨機、輪詢或者權重等算法選擇一個從庫。

5、SQL重寫

SQL重寫主要為表名添加後綴(應用寫SQL時是邏輯表名,實際表名往往是邏輯表名+序號),根據路由計算環節得到的物理表名,替換原SQL中的邏輯表名。另外SQL中有聚合函數、多庫表分頁等操作時,也會涉及到對SQL的改寫,這部分有的開源中間件裡也叫做SQL優化。注意這裡最好不要簡單的用字符串匹配去替換錶名,例如當存在列名與表名一樣的情況下會出現問題。

6、SQL執行

SQL執行負責SQL的真正執行,對應的就是執行連接池或數據庫驅動中Statement的execute、executeQuery、executeUpdate、executeBatch等方法。當然如果是涉及到多庫多表的SQL,例如where條件不包含分庫分錶鍵,這時會涉及到庫表掃描,則需要考慮是連接優先還是內存優先,即採用多少個並發數據庫連接執行,連接數太大則會可能耗盡連接池,給內存以及數據庫帶來很大壓力;但連接數太小則會拉長SQL執行時間,很有可能帶來超時問題,所以一個強大的SQL執行器還會根據SQL類型、數據分佈、連接數等因素生成一個到合適的執行計劃。

7、數據源管理

數據源管理負責維護各數據庫的連接,這塊實現起來比較簡單,一般維護一個數據庫連接池DataSource對象的Map就可以,只要根據數據源下標或者名稱可以拿到對應的數據庫連接即可。

8、聚合處理

聚合處理負責對聚合類函數的處理,因為分庫分錶後,實際執行的SQL都是面向單庫的,而對於max、min、sum、count、avg等聚合操作,需要將各單庫返回的結果進行二次處理才能計算出準確的值,例如max、min、sum、count需要遍歷個各庫結果,然後分別取最大、最小、累加,對於avg操作,還需要將原SQL修改為select sum, count,然後分別累加,最後用累積後的sum除以累加後count才能得到準確值。另外對於多庫表的分頁操作,例如limit 1,10,則將單庫SQL的起始頁都修改為第一頁即limit 0,10,然後再整體排序取出前10個才是正確的數據。

9、結果集合併

結果集合併負責將多個SQL執行單元返回的數據集進行合併,然後返回給調用客戶端。一般當進行庫表遍歷、或者涉及多個庫SQL(例如使用in時)會需要進行合併。當然並不一定需要把數據全部讀到內存再合併,有時基於數據庫驅動實現的ResultSet.next()函數,逐條從數據庫獲取數據即可滿足要求。關於結果集合併,sharding-jdbc對此有一個更豐富的抽象與分類,支持流式歸併、內存歸併、分組歸併等,具體可參見歸併引擎。

歸併引擎:https://shardingsphere.apache.org/document/current/cn/features/sharding/principle/merge/

10、配置管理

配置管理負責分庫分錶的規則以及數據源的定義,這塊是面向應用開發者的,在使用體驗上應當簡單、易用、靈活。其中會涉及到物理數據源(參數跟連接池類似)、邏輯表、路由規則(庫路由、表路由,庫表分佈,支持指定java函數或者groovy表達式),邏輯表->路由規則的映射關係。另外我們在實踐時還包括了一些元數據信息,包括shardID->庫表序號,這樣做有個好處,業務在配置路由規則時只需要關注業務對象->shardID即可。配置管理在具體形式方面,可以支持xml、yaml、也支持在管控平台上在線進行配置,後者會通過將配置同步到配置中心,進而支持數據訪問層進行編排(orchestration),例如在線擴容時需要動態增加數據源、修改路由規則、元數據信息等。

一個完整的分佈式數據訪問中間件,在架構上和數據庫的計算層很像,尤其如果涉及到DB協議報文與SQL的解析,還是一個複雜和工作量較大的工程,因此一般應用團隊建議還是採用開源成熟的方案,基於此做定制優化即可,沒必要重複造輪子。

SDK和Proxy方式各有優缺點,在我們項目中分別用在不同的場景,簡單總結如下:

  • 聯機交易 高頻、高並發,查詢帶拆分鍵,數據量小,sdk方式;
  • 運維 低頻、查詢條件靈活,數據量大,以查詢為主 proxy方式;
  • 批量 不攜帶分庫分錶列,數據量大,查詢、更新、插入、刪除都有,通過API指定庫表方式。

接下來介紹下我們在開源中間件方面的實踐,分為三個階段

第一階段

早些年這類開源中間件還挺多,但其實都沒有一個穩定的社區支持。 2015年時我們基於一個類似TDDL的組件,對其事務、數據連接池、SQL解析等方面進行了優化,修復了數十個開發遇到的bug,實現SDK版本的數據訪問中間件,暫就叫做DAL。

第二階段

2017年,系統上線後發現,開發測試以及運維還需要一個執行分庫分錶SQL的平台,於是我們調研了Mycat,但當時1.6版本只支持單維度拆分(單庫內分錶或者只分庫),因此我們重寫了其後端SQL路由模塊,結合原SDK版本數據組件,利用Mycat的報文解析實現了Proxy的數據訪問層。

Proxy模式的數據訪問層上線後,可以很好的應對帶分庫分錶鍵的SQL操作,但在涉及到庫表遍歷時,由於並發連接太多,經常會導致連接數不夠,但如果串行執行則經常導致執行時間太長,最後超時報錯。針對這個問題,我們做了個新的優化:
在將這類庫表遍歷的查詢在生成執行計劃時,通過union all進行了改寫,類似map-reduce,同一庫上的不同表的sql通過union all合併,然後發到數據庫執行,這樣連接數=物理數據庫總數,同時盡可能的利用了數據庫的計算能力,在損耗較少連接數的前提下,大大提升了這類SQL的執行效率。 (注意order by 和limit需要加在union all的最後,為了不影響主庫,可以將這類查詢在從庫執行)。
例如user表拆分成1024表,分佈在4個庫,SQL拆分與合併示意圖如下:

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台 2

通過union all實現庫表遍歷

第三階段

這兩個中間件在運行3年左右後,也暴露出來了很多問題,例如SQL限制太多,兼容性太差,開源社區不活躍,部分核心代碼設計結構不夠清晰等,這給後續更複雜場景的使用帶來了很多桎梏。因此在19年,我們決定對數據訪問層進行升級重構,將底層分庫分錶組件與上層配置、編排進行剝離,改成插拔式設計,增加更加多元的分庫分錶組件。在那時開源社區已經湧現了一些優秀的分庫分錶項目,目前來看做的最好的就是shardingshpere(後面簡稱ss)了,ss的設計與使用手冊其官網都有詳細介紹,這裡主要簡單介紹下我們集成ss的一些實踐。

shardingsphere整體設計架構清晰,內核各個引擎設計職責明確,jdbc 與proxy版本共享內核,接入端支持的多種實現方式。治理、事務、SQL解析器分別單獨抽像出來,都可以hook方式進行集成,通過SPI進行擴展。這種靈活的設計也為我們定制帶來了很大的方便,代碼實現上比較優雅。我們在集成時開始是3.0.0版本,後來升級到4.0.0-RC1版本,目前ss已發布4.0.0的release版本。

1)配置兼容

因為要在上層應用無感知的情況下更換底層分庫分錶引擎,所以改造的第一個問題就是兼容以前的配置。基於此,也就無法直接使用sharding-jdbc的spring或者yaml配置方式,而改用API方式,將原配置都轉換為sharding-jdbc的配置對象。這塊工作量時改造裡最大的,但如果項目之前並沒有分庫分錶配置,則直接在sharding-jdbc提供的方式中選擇一種即可。由於我們項目中需要支持規則鏈、讀權重等ss不支持功能,所以我們是基於ComplexKeysShardingAlgorithm接口進行的實現。

更簡潔的yaml配置形式:

ds:
  master_0:
    blockingTimeoutMillis: 5000
    borrowConnectionTimeout: 30
    connectionProperties: {}
    idleTimeoutMinutes: 30
    jdbcUrl: jdbc:mysql://localhost:3306/shard_0
    logAbandoned: false
    maintenanceInterval: 60
    maxConn: 10
    maxIdleTime: 61
    minConn: 1
    userName: root
    password: 123456
    queryTimeout: 30
    testOnBorrow: false
    testOnReturn: false
    testQuery: null
    testWhileIdle: true
    timeBetweenEvictionRunsMillis: 60000
  master_1:
    jdbcUrl: jdbc:mysql://localhost:3306/shard_1
    parent: master_0
groupRule: null
shardRule:
  bindingTables: 
  - user,name 
  rules:
    userTableRule:
      dbIndexs: master_0,master_1
      dataNodes: master_0.user_${['00','01']},master_1.user_${['02','03']}
      dbRules:
       - return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserDbIndex(#user_id#)
       - return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserDbIndexByName(#name#,#address#)
      tbRules:
       - return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserTbIndex(#user_id#)
       - return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserTbIndexByName(#name#,#address#)
       
  tableRuleMap: {name: nameTableRule, user: userTableRule}

2)事務級別

sharding-jdbc的默認事務是local,即最大努力一階段提交,或者叫鍊式提交,這種方式的好處是對應用透明,性能也還不錯,互聯網中使用較多。但這種方式可能會由於網絡等原因導致部分提交成功,部分失敗。雖然這種概率可能並不高,但一旦出現則會產生事務不一致的問題,這在金融關鍵場景下風險是很高的。所以我們在聯機交易場景下禁止使用這種方式,而是要求必須嚴格單庫事務,我們在先前SDK版本的數據訪問中間件增加了校驗,一旦跨庫就直接拋異常。因此切換到sharding-jdbc,這種事務級別也要繼續支持。實現代碼片段:

/**
 * Single DB Transaction Manager
 *  SPI:  org.apache.shardingsphere.transaction.spi.ShardingTransactionManager
 */
@NoArgsConstructor
public class SingleDBTransactionManager implements ShardingTransactionManager {
  private Map dataSources = new HashMap();
  private ThreadLocal targetDataSourceName = new ThreadLocal() {
    protected String initialValue() {
      return null;
    }
  };
  private ThreadLocal connection = new ThreadLocal() {
    protected Connection initialValue() {
      return null;
    }
  };
  private ThreadLocal autoCommitted = new ThreadLocal() {
    protected Boolean initialValue() {
      return true;
    }
  };
  @Override
  public void close() throws Exception {
    if (connection.get() != null) {
      connection.get().close();
    }
  }
  @Override
  public void init(DatabaseType databaseType, Collection resourceDataSources) {
    for (ResourceDataSource res : resourceDataSources) {
      dataSources.put(res.getOriginalName(), res.getDataSource());
    }
  }
  @Override
  public TransactionType getTransactionType() {
    return TransactionType.SINGLEDB;
  }
  @Override
  public Connection getConnection(String dataSourceName) throws SQLException {
    if (!ConditionChecker.getInstance().isMultiDbTxAllowed() && targetDataSourceName.get() != null
        && !targetDataSourceName.get().equals(dataSourceName)) {
      throw new TransactionException(
          "Don't allow multi-db transaction currently.previous dataSource key="
              + targetDataSourceName.get() + ", new dataSource key=" + dataSourceName);
    }
    targetDataSourceName.set(dataSourceName);
    if (connection.get() == null) {
      connection.set(dataSources.get(dataSourceName).getConnection());
    }
    return connection.get();
  }
…
}

3)讀庫權重

雖然多個從庫(一個主一般都要掛兩個或者三個從,從庫的數量由RPO、多活甚至監管要求等因素決定)可以提供讀功能,但細分的話,這些從庫其實是有“差別”的,這種差異性有可能是由於機器硬件配置,也可能是由於所在機房、網絡原因導致,這種時候就會需要支持讀權限的權重配置,例如我們項目中有單元化的設計,需要根據當前所在單元及權重配置路由到當前機房的從庫。另外也可以通過調整權重,支持在線對數據庫進行維護或者升級等運維操作。實現代碼片段:

/**
 * Weight based slave database load-balance algorithm.
 * SPI: org.apache.shardingsphere.spi.masterslave.MasterSlaveLoadBalanceAlgorithm
 */
public final class WeightMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
  public final static String TYPE = "WEIGHT";
    
  protected DataSource dataSource;
  public WeightMasterSlaveLoadBalanceAlgorithm(DataSource ds) {
    this.dataSource = ds;
  }
  
  public  WeightMasterSlaveLoadBalanceAlgorithm(){
    
  }
    @Override
    public String getDataSource(final String name, final String masterDataSourceName, final List slaveDataSourceNames) {
      String selectReadDb = dataSource.getTableRuleContext().getGroupRule(name).selectReadDb();
      return slaveDataSourceNames.contains(selectReadDb) ? selectReadDb : null;
    }
  @Override
  public String getType() {
    return TYPE;
  }

4)SQL開關

SDK模式的數據訪問中間件,主要用在聯機交易中,在這類場景下,是沒有DDL操作需求的,也是不允許的,但shading-jdbc作為一個通用的數據分片中間件。對此並沒有相應的開關配置,因此我們增加開關功能,應用在默認情況下,對DDL、DCL等語句進行了校驗,不允許執行該類SQL,在技術層面杜絕了應用的誤用。實現代碼片段:

//SPI: org.apache.shardingsphere.core.parse.hook.ParsingHook
public class AccessPrevilegeCheckHook implements ParsingHook {
  @Override
  public void start(String sql) {

  }
  @Override
  public void finishSuccess(SQLStatement sqlStatement, ShardingTableMetaData shardingTableMetaData) {
    ConditionChecker.getInstance().checkDdlAndDcl(sqlStatement);
  }
…
}

//SPI:org.apache.shardingsphere.core.rewrite.hook.RewriteHook
@NoArgsConstructor
public class TableScanCheckHook implements RewriteHook {
  
  private List tableUnits = new LinkedList();
  @Override
  public void start(TableUnit tableUnit) {
    if(tableUnits.size() > 0 && !ConditionChecker.getInstance().isTableScanAllowed()){
      throw new RouteException("Don't allow table scan.");
    }
    tableUnits.add(tableUnit);  
  }
…
}

public class ConditionChecker {
  private static ThreadLocal sqlTypeSnapshot = new ThreadLocal();
  private boolean defalutTableScanAllowed = true;
  private boolean defalutMultiDbTxAllowed = true;
  private boolean defalutDdlAndDclAllowed = true;
  private static ConditionChecker checker = new ConditionChecker();
  public static ConditionChecker getInstance() {
    return checker;
  }
  private ConditionChecker() {
  }
  private ThreadLocal tableScanAllowed = new ThreadLocal() {
    protected Boolean initialValue() {
      return defalutTableScanAllowed;
    }
  };
  private ThreadLocal multiDbTxAllowed = new ThreadLocal() {
    protected Boolean initialValue() {
      return defalutMultiDbTxAllowed;
    }
  };
  private ThreadLocal ddlAndDclAllowed = new ThreadLocal() {
    protected Boolean initialValue() {
      return defalutDdlAndDclAllowed;
    }
  };
  public void setDefaultCondtion(boolean tableScanAllowed, boolean multiDbTxAllowed, boolean ddlAndDclAllowed) {
    defalutTableScanAllowed = tableScanAllowed;
    defalutMultiDbTxAllowed = multiDbTxAllowed;
    defalutDdlAndDclAllowed = ddlAndDclAllowed;
  }
  public boolean isTableScanAllowed() {
    return tableScanAllowed.get();
  }
  public void setTableScanAllowed(boolean tableScanAllowed) {
    this.tableScanAllowed.set(tableScanAllowed);
  }
  public boolean isMultiDbTxAllowed() {
    return multiDbTxAllowed.get();
  }
  public void setMultiDbTxAllowed(boolean multiDbTxAllowed) {
    this.multiDbTxAllowed.set(multiDbTxAllowed);
  }
  public boolean isDdlAndDclAllowed() {
    return ddlAndDclAllowed.get();
  }
  public void setDdlAndDclAllowed(boolean ddlAllowed) {
    this.ddlAndDclAllowed.set(ddlAllowed);
  }
  public SQLType getSqlTypeSnapshot() {
    return sqlTypeSnapshot.get();
  }
  public void checkTableScan(boolean isTableScan) {
    if (!isTableScanAllowed())
      throw new ConditionCheckException("Don't allow table scan.");
  }
  public void checkDdlAndDcl(SQLStatement sqlStatement) {
    sqlTypeSnapshot.set(sqlStatement.getType());
    if (!isDdlAndDclAllowed()
        && (sqlStatement.getType().equals(SQLType.DDL) || sqlStatement.getType().equals(SQLType.DCL))) {
      throw new ConditionCheckException("Don't allow DDL or DCL.");
    }
  }
  public void checkMultiDbTx(Map cachedConnections, String newDataSource) {
    if (!isMultiDbTxAllowed() && cachedConnections.size() > 0 && !cachedConnections.containsKey(newDataSource)) {
      throw new ConditionCheckException("Don't allow multi-db transaction currently.old connection key="
          + cachedConnections.keySet() + "new connection key=" + newDataSource);
    }
  }
}

5)路由規則鏈

在我們項目中,對於一張表,在不同場景下可能會使用不同的分庫分錶列,例如有的是賬號、有的是客戶號(這兩列都可路由到同一庫表中),這時候就需要路由模塊可以依次匹配搭配多個規則,例如SQL中有賬號則用account-rule,有客戶號則用customer-rule,因此我們支持了規則鏈配置功能,但sharding-jdbc只支持配置一個路由規則,因此在自定義路由算法函數中,我們增加了對規則鏈的支持。實現代碼片段:

public abstract class ChainedRuleShardingAlgorithm implements ComplexKeysShardingAlgorithm {
  protected final DataSource dataSource;
  public ChainedRuleShardingAlgorithm(DataSource ds) {
    this.dataSource = ds;
  }
  @Override
  public Collection doSharding(Collection availableTargetNames, ComplexKeysShardingValue shardingValue) {
    List targets = new ArrayList();
    Set actualNames = HintManager.isDatabaseShardingOnly() ? getHintActualName(shardingValue)
        : calculateActualNames(shardingValue);
    for (String each : actualNames) {
      if (availableTargetNames.contains(each)) {
        targets.add(each);
      }
    }
    clear();
    return targets;
  }
  @SuppressWarnings({ "serial", "unchecked" })
  protected Set calculateActualNames(ComplexKeysShardingValue shardingValue) {
    Set target = new HashSet();
    Map shardingMap = new HashMap();
    String logicalTableName = shardingValue.getLogicTableName();
    Map shardingValuesMap = shardingValue.getColumnNameAndShardingValuesMap();
    for (final Entry entry : shardingValuesMap.entrySet()) {
      if (shardingMap.containsKey(logicalTableName)) {
        shardingMap.get(logicalTableName).put(entry.getKey(), entry.getValue());
      } else {
        shardingMap.put(logicalTableName, new HashMap() {
          {
            put(entry.getKey(), entry.getValue());
          }
        });
      }
    }
    // 遍历规则链,查询匹配规则
    for (String tableName : shardingMap.keySet()) {
      RuleChain ruleChain = dataSource.getTableRuleContext().getRuleChain(tableName);
      for (GroovyListRuleEngine engine : getRuleEngine(ruleChain)) {
        Set parameters = engine.getParameters();
        Map columnValues = shardingMap.get(tableName);
        Set eval = eval(columnValues, parameters, engine, ruleChain);
        if (eval.size() > 0) {// 匹配即中止
          target.addAll(eval);
          return target;
        }
      }
    }
    return target;
  }
  @SuppressWarnings("unchecked")
  protected Set eval(final Map columnValues, Set parameters,
      GroovyListRuleEngine engine, RuleChain ruleChain) {
    Set targetNames = new HashSet();
    if (columnValues.keySet().containsAll(parameters)) {// 匹配
      List list = new LinkedList();// 参数集合
      List columns = new LinkedList();// 列名集合
      for (final String requireParam : parameters) {
        list.add(convertToSet(columnValues.get(requireParam)));
        columns.add(requireParam);
      }
      Set cartesianProduct = Sets.cartesianProduct(list);
      for (List values : cartesianProduct) {
        Map arugmentMap = createArugmentMap(values, columns);
        int index = engine.evaluate(arugmentMap);
        targetNames.add(getActualName(ruleChain, index));
      }
    }
    return targetNames;
  }
  private Set convertToSet(final Collection values) {
    return Sets.newLinkedHashSet(values);
  }
  private Map createArugmentMap(List values, List columns) {
    HashMap map = new HashMap();
    for (int i = 0; i < columns.size(); i++) {
      map.put(columns.get(i).toLowerCase(), values.get(i));
    }
    return map;
  }

  protected abstract List getRuleEngine(RuleChain ruleChain);
  protected abstract String getActualName(RuleChain ruleChain, int index);
}

/**
 * 库路由算法
 */

public class ChainedRuleDbShardingAlgorithm extends ChainedRuleShardingAlgorithm {
  
  public ChainedRuleDbShardingAlgorithm(DataSource ds) {
    super(ds);
  }
  @Override
  protected List getRuleEngine(RuleChain ruleChain) {
    return ruleChain.getDbRuleList();
  }
  @Override
  protected String getActualName(RuleChain ruleChain,int index) {
    //add mapping from shard metadata
    String dbIndex = dataSource.getHintSupport().getShardingDb(String.valueOf(index));
    if(StringUtils.isEmpty(dbIndex)){
      return ruleChain.getTableRule().getDbIndexArray()[index];
    }else{
      return ruleChain.getTableRule().getDbIndexArray()[Integer.valueOf(dbIndex)];
    }
    
  }
}

/**
 * 表路由算法
 */
public class ChainedRuleTableShardingAlgorithm extends ChainedRuleShardingAlgorithm {
  public ChainedRuleTableShardingAlgorithm(DataSource ds) {
    super(ds);
  }
  @Override
  protected List getRuleEngine(RuleChain ruleChain) {
    return ruleChain.getTableRuleList();
  }
  @Override
  protected String getActualName(RuleChain ruleChain, int index) {
    //add mapping from shard metadata
    String tbShardIndex = dataSource.getHintSupport().getShardingTable(String.valueOf(index));
    int tbIndex = index;
    if(!StringUtils.isEmpty(tbShardIndex)){
      tbIndex = Integer.valueOf(tbShardIndex);
    }
    SuffixManager suffixManager = ruleChain.getTableRule().getSuffixManager();
    if(suffixManager.isInlineExpression()){
      return ruleChain.getTbIndexs()[tbIndex];
    }else{
      Suffix suffix = suffixManager.getSuffix(0);
      return String.format("%s%s%0"+suffix.getTbSuffixWidth() +"d", ruleChain.getLogicTable(),suffix.getTbSuffixPadding(), suffix.getTbSuffixFrom() + tbIndex);
    }
  }

6)管控平台對接

我們提供了一個管控平台,支持分佈式數據相關組件在線配置,這些通過配置中心統一下發到各應用,而且支持動態變更。不管是SDK模式還是Proxy模式的數據訪問中間件都使用的是同一份分庫分錶配置,只是接入方式不同而已。因此在集成ss的時候,還需要增加從配置中心獲取配置的功能,這塊主要涉及的調用配置中心API獲取配置,這裡就不貼具體代碼了。

數據訪問中間件的發展演進方向,未來其將會是多種形態的混合存在。

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台 3

分佈式數據訪問中間件的三種模式

二、平滑擴容

在設計篇中已經介紹了擴容的機制,簡單的說,平滑擴容就是通過異步複製,等數據接近追平後禁寫,修改路由,然後恢復業務。主要目的是自動化、以及盡可能縮短停機窗口,目前一些雲產品比如阿里雲DRDS、騰訊雲TDSQL等的一鍵在線擴容本質上都是基於此機制。

但實踐中這個過程需要多個步驟,數據庫數量越多,操作風險越大,而且需要停機完成。為此我們與數據庫團隊一起設計與開發了平滑擴容功能。

我們將整個擴容環節,分為配置、遷移、校驗、切換、清理五個大的步驟,每個步驟裡又由多個任務構成。擴容任務在管控平台上建立,平滑擴容模塊自動依次觸發各個任務。

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台 4

平滑擴容

  • 配置環節,主要是應用系統方定義擴容後的分庫分錶配置;
  • 遷移環節,依次自動完成從復制,同時進行數據校驗;
    • 切換環節,首先進行禁寫,斷開主從只從,然後修改路由規則,最後再解除擴容庫禁寫。整個過程應用無需停機,僅僅會有一段時間禁寫,這個時間一般來說也就十來秒;
  • 清理環節,清理環節是在後台異步處理,即清理數據庫冗餘表。

在分佈式數據服務管控平台定義好擴容前後分庫分錶配置後,即可啟動一鍵在線擴容。在擴容過程中可實時監控擴容進度,同時支持擴容中斷恢復以及回滾。

三、數據集成

微服務架構下,有大量需要數據集成的場景:

  • 業務系統之間,例如下訂單後需要通知庫存、商家,然後還要推送到大數據等下游系統;
  • 分庫分錶後,為了應對其它維度查詢,會需要建立異構索引,這樣就需要數據傳輸到另外一套數據庫中;
  • 系統內應用與中間件之間,例如如果使用redis等緩存,在操作完數據庫還要更新緩存,類似這類數據集成需求,最樸素的解決方式就是雙寫,但雙寫一個問題是增加了應用複雜性,另外當發生不一致的情況是難以處理。

這類問題本質上也屬於分佈式事務場景,一種簡單的方式就是基於MQ可靠消息,即在應用端寫消息表,然後通過MQ消費消息進行數據集成處理。但這導致應用代碼耦合大量雙寫邏輯,給應用開髮帶來很多複雜度。

針對雙寫問題,業界一種更優雅、先進的設計是基於日誌的集成架構,在OLTP場景下,可以通過解析數據庫日誌類似CDC,這種方式的好處是數據集成工作從應用代碼中進行了剝離。

關於雙寫以及基於日誌集成架構可參考Using logs to build a solid data infrastructure (or: why dual writes are a bad idea

雙寫以及基於日誌集成架構參考:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台 5

數據集成

這類CDC的開源軟件,java類的有shyiko、canal、debezium等。這類項目的實現原理是主要模擬從庫從主庫異步獲取日誌事件,然後經過ETL發送到MQ或者其它下游系統。

如何模擬一個從庫可參照MySQL複製協議:https://dev.mysql.com/doc/internals/en/replication-protocol.html

考慮功能完整性和社區活躍度,我們選擇了基於canal構建數據集成中間件,具體工作原理這裡就不介紹了,可參見canal github。這裡主要介紹下我們對其做的一些定制和優化。

canal github:https://github.com/alibaba/canal

1、在線配置

canal的配置非常繁雜,很容易配錯,所以最先開始做的就是提供了一個更簡單、易用的在線配置定義功能,用戶是只需進行一些核心關鍵的配置,例如數據庫的IP、用戶、密碼、訂閱表、MQ地址,其它不常修改的配置通過模板形式提供,大大降低了配置複雜度和工作量,當然如果需要也完全支持自定義。

2、性能優化

數據庫binlog是有序的,但如果寫MQ或者目標庫,仍完全保證該順序,那麼則無法進行並發,這樣同步的TPS是肯定上不去的,因此如何在保證一定順序的​​前提下最大程度提高並發性能是一個需要結合業務場景解決的問題。

我們當時用的是canal1.1.3的版本,經過我們性能測試,數據寫入kafka的TPS也就5000+,這對於結息等大量數據變更的場景是不能滿足要求的。

另外canal寫入MQ的並發維度是表的主鍵,但我們項目中表的主鍵都是自增列(這個是我們項目中數據庫開發規範,主要目的是保證MySQL寫性能),如果根據此列進行並發控制,那麼則無法保證MQ寫入時的業務順序性。例如支付流水錶,如根據主鍵(自增列)則無法保證同一賬戶流水的順序性。

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台 6

canal自身寫入機制

針對此問題,我們對canal進行了改造,將原來只支持根據主鍵進行並發控制,修改為支持應用指定,例如我們項目採用業務唯一鍵;原來流程是順序從canal server端讀取binlog->寫MQ ->再確認->再讀取下一binlog事件;調整後改為並發讀取binlog,一旦在執行事件集中沒有當前業務唯一鍵,就可直接寫入MQ,後台開啟一個線程,按照batchID依次進行ack,通過並行拉取binlog事件、分階段無阻塞處理,單庫數據同步kafak的TPS可以達到1.2W+,已可以滿足結息等場景。

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台 7

canal寫入改造後機制

3、Serverless化Serverless化

如果通過安裝包部署,在用戶配置完數據集成相關參數後,需要手工將canal server以及adapter包上傳至服務器上。考慮到高可用,還得在備機上進行部署,在分庫分錶下,數據庫拆分成多個,需要部署多個實列到多個服務器上(canal支持同個實例部署在統一server節點,但性能會受影響)。因此我們將canal server與adapter進行了容器化改造,然後部署到了統一的k8s集群中,這樣用戶在配置完後,點擊“啟動實列”按鈕,即可在k8s環境中自動部署高可用的canal集群,從而實現了數據集成功能的serverless化。

通過數據集成中間件,可以在應用無侵入下解決分庫分錶後一個很典型的問題:多維度拆分與多庫查詢。例如將分庫分錶的數據再集中到一個匯總庫,然後一些複雜的查詢統計就可以放在匯總庫上;還有一些多維度拆分場景,類似電商裡的賣家庫、商家庫,需要創建“二級索引”,也可以通過數據中間件自動創建;另外也可以方便實現諸如小表廣播等需要保證數據一致性的功能。

四、管控平台

前面提到的各種中間件,涉及到大量配置定義、實例管理、監控等功能,這些功能分散在各組件內部,缺少一個統一的視圖,而且應用開發人員需要重複定義。因此我們設計開發了數據服務管控平台,將分庫分錶配置定義、在線擴容、運維、監控等功能統一集成,最大程度降低開發以及運維人員對數據拆分帶來的複雜度。同時提供開放API,可以對接目前公司已有數據庫以及雲管理系統。

配置信息統一存放在配置中心,各中間件直接從配置中心拉取配置,在管控平台修改配置後,也可以實時通知各應用進行動態加載。管控平台相當於一個數據服務雲管理平台,提供多租戶,各應用無需自行部署,直接接入使用即可。在技​​術架構方面管控平台是個前後端分離架構,前端基於vue.js,後端按照功能模塊拆分成微服務,都部署在k8s集群中。

銀行選型和排坑實戰:用開源軟件自建分佈式數據服務平台 8

管控平台

五、 感受

上面介紹了我們在數據服務平台建設中各技術組件的設計原理和實踐,限於篇幅,更多實現細節就不展開介紹。

在企業軟件這塊,有兩個不同的思路,一個是購買商業產品,一種是基於開源軟件自行構建。在金融領域,早些年以前者為主,近些年後者則變成了趨勢。開源軟件有點是開放,因為有源碼所以有自主掌控的可能與條件,缺點是不像商業產品功能完整,往往需要自行定制、優化、擴展。作為軟件開發人員,我們更喜歡使用白盒而不是黑盒。

當然開源並不代表免費,有可能付出的成本比商業軟件更高。一方面需要投入精力學習開源項目,只有熟悉源代碼後才可能具備修改定制能力;另一方面要積極關注開源界技術的發展,與時俱進,要有開放的心態,吸取開源先進的設計的東西,大膽驗證,謹慎使用。

作者介紹

溫衛斌,就職於中國民生銀行信息科技部,目前負責分佈式技術平台設計與研發,主要關注分佈式數據相關領域。

原文鏈接

https://mp.weixin.qq.com/s?__biz=MzI4NTA1MDEwNg==&mid=2650786356&idx=1&sn=273a84662b935c65aa213446294d7f5c&chksm=f3f97fa1c48ef6b71bda53769ee1e4cfb073860cd11a1f0a49fa6868025ef54d45e85996d5b7&scene=27#wechat_redirect