Apache Spark 예제: Java의 단어 수 프로그램


아파치 스파크

Apache Spark는 분산 환경에서 빅 데이터에 대한 분석 작업을 수행할 수 있는 오픈 소스 데이터 처리 프레임워크입니다. UC Berkeley의 학술 프로젝트였으며 2009년 UC Berkeley의 AMPLab에서 Matei Zaharia가 처음 시작했습니다. Apache Spark는 Mesos라는 클러스터 관리 도구를 기반으로 만들어졌습니다. 이는 분산 처리가 있는 클러스터 기반 환경에서 작동할 수 있도록 나중에 수정 및 업그레이드되었습니다.

Apache Spark 예제 프로젝트 설정

Maven을 사용하여 데모용 샘플 프로젝트를 만들 것입니다. 프로젝트를 생성하려면 작업 공간으로 사용할 디렉터리에서 다음 명령을 실행합니다.

mvn archetype:generate -DgroupId=com.journaldev.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

maven을 처음 실행하는 경우 maven이 생성 작업을 수행하기 위해 필요한 모든 플러그인과 아티팩트를 다운로드해야 하기 때문에 생성 명령을 수행하는 데 몇 초가 걸립니다. 프로젝트를 만든 후에는 즐겨 사용하는 IDE에서 자유롭게 엽니다. 다음 단계는 적절한 Maven 종속성을 프로젝트에 추가하는 것입니다. 다음은 적절한 종속성이 있는 pom.xml 파일입니다.

<dependencies>

    <!-- Import Spark -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>1.4.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.0.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                    <id>copy</id>
                    <phase>install</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>${project.build.directory}/lib</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

이것은 maven 기반 프로젝트이므로 실제로 머신에 Apache Spark를 설치하고 설정할 필요가 없습니다. 이 프로젝트를 실행하면 Apache Spark의 런타임 인스턴스가 시작되고 프로그램 실행이 완료되면 종료됩니다. 마지막으로, 이 종속성을 추가할 때 프로젝트에 추가된 모든 JAR을 이해하기 위해 일부 종속성을 추가할 때 프로젝트에 대한 전체 종속성 트리를 볼 수 있는 간단한 Maven 명령을 실행할 수 있습니다. 사용할 수 있는 명령은 다음과 같습니다.

mvn dependency:tree

이 명령을 실행하면 다음 종속성 트리가 표시됩니다.

shubham:JD-Spark-WordCount shubham$ mvn dependency:tree

[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------------< com.journaldev:java-word-count >-------------------
[INFO] Building java-word-count 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ java-word-count ---
[INFO] com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-core_2.11:jar:1.4.0:compile
[INFO] |  +- com.twitter:chill_2.11:jar:0.5.0:compile
[INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
[INFO] |  |     +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
[INFO] |  |     +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  |     \- org.objenesis:objenesis:jar:1.2:compile
[INFO] |  +- com.twitter:chill-java:jar:0.5.0:compile
[INFO] |  +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile
[INFO] |  |  |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  |  |  +- org.apache.commons:commons-math:jar:2.1:compile
[INFO] |  |  |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  |  |  +- commons-io:commons-io:jar:2.1:compile
[INFO] |  |  |  +- commons-logging:commons-logging:jar:1.1.1:compile
[INFO] |  |  |  +- commons-lang:commons-lang:jar:2.5:compile
[INFO] |  |  |  +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] |  |  |  |  +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] |  |  |  |  +- commons-digester:commons-digester:jar:1.8:compile
[INFO] |  |  |  |  |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] |  |  |  |  \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] |  |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile
[INFO] |  |  |  +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] |  |  |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] |  |  |     \- org.tukaani:xz:jar:1.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile
[INFO] |  |  |  \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:compile
[INFO] |  |  |  |  +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile
[INFO] |  |  |  |  |  +- com.google.inject:guice:jar:3.0:compile
[INFO] |  |  |  |  |  |  +- javax.inject:javax.inject:jar:1:compile
[INFO] |  |  |  |  |  |  \- aopalliance:aopalliance:jar:1.0:compile
[INFO] |  |  |  |  |  +- com.sun.jersey.jersey-test-framework:jersey-test-framework-grizzly2:jar:1.9:compile
[INFO] |  |  |  |  |  |  +- com.sun.jersey.jersey-test-framework:jersey-test-framework-core:jar:1.9:compile
[INFO] |  |  |  |  |  |  |  +- javax.servlet:javax.servlet-api:jar:3.0.1:compile
[INFO] |  |  |  |  |  |  |  \- com.sun.jersey:jersey-client:jar:1.9:compile
[INFO] |  |  |  |  |  |  \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-https:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |  \- org.glassfish.grizzly:grizzly-framework:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |     \- org.glassfish.gmbal:gmbal-api-only:jar:3.0.0-b023:compile
[INFO] |  |  |  |  |  |     |        \- org.glassfish.external:management-api:jar:3.0.0-b012:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-http-server:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |  \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-http-servlet:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     \- org.glassfish:javax.servlet:jar:3.1:compile
[INFO] |  |  |  |  |  +- com.sun.jersey:jersey-json:jar:1.9:compile
[INFO] |  |  |  |  |  |  +- org.codehaus.jettison:jettison:jar:1.1:compile
[INFO] |  |  |  |  |  |  |  \- stax:stax-api:jar:1.0.1:compile
[INFO] |  |  |  |  |  |  +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
[INFO] |  |  |  |  |  |  |  \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] |  |  |  |  |  |  |     \- javax.activation:activation:jar:1.1:compile
[INFO] |  |  |  |  |  |  +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
[INFO] |  |  |  |  |  |  \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
[INFO] |  |  |  |  |  \- com.sun.jersey.contribs:jersey-guice:jar:1.9:compile
[INFO] |  |  |  |  \- org.apache.hadoop:hadoop-yarn-server-common:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-yarn-api:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:compile
[INFO] |  |  \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile
[INFO] |  +- org.apache.spark:spark-launcher_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-network-common_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-network-shuffle_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-unsafe_2.11:jar:1.4.0:compile
[INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] |  |  +- commons-codec:commons-codec:jar:1.3:compile
[INFO] |  |  \- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] |  +- org.apache.curator:curator-recipes:jar:2.4.0:compile
[INFO] |  |  +- org.apache.curator:curator-framework:jar:2.4.0:compile
[INFO] |  |  |  \- org.apache.curator:curator-client:jar:2.4.0:compile
[INFO] |  |  +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
[INFO] |  |  |  \- jline:jline:jar:0.9.94:compile
[INFO] |  |  \- com.google.guava:guava:jar:14.0.1:compile
[INFO] |  +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  +- org.slf4j:slf4j-api:jar:1.7.10:compile
[INFO] |  +- org.slf4j:jul-to-slf4j:jar:1.7.10:compile
[INFO] |  +- org.slf4j:jcl-over-slf4j:jar:1.7.10:compile
[INFO] |  +- log4j:log4j:jar:1.2.17:compile
[INFO] |  +- org.slf4j:slf4j-log4j12:jar:1.7.10:compile
[INFO] |  +- com.ning:compress-lzf:jar:1.0.3:compile
[INFO] |  +- org.xerial.snappy:snappy-java:jar:1.1.1.7:compile
[INFO] |  +- net.jpountz.lz4:lz4:jar:1.2.0:compile
[INFO] |  +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
[INFO] |  +- commons-net:commons-net:jar:2.2:compile
[INFO] |  +- org.spark-project.akka:akka-remote_2.11:jar:2.3.4-spark:compile
[INFO] |  |  +- org.spark-project.akka:akka-actor_2.11:jar:2.3.4-spark:compile
[INFO] |  |  |  \- com.typesafe:config:jar:1.2.1:compile
[INFO] |  |  +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] |  |  +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
[INFO] |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
[INFO] |  +- org.spark-project.akka:akka-slf4j_2.11:jar:2.3.4-spark:compile
[INFO] |  +- org.scala-lang:scala-library:jar:2.11.6:compile
[INFO] |  +- org.json4s:json4s-jackson_2.11:jar:3.2.10:compile
[INFO] |  |  \- org.json4s:json4s-core_2.11:jar:3.2.10:compile
[INFO] |  |     +- org.json4s:json4s-ast_2.11:jar:3.2.10:compile
[INFO] |  |     \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] |  |        \- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] |  |           +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] |  |           \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] |  +- com.sun.jersey:jersey-server:jar:1.9:compile
[INFO] |  |  \- asm:asm:jar:3.1:compile
[INFO] |  +- com.sun.jersey:jersey-core:jar:1.9:compile
[INFO] |  +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
[INFO] |  +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] |  +- com.clearspring.analytics:stream:jar:2.7.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
[INFO] |  +- com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
[INFO] |  +- org.apache.ivy:ivy:jar:2.4.0:compile
[INFO] |  +- oro:oro:jar:2.0.8:compile
[INFO] |  +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
[INFO] |  |  \- org.tachyonproject:tachyon:jar:0.6.4:compile
[INFO] |  +- net.razorvine:pyrolite:jar:4.4:compile
[INFO] |  +- net.sf.py4j:py4j:jar:0.8.2.1:compile
[INFO] |  \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO]    \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2018-04-07T15:50:34+05:30
[INFO] ------------------------------------------------------------------------

두 개의 추가된 종속성으로 Spark는 Scala 종속성을 포함하는 프로젝트에서 필요한 모든 종속성을 수집했으며 Apache Spark는 Scala 자체로 작성되었습니다.

입력 파일 생성

Word Counter 프로그램을 생성할 때 프로젝트의 루트 디렉터리에 input.txt라는 이름으로 프로젝트에 대한 샘플 입력 파일을 생성합니다. 그 안에 내용을 넣으면 다음 텍스트를 사용합니다.

Hello, my name is Shubham and I am author at JournalDev . JournalDev is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.

Big Data lessons are difficult to find but at JournalDev , you can find some excellent
pieces of lessons written on Big Data.

이 파일의 텍스트를 자유롭게 사용하십시오.

프로젝트 구조

워드카운터 만들기

이제 프로그램 작성을 시작할 준비가 되었습니다. 빅 데이터 프로그램으로 작업을 시작하면 가져오기로 인해 많은 혼란이 생길 수 있습니다. 이를 방지하기 위해 프로젝트에서 사용할 모든 가져오기는 다음과 같습니다.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

다음으로 사용할 클래스의 구조는 다음과 같습니다.

package com.journaldev.sparkdemo;

...imports...

public class WordCounter {

    private static void wordCount(String fileName) {
        ...
    }

    public static void main(String[] args) {
        ...
    }
}

모든 논리는 wordCount 메서드 안에 있습니다. SparkConf 클래스에 대한 개체를 정의하는 것으로 시작하겠습니다. 이 클래스는 다양한 Spark 매개변수를 프로그램의 키-값 쌍으로 설정하는 데 사용되는 개체입니다. 간단한 매개변수만 제공합니다.

SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

master는 이 프로그램이 localhost에서 실행 중인 Spark 스레드에 연결해야 함을 의미하는 local을 지정합니다. 앱 이름은 Spark에 애플리케이션 메타데이터를 제공하는 방법일 뿐입니다. 이제 이 구성 개체를 사용하여 Spark 컨텍스트 개체를 생성할 수 있습니다.

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

Spark는 처리할 모든 리소스를 RDD(Resilient Distributed Datasets)로 간주하여 훨씬 더 효율적으로 분석할 수 있는 찾기 데이터 구조로 데이터를 구성하는 데 도움을 줍니다. 이제 입력 파일을 JavaRDD 개체 자체로 변환합니다.

JavaRDD<String> inputFile = sparkContext.textFile(fileName);

이제 Java 8 API를 사용하여 JavaRDD 파일을 처리하고 파일에 포함된 단어를 별도의 단어로 분할합니다.

JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

다시 말하지만 Java 8 mapToPair(...) 메서드를 사용하여 단어를 세고 출력으로 표시할 수 있는 단어, 숫자 쌍을 제공합니다.

JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

이제 출력 파일을 텍스트 파일로 저장할 수 있습니다.

countData.saveAsTextFile("CountData");

마지막으로 main() 메서드를 사용하여 프로그램에 진입점을 제공할 수 있습니다.

public static void main(String[] args) {
    if (args.length == 0) {
        System.out.println("No files provided.");
        System.exit(0);
    }
    wordCount(args[0]);
}

전체 파일은 다음과 같습니다.

package com.journaldev.sparkdemo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class WordCounter {

    private static void wordCount(String fileName) {

        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        JavaRDD<String> inputFile = sparkContext.textFile(fileName);

        JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

        JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

        countData.saveAsTextFile("CountData");
    }

    public static void main(String[] args) {

        if (args.length == 0) {
            System.out.println("No files provided.");
            System.exit(0);
        }

        wordCount(args[0]);
    }
}

이제 Maven 자체를 사용하여 이 프로그램을 실행해 보겠습니다.

애플리케이션 실행

응용 프로그램을 실행하려면 프로그램의 루트 디렉터리로 이동하여 다음 명령을 실행합니다.

mvn exec:java -Dexec.mainClass=com.journaldev.sparkdemo.WordCounter -Dexec.args="input.txt"

결론

이 레슨에서는 Maven 기반 프로젝트에서 Apache Spark를 사용하여 간단하지만 효과적인 단어 카운터 프로그램을 만드는 방법을 살펴보았습니다. 사용 가능한 빅 데이터 도구 및 처리 프레임워크에 대한 더 깊은 지식을 얻으려면 더 많은 빅 데이터 게시물을 읽으십시오.

소스 코드 다운로드

Spark WordCounter 프로젝트 다운로드: JD-Spark-WordCount