简单的word count

maven 依赖

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
</dependency>

map

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCountMap extends Mapper<Object, Text, Text,LongWritable> {
private final LongWritable ONE =new LongWritable(1);
private final Text WORD = new Text();
@Override
public void map(Object key, Text value, Mapper<Object, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString());
while (st.hasMoreTokens()){
WORD.set(st.nextToken());
context.write(WORD,ONE);
}
}
}

reduce

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class WordCountReduce extends Reducer<Text, LongWritable, Text,LongWritable> {
private final LongWritable RESULT = new LongWritable();
@Override
protected void reduce(Text word, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
int sum =0;
for (LongWritable value : values) {
sum +=value.get();
}
RESULT.set(sum);
context.write(word,RESULT);
}
}

client

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Client {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(Client.class);
job.setMapperClass(WordCountMap.class);
job.setCombinerClass(WordCountReduce.class);
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}

简单的word count排序

将上一步的输出作为这次的输入

SortMap

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class SortMap extends Mapper<Object, Text,LongWritable, Text> {
private final LongWritable KEY =new LongWritable(1);
private final Text VALUE = new Text();

@Override
protected void map(Object key, Text value, Mapper<Object, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] s1 = s.split("\t");
KEY.set(Integer.valueOf(s1[1]));
VALUE.set(s1[0]);
context.write(KEY,VALUE);
}
}

SortReduce

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SortReduce extends Reducer<LongWritable, Text,LongWritable, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
context.write(key,values.iterator().next());
}
}

SortClient

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortClient {
public static void main(String[] args) throws E`xception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, " sort word count");
job.setJarByClass(SortClient.class);
job.setMapperClass(SortMap.class);
job.setCombinerClass(SortReduce.class);
job.setReducerClass(SortReduce.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

博主主页

可以加博主微信一起交流:twobixiaoxin