• 曾经的文章介绍过Hadoop中的数据类型与Java中的基本数据类型的区别与联系,在实际需求时Hadoop提供的数据类型往往是不够用的,因此文本着重解决这个问题,有关Hadoop序列化的自定义。本文需要有一定的MapReduce基础

一、序列化

  • 什么是序列化?

    • 将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程。
    • 内存对象 >> 字节序列
  • 什么是反序列化?

    • 将字节流转换为一系列结构化对象的过程。
    • 磁盘的持久化数据 >> 内存对象

二、为什么要序列化

序列化可以储存“活的”对象,可以将“活的”对象发送到远程计算机

三、Hadoop的序列化

Java基本数据类型与Hadoop数据类型的对比

  • 为什么Hadoop不沿用java的序列化
    • Java的序列化是一个重量级框架(Serializable),对象被序列化后,会附加很多额外信息,不便于在网络高效传输
  • Hadoop序列化特点
    • 紧凑:高效使用储存空间
    • 快速:读写数据的额外开销小
    • 可拓展:随着通信协议的升级而升级
    • 互操作:支持多种语言的交互

四、如何实现Hadoop序列化的自定义

以下是Hadoop中的IongWritable源码

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.io;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public class LongWritable implements WritableComparable<LongWritable> {
    private long value;

    public LongWritable() {
    }

    public LongWritable(long value) {
        this.set(value);
    }

    public void set(long value) {
        this.value = value;
    }

    public long get() {
        return this.value;
    }

    public void readFields(DataInput in) throws IOException {
        this.value = in.readLong();
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(this.value);
    }

    public boolean equals(Object o) {
        if (!(o instanceof LongWritable)) {
            return false;
        } else {
            LongWritable other = (LongWritable)o;
            return this.value == other.value;
        }
    }

    public int hashCode() {
        return (int)this.value;
    }

    public int compareTo(LongWritable o) {
        long thisValue = this.value;
        long thatValue = o.value;
        return thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1);
    }

    public String toString() {
        return Long.toString(this.value);
    }

    static {
        WritableComparator.define(LongWritable.class, new LongWritable.Comparator());
    }

    public static class DecreasingComparator extends LongWritable.Comparator {
        public DecreasingComparator() {
        }

        public int compare(WritableComparable a, WritableComparable b) {
            return super.compare(b, a);
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return super.compare(b2, s2, l2, b1, s1, l1);
        }
    }

    public static class Comparator extends WritableComparator {
        public Comparator() {
            super(LongWritable.class);
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            long thisValue = readLong(b1, s1);
            long thatValue = readLong(b2, s2);
            return thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1);
        }
    }
}

接口WritableComparable源码如下

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.io;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

根据上述源码可以看出自定义的序列化类型需要实现如下几步

  • 必须实现Writable接口
    • LongWritable因可能放在key排序,这是MapReduce框架中的Shuffle决定的,因此需要实现Comparable接口,从它的源码发现其实也继承了Writable、Comparable接口
  • 反序列化时,需要反射调用空参构造函数,所以需要有空参构造
  • 重写序列化方法
  • 重写发序列化方法
    • 注意:序列化与反序列化时顺序必须完全一致
  • 把结果显示在文件中,则需要重写toString()方法,从而实现自定义的输出样式

五、实际案例

这是我随便写的不同职业的最低薪资与最高薪资

程序员 6000 20000
教师 3000 8000
搬砖 10000 20000
教师 4000 8000

需求是实现各职业平均最低薪资、最高薪资以及职业内部的薪资差,理想的输出结果应该为

程序员 6000 20000 18000

通过分析,Hadoop已有的序列化类型无法满足这个需求,因此我们需要自定义Bean对象

1.实现Writable接口,空参构造

分析可得Bean对象至少需要最低薪资、最高薪资和薪资差这三个属性

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Bean implements Writable {
    private int maxSalary;
    private int minSalary;
    private int spacing;
    //空参构造
    Bean(){
        super();
    }
    public int getMaxSalary(){
        return maxSalary;
    }

    public int getMinSalary(){
        return minSalary;
    }

    public void setMaxSalary(int maxSalary){
        this.maxSalary = maxSalary;
    }

    public void setMinSalary(int minSalary){
        this.minSalary = minSalary;
    }

    public void setSpacing(){
        this.spacing = maxSalary - minSalary;
    }

    public void write(DataOutput dataOutput) throws IOException {

    }

    public void readFields(DataInput dataInput) throws IOException {

    }
}

2.重写序列化方法

固定写法

//重写序列化方法
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(maxSalary);
        dataOutput.writeInt(minSalary);
        dataOutput.writeInt(spacing);
    }

3.重写反序列化方法

需要注意的是反序列方法写法虽然固定,但是顺序必须与序列化方法的顺序一致【类似队列的性质,先进先出】

//重写反序列化方法
    public void readFields(DataInput dataInput) throws IOException {
        maxSalary = dataInput.readInt();
        minSalary = dataInput.readInt();
        spacing = dataInput.readInt();
    }

4.重写toString()方法

根据理想的输出情况来重写toString()方法

//重写toString()
    @Override
    public String toString() {
        return maxSalary + "\t" + minSalary + "\t" + spacing;
    }

这样自定义的序列就写好了,下面我们来实现上述需求,和曾经的文章几乎一样,因此下面代码不写注释,可参照文章开头的链接

5.Map阶段

package MapReduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CountMap extends Mapper<LongWritable, Text, Text, Bean> {
   private Text k = new Text();
   private Bean v = new Bean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] info = line.split(" ");
        k.set(info[0]);
        v.setMinSalary(Integer.parseInt(info[1]));
        v.setMaxSalary(Integer.parseInt(info[2]));
        context.write(k, v);
    }
}

这里解释一下和以前写的代码不同之处,我将new的对象放在了map()方法之外,🤔🤔🤔考虑到map()方法是一行调用一次,曾经的方法意味着每行都至少需要new两个对象,为了遵守MapReduce的哲学就把它放外面。

6.Reduce阶段

package MapReduce;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CountReduce extends Reducer<Text, Bean, Text, Bean> {
    private Bean v = new Bean();
    @Override
    protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {
        int sumMaxSalary = 0;
        int sumMinSalary = 0;
        int length = 0;
        for (Bean value : values){
            length += 1;
            sumMaxSalary += value.getMaxSalary();
            sumMinSalary += value.getMinSalary();
        }
        v.setMaxSalary(sumMaxSalary/length);
        v.setMinSalary(sumMinSalary/length);
        v.setSpacing();
        context.write(key, v);
    }
}

7.Driver阶段

package MapReduce;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();

        job.setJarByClass(CountDriver.class);

        job.setMapperClass(CountMap.class);
        job.setReducerClass(CountReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Bean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Bean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

8.最终的运行结果

  • 最近在准备一家科技公司的大数据开发助理的面试,希望一切顺利😄😄😄

只喜欢学习