返回文章列表

Find the Median Word

2017-06-21
1 分鐘
HadoopMapReduceJava

1. 介紹

延續先前的 WordCounting MapReduce 範例,本篇計算「中位數單字」:在所有不重複單字按出現次數排序後,位於中間位置的那個頻次群組。

範例

給定以下詞頻:

WordTotal Appearances
hadoop3
hdfs1
mapreduce5
pig5
zookeeper5
hive4
oozie1
yarn1
avro2
spark3
mahout2
hbase4

按頻次整理後,中位數群組為頻次 2 的「pig, avro, mahout」。

2. 程式邏輯與實作

Main Driver Class

start.java
package mediumWord;

import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.BasicConfigurator;

public class start extends Configured implements Tool {
  public enum MediumCounter {
    SUM
  }

  public static void main(String[] args) throws Exception {
    BasicConfigurator.configure();
    int res = ToolRunner.run(new Configuration(), new start(), args);
    System.exit(res);
  }

  public int run(String[] args) throws Exception {
    Configuration conf = this.getConf();

    File outDir = new File(args[1]+"/second");
    if(outDir.exists()) { FileUtils.deleteDirectory(outDir); }
    File outDir2 = new File(args[1]+"/first");
    if(outDir2.exists()) { FileUtils.deleteDirectory(outDir2); }

    // Job 1: Count words
    Job job = Job.getInstance(conf, "Tool Job");
    job.setJarByClass(CountKeyword.class);
    job.setMapperClass(CountKeyword.ConutMapper.class);
    job.setCombinerClass(CountKeyword.CountReducer.class);
    job.setReducerClass(CountKeyword.CountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]+"/first"));

    // Job 2: Find median
    Job job2 = Job.getInstance(conf, "Tool Job2");
    job2.setJarByClass(Medium.class);
    job2.setMapperClass(Medium.ExchangeKeyValueMapper.class);
    job2.setReducerClass(Medium.WordCountingReducer.class);
    job2.setMapOutputKeyClass(IntWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(IntWritable.class);
    job2.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job2, new Path(args[1]+"/first"));
    FileOutputFormat.setOutputPath(job2, new Path(args[1]+"/second"));

    return job.waitForCompletion(true) && job2.waitForCompletion(true) ? 0 : 1;
  }
}

Word Counting Phase

CountKeyword.java
package mediumWord;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class CountKeyword {

  public static boolean isEnglish(char c) {
    return (c >= 65 && c <= 90) || (c >= 97 && c <= 122);
  }

  public static class ConutMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
      String line = value.toString();
      char[] charArray = line.toCharArray();
      int lastIndex = -1;
      for (int i = 0; i < charArray.length; i++) {
        char current = charArray[i];
        if (!isEnglish(current)) {
          if ((i - lastIndex) > 1) {
            String candidate = line.substring(lastIndex + 1, i);
            word.set(candidate);
            context.write(word, one);
          }
          lastIndex = i;
        }
      }
    }
  }

  public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) { sum += val.get(); }
      result.set(sum);
      context.write(key, result);
    }
  }
}

Median Calculation Phase

Medium.java
package mediumWord;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import mediumWord.start.MediumCounter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class Medium {

  public static class ExchangeKeyValueMapper extends Mapper<Object, Text, IntWritable, Text> {
    private final static IntWritable count = new IntWritable();
    private Text word = new Text();
    private static Pattern linePtn = Pattern.compile("(?<word>[A-Za-z]*)\s(?<count>[0-9]+)");

    @Override
    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
      String line = value.toString();
      Matcher match = linePtn.matcher(line);
      if(match.matches()){
        count.set(Integer.parseInt(match.group("count")));
        word.set(match.group("word"));
        context.write(count, word);
      }
    }
  }

  public static class WordCountingReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    @Override
    public void reduce(IntWritable key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      StringBuilder sb = new StringBuilder();
      int size = 0;
      sb.append("{wordlist:[");
      for (Text val : values) {
        sb.append("\""+val.toString()+"\",");
        context.getCounter(MediumCounter.SUM).increment(1);
        size++;
      }
      sb.replace(sb.toString().length()-1, sb.toString().length(), "");
      sb.append("],wordlength:"+size+",");
      sb.append("sum:"+context.getCounter(MediumCounter.SUM).getValue()+"}");
      context.write(key, new Text(sb.toString()));
    }
  }
}

本方案採用兩個串接的 MapReduce Job:第一個 Job 統計詞頻,第二個 Job 交換 key 與 value,以頻次為 key 將單字分群,便於找出中位數。

原文發表於 Medium

Command Palette

Search for a command to run...