<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • Hadoop: 單詞計數(Word Count)的MapReduce實現

    1.Map與Reduce過程

    1.1 Map過程

    首先,Hadoop會把輸入數據劃分成等長的輸入分片(input split)分片發送到MapReduce。Hadoop為每個分片創建一個map任務,由它來運行用戶自定義的map函數以分析每個分片中的記錄。在我們的單詞計數例子中,輸入是多個文件,一般一個文件對應一個分片,如果文件太大則會劃分為多個分片。map函數的輸入以<key, value>形式做為輸入,value為文件的每一行,key為該行在文件中的偏移量(一般我們會忽視)。這里map函數起到的作用為將每一行進行分詞為多個word,并在context中寫入<word, 1>以代表該單詞出現一次。

    map過程的示意圖如下:

    mapper代碼編寫如下:

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
    
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            //每次處理一行,一個mapper里的value為一行,key為該行在文件中的偏移量
            StringTokenizer iter = new StringTokenizer(value.toString());
            while (iter.hasMoreTokens()) {
                word.set(iter.nextToken());
                // 向context中寫入<word, 1>
                context.write(word, one);
                System.out.println(word);
            }
        }
    }
    

    如果我們能夠并行處理分片(不一定是完全并行),且分片是小塊的數據,那么處理過程將會有一個好的負載平衡。但是如果分片太小,那么管理分片與map任務創建將會耗費太多時間。對于大多數作業,理想分片大小為一個HDFS塊的大小,默認是64MB。

    map任務的執行節點和輸入數據的存儲節點相同時,Hadoop的性能能達到最佳,這就是計算機系統中所謂的data locality optimization(數據局部性優化)。而最佳分片大小與塊大小相同的原因就在于,它能夠保證一個分片存儲在單個節點上,再大就不能了。

    1.2 Reduce過程

    接下來我們看reducer的編寫。reduce任務的多少并不是由輸入大小來決定,而是需要人工單獨指定的(默認為1個)。和上面map不同的是,reduce任務不再具有本地讀取的優勢————一個reduce任務的輸入往往來自于所有mapper的輸出,因此map和reduce之間的數據流被稱為 shuffle(洗牌) 。Hadoop會先按照key-value對進行排序,然后將排序好的map的輸出通過網絡傳輸到reduce任務運行的節點,并在那里進行合并,然后傳遞到用戶定義的reduce函數中。

    reduce 函數示意圖如下:

    reducer代碼編寫如下:

     public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable>{
            private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    

    2.完整代碼

    2.1 項目架構

    關于VSCode+Java+Maven+Hadoop開發環境搭建,可以參見我的博客《VSCode+Maven+Hadoop開發環境搭建》,此處不再贅述。這里展示我們的項目架構圖:

    Word-Count-Hadoop
    ├─ input
    │  ├─ file1
    │  ├─ file2
    │  └─ file3
    ├─ output
    ├─ pom.xml
    ├─ src
    │  └─ main
    │     └─ java
    │        └─ WordCount.java
    └─ target
    

    WordCount.java代碼如下:

    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class WordCount{
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            //每次處理一行,一個mapper里的value為一行,key為該行在文件中的偏移量
                StringTokenizer iter = new StringTokenizer(value.toString());
                while (iter.hasMoreTokens()) {
                    word.set(iter.nextToken());
                    // 向context中寫入<word, 1>
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable>{
            private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "word_count");
    
            job.setJarByClass(WordCount.class);
    
            job.setMapperClass(TokenizerMapper.class);
            //此處的Combine操作意為即第每個mapper工作完了先局部reduce一下,最后再全局reduce
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //第0個參數是輸入目錄,第1個參數是輸出目錄
            //先判斷output path是否存在,如果存在則刪除
            Path path = new Path(args[1]);// 
            FileSystem fileSystem = path.getFileSystem(conf);
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
    
            //設置輸入目錄和輸出目錄
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }
    
    

    pom.xml中記得配置Hadoop的依賴環境:

        ...
      <!-- 集中定義版本號 -->
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <hadoop.version>3.3.1</hadoop.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <!-- 導入hadoop依賴環境 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-api</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
      </dependencies>
      ...
    </project>
    

    此外,因為我們的程序自帶輸入參數,我們還需要在VSCode的launch.json中配置輸入參數intput(代表輸入目錄)和output(代表輸出目錄):

    ...
    "args": [
        "input",
        "output"
    ],
    ...
    

    編譯運行完畢后,可以查看output文件夾下的part-r-00000文件:

    David	1
    Goodbye	1
    Hello	3
    Tom	1
    World	2
    

    可見我們的程序正確地完成了單詞計數的功能。

    參考

    posted @ 2022-05-24 19:45  orion-orion  閱讀(188)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看