<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • Spark: 單詞計數(Word Count)的MapReduce實現(Java/Python)

    1 導引

    我們在博客《Hadoop: 單詞計數(Word Count)的MapReduce實現 》中學習了如何用Hadoop-MapReduce實現單詞計數,現在我們來看如何用Spark來實現同樣的功能。

    2. Spark的MapReudce原理

    Spark框架也是MapReduce-like模型,采用“分治-聚合”策略來對數據分布進行分布并行處理。不過該框架相比Hadoop-MapReduce,具有以下兩個特點:

    • 對大數據處理框架的輸入/輸出,中間數據進行建模,將這些數據抽象為統一的數據結構命名為彈性分布式數據集(Resilient Distributed Dataset),并在此數據結構上構建了一系列通用的數據操作,使得用戶可以簡單地實現復雜的數據處理流程。

    • 采用了基于內存的數據聚合、數據緩存等機制來加速應用執行尤其適用于迭代和交互式應用。

    Spark社區推薦用戶使用Dataset、DataFrame等面向結構化數據的高層API(Structured API)來替代底層的RDD API,因為這些高層API含有更多的數據類型信息(Schema),支持SQL操作,并且可以利用經過高度優化的Spark SQL引擎來執行。不過,由于RDD API更基礎,更適合用來展示基本概念和原理,后面我們的代碼都使用RDD API。

    Spark的RDD/dataset分為多個分區。RDD/Dataset的每一個分區都映射一個或多個數據文件, Spark通過該映射讀取數據輸入到RDD/dataset中。

    Spark的分區數和以下參數都有關系:

    • spark.default.parallelism (默認為CPU的核數)

    • spark.sql.files.maxPartitionBytes (默認為128 MB)讀取文件時打包到單個分區中的最大字節數)

    • spark.sql.files.openCostInBytes (默認為4 MB) 該參數默認4M,表示小于4M的小文件會合并到一個分區中,用于減小小文件,防止太多單個小文件占一個分區情況。這個參數就是合并小文件的閾值,小于這個閾值的文件將會合并。

    我們下面的流程描述中,假設每個文件對應一個分區(實際上因為文件很小,導致三個文件都在同一個分區中,大家可以通過調用RDD對象的getNumPartitions()查看)。

    Spark的Map示意圖如下:

    Spark的Reduce示意圖如下:

    3. Word Count的Java實現

    項目架構如下圖:

    Word-Count-Spark
    ├─ input
    │  ├─ file1.txt
    │  ├─ file2.txt
    │  └─ file3.txt
    ├─ output
    │  └─ result.txt
    ├─ pom.xml
    ├─ src
    │  ├─ main
    │  │  └─ java
    │  │     └─ WordCount.java
    │  └─ test
    └─ target
    

    WordCount.java文件如下:

    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.sql.SparkSession;
    
    import scala.Tuple2;
    import java.util.Arrays;
    import java.util.List;
    import java.util.regex.Pattern;
    import java.io.*;
    import java.nio.file.*;
    
    public class WordCount {
    	private static Pattern SPACE = Pattern.compile(" ");
    
    	public static void main(String[] args) throws Exception {
    		if (args.length != 2) {
    			System.err.println("Usage: WordCount <intput directory> <output directory>");
    			System.exit(1);
    		}
            String input_path = args[0];
            String output_path = args[1];
    
    		SparkSession spark = SparkSession.builder()
    			.appName("WordCount")
    			.master("local")
    			.getOrCreate();
    
    		JavaRDD<String> lines = spark.read().textFile(input_path).javaRDD();
    
    		JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
    		JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
    		JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
    
    		List<Tuple2<String, Integer>> output = counts.collect();
    
            String filePath = Paths.get(output_path, "result.txt").toString();
            BufferedWriter out = new BufferedWriter(new FileWriter(filePath));
    		for (Tuple2<?, ?> tuple : output) {
    			out.write(tuple._1() + ": " + tuple._2() + "\n");
    		}
    		out.close();
            spark.stop();
    	}
    }
    

    pom.xml文件配置如下:

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.WordCount</groupId>
      <artifactId>WordCount</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <name>WordCount</name>
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>
    
      <!-- 集中定義版本號 -->
      <properties>
        <scala.version>2.12.10</scala.version>
        <scala.compat.version>2.12</scala.compat.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <project.timezone>UTC</project.timezone>
        <java.version>11</java.version>
        <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
        <site.plugin.version>3.7.1</site.plugin.version>
        <scalatest.version>3.1.2</scalatest.version>
        <scalatest-maven-plugin>2.0.0</scalatest-maven-plugin>
        <scala.maven.plugin.version>4.4.0</scala.maven.plugin.version>
        <maven.compiler.plugin.version>3.8.0</maven.compiler.plugin.version>
        <maven.javadoc.plugin.version>3.2.0</maven.javadoc.plugin.version>
        <maven.source.plugin.version>3.2.1</maven.source.plugin.version>
        <maven.deploy.plugin.version>2.8.2</maven.deploy.plugin.version>
        <nexus.staging.maven.plugin.version>1.6.8</nexus.staging.maven.plugin.version>
        <maven.help.plugin.version>3.2.0</maven.help.plugin.version>
        <maven.gpg.plugin.version>1.6</maven.gpg.plugin.version>
        <maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spark.version>3.2.1</spark.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <!--======SCALA======-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
      </dependencies>
    
    
      <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
          <plugins>
            <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
            <plugin>
              <artifactId>maven-clean-plugin</artifactId>
              <version>3.1.0</version>
            </plugin>
            <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
            <plugin>
              <artifactId>maven-resources-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>2.22.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-install-plugin</artifactId>
              <version>2.5.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-deploy-plugin</artifactId>
              <version>2.8.2</version>
            </plugin>
            <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
            <plugin>
              <artifactId>maven-site-plugin</artifactId>
              <version>3.7.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-project-info-reports-plugin</artifactId>
              <version>3.0.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
              <configuration>
                  <source>11</source>
                  <target>11</target>
                  <fork>true</fork>
                  <executable>/Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac</executable>
              </configuration>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </project>
    
    

    記得配置輸入參數inputoutput代表輸入目錄和輸出目錄(在VSCode中在launch.json文件中配置)。編譯運行后可在output目錄下查看result.txt

    Tom: 1
    Hello: 3
    Goodbye: 1
    World: 2
    David: 1
    

    可見成功完成了單詞計數功能。

    4. Word Count的Python實現

    先使用pip按照pyspark==3.8.2

    pip install pyspark==3.8.2
    

    注意PySpark只支持Java 8/11,請勿使用更高級的版本。這里我使用的是Java 11。運行java -version可查看本機Java版本。

    (base) orion-orion@MacBook-Pro ~ % java -version
    java version "11.0.15" 2022-04-19 LTS
    Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
    Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)
    

    項目架構如下:

    Word-Count-Spark
    ├─ input
    │  ├─ file1.txt
    │  ├─ file2.txt
    │  └─ file3.txt
    ├─ output
    │  └─ result.txt
    ├─ src
    │  └─ word_count.py
    

    word_count.py編寫如下:

    from pyspark.sql import SparkSession
    import sys
    import os
    from operator import add
    
    if len(sys.argv) != 3:
        print("Usage: WordCount <intput directory> <output directory>", file=sys.stderr)
        exit(1)
         
    input_path, output_path = sys.argv[1], sys.argv[2]
    
    spark = SparkSession.builder.appName("WordCount").master("local").getOrCreate()
    
    lines = spark.read.text(input_path).rdd.map(lambda r: r[0])
    
    counts = lines.flatMap(lambda s: s.split(" "))\
        .map(lambda word: (word, 1))\
        .reduceByKey(add)
    
    output = counts.collect()
    
    with open(os.path.join(output_path, "result.txt"), "wt") as f:
        for (word, count) in output:
            f.write(str(word) +": " + str(count) + "\n")
    
    spark.stop()
    
    

    使用python word_count.py input output運行后,可在output中查看對應的輸出文件result.txt

    Hello: 3
    World: 2
    Goodbye: 1
    David: 1
    Tom: 1
    

    可見成功完成了單詞計數功能。

    參考

    posted @ 2022-05-26 20:24  orion-orion  閱讀(203)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看