當前位置: 妍妍網 > 碼農

使用雙異步後,從 191s 最佳化到 2s

2024-05-11碼農

在開發中,我們經常會遇到這樣的需求,將Excel的數據匯入資料庫中。

一、一般我會這樣做

  • 透過POI讀取需要匯入的Excel;

  • 以檔名為表名、列頭為列名、並將數據拼接成sql;

  • 透過JDBC或mybatis插入資料庫;

  • 操作起來,如果檔比較多,數據量都很大的時候,會非常慢。

    存取之後,感覺沒什麽反應,實際上已經在讀取 + 入庫了,只是比較慢而已。

    讀取一個10萬行的Excel,居然用了191s,我還以為它卡死了呢!

    private void readXls(String filePath, String filename) throws Exception { @SuppressWarnings("resource") XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));// 讀取第一個工作表 XSSFSheet sheet = xssfWorkbook.getSheetAt(0);// 總行數int maxRow = sheet.getLastRowNum(); StringBuilder insertBuilder = new StringBuilder(); insertBuilder.append("insert into ").append(filename).append(" ( UUID,"); XSSFRow row = sheet.getRow(0);for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) { insertBuilder.append(row.getCell(i)).append(","); } insertBuilder.deleteCharAt(insertBuilder.length() - 1); insertBuilder.append(" ) values ( "); StringBuilder stringBuilder = new StringBuilder();for (int i = 1; i <= maxRow; i++) { XSSFRow xssfRow = sheet.getRow(i); String id = ""; String name = "";for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {if (j == 0) { id = xssfRow.getCell(j) + ""; } elseif (j == 1) { name = xssfRow.getCell(j) + ""; } } boolean flag = isExisted(id, name);if (!flag) { stringBuilder.append(insertBuilder); stringBuilder.append('\'').append(uuid()).append('\'').append(",");for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) { stringBuilder.append('\'').append(value).append('\'').append(","); } stringBuilder.deleteCharAt(stringBuilder.length() - 1); stringBuilder.append(" )").append("\n"); } } List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());int sum = JdbcUtil.executeDML(collect);}private static boolean isExisted(String id, String name) { String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'"; String num = JdbcUtil.executeSelect(sql, "num");return Integer.valueOf(num) > 0;}private static String uuid() {return UUID.randomUUID().toString().replace("-", "");}

    二、誰寫的?拖出去,斬了!

    最佳化1: 先查詢全部數據,緩存到map中,插入前再進行判斷,速度快了很多。

    最佳化2: 如果單個Excel檔過大,可以采用 異步 + 多執行緒 讀取若幹行,分批入庫。

    最佳化3: 如果檔數量過多,可以采一個Excel一個異步,形成完美的雙異步讀取插入。

    使用雙異步後,從 191s 最佳化到 2s,你敢信?

    下面貼出異步讀取Excel檔、並分批讀取大Excel檔的關鍵程式碼。

    1、readExcelCacheAsync控制類

    @RequestMapping(value = "/readExcelCacheAsync", method = RequestMethod.POST)@ResponseBodypublicString readExcelCacheAsync() {String path = "G:\\測試\\data\\";try {// 在讀取Excel之前,緩存所有數據 USER_INFO_SET = getUserInfo(); File file = new File(path);String[] xlsxArr = file.list();for (int i = 0; i < xlsxArr.length; i++) { File fileTemp = new File(path + "\\" + xlsxArr[i]);String filename = fileTemp.getName().replace(".xlsx", ""); readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename); } } catch (Exception e) { logger.error("|#ReadDBCsv|#異常: ", e);return"error"; }return"success";}

    2、分批讀取超大Excel檔

    @Async("async-executor")public void readXls(String filePath, String filename) throws Exception { @SuppressWarnings("resource") XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));// 讀取第一個工作表 XSSFSheet sheet = xssfWorkbook.getSheetAt(0);// 總行數int maxRow = sheet.getLastRowNum(); logger.info(filename + ".xlsx,一共" + maxRow + "行數據!"); StringBuilder insertBuilder = new StringBuilder(); insertBuilder.append("insert into ").append(filename).append(" ( UUID,"); XSSFRow row = sheet.getRow(0);for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) { insertBuilder.append(row.getCell(i)).append(","); } insertBuilder.deleteCharAt(insertBuilder.length() - 1); insertBuilder.append(" ) values ( ");inttimes = maxRow / STEP + 1;//logger.info("將" + maxRow + "行數據分" + times + "次插入資料庫!");for (inttime = 0; time < times; time++) {int start = STEP * time + 1;int end = STEP * time + STEP;if (time == times - 1) { end = maxRow; }if(end + 1 - start > 0){//logger.info("第" + (time + 1) + "次插入資料庫!" + "準備插入" + (end + 1 - start) + "條數據!");//readExcelDataAsyncService.readXlsCacheAsync(sheet, row, start, end, insertBuilder); readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder); } }}

    3、異步批次入庫

    @Async("async-executor")public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) { StringBuilder stringBuilder = new StringBuilder();for (int i = start; i <= end; i++) { XSSFRow xssfRow = sheet.getRow(i); String id = ""; String name = "";for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {if (j == 0) { id = xssfRow.getCell(j) + ""; } elseif (j == 1) { name = xssfRow.getCell(j) + ""; } }// 先在讀取Excel之前,緩存所有數據,再做判斷 boolean flag = isExisted(id, name);if (!flag) { stringBuilder.append(insertBuilder); stringBuilder.append('\'').append(uuid()).append('\'').append(",");for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) { stringBuilder.append('\'').append(value).append('\'').append(","); } stringBuilder.deleteCharAt(stringBuilder.length() - 1); stringBuilder.append(" )").append("\n"); } } List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());if (collect != null && collect.size() > 0) {int sum = JdbcUtil.executeDML(collect); }}private boolean isExisted(String id, String name) {return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name);}

    4、異步執行緒池工具類

    @Async的作用就是異步處理任務。

  • 在方法上添加@Async,表示此方法是異步方法;

  • 在類上添加@Async,表示類中的所有方法都是異步方法;

  • 使用此註解的類,必須是Spring管理的類;

  • 需要在啟動類或配置類中加入@EnableAsync註解,@Async才會生效;

  • 在使用@Async時,如果不指定執行緒池的名稱,也就是不自訂執行緒池,@Async是有預設執行緒池的,使用的是Spring預設的執行緒池SimpleAsyncTaskExecutor。

    預設執行緒池的預設配置如下:

  • 預設核心執行緒數:8;

  • 最大執行緒數:Integet.MAX_VALUE;

  • 佇列使用LinkedBlockingQueue;

  • 容量是:Integet.MAX_VALUE;

  • 空閑執行緒保留時間:60s;

  • 執行緒池拒絕策略:AbortPolicy;

  • 從最大執行緒數可以看出,在並行情況下,會無限制的建立執行緒,我勒個嗎啊。

    也可以透過yml重新配置:

    spring:task:execution:pool:max-size: 10core-size: 5keep-alive: 3squeue-capacity: 1000thread-name-prefix: my-executor

    也可以自訂執行緒池,下面透過簡單的程式碼來實作以下@Async自訂執行緒池。

    @EnableAsync// 支持異步操作@Configurationpublic classAsyncTaskConfig{/** * com.google.guava中的執行緒池 * @return */@Bean("my-executor")public Executor firstExecutor(){ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();// 獲取CPU的處理器數量int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,200, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), threadFactory); threadPool.allowsCoreThreadTimeOut();return threadPool; }/** * Spring執行緒池 * @return */@Bean("async-executor")public Executor asyncExecutor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();// 核心執行緒數 taskExecutor.setCorePoolSize(24);// 執行緒池維護執行緒的最大數量,只有在緩沖佇列滿了之後才會申請超過核心執行緒數的執行緒 taskExecutor.setMaxPoolSize(200);// 緩存佇列 taskExecutor.setQueueCapacity(50);// 空閑時間,當超過了核心執行緒數之外的執行緒在空閑時間到達之後會被銷毀 taskExecutor.setKeepAliveSeconds(200);// 異步方法內部執行緒名稱 taskExecutor.setThreadNamePrefix("async-executor-");/** * 當執行緒池的任務緩存佇列已滿並且執行緒池中的執行緒數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略 * 通常有以下四種策略: * ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 * ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重復此過程) * ThreadPoolExecutor.CallerRunsPolicy:重試添加當前的任務,自動重復呼叫 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize();return taskExecutor; }}

    5、異步失效的原因

  • 註解@Async的方法不是public方法;

  • 註解@Async的返回值只能為void或Future;

  • 註解@Async方法使用static修飾也會失效;

  • 沒加@EnableAsync註解;

  • 呼叫方和@Async不能在一個類中;

  • 在Async方法上標註@Transactional是沒用的,但在Async方法呼叫的方法上標註@Transcational是有效的;

  • 三、執行緒池中的核心執行緒數設定問題

    有一個問題,一直沒時間摸索,執行緒池中的核心執行緒數CorePoolSize、最大執行緒數MaxPoolSize,設定成多少,最合適,效率最高。

    借著這個機會,測試一下。

    1、我記得有這樣一個說法,CPU的處理器數量

    將核心執行緒數CorePoolSize設定成CPU的處理器數量,是不是效率最高的?

    // 獲取CPU的處理器數量int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;

    Runtime.getRuntime().availableProcessors()獲取的是CPU核心執行緒數,也就是計算資源。

  • CPU密集型,執行緒池大小設定為N,也就是和cpu的執行緒數相同,可以盡可能地避免執行緒間上下文切換,但在實際開發中,一般會設定為N+1,為了防止意外情況出現執行緒阻塞,如果出現阻塞,多出來的執行緒會繼續執行任務,保證CPU的利用效率。

  • IO密集型,執行緒池大小設定為2N,這個數是根據業務壓測出來的,如果不涉及業務就使用推薦。

  • 在實際中,需要對具體的執行緒池大小進行調整,可以透過壓測及機器裝置現狀,進行調整大小。

    如果執行緒池太大,則會造成CPU不斷的切換,對整個系統效能也不會有太大的提升,反而會導致系統緩慢。

    我的電腦的CPU的處理器數量是24。

    那麽一次讀取多少行最合適呢?

    測試的Excel中含有10萬條數據,10萬/24 = 4166,那麽我設定成4200,是不是效率最佳呢?

    測試的過程中發現,好像真的是這樣的。

    2、我記得大家都習慣性的將核心執行緒數CorePoolSize和最大執行緒數MaxPoolSize設定成一樣的,都愛設定成200

    是隨便寫的,還是經驗而為之?

    測試發現,當你將核心執行緒數CorePoolSize和最大執行緒數MaxPoolSize都設定為200的時候,第一次它會同時開啟150個執行緒,來進行工作。

    這個是為什麽?

    3、經過數十次的測試

    發現核心執行緒數好像差別不大

    每次讀取和入庫的數量是關鍵,不能太多,因為每次入庫會變慢;

    也不能太少,如果太少,超過了150個執行緒,就會造成執行緒阻塞,也會變慢;

    四、透過EasyExcel讀取並插入資料庫

    EasyExcel的方式,我就不寫雙異步最佳化了,大家切記陷入低水平勤奮的怪圈。

    1、ReadEasyExcelController

    @RequestMapping(value = "/readEasyExcel", method = RequestMethod.POST)@ResponseBodypublicString readEasyExcel() {try {String path = "G:\\測試\\data\\";String[] xlsxArr = new File(path).list();for (int i = 0; i < xlsxArr.length; i++) {String filePath = path + xlsxArr[i]; File fileTemp = new File(path + xlsxArr[i]);String fileName = fileTemp.getName().replace(".xlsx", ""); List<UserInfo> list = new ArrayList<>(); EasyExcel.read(filePath, UserInfo. class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead(); } }catch (Exception e){ logger.error("readEasyExcel 異常:",e);return"error"; }return"suceess";}

    2、ReadEasyExeclAsyncListener

    public ReadEasyExeclService readEasyExeclService;// 表名public String TABLE_NAME;// 批次插入閾值privateint BATCH_COUNT;// 數據集合private List<UserInfo> LIST;publicReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list){this.readEasyExeclService = readEasyExeclService;this.TABLE_NAME = tableName;this.BATCH_COUNT = batchCount;this.LIST = list; } @Overridepublicvoidinvoke(UserInfo data, AnalysisContext analysisContext){ data.setUuid(uuid()); data.setTableName(TABLE_NAME); LIST.add(data);if(LIST.size() >= BATCH_COUNT){// 批次入庫 readEasyExeclService.saveDataBatch(LIST); } } @OverridepublicvoiddoAfterAllAnalysed(AnalysisContext analysisContext){if(LIST.size() > 0){// 最後一批入庫 readEasyExeclService.saveDataBatch(LIST); } }publicstatic String uuid(){return UUID.randomUUID().toString().replace("-", ""); }}

    3、ReadEasyExeclServiceImpl

    @Servicepublic class ReadEasyExeclServiceImpl implements ReadEasyExeclService { @Resource private ReadEasyExeclMapper readEasyExeclMapper; @Override public void saveDataBatch(List<UserInfo> list) {// 透過mybatis入庫 readEasyExeclMapper.saveDataBatch(list);// 透過JDBC入庫// insertByJdbc(list); list.clear(); } private void insertByJdbc(List<UserInfo> list){ List<String> sqlList = new ArrayList<>();for (UserInfo u : list){ StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( "); sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',") .append("'").append(u.getId()).append("',") .append("'").append(u.getName()).append("',") .append("'").append(u.getAge()).append("',") .append("'").append(u.getAddress()).append("',") .append("'").append(u.getPhone()).append("',") .append("sysdate )"); sqlList.add(sqlBuilder.toString()); } JdbcUtil.executeDML(sqlList); }}

    4、UserInfo

    @Datapublic class UserInfo {privateString tableName;privateString uuid;@ExcelProperty(value = "ID")privateString id;@ExcelProperty(value = "NAME")privateString name;@ExcelProperty(value = "AGE")privateString age;@ExcelProperty(value = "ADDRESS")privateString address;@ExcelProperty(value = "PHONE")privateString phone;}

    作者丨哪咤

    來源丨公眾號:哪咤編程(ID:gh_61b183bcf690)

    dbaplus社群歡迎廣大技術人員投稿,投稿信箱: [email protected]