<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • 基于文件的多表join實現參考

      用例:有N個文件,每個文件只有一列主鍵,每個文件代表一種屬性。即當如PRI1主鍵在A文件中,說明PRI1具有A屬性。這種場景,一般用于數據的篩選,比如需要既有屬性A又有屬性B的主鍵有哪些?就是這類場景。

      如何處理該場景?

     

    1. 解題思路

      如果拋卻如題所說文件限制,那我們如何解決?

      比如,我們可以將每個文件數據導入到redis中,數據結構為hash, redis-key為pri主鍵,hash-key為屬性X, hash-value為1或不存在。在做判定的時候,只需找到對應的key, 再去判斷其是否具有對應屬性即可解決問題了。

      這個方案看起來比較合適,但有兩個缺點:1. redis內存數據庫,容量有限,不一定能滿足大數據量的場景; 2. 針對反向查詢的需求無法滿足,即想要查找既含有A屬性又含有B屬性的主鍵列表,就很難辦到。

      再比如,我們可以使用類似于mysql之類的關系型數據,先將單文件數據導致單表中,表名以相應屬性標識命名,然后以sql形式進行臨時計算即可。sql參考如下:

     select COALESCE(ta.id,tb.id) as id, 
         case when ta.id is not null then 1 else 0 end as ta_flag, 
         case when tb.id is not null then 1 else 0 end as tb_flag
       from table_a as ta 
        full join table_b as tb on ta.id=tb.id;

      應該說這種解決方案算是比較好的了,在計算不大的情況下,這種復雜度在數據庫領域簡直是小場面了。需要再次說明的是,在數據庫會新建一個個的小表,它只有一列主鍵數據,然后在查詢的時候再進行計算。這種方案的問題在于,當標識越來越多之后,就會導致小表會越來越多,甚至可能超出數據庫限制。原本是一個一般的需求,卻要要求非常好數據庫支持,也不太好嘛。

      不過,上面這個問題,也可以解決。比如我們可以使用行轉列的形式,將以上小表轉換成一張大表,隨后將小表刪除,從而達到數據庫的普通要求。合并語句也不復雜。參考如下:

    create table w_xx as 
     select COALESCE(ta.id,tb.id) as id, 
         case when ta.id is not null then 1 else 0 end as ta_flag, 
         case when tb.id is not null then 1 else 0 end as tb_flag
       from table_a as ta 
        full join table_b as tb on ta.id=tb.id;

      如此,基本完美了。

     

    2. 基于文件的行轉列數據join

      如果我沒有外部存儲介質,那當如何?如題,直接基于文件,將多個合并起來。看起來并非難事。

      如果不考慮內存問題,則可以將每個文件讀入為list, 轉換為map存儲,和上面的redis實現方案類似。只是可能不太現實,也比較簡單,忽略實現。

      再簡單化,如果我們每個文件中保存的主鍵都是有序的,要想合并就更簡單了。
      基本思路是,兩兩文件合并,依次讀取行,然后比對是否有相等的值,然后寫到新文件中即可。

      另外,如果要做并行計算,可以考慮使用上一篇文章提到的 fork/join 框架,非常合場景呢。

     

    2.1. 文件行轉列合并主體框架

      主要算法為依次遍歷各文件,進行數據判定,然后寫目標文件。具體實現如下:

    /**
     * 功能描述: 文件合并工具類
     *
     */
    @Slf4j
    public class FileJoiner {
    
        /**
         * router結果文件分隔符
         */
        private static final String CSV_RESULT_FILE_SEPARATOR = ",";
    
        /**
         * 合并文件語義,等價sql:
         * select coalesce(a.id, b.id, c.id...) id,
         *      case when a.id is not null then '1' else '' end f_a,
         *      case when b.id is not null then '1' else '' end f_b,
         *      ...
         *  from a
         *      full join b on a.id = b.id
         *      full join c on a.id = c.id
         *      ...
         *   ;
         */
        public JoinFileDescriptor joinById(JoinFileDescriptor a,
                                           JoinFileDescriptor b) throws IOException {
            JoinFileDescriptor mergedDesc = new JoinFileDescriptor();
            if(a.getLineCnt() <= 0 && b.getLineCnt() <= 0) {
                List<FileFieldDesc> fieldDesc = new ArrayList<>();
                // 先a后b
                fieldDesc.addAll(a.getFieldInfo());
                fieldDesc.addAll(b.getFieldInfo());
                mergedDesc.setFieldInfo(fieldDesc);
                return mergedDesc;
            }
            if(a.getLineCnt() <= 0) {
                List<FileFieldDesc> fieldDesc = new ArrayList<>();
                // 先b后a
                fieldDesc.addAll(b.getFieldInfo());
                fieldDesc.addAll(a.getFieldInfo());
                mergedDesc.setFieldInfo(fieldDesc);
                return mergedDesc;
            }
            if(b.getLineCnt() <= 0) {
                List<FileFieldDesc> fieldDesc = new ArrayList<>();
                // 先a后b
                fieldDesc.addAll(a.getFieldInfo());
                fieldDesc.addAll(b.getFieldInfo());
                mergedDesc.setFieldInfo(fieldDesc);
                return mergedDesc;
            }
            // 正式合并 a b 表
            String mergedPath = a.getPath() + ".m" + a.getDeep();
            long cnt = -1;
            try(BufferedReader aReader = new BufferedReader(new FileReader(a.getPath()))) {
                try(BufferedReader bReader = new BufferedReader(new FileReader(b.getPath()))) {
                    a.setReader(aReader);
                    b.setReader(bReader);
                    try(OutputStream outputStream = FileUtils.openOutputStream(new File(mergedPath))) {
                        cnt = unionTwoBufferStream(a, b, outputStream);
                    }
                }
            }
            mergedDesc.setPath(mergedPath);
            mergedDesc.setLineCnt(cnt);
            mergedDesc.incrDeep();
            // 先a后b
            List<FileFieldDesc> fieldDesc = new ArrayList<>();
            a.getFieldInfo().forEach(FileFieldDesc::writeOk);
            b.getFieldInfo().forEach(FileFieldDesc::writeOk);
            fieldDesc.addAll(a.getFieldInfo());
            fieldDesc.addAll(b.getFieldInfo());
            mergedDesc.setFieldInfo(fieldDesc);
            return mergedDesc;
        }
    
        /**
         * 合并多文件,無序的,但各字段位置可定位
         *
         * @param fileList 待合并的文件列表
         * @param orderedFieldList 需要按序排列
         * @return 合并后文件信息及字段列表
         * @throws Exception 合并出錯拋出
         */
        public JoinFileDescriptor joinMultiFile(List<JoinFileDescriptor> fileList,
                                                List<String> orderedFieldList) throws Exception {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            FileJoinFJTask fjTask = new FileJoinFJTask(fileList);
            ForkJoinTask<JoinFileDescriptor> future = forkJoinPool.submit(fjTask);
            JoinFileDescriptor mergedFile = future.get();
    //        List<String> orderedFieldList = new ArrayList<>();
    //        for (JoinFileDescriptor file1 : fileList) {
    //            List<String> field1 = file1.getFieldInfo().stream()
    //                                        .map(FileFieldDesc::getFieldName)
    //                                        .collect(Collectors.toList());
    //            orderedFieldList.addAll(field1);
    //        }
            return rewriteFileBySelectField(mergedFile, orderedFieldList);
        }
    
        /**
         * 按照要求字段順序重寫文件內容
         *
         * @param originFile 當前文件描述
         * @param orderedFields 目標字段序列
         * @return 處理好的文件實例(元數據或獲取)
         * @throws IOException 寫文件異常拋出
         */
        public JoinFileDescriptor rewriteFileBySelectField(JoinFileDescriptor originFile,
                                                           List<String> orderedFields) throws IOException {
            List<FileFieldDesc> fieldDescList = originFile.getFieldInfo();
            if(checkIfCurrentFileInOrder(fieldDescList, orderedFields)) {
                log.info("當前文件已按要求排放好,無需再排: {}", orderedFields);
                return originFile;
            }
            Map<String, FieldOrderIndicator> indicatorMap = composeFieldOrderIndicator(fieldDescList, orderedFields);
            AtomicLong lineCounter = new AtomicLong(0);
            String targetFilePath = originFile.getPath() + ".of";
            try(BufferedReader aReader = new BufferedReader(new FileReader(originFile.getPath()))) {
                try(OutputStream outputStream = FileUtils.openOutputStream(new File(targetFilePath))) {
                    String lineData;
                    while ((lineData = aReader.readLine()) != null) {
                        String[] cols = StringUtils.splitPreserveAllTokens(
                                            lineData, CSV_RESULT_FILE_SEPARATOR);
                        // 空行
                        if(cols.length == 0) {
                            continue;
                        }
                        // id,1,...
                        StringBuilder sb = new StringBuilder(cols[0]);
                        for (String f1 : orderedFields) {
                            sb.append(CSV_RESULT_FILE_SEPARATOR);
                            FieldOrderIndicator fieldDescIndicator = indicatorMap.get(f1);
                            if(fieldDescIndicator == null
                                    || (fieldDescIndicator.fieldIndex >= cols.length
                                        && fieldDescIndicator.fieldDesc.getWriteFlag() == 1)) {
                                continue;
                            }
                            sb.append(cols[fieldDescIndicator.fieldIndex]);
                        }
                        writeLine(outputStream, sb.toString(), lineCounter);
                    }
                }
            }
            JoinFileDescriptor mergedDesc = new JoinFileDescriptor();
            mergedDesc.setPath(targetFilePath);
            mergedDesc.setLineCnt(lineCounter.get());
            mergedDesc.setFieldInfo(
                    orderedFields.stream()
                            .map(r -> FileFieldDesc.newField(r, 1))
                            .collect(Collectors.toList()));
            return mergedDesc;
        }
    
        /**
         * 構造字段下標指示器
         *
         * @param currentFieldDescList 當前字段排列情況
         * @param orderedFields 目標序列的字段列表
         * @return {"a":{"fieldIndex":1, "fieldDesc":{"name":"aaa", "writeFlag":1}}}
         */
        private Map<String, FieldOrderIndicator> composeFieldOrderIndicator(List<FileFieldDesc> currentFieldDescList,
                                                                            List<String> orderedFields) {
            Map<String, FieldOrderIndicator> indicatorMap = new HashMap<>(orderedFields.size());
            outer:
            for (String f1 : orderedFields) {
                for (int i = 0; i < currentFieldDescList.size(); i++) {
                    FileFieldDesc originField1 = currentFieldDescList.get(i);
                    if (f1.equals(originField1.getFieldName())) {
                        indicatorMap.put(f1, new FieldOrderIndicator(i + 1, originField1));
                        continue outer;
                    }
                }
                indicatorMap.put(f1, null);
            }
            return indicatorMap;
        }
    
        /**
         * 檢測當前文件是按字段先后要求排放好
         *
         * @param currentFieldDescList 現有文件字段排列情況
         * @param orderedFields 期望排列的順序列表
         * @return true:已排好序,無需再排; false:未按要求排好
         */
        private boolean checkIfCurrentFileInOrder(List<FileFieldDesc> currentFieldDescList,
                                                  List<String> orderedFields) {
            if(orderedFields.size() != currentFieldDescList.size()) {
                return true;
            }
            for (int j = 0; j < orderedFields.size(); j++) {
                String targetFieldName = orderedFields.get(j);
                FileFieldDesc possibleFieldDesc = currentFieldDescList.get(j);
                if(possibleFieldDesc != null
                        && targetFieldName.equals(possibleFieldDesc.getFieldName())
                        && possibleFieldDesc.getWriteFlag() == 1) {
                    continue;
                }
                return false;
            }
            return true;
        }
    
        /**
         * 計算兩個數據流取并集 ( A ∪ B)
         *
         *   并將 A/B 標簽位寫到后置位置中, 1代表存在,空代表存在
         *      如A存在且B存在,則寫結果為:  A,1,1
         *      如A存在但B不存在, 則寫結果為: A,1,
         *      如A不存在但B存在, 則寫結果為: B,,1
         *
         *    當A或B中存在多列時,以第一列為主鍵進行關聯
         *       如A為: 111
         *         B為: 111,,1,1
         *       則合并后的結果為: 111,1,,1,1
         *
         * @return 最終寫入的文件行數
         */
        private long unionTwoBufferStream(JoinFileDescriptor a,
                                          JoinFileDescriptor b,
                                          OutputStream targetOutputStream) throws IOException {
            String lineDataLeft;
            String lineDataRight;
    //        String lineDataLast = null;
            AtomicLong lineNumCounter = new AtomicLong(0);
            BufferedReader leftBuffer = a.getReader();
            BufferedReader rightBuffer = b.getReader();
            lineDataRight = rightBuffer.readLine();
            // 主鍵固定在第一列
            int idIndex = 1;
            String leftId = null;
            String rightId = getIdColumnValueFromLineData(lineDataRight, idIndex);
            String lastId = null;
            int cmpV;
            while ((lineDataLeft = leftBuffer.readLine()) != null) {
                // 以左表基礎迭代,所以優先檢查右表
                leftId = getIdColumnValueFromLineData(lineDataLeft, idIndex);
                if(lineDataRight != null
                        && (cmpV = leftId.compareTo(rightId)) >= 0) {
                    do {
                        if(rightId.equals(lastId)) {
                            lineDataRight = rightBuffer.readLine();
                            rightId = getIdColumnValueFromLineData(
                                    lineDataRight, idIndex);
                            // 合并左右數據
                            continue;
                        }
                        writeLine(targetOutputStream,
                                joinLineData(cmpV == 0 ? lineDataLeft : null,
                                        lineDataRight, a.getFieldInfo(),
                                        b.getFieldInfo()),
                                lineNumCounter);
                        lastId = rightId;
                        lineDataRight = rightBuffer.readLine();
                        rightId = getIdColumnValueFromLineData(
                                lineDataRight, idIndex);
                    } while (lineDataRight != null
                                && (cmpV = leftId.compareTo(rightId)) >= 0);
                }
                // 左右相等時,右表數據已寫成功,直接跳過即可
                if(leftId.equals(lastId)) {
                    continue;
                }
                writeLine(targetOutputStream,
                        joinLineData(lineDataLeft, null,
                                a.getFieldInfo(), b.getFieldInfo()),
                        lineNumCounter);
                lastId = leftId;
            }
            // 處理可能剩余的右表數據
            while (lineDataRight != null) {
                rightId = getIdColumnValueFromLineData(lineDataRight, idIndex);
                if(rightId.equals(lastId)) {
                    lineDataRight = rightBuffer.readLine();
                    continue;
                }
                writeLine(targetOutputStream,
                        joinLineData(null, lineDataRight,
                                a.getFieldInfo(), b.getFieldInfo()),
                        lineNumCounter);
                lastId = rightId;
                lineDataRight = rightBuffer.readLine();
            }
            return lineNumCounter.get();
        }
    
        /**
         * 依據字段順序合并兩行數據(以左行為先)
         *
         *          最后一個字段為本次需要進行追加的字段
         *
         * @param leftLineData 左邊數據
         * @param rightLineData 右邊數據
         * @param leftFields 左邊字段信息(可能未寫入左邊數據中)
         * @param rightFields 右邊字段信息(可能未寫入右邊數據中)
         * @return 合并后的結果
         */
        private String joinLineData(String leftLineData, String rightLineData,
                                    List<FileFieldDesc> leftFields,
                                    List<FileFieldDesc> rightFields) {
            if(StringUtils.isBlank(leftLineData)
                    && StringUtils.isBlank(rightLineData)) {
                return "";
            }
            int leftEmptyFieldIndex = getFieldEmptyPlaceholderIndex(leftFields);
            int rightEmptyFieldIndex = getFieldEmptyPlaceholderIndex(rightFields);
            // 1. 只有右值, 將右值首字段移至行首,其余放右尾部
            if(StringUtils.isBlank(leftLineData)) {
                return joinFieldByRight(rightLineData, leftFields,
                                        rightFields, rightEmptyFieldIndex);
            }
            // 2. 只有左值
            if(StringUtils.isBlank(rightLineData)) {
                return joinFieldByLeft(leftLineData, leftFields,
                                        rightFields, leftEmptyFieldIndex);
            }
            // 3. 左右均有部分值
            return joinFieldByLeftRight(leftLineData, rightLineData,
                                        leftFields, rightFields,
                                        leftEmptyFieldIndex, rightEmptyFieldIndex);
        }
    
        /**
         * 關聯一行僅有右值的數據
         *
         * @param rightLineData 右值數據行(可能含有空值占位未填充)
         * @param leftFields 左列字段列表
         * @param rightFields 右列字段列表
         * @param emptyFieldIndex 空占位的
         * @return 合并后的字段,此時全部字段均已填充
         */
        private String joinFieldByRight(String rightLineData,
                                        List<FileFieldDesc> leftFields,
                                        List<FileFieldDesc> rightFields,
                                        int emptyFieldIndex) {
            String[] rightCols = StringUtils.splitPreserveAllTokens(
                                    rightLineData, CSV_RESULT_FILE_SEPARATOR);
            if(emptyFieldIndex != -1
                    && rightCols.length != emptyFieldIndex + 1) {
                throw new RuntimeException("字段位置不匹配:" + rightCols.length
                        + ", 實際未寫:" + (emptyFieldIndex + 1));
            }
            // s1. 填充首列
            StringBuilder lineResultBuilder = new StringBuilder(rightCols[0]);
            // s2. 填充空值左列
            for (int i = 0; i < leftFields.size(); i++) {
                lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR);
            }
            // s3. 填充右值有值列
            for (int i = 1; i < rightCols.length; i++) {
                lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR)
                        .append(rightCols[i]);
            }
            // s4. 填充右值空值列, 最末留與當前字段使用
            if(rightCols.length < rightFields.size() + 1) {
                if(emptyFieldIndex != -1) {
                    for (int i = emptyFieldIndex; i < rightFields.size() - 1; i++) {
                        lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR);
                    }
                }
                // 右值存在字段位寫1
                lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1");
            }
            return lineResultBuilder.toString();
        }
    
        /**
         * 關聯一行僅有右值的數據
         *
         * @param leftLineData 左值數據行(可能含有空值占位未填充)
         * @param leftFields 左列字段列表
         * @param rightFields 右列字段列表
         * @param emptyFieldIndex 空占位的
         * @return 合并后的字段,此時全部字段均已填充
         */
        private String joinFieldByLeft(String leftLineData,
                                       List<FileFieldDesc> leftFields,
                                       List<FileFieldDesc> rightFields,
                                       int emptyFieldIndex) {
            String[] cols = StringUtils.splitPreserveAllTokens(
                                leftLineData, CSV_RESULT_FILE_SEPARATOR);
            if(emptyFieldIndex != -1
                    && cols.length != emptyFieldIndex + 1) {
                throw new RuntimeException("字段位置不匹配:" + cols.length
                        + ", 實際未寫:" + (emptyFieldIndex + 1));
            }
            // s1. 直接保留左值非空值
            StringBuilder lineResultBuilder = new StringBuilder(leftLineData);
            // s2. 填充左值空值
            if(cols.length < rightFields.size() + 1) {
                if(emptyFieldIndex != -1) {
                    for (int i = emptyFieldIndex; i < leftFields.size() - 1; i++) {
                        lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR);
                    }
                }
                lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1");
            }
            // s3. 填充右值空值
            for (int i = 0; i < rightFields.size(); i++) {
                lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR);
            }
            return lineResultBuilder.toString();
        }
        /**
         * 關聯一行僅有右值的數據
         *
         * @param leftLineData 左值數據行(可能含有空值占位未填充)
         * @param rightLineData 右值數據行(可能含有空值占位未填充)
         * @param leftFields 左列字段列表
         * @param rightFields 右列字段列表
         * @param leftEmptyFieldIndex 空占位的
         * @param rightEmptyFieldIndex 空占位的
         * @return 合并后的字段,此時全部字段均已填充
         */
        private String joinFieldByLeftRight(String leftLineData,
                                            String rightLineData,
                                            List<FileFieldDesc> leftFields,
                                            List<FileFieldDesc> rightFields,
                                            int leftEmptyFieldIndex,
                                            int rightEmptyFieldIndex) {
            String[] leftCols = StringUtils.splitPreserveAllTokens(
                                    leftLineData, CSV_RESULT_FILE_SEPARATOR);
            if(leftEmptyFieldIndex != -1
                    && leftCols.length != leftEmptyFieldIndex + 1) {
                throw new RuntimeException("字段位置不匹配:" + leftCols.length
                        + ", 實際未寫:" + (leftEmptyFieldIndex + 1));
            }
            String[] rightCols = StringUtils.splitPreserveAllTokens(
                                    rightLineData, CSV_RESULT_FILE_SEPARATOR);
            if(rightEmptyFieldIndex != -1
                    && rightCols.length != rightEmptyFieldIndex + 1) {
                throw new RuntimeException("字段位置不匹配:" + rightCols.length
                        + ", 實際未寫:" + (rightEmptyFieldIndex + 1));
            }
            // s1. 直接保留左值非空值
            StringBuilder lineResultBuilder = new StringBuilder(leftLineData);
            // s2. 填充左值空值, 最后一位留給當前字段
            if(leftCols.length < leftFields.size() + 1) {
                if(leftEmptyFieldIndex != -1) {
                    for (int i = leftEmptyFieldIndex; i < leftFields.size() - 1; i++) {
                        lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR);
                    }
                }
                // 左值存在字段位寫1
                lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1");
            }
            // s3. 填充右值非空值,第一列忽略
            for (int i = 1; i < rightCols.length; i++) {
                lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR)
                        .append(rightCols[i]);
            }
            if(rightCols.length < rightFields.size() + 1) {
                if(rightEmptyFieldIndex != -1) {
                    for (int i = rightEmptyFieldIndex; i < rightFields.size() - 1; i++) {
                        lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR);
                    }
                }
                // 右值存在字段位寫1
                lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1");
            }
            return lineResultBuilder.toString();
        }
    
        /**
         * 獲取首個字段未被填充值的位置
         *
         * @param fieldList 所有字段列表
         * @return 首個未填充的字段位置
         */
        private int getFieldEmptyPlaceholderIndex(List<FileFieldDesc> fieldList) {
            for (int i = 0; i < fieldList.size(); i++) {
                FileFieldDesc f1 = fieldList.get(i);
                if(f1.getWriteFlag() == 0) {
                    return i;
                }
            }
            return -1;
        }
    
        /**
         * 從一行數據中讀取id列字段值
         *
         * @param lineData 該行內容
         * @param idIndex id列所在下標,從1開始計算
         * @return id的值
         */
        private String getIdColumnValueFromLineData(String lineData,
                                                        int idIndex) {
            if(lineData == null) {
                return null;
            }
            if(idIndex <= 0) {
                log.warn("id行下標給定錯誤:{},"
                        + "返回整行,請注意排查原因", idIndex);
                return lineData;
            }
            // 固定使用','分隔多列數據
            String[] cols = StringUtils.splitPreserveAllTokens(lineData, CSV_RESULT_FILE_SEPARATOR);
            // 列超限,返回空
            if(idIndex > cols.length) {
                log.warn("id列下標超限,請排查:{} -> {}", lineData, idIndex);
                return "";
            }
            return cols[idIndex - 1];
        }
    
        /**
         * 寫單行數據到輸出流(帶計數器)
         */
        private void writeLine(OutputStream outputStream,
                               String lineData,
                               AtomicLong counter) throws IOException {
            if(counter.get() > 0) {
                outputStream.write("\n".getBytes());
            }
            outputStream.write(lineData.getBytes());
            counter.incrementAndGet();
        }
    
        /**
         * 字段序列號指示器
         */
        private class FieldOrderIndicator {
            int fieldIndex;
            FileFieldDesc fieldDesc;
            FieldOrderIndicator(int fieldIndex, FileFieldDesc fieldDesc) {
                this.fieldIndex = fieldIndex;
                this.fieldDesc = fieldDesc;
            }
        }
    
        /**
         * 文件join任務分解類
         */
        private static class FileJoinFJTask extends RecursiveTask<JoinFileDescriptor> {
    
            private static final FileJoiner joiner = new FileJoiner();
    
            private List<JoinFileDescriptor> fileList;
    
            public FileJoinFJTask(List<JoinFileDescriptor> fileList) {
                this.fileList = fileList;
            }
    
            @Override
            public JoinFileDescriptor compute() {
                int len = fileList.size();
                if(len > 2) {
                    int mid = len / 2;
                    FileJoinFJTask subTask1 = new FileJoinFJTask(fileList.subList(0, mid));
                    subTask1.fork();
                    FileJoinFJTask subTask2 = new FileJoinFJTask(fileList.subList(mid, len));
                    subTask2.fork();
    
                    JoinFileDescriptor m1 = subTask1.join();
                    JoinFileDescriptor m2 = subTask2.join();
                    return joinTwoFile(m1, m2);
                }
                if(len == 2) {
                    return joinTwoFile(fileList.get(0), fileList.get(1));
                }
                // len == 1
                if(len == 1) {
                    return fileList.get(0);
                }
                throw new RuntimeException("待合并的文件數為0?->" + fileList.size());
            }
    
            /**
             * 合并兩個有序文件
             *
             * @param m1 文件1
             * @param m2 文件2
             * @return 合并后的文件
             */
            private JoinFileDescriptor joinTwoFile(JoinFileDescriptor m1, JoinFileDescriptor m2) {
                try {
    //                System.out.println("join file1:" + m1.getPath().substring(82) + ", fields:" + m1.getFieldInfo()
    //                        + ", file2:" + m2.getPath().substring(82) + ", fields:" + m2.getFieldInfo());
                    return joiner.joinById(m1, m2);
                } catch (IOException e) {
                    log.error("合并文件失敗,{}, {}", m1, m2, e);
                    throw new RuntimeException(e);
                }
            }
        }
    }

      總體算法框架就是這樣了,外部調用時,可以串行計算調用 joinById, 自行合并。也可以直接joinMultiFile, 內部進行并行計算了。然后,最后再可以按照自行要求,做順序固化。此處并行計算的方案,正則上篇中講到的fork/join.

     

    2.2. 幾個輔助類

      如上計算過程中,需要使用一些輔助型數據結構,以表達清楚過程。以下為輔助類信息:

    // 1. JoinFileDescriptor 
    import java.io.BufferedReader;
    import java.util.List;
    
    /**
     * 功能描述: 需要關聯join的文件描述類
     *
     */
    
    public class JoinFileDescriptor {
    
        /**
         * 文件路徑
         */
        private String path;
    
        /**
         * 文件行數
         */
        private long lineCnt;
    
        /**
         *  字段名列表,按先后排列寫入文件
         */
        private List<FileFieldDesc> fieldInfo;
    
        /**
         * 合并深度,未合并時為0
         */
        private int deep;
    
        public JoinFileDescriptor() {
        }
    
        public JoinFileDescriptor(String path, int lineCnt,
                                  List<FileFieldDesc> fieldInfo) {
            this.path = path;
            this.lineCnt = lineCnt;
            this.fieldInfo = fieldInfo;
        }
    
        private transient BufferedReader reader;
    
        public BufferedReader getReader() {
            return reader;
        }
    
        public void setReader(BufferedReader reader) {
            this.reader = reader;
        }
    
        public String getPath() {
            return path;
        }
    
        public void setPath(String path) {
            this.path = path;
        }
    
        public long getLineCnt() {
            return lineCnt;
        }
    
        public void setLineCnt(long lineCnt) {
            this.lineCnt = lineCnt;
        }
    
        public List<FileFieldDesc> getFieldInfo() {
            return fieldInfo;
        }
    
        public void setFieldInfo(List<FileFieldDesc> fieldInfo) {
            this.fieldInfo = fieldInfo;
        }
    
        public int getDeep() {
            return deep;
        }
    
        public void incrDeep() {
            this.deep++;
        }
    
        @Override
        public String toString() {
            return "JoinFileDescriptor{" +
                    "path='" + path + '\'' +
                    ", lineCnt=" + lineCnt +
                    ", fieldInfo=" + fieldInfo +
                    ", deep=" + deep +
                    '}';
        }
    }
    
    // 2. FileFieldDesc
    /**
     * 功能描述: 文件字段描述
     *
     */
    public class FileFieldDesc {
        /**
         *  字段名列表,按先后排列寫入文件
         */
        private String fieldName;
    
        /**
         * 字段是否被真實寫入文件,
         * <p>
         * 1:已寫入,0:未寫入(序號排在前面的字段,需要后字段合并時同步寫入)
         */
        private int writeFlag;
    
        private FileFieldDesc(String fieldName) {
            this.fieldName = fieldName;
        }
    
        public static FileFieldDesc newField(String fieldName) {
            return new FileFieldDesc(fieldName);
        }
    
        public static FileFieldDesc newField(String fieldName, int writeFlag) {
            FileFieldDesc f = new FileFieldDesc(fieldName);
            f.setWriteFlag(writeFlag);
            return f;
        }
    
        public String getFieldName() {
            return fieldName;
        }
    
        public void setFieldName(String fieldName) {
            this.fieldName = fieldName;
        }
    
        public int getWriteFlag() {
            return writeFlag;
        }
    
        public void setWriteFlag(int writeFlag) {
            this.writeFlag = writeFlag;
        }
    
        public void writeOk() {
            writeFlag = 1;
        }
    
        @Override
        public String toString() {
            return "FileFieldDesc{" +
                    "fieldName='" + fieldName + '\'' +
                    ", writeFlag=" + writeFlag +
                    '}';
        }
    }

      還是很簡單的吧。

     

    2.3. 單元測試

      沒有測試不算完成,一個好的測試應該包含所有可能的計算情況,結果。比如幾個文件合并,合并后有幾行,哪幾行的數據應該如何等等。害,那些留給使用者自行完善吧。簡單測試如下。

    /**
     * 功能描述: 文件合并工具類測試
     *
     */
    public class FileJoinerTest {
    
        @Before
        public void setup() {
            // 避免log4j解析報錯
            System.setProperty("catalina.home", "/tmp");
        }
    
        @Test
        public void testJoinById() throws Exception {
            long startTime = System.currentTimeMillis();
            List<String> resultLines;
            String classpath = this.getClass().getResource("/").getPath();
            JoinFileDescriptor file1 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_a.csv", 4,
                    Collections.singletonList(FileFieldDesc.newField("crowd_a")));
            JoinFileDescriptor file2 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_b.csv", 5,
                    Collections.singletonList(FileFieldDesc.newField("crowd_b")));
            FileJoiner joiner = new FileJoiner();
            JoinFileDescriptor fileMerged = joiner.joinById(file1, file2);
            resultLines = FileUtils.readLines(new File(fileMerged.getPath()), "utf-8");
            System.out.println("result:" + fileMerged);
            Assert.assertEquals("合并結果行數不正確", 6L, fileMerged.getLineCnt());
            Assert.assertEquals("道行合并結果不正確", "6001,1,1", resultLines.get(0));
            Assert.assertEquals("道行合并結果不正確", "6011,,1", resultLines.get(5));
            JoinFileDescriptor file3 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_c.csv", 5,
                    Collections.singletonList(FileFieldDesc.newField("crowd_c")));
            fileMerged = joiner.joinById(fileMerged, file3);
            System.out.println("result3:" + fileMerged);
    
    
            JoinFileDescriptor file4 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_d.csv", 4,
                    Collections.singletonList(FileFieldDesc.newField("crowd_d")));
            fileMerged = joiner.joinById(fileMerged, file4);
            System.out.println("result4:" + fileMerged);
    
            JoinFileDescriptor file6 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_f.csv", 4,
                    Collections.singletonList(FileFieldDesc.newField("crowd_f")));
            fileMerged = joiner.joinById(fileMerged, file6);
            System.out.println("result4:" + fileMerged);
    
            JoinFileDescriptor file5 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_e.csv", 4,
                    Collections.singletonList(FileFieldDesc.newField("crowd_e")));
            fileMerged = joiner.joinById(fileMerged, file5);
            System.out.println("result4:" + fileMerged);
    
            fileMerged = joiner.rewriteFileBySelectField(fileMerged,
                                Arrays.asList("crowd_a", "crowd_b", "crowd_c",
                                            "crowd_d", "crowd_e", "crowd_f"));
            System.out.println("result4:" + fileMerged);
    
            System.out.println("costTime:" + (System.currentTimeMillis() - startTime) + "ms");
        }
    
        @Test
        public void testJoinByIdUseForkJoin() throws Exception {
            long startTime = System.currentTimeMillis();
            List<JoinFileDescriptor> sortedFileList = new ArrayList<>();
            String classpath = this.getClass().getResource("/").getPath();
            JoinFileDescriptor file1 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_a.csv", 4,
                    Collections.singletonList(FileFieldDesc.newField("crowd_a")));
            sortedFileList.add(file1);
    
            JoinFileDescriptor file2 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_b.csv", 5,
                    Collections.singletonList(FileFieldDesc.newField("crowd_b")));
            sortedFileList.add(file2);
    
            JoinFileDescriptor file3 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_c.csv", 5,
                    Collections.singletonList(FileFieldDesc.newField("crowd_c")));
            sortedFileList.add(file3);
    
            JoinFileDescriptor file4 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_d.csv", 4,
                    Collections.singletonList(FileFieldDesc.newField("crowd_d")));
            sortedFileList.add(file4);
    
            JoinFileDescriptor file5 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_e.csv", 10,
                    Collections.singletonList(FileFieldDesc.newField("crowd_e")));
            sortedFileList.add(file5);
    
            JoinFileDescriptor file6 = new JoinFileDescriptor(
                    classpath + "file/t0/crowd_f.csv", 10,
                    Collections.singletonList(FileFieldDesc.newField("crowd_f")));
            sortedFileList.add(file6);
            Collections.shuffle(sortedFileList);
    
            FileJoiner joiner = new FileJoiner();
            JoinFileDescriptor fileMerged = joiner.joinMultiFile(sortedFileList,
                                        Arrays.asList("crowd_a", "crowd_b", "crowd_c",
                                                "crowd_d", "crowd_e", "crowd_f"));
            System.out.println("fileMerged:" + fileMerged);
            System.out.println("costTime:" + (System.currentTimeMillis() - startTime) + "ms");
        }
    
    }

      下面這個并行計算沒有斷言,一是懶得加,二是這種確實也復雜,這也是和分布系統排查問題難表暗合之意。另外值得一提的是,為了驗證代碼的穩定性,單測中添加了一個文件的隨機打亂,從而保證了任意順序都可拿到最終結果。而在實際應用中,可以按照文件行數大小排序,使用小文件與小文件合,大文件與大文件合,從而避免許多空行讀而浪費性能。這也是自己實現的好處,想起來哪里想調整下,立即橫刀立馬。

    下面給幾個樣例文件:

    // crowd_a.csv
    6001
    6002
    6003
    6009
    // crowd_b.csv
    6001
    6002
    6003
    6006
    6011
    // crowd_c.csv
    6001
    6003
    6006
    6009
    ...
    e,f,g
    ...

      以上工具類,可以看作是對前面所示sql語義的同等實現,雖不能與官方同日而語,但也有一定的應用場景,只待各位發現。供諸君參考。(誰知道呢,也許你用MR更簡單更高效)

    posted @ 2021-06-29 15:38  等你歸去來  閱讀(268)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看