BigData Hadoop 词频统计编程

1.编译WordCount.java 程序,将程序在Hadoop 集中运行

2.在HDFS中处理文件 进行上传

3.运行 Word Count.java

1.文件上传到 /home/hadoop/data/wordcount
2.切换到该目录下
3.Hadoop集群上运行 MapReduce的程序WordCount.java

gedit WordCount.java

4.在本地创建 wordcount_classes :

mkdir wordcount_classes

5.编译 WordCount.java. 输入下面的命令编译 WordCount.java 程序并设置正确的路径及输出目录.
用 -d (directory,目录)选择指定编译结果的 .class文件的存放目录:

javac -cp /home/hadoop/hadoop-2.9.0/share/hadoop/*   -d  wordcount_class WordCount.java

6.为编译的 wordcount目录创建一个.jar 文件. 因为需要将该 .jar 发送到集群的其他节点同时运行.

jar -cvf WordCount.jar -C wordcount_classes/ .

7.创建Map Reduce 输入文件 /tmp/MR-WordCount,上传到 YouTube 数据集当作 WordCount程序的输入文件.

Hadoop fs -mkdir /tmp/MR-WordCount

8.使用Hadoop的put命令把 YouTupe 数据集从本地系统的 /home/hadoop/data/wordcount 复制到HDFS 的 /tmp/MR-WrodCount目录:

hadoop fs -put YoutubeDataSets.txt  /tmp/MR-WordCount/

9.查看文件是否上传成功,命令如下:

hadoop fs -ls /tmp/MR-WordCount/

10.运行MapReduce

hadoop jar WordCount.jar  cn.hust.book.bigdata.ch04.WordCount 
/tmp/MR-WordCount/YoutubeDataSets.txt 
/tmp/MR-WordCount/output  

WordCount.java

import java.awt.JobAttributes;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.module.Configuration;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchEvent.Modifier;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import java.util.StringTokenizer;

import javax.naming.Context;

import org.w3c.dom.Text;

public class WordCount {
 public static class MyMapper extends Mapper<Object,Text,Text,IntWritable>{
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
   StringTokenizer itr = new
     StringTokenizer(value.toString(),"\t"); //以制表符分隔一行文本
    while(itr.hasMoreTokens()) {
     word.set(itr.hasMoreTokens());
     context.write(word,one);
    }
  }
 }
 
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
 public void reduce(Text Key,Iterable<IntWritable>values,Context context) throws IOException,InterruptedException  {
  int sum = 0;
  for(IntWritable value: values) { // 相同Key累计计数
   sum += value.get();
  }
  countext.write(key,new IntWritable(sum));
 }
}
 
public static void main(String[] args) {
   Configuration conf = new Configuration();
   String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
   Job job = new Job(conf,"WordCount");
   job.setJarByClass(WordCount.class);
   job.setMapperClass(MyMapper.class);
   job.setReducerClass(MyReducer.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(InyWritable.class);
   FileInputStream.addInputPath(job,new Pathath(otherArgs[0]));
   FileOutputStream.setPutputPath(job,new Path(otherArgs[1]));
   System.out.println(job,waitForCompletion(true) ? 0:1 );
 }
}