Hadoop-SecondarySort如何实现长尾词排序功能?
- 内容介绍
- 文章标签
- 相关推荐
本文共计950个文字,预计阅读时间需要4分钟。
%E2%80%9CMapReduce%E6%A1%86%E6%9E%B6%E7%9A%84%E7%BB%93%E6%9E%9C%E6%8E%92%E5%BA%8F%E9%9C%80%E8%A6%81%E5%9F%BA%E4%BA%8E%E5%80%BC%E7%9A%84%E9%BB%98%E8%AE%A4%E6%8E%92%E5%BA%8F%E3%80%82%E8%BF%99%E7%A7%8D%E6%8E%92%E5%BA%8F%E9%9C%80%E8%A6%81%E4%BD%BF%E7%94%A8Hadoop%E7%9A%84%E4%BA%8C%E6%AC%A1%E6%8E%92%E5%BA%8F%E3%80%8CSecondary+Sort%E3%80%8D%E6%9D%BF%E5%8F%AF%E4%BB%A5%E6%BB%A1%E8%B6%B3%E5%A4%8D%E6%9D%82%E7%9A%84%E9%9C%80%E6%B1%82%E3%80%82%E5%9C%A8Map%E9%98%B6%E6%AE%B5%E4%B8%AD%EF%BC%8C%E8%BD%BD%E5%8F%82%E6%95%B0%E7%9A%84%E8%BE%93%E5%87%BA%E5%9B%BE%E2%80%9Djob.setInputFormat%E2%80%9D%E2%80%9D。
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求。为了满足复杂的需求需要Hadoop二次排序Secondary Sort。
过程
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。
他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是
流程
参考:zengzhaozheng.blog.51cto.com/8219051/1379271/
map:
每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,
就调用自定义二次排序函数对其进行排序。
reducer:
就是分组和reduce函数处理都是在shuffle完成之后才进行的。另外一点我们也非常容易看出,就是每处理完一个分组数据就会去调用一次的reduce函对这个分组来进行处理和输出。
此外,说明一下分组函数的返回值问题,当返回值为0时候才会被分到同一个组当中。另外一点我们也可以看出来,一个分组中每合并n个值就会有n-1分组函数返回0值,
也就是说有进行了n-1次比较。
实例
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 二次排序对象
*/
public class SortAssist {
public static class UnionKey implements WritableComparable<UnionKey> {
public String partitionerKey;
public String resortString;
public String getPartitionerKey() {
return partitionerKey;
}
public void setPartitionerKey(String partitionerKey) {
this.partitionerKey = partitionerKey;
}
public String getResortString() {
return resortString;
}
public void setResortString(String resortString) {
this.resortString = resortString;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(partitionerKey);
out.writeUTF(resortString);
}
@Override
public void readFields(DataInput in) throws IOException {
partitionerKey = in.readUTF();
resortString = in.readUTF();
}
@Override
public int hashCode() {
return partitionerKey.hashCode() * 157 + resortString.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (this == obj)
return true;
if (obj instanceof UnionKey) {
UnionKey r = (UnionKey) obj;
return r.partitionerKey.equals(partitionerKey) && r.resortString.equals(resortString);
} else {
return false;
}
}
//自定义比较器
@Override
public int compareTo(UnionKey o) {
//确保进行排序的数据在同一个区内,如果不在同一个区则按照组合键中第一个键排序
if (!partitionerKey.equals(o.partitionerKey)) {
return partitionerKey.compareTo(o.partitionerKey);
} else if (resortString != o.resortString) {
//相同区内按照组合键的第二个键的升序排序
return resortString.compareTo(o.resortString);
} else
return 0;
}
}
//自定义分区处理器
public static class SelfPartitioner extends Partitioner<UnionKey, Text> {
@Override
public int getPartition(UnionKey key, Text value, int numPartitions) {
return Math.abs(key.partitionerKey.hashCode() * 127) % numPartitions;
}
}
//自定义分区策略
public static class SelfGroupingComparator extends WritableComparator {
protected SelfGroupingComparator() {
super(UnionKey.class, true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable writableComparable, WritableComparable writableComparable2) {
UnionKey unionKey = (UnionKey) writableComparable;
UnionKey unionKey2 = (UnionKey) writableComparable2;
String key1 = unionKey.getPartitionerKey();
String key2 = unionKey2.getPartitionerKey();
return
job中调用
.setPartitionerClass(SortAssist.SelfPartitioner.class); //自定义分组器
job.setGroupingComparatorClass(SortAssist.SelfGroupingComparator.class); //自定义分区策略
本文共计950个文字,预计阅读时间需要4分钟。
%E2%80%9CMapReduce%E6%A1%86%E6%9E%B6%E7%9A%84%E7%BB%93%E6%9E%9C%E6%8E%92%E5%BA%8F%E9%9C%80%E8%A6%81%E5%9F%BA%E4%BA%8E%E5%80%BC%E7%9A%84%E9%BB%98%E8%AE%A4%E6%8E%92%E5%BA%8F%E3%80%82%E8%BF%99%E7%A7%8D%E6%8E%92%E5%BA%8F%E9%9C%80%E8%A6%81%E4%BD%BF%E7%94%A8Hadoop%E7%9A%84%E4%BA%8C%E6%AC%A1%E6%8E%92%E5%BA%8F%E3%80%8CSecondary+Sort%E3%80%8D%E6%9D%BF%E5%8F%AF%E4%BB%A5%E6%BB%A1%E8%B6%B3%E5%A4%8D%E6%9D%82%E7%9A%84%E9%9C%80%E6%B1%82%E3%80%82%E5%9C%A8Map%E9%98%B6%E6%AE%B5%E4%B8%AD%EF%BC%8C%E8%BD%BD%E5%8F%82%E6%95%B0%E7%9A%84%E8%BE%93%E5%87%BA%E5%9B%BE%E2%80%9Djob.setInputFormat%E2%80%9D%E2%80%9D。
MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求。为了满足复杂的需求需要Hadoop二次排序Secondary Sort。
过程
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。
他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是
流程
参考:zengzhaozheng.blog.51cto.com/8219051/1379271/
map:
每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,
就调用自定义二次排序函数对其进行排序。
reducer:
就是分组和reduce函数处理都是在shuffle完成之后才进行的。另外一点我们也非常容易看出,就是每处理完一个分组数据就会去调用一次的reduce函对这个分组来进行处理和输出。
此外,说明一下分组函数的返回值问题,当返回值为0时候才会被分到同一个组当中。另外一点我们也可以看出来,一个分组中每合并n个值就会有n-1分组函数返回0值,
也就是说有进行了n-1次比较。
实例
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 二次排序对象
*/
public class SortAssist {
public static class UnionKey implements WritableComparable<UnionKey> {
public String partitionerKey;
public String resortString;
public String getPartitionerKey() {
return partitionerKey;
}
public void setPartitionerKey(String partitionerKey) {
this.partitionerKey = partitionerKey;
}
public String getResortString() {
return resortString;
}
public void setResortString(String resortString) {
this.resortString = resortString;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(partitionerKey);
out.writeUTF(resortString);
}
@Override
public void readFields(DataInput in) throws IOException {
partitionerKey = in.readUTF();
resortString = in.readUTF();
}
@Override
public int hashCode() {
return partitionerKey.hashCode() * 157 + resortString.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (this == obj)
return true;
if (obj instanceof UnionKey) {
UnionKey r = (UnionKey) obj;
return r.partitionerKey.equals(partitionerKey) && r.resortString.equals(resortString);
} else {
return false;
}
}
//自定义比较器
@Override
public int compareTo(UnionKey o) {
//确保进行排序的数据在同一个区内,如果不在同一个区则按照组合键中第一个键排序
if (!partitionerKey.equals(o.partitionerKey)) {
return partitionerKey.compareTo(o.partitionerKey);
} else if (resortString != o.resortString) {
//相同区内按照组合键的第二个键的升序排序
return resortString.compareTo(o.resortString);
} else
return 0;
}
}
//自定义分区处理器
public static class SelfPartitioner extends Partitioner<UnionKey, Text> {
@Override
public int getPartition(UnionKey key, Text value, int numPartitions) {
return Math.abs(key.partitionerKey.hashCode() * 127) % numPartitions;
}
}
//自定义分区策略
public static class SelfGroupingComparator extends WritableComparator {
protected SelfGroupingComparator() {
super(UnionKey.class, true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable writableComparable, WritableComparable writableComparable2) {
UnionKey unionKey = (UnionKey) writableComparable;
UnionKey unionKey2 = (UnionKey) writableComparable2;
String key1 = unionKey.getPartitionerKey();
String key2 = unionKey2.getPartitionerKey();
return
job中调用
.setPartitionerClass(SortAssist.SelfPartitioner.class); //自定义分组器
job.setGroupingComparatorClass(SortAssist.SelfGroupingComparator.class); //自定义分区策略

