1、什么是MapReduce

1.1 举个例子

果园里有海量苹果,我们假设有三种颜色,红色、黄色、绿色,现在把苹果采摘完成了,并放到了一起,如下图所示


我们现在需要分别统计每种颜色的苹果的个数,怎么实现呢?

我们可以找一个人,来完成这个工作,但是别忘了,我们面对的是海量的苹果,所以我们需要很多人来一起完成这个工作。也就是说,这海量的苹果被分成了若干份,每个人处理一份,如下图所示:


每个人只处理自己的那一份,他们的工作就是把自己分到的那份苹果,按照颜色进行分类,如下图所示:


有很多人都完成了自己那份工作,那还差了一步,就是按照颜色进行汇总,也就是如下图所示:

这样我们就统计了每个颜色的苹果的个数了。我们为什么要讲这个例子呢?其实这个例子就概括了什么是MapReduce。

1.2 什么是MapReduce

Mapreduce是一个分布式、运算程序的编程框架,把海量的数据,切分成若干个小份,每台服务器负责若干个小份,对数据进行运算。那么海量的数据就平均分配到若干台服务器上。MapReduce所处理的数据是放在HDFS之上的。Mapreduce核心功能是将用户编写的业务逻辑代码,并发运行在一个hadoop集群上。

2、MapReduce核心思想

MapReduce实现的分布式的运算,程序往往需要分成至少2个阶段

  • 第一个阶段的map task并发执行,完全并行运行,互不相干
  • 第二个阶段的reduce task并发执行,互不相干,他们的数据依赖于所有maptask的输出

再拿上面例子来看一下:

map task有若干个,每个map task处理一部分数据,对数据进行分类等操作,并得到一个结果数据,然后每个map task处理之后的数据,会被拿到reduce task端进行合并。所以简单来说:

  • map task是用来处理每一小分数据
  • reduce task是用来合并map task成立之后的数据

3、MapReduce编码规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver

1)Mapper阶段

  • 用户自定义的Mapper要继承自己的父类 Map
  • Mapper的输入数据是KV对的形式(KV的类型可自定义
  • Mapper中的业务逻辑写在自定义map()方法中
  • Mapper的输出数据是KV对的形式(KV的类型可自定义
  • map task进程 对每一个<K,V>调用一次map()方法

2)Reducer阶段

  • 用户自定义的Reducer要继承自己的父类Reducer
  • Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  • Reducer的业务逻辑写在自定义reduce()方法中
  • reduce task进程对每一组相同k的<k,v>组调用一次reduce()方法

3)Driver阶段

  • 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象

4、MapReduce模拟示例

现在我们来模拟上面的案例,通过如下代码生成一些苹果

import random
f = open('AppleCount.txt','w')
apple = ['红 ','黄 ','蓝 ','绿 ','\n']
for i in range(1000):
    choice = random.randint(0,4)
    if choice == 0:
        f.write(apple[0])
    if choice == 1:
        f.write(apple[1])
    if choice == 2:
        f.write(apple[2])
    if choice == 3:
        f.write(apple[3])
    if choice == 4:
        f.write(apple[4])
f.close()

通过随机数0-4表示红、黄、蓝、绿四种颜色的苹果,并通过\n换行,一行数据表示一个人的任务。

AppleCount.txt

部分数据如下

蓝 蓝 绿 红 蓝 蓝 黄 
绿 红 蓝 红 红 黄 黄 蓝 蓝 红 蓝 
红 绿 
绿 蓝 黄 红 蓝 黄 
红 蓝 绿 
红 蓝 黄 黄 黄 蓝 红 
蓝 黄 黄 黄 黄 红 
黄 绿 绿 
蓝 蓝 红 蓝 绿 黄 红 黄 
绿 红 绿 
黄 绿 红 
绿 红 黄 黄 红 
绿 蓝 蓝 

下面开始开始实现逻辑

1)Mapper阶段逻辑

默认配置好了hadoop的jar包,能上外网的可以添加依赖,我是内网所以配置的Libraries。jar包自取

自定义ApperMapper类继承Mapper类并复写map方法

package AppleCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class AppleMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        super.map(key, value, context);
    }
}

[包不要导错了,看清楚是mapreduce的]

Mapper<LongWritable, Text, Text, IntWritable>

Mapper有四个参数分别是传入数据Key、传入数据Value、传出数据Key、传出数据Value,传入数据Key又称偏移值,仅记录每行数据的位置并无其他意思,传入数据Value根据上面所说即为一行数据,后三个数据类型需根据业务逻辑自行改变,注意:Java中的数据类型并不适用Hadoop,但其对应关系如下:

Java类型    Hadoop Writable类型
boolean        BooleanWritable
byte        ByteWritable
int            IntWritable
float        FloatWritable
long        LongWritable
double        DoubleWritable
string        Text
map            MapWritable
array        ArrayWritable

下面我们分析逻辑,首先Map端传入一行数据

绿 红 蓝 红 红 黄 黄 蓝 蓝 红 蓝 

这是分给我的苹果,我需要做的仅仅是统计这一行不同颜色苹果的个数即可,传入的是Text类型,我们需要将其转换为string类型,注意到每个苹果是用空格割开,因此我们可以按空格对字符串进行切割得到没一个苹果

String[] line = value.toString().split(" ");

如何统计个数???

遍历,我们可以遍历line里面的每一个苹果,把它的颜色作为Key,Value记为1表示一个苹果的意思,把这组KV对通过context传给Reduce端

 for (String Line : line){
     context.write(new Text(Line), new IntWritable(1));
 }

我们可以写出Map端传出的KV对

< 绿 , 1 >
< 红 , 1 >
< 蓝 , 1 >
< 红 , 1 >
< 红 , 1 >
< 黄 , 1 >
< 黄 , 1 >
< 蓝 , 1 >
< 蓝 , 1 >
< 红 , 1 >
< 蓝 , 1 >

2)Reducer阶段逻辑

其实在传入Reduce端之前会对其相同的Key进行一次整合,例如上面的会被整合为

< 红 , [1,1,1,1] >
< 黄 , [1,1] >
< 蓝 , [1,1,1,1] >
< 绿 , [1] >

传入Reduce端是上面的KV对形势,那么Reduce类同理,自定义AppleReducer类并继承Reducer,复写reduce方法,同样Reducer也有四个参数,前面为接收Map端数据,那么应该保持一致,后面的两个参数为输出KV对数据类型,根据业务逻辑自定义。

package AppleCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class AppleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        super.reduce(key, values, context);
    }
}

下面分析reduce逻辑,在这里面其实已经达到业务要求,只需要统计Value有多少个1即可,因此可以遍历Value实现累加,注意这里的1的数据类型是IntWritable,因此我们可以申明一个count来计数直接输出即可。

int count = 0;
for (IntWritable Values : values){
    count += 1;
}
context.write(key, new IntWritable(count));

3)Driver阶段逻辑

至于Driver端可以说是一个模板,需要修改的地方注释以标出,完整代码如下:

package AppleCount;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class AppleDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();
        job.setJarByClass(AppleDriver.class);//指出程序入口

        job.setMapperClass(AppleMapper.class);//指出Mapper类
        job.setMapOutputKeyClass(Text.class);//指出map输出key类型
        job.setMapOutputValueClass(IntWritable.class);//指出map输出value类型

        job.setReducerClass(AppleReducer.class);//指出Reducer类
        job.setOutputKeyClass(Text.class);//指出reduce输出key类型
        job.setOutputValueClass(IntWritable.class);//指出reduce输出value类型

        FileInputFormat.addInputPath(job,new Path("/Data/AppleCount.txt"));//数据存放路径
        FileOutputFormat.setOutputPath(job,new Path("/Data/AppleCount"));//结果存放路径

        job.waitForCompletion(true);    //启动程序
    }
}

注意:数据存放的路径及结果存放的路径是HDFS里的路径,不是本地,不是本地,不是本地

完整Mapper代码如下:

package AppleCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class AppleMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] line = value.toString().split(" ");
        for (String Line : line){
            context.write(new Text(Line), new IntWritable(1));
        }
    }
}

完整Reducer代码如下:

package AppleCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class AppleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable Values : values){
            count += 1;
        }
        context.write(key, new IntWritable(count));
    }
}

最后配置一下Artifacts,打一个jar包,并记录程序入口的本地相对路径[右击AppleDrive选择Copy Reference]

AppleCount.AppleDriver

需要删除jar包里META-INF文件夹里的MSFTSIG.RSA和MSFTSIG.SF,最后我们需要将jar包导入Linux系统中,将数据AppleCount上传到程序指定的HDFS路径,注意结果存放路径的文件夹在HDFS一定要不存在,不存在,不存在,程序会自己创建,然后就是上一篇博文的知识了,启动HDFS—上传文件,最后运行jar包,代码如下:

hadoop jar /MapReduce/Blog.jar AppleCount.AppleDriver

/MapReduce/Blog.jar是你jar包在Linux下的位置,当看到绿绿的successfully表示程序运行成功。

来到我们程序设定的结果存放路径我们可以看两个文件_SUCCESS日志文件,说明成功了,part文件存放我们的结果

可以用cat命令查看

hdfs dfs -cat /Data/AppleCount/p*

其最中结果如下[因为数据未清洗,可能会统计一些空格数量]

红      222
绿      194
蓝      190
黄      205

当然,说了这么多其实完全可以用word直接查找^_^,开个玩笑,例子很简单,主要了解原理懂得运用……[上述源码、数据、jar包这里自取(https://www.lanzous.com/i8ygv5e)

  • 初学Hadoop博客仅为存放一些笔记,若能帮到你一丝万分荣幸,如有错误欢迎评论留言

只喜欢学习