返回文章列表

First Handmade MapReduce Example

2017-06-21
2 分鐘
HadoopMapReduceJava

改良 WordCounting 的結果

此篇改良 WordCounting 範例,在 map function 中過濾附著在單字上的標點符號。過去 "Hello"、"Hello!"、"Hello…"、"Hello," 會被視為不同的 key 分別計算;改良後統一視為同一個 key,輸出結果為 [Hello, 4]

開發環境

  • IDE:Eclipse
  • 相依管理:Maven(從 Maven Repository 取得函式庫)

專案步驟

(1) 建立新的 Maven Project

在 Eclipse 中建立新的 Maven 專案。

(2) 設定 pom.xml

pom.xml 加入以下相依:

  • hadoop-mapreduce-client-core
  • hadoop-common (version 2.6.0)

(3) 建立新的 Class 檔案

在 Eclipse 中建立新的 class 檔案。

(4) 實作程式碼

WordCount.java
package wordcount;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

import java.io.IOException;
import java.util.Iterator;

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String line = value.toString();
            char[] charArray = line.toCharArray();
            int lastIndex = -1;
            for (int i = 0; i < charArray.length; i++) {
                char current = charArray[i];
                if (!isEnglish(current)) {
                    if ((i - lastIndex) > 1) {
                        String candidate = line.substring(lastIndex + 1, i);
                        word.set(candidate);
                        output.collect(word, one);
                    }
                    lastIndex = i;
                }
            }
        }
    }

    public static boolean isEnglish(char c) {
        return (c >= 65 && c <= 90) || (c >= 97 && c <= 122);
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

程式碼重點:

  • isEnglish() 方法過濾非字母字元
  • map function 以 char array 處理字元,擷取不含標點的單字
  • reduce function 累加相同 key 的次數

(5) 打包成 JAR 檔

用 Maven 將完成的程式碼編譯並打包成可執行的 .jar 檔案。

(6) 部署並執行

.jar 部署到 Hadoop 並以標準 hadoop 指令執行,流程與先前的 MapReduce WordCounting 範例相同。

原文發表於 Medium

Command Palette

Search for a command to run...