`

hadoop二次排序(合集)

 
阅读更多

1.原理

在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。
在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

2.步骤

(1)自定义key

所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法

@Override
public void write(DataOutput out) {}
@Override
public void readFields(DataInput in) {}
@Override
public int hashCode() {}
@Override
public boolean equals(Object right)
@Override
public int compareTo(StringPair o) {}

(2)由于key是自定义的,所以还需要自定义一下类:
(2.1)分区函数类。这是key的第一次比较。

static class FirstPartitioner extends HashPartitioner<StringPair, LongWritable>
在job中使用setPartitionerClasss设置Partitioner。

(2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。

public static class SortComparator extends WritableComparator

必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)

另一种方法是 实现接口RawComparator。
在job中使用setSortComparatorClass设置key比较函数类。
(2.3)分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。

public static class GroupingComparator extends WritableComparator
分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
分组函数类的另一种方法是实现接口RawComparator。
在job中使用setGroupingComparatorClass设置分组函数类。

另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。


例子1:package example;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Text.Comparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


import util.HbaseServiceUtil;


public class TwoSortMR {
static class TwoSortMapper extends
Mapper<ImmutableBytesWritable, Result, Text, LongWritable> {
HTable htable;
HTable htable1;
byte[] family = Bytes.toBytes("baseInfo");
HbaseServiceUtil u;


protected void setup(Context context) throws IOException,
InterruptedException {
u = new HbaseServiceUtil();
this.htable = u.getHtable("wb_hbase_relation_attentions", 1);
this.htable1 = u.getHtable("wb_hbase_user", 1);
}


@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
String uid = new String(key.get());
try {
int cachsize = 5000;
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(uid + "_"));
scan.setStopRow(Bytes.toBytes(uid + "_sz"));
scan.setCaching(cachsize);
ResultScanner scanner = htable.getScanner(scan);


Result[] results = scanner.next(cachsize);
// 判断用户是否有对应的微博信息
if (results.length <= 0) {
return;
}
for (Result result : results) {
if (result.isEmpty()) {
continue;
}
byte[] obj = result.getValue(family, Bytes.toBytes("uid2"));
if (obj == null) {
continue;
}
String uid2 = new String(obj);
Get get = new Get(uid2.getBytes());
Result result1 = htable1.get(get);
obj = result1.getValue(family, Bytes.toBytes("fansCount"));
if (obj == null) {
continue;
}
String fansCount = new String(obj);
if (!fansCount.matches("[0123456789].*")) {
continue;
}
long aa = Long.parseLong(fansCount);
try {
context.write(new Text(uid+"|"+aa), new LongWritable(aa));
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}


}


static class TwoSortReducer extends
Reducer<Text, LongWritable, NullWritable, Put> {


@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
// long max = values.iterator().next().get();
StringBuffer max = new StringBuffer();
for (LongWritable value : values) {
max.append(value.get()+"|");
}
System.out.println(key.toString()+" "+max.toString());


Put put = new Put(Bytes.toBytes(key.toString().split("\\|")[0]));
put.add("baseInfo".getBytes(), "maxFansCount".getBytes(),
(max.toString() + "").getBytes());
context.write(NullWritable.get(), put);

}


}


// map阶段的最后会对整个map的List进行分区,每个分区映射到一个reducer
static class FirstPartitioner extends HashPartitioner<Text, LongWritable> {
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
return (key.toString().split("\\|")[0].hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
// 每个分区内又调用job.setSortComparatorClass或者key的比较函数进行排序
public static class SortComparator extends WritableComparator {
protected SortComparator() {
super(Text.class, true);
}
// 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
// 这里是先比较用户id,再比较粉丝数,按粉丝数降序排序
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
Text t1 =(Text)w1;
Text t2 =(Text)w2;
long a1 =Long.parseLong(t1.toString().split("\\|")[0].replaceAll("si", ""));
long a2 = Long.parseLong(t2.toString().split("\\|")[0].replaceAll("si", ""));
long a3 = Long.parseLong(t1.toString().split("\\|")[1]);
long a4 = Long.parseLong(t2.toString().split("\\|")[1]);

int cmp = TwoSortMR.compare(a1, a2);
if (cmp != 0) {
return cmp;
}
return -TwoSortMR.compare(a3, a4); //reverse


}
}

// 只要这个比较器比较的两个key相同,他们就属于同一个组.
// 它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key
public static class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(Text.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
// return w1.toString().split("\\|")[0].compareTo(w2.toString().split("\\|")[0]);
Text t1 =(Text)w1;
Text t2 =(Text)w2;
long l = Long.parseLong(t1.toString().split("\\|")[0].replaceAll("si", ""));
long r = Long.parseLong(t2.toString().split("\\|")[0].replaceAll("si", ""));
return TwoSortMR.compare(l, r);
}
}
public static int compare(long left, long right) {
// TODO Auto-generated method stub
return left > right ? 1 : (left == right ? 0 : -1);
}


public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = HbaseServiceUtil.getConfiguration();
HbaseServiceUtil.setConf(conf, "hdfs4");


String inputTableName = "analyzer_wuqilong1";
String OutputTableName = "analyzer_wuqilong3";
Scan scan = new Scan();
// scan.setStartRow("si".getBytes());
// scan.setStopRow("0000000000000si\uFFFF".getBytes()); //��Ҫ�Ӵ�Χ��
scan.setCaching(100);
scan.setCacheBlocks(false); // don't set to true for MR jobs
conf.set(TableInputFormat.SCAN,
HbaseServiceUtil.convertScanToString(scan));
conf.set(TableInputFormat.INPUT_TABLE, inputTableName);
conf.set(TableOutputFormat.OUTPUT_TABLE, OutputTableName);



Job job = new Job(conf);


job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);


job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(Text.class);


job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);


job.setMapperClass(TwoSortMapper.class);
job.setReducerClass(TwoSortReducer.class);

// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(SortComparator.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);


job.setNumReduceTasks(5);
job.setJarByClass(TwoSortMR.class);
job.setJobName("Test2Sort");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}


}


例子2: package temp;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;


public class StringPair implements WritableComparable<StringPair> {


private String first;
private long second;


public StringPair() {
}


public void set(String first, long second) {
this.first = first;
this.second = second;
}


public String toString() {
return first + "|" + second;
}


public String getFirst() {
return first;
}


public long getSecond() {
return second;
}


@Override
public void write(DataOutput out) throws IOException {
int length = first.length();
byte[] buf = first.getBytes();
// 先写字符串长度
out.writeInt(length);
// 再写字符串数据
out.write(buf, 0, length);
// 接着long
out.writeLong(second);
}


@Override
public void readFields(DataInput in) throws IOException {
// 先写字符串的长度信息
int length = in.readInt();
byte[] buf = new byte[length];
in.readFully(buf, 0, length);
first = new String(buf);
second = in.readLong();
}


@Override
public int hashCode() {
return first.hashCode();
}


@Override
public boolean equals(Object right) {
return first.equals(((StringPair) right).first);
}


@Override
public int compareTo(StringPair o) {
int c1 = StringPair.compare(getFirst(), o.getFirst());
if (c1 != 0) {
return c1;
} else {
return StringPair.compare(getSecond(), o.getSecond());
}
}


public static int compare(StringPair o1, StringPair o2) {
int c1 = StringPair.compare(o1.getFirst(), o2.getFirst());
if (c1 != 0) {
return c1;
} else {
return -StringPair.compare(o1.getSecond(), o2.getSecond());
}
}


public static int compare(long left, long right) {
return left > right ? 1 : (left == right ? 0 : -1);
}


public static int compare(String left, String right) {
return left.compareTo(right);
}


}


package temp;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


import util.HbaseServiceUtil;




public class TwoSortMR {
static class TwoSortMapper extends
Mapper<ImmutableBytesWritable, Result, StringPair, LongWritable> {
HTable htable;
HTable htable1;
byte[] family = Bytes.toBytes("baseInfo");
HbaseServiceUtil u;


protected void setup(Context context) throws IOException,
InterruptedException {
u = new HbaseServiceUtil();
this.htable = u.getHtable("wb_hbase_relation_attentions", 1);
this.htable1 = u.getHtable("wb_hbase_user", 1);
}


@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
String uid = new String(key.get());
try {
int cachsize = 5000;
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(uid + "_"));
scan.setStopRow(Bytes.toBytes(uid + "_sz"));
scan.setCaching(cachsize);
ResultScanner scanner = htable.getScanner(scan);


Result[] results = scanner.next(cachsize);
// 判断用户是否有对应的微博信息
if (results.length <= 0) {
return;
}
for (Result result : results) {
if (result.isEmpty()) {
continue;
}
byte[] obj = result.getValue(family, Bytes.toBytes("uid2"));
if (obj == null) {
continue;
}
String uid2 = new String(obj);
Get get = new Get(uid2.getBytes());
Result result1 = htable1.get(get);
obj = result1.getValue(family, Bytes.toBytes("fansCount"));
if (obj == null) {
continue;
}
String fansCount = new String(obj);
if (!fansCount.matches("[0123456789].*")) {
continue;
}
long aa = Long.parseLong(fansCount);
try {
StringPair strs = new StringPair();
strs.set(uid,aa);
context.write(strs, new LongWritable(aa));
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}


}


static class TwoSortReducer extends
Reducer<StringPair, LongWritable, NullWritable, Put> {


@Override
protected void reduce(StringPair key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
// long max = values.iterator().next().get();
StringBuffer max = new StringBuffer();
for (LongWritable value : values) {
max.append(value.get()+"|");
}
System.out.println(key.toString()+" "+max.toString());


Put put = new Put(Bytes.toBytes(key.getFirst()));
put.add("baseInfo".getBytes(), "maxFansCount".getBytes(),
(max.toString() + "").getBytes());
context.write(NullWritable.get(), put);

}


}


// map阶段的最后会对整个map的List进行分区,每个分区映射到一个reducer
static class FirstPartitioner extends HashPartitioner<StringPair, LongWritable> {
@Override
public int getPartition(StringPair key, LongWritable value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
// 每个分区内又调用job.setSortComparatorClass或者key的比较函数进行排序
public static class SortComparator extends WritableComparator {
protected SortComparator() {
super(StringPair.class, true);
}

@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
StringPair t1 = (StringPair) a;
StringPair t2 = (StringPair) b;
return StringPair.compare(t1, t2);


}
}
// 只要这个比较器比较的两个key相同,他们就属于同一个组.
// 它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key
public static class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(StringPair.class, true);
}


// 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
// 这里是先比较用户id,再比较粉丝数,按粉丝数降序排序
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
StringPair strs1 = (StringPair) a;
StringPair strs2 = (StringPair) b;
return StringPair.compare(strs1.getFirst(), strs2.getFirst());
}
}


public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = HbaseServiceUtil.getConfiguration();
HbaseServiceUtil.setConf(conf, "hdfs4");


String inputTableName = "analyzer_wuqilong1";
String OutputTableName = "analyzer_wuqilong3";
Scan scan = new Scan();
// scan.setStartRow("si".getBytes());
// scan.setStopRow("0000000000000si\uFFFF".getBytes()); //��Ҫ�Ӵ�Χ��
scan.setCaching(100);
scan.setCacheBlocks(false); // don't set to true for MR jobs
conf.set(TableInputFormat.SCAN,
HbaseServiceUtil.convertScanToString(scan));
conf.set(TableInputFormat.INPUT_TABLE, inputTableName);
conf.set(TableOutputFormat.OUTPUT_TABLE, OutputTableName);



Job job = new Job(conf);


job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);


job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(StringPair.class);


job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);


job.setMapperClass(TwoSortMapper.class);
job.setReducerClass(TwoSortReducer.class);

// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(SortComparator.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);


job.setNumReduceTasks(5);
job.setJarByClass(TwoSortMR.class);
job.setJobName("TestCombine");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}


}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics