package
taobao;
import
java.io.DataInput;
import
java.io.DataOutput;
import
java.io.IOException;
import
java.util.*;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.conf.*;
import
org.apache.hadoop.io.*;
import
org.apache.hadoop.mapred.*;
import
org.apache.hadoop.mapred.lib.InverseMapper;
import
org.apache.hadoop.mapred.lib.MultipleOutputs;
import
org.apache.hadoop.util.*;
public
class
CK {
public
static
class
Key
implements
WritableComparable<Key> {
public
String id =
""
;
public
short
weight;
public
Key() {
}
public
Key(String i,
short
j) {
id = i;
weight = j;
}
@Override
public
void
readFields(DataInput in)
throws
IOException {
int
length = in.readInt();
byte
[] buf =
new
byte
[length];
in.readFully(buf,
0
, length);
id =
new
String(buf);
weight = in.readShort();
}
@Override
public
void
write(DataOutput out)
throws
IOException {
System.out.println(
"in write"
);
String str = id.toString();
int
length = str.length();
byte
[] buf = str.getBytes();
out.writeInt(length);
out.write(buf,
0
, length);
out.writeShort(weight);
}
@Override
public
int
hashCode() {
return
id.hashCode();
}
@Override
public
String toString() {
return
id;
}
@Override
public
boolean
equals(Object right) {
System.out.println(
"in equals"
);
if
(right
instanceof
Key) {
Key r = (Key) right;
return
r.id.equals(id);
}
else
{
return
false
;
}
}
@Override
public
int
compareTo(Key k) {
System.out.println(
"in compareTo, key="
+ k.toString());
int
cmp = id.compareTo(k.id);
if
(cmp !=
0
) {
return
cmp;
}
if
(weight > k.weight)
return
-
1
;
else
if
(weight < k.weight)
return
1
;
else
return
0
;
}
}
public
static
class
Map
extends
MapReduceBase
implements
Mapper<LongWritable, Text, Key, Text> {
public
void
map(LongWritable l, Text value,
OutputCollector<Key, Text> output, Reporter reporter)
throws
IOException {
String line = value.toString();
String[] pair = line.split(
"\t"
);
if
(pair.length ==
3
) {
Key k =
new
Key(pair[
1
], (
short
)
0
);
output.collect(k,
new
Text(pair[
0
]));
}
else
{
Key k =
new
Key(pair[
0
], (
short
)
1
);
output.collect(k,
new
Text(pair[
1
]));
}
}
}
public
static
class
Reduce
extends
MapReduceBase
implements
Reducer<Key, Text, Text, Text> {
public
void
reduce(Key key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws
IOException {
Text second =
new
Text(values.next());
while
(values.hasNext()) {
output.collect(values.next(), second);
}
}
}
public
static
class
FirstGroupingComparator
implements
RawComparator<Key> {
@Override
public
int
compare(
byte
[] b1,
int
s1,
int
l1,
byte
[] b2,
int
s2,
int
l2) {
return
WritableComparator.compareBytes(b1, s1, Integer.SIZE /
8
,
b2, s2, Integer.SIZE /
8
);
}
@Override
public
int
compare(Key o1, Key o2) {
System.out.println(
"in group compare"
);
String l = o1.id.toString();
String r = o2.id.toString();
int
res = l.compareTo(r);
System.out.println(
"res="
+ res);
return
res;
}
}
public
static
class
FirstPartitioner
implements
Partitioner<Key, Text> {
@Override
public
void
configure(JobConf job) {
}
@Override
public
int
getPartition(Key key, Text value,
int
numPartitions) {
System.out.println(
"in FirstPartitioner"
);
return
Math.abs(key.id.hashCode()) % numPartitions;
}
}
public
static
void
main(String[] args)
throws
Exception {
JobConf conf =
new
JobConf(CK.
class
);
conf.setJobName(
"Composite key"
);
conf.setMapOutputKeyClass(Key.
class
);
conf.setMapOutputValueClass(Text.
class
);
conf.setOutputKeyClass(Text.
class
);
conf.setOutputValueClass(Text.
class
);
conf.setMapperClass(Map.
class
);
conf.setReducerClass(Reduce.
class
);
conf.setOutputValueGroupingComparator(FirstGroupingComparator.
class
);
conf.setPartitionerClass(FirstPartitioner.
class
);
conf.setInputFormat(TextInputFormat.
class
);
conf.setOutputFormat(TextOutputFormat.
class
);
FileSystem fstm = FileSystem.get(conf);
Path outDir =
new
Path(args[
1
]);
fstm.delete(outDir,
true
);
FileInputFormat.setInputPaths(conf,
new
Path(args[
0
]));
FileOutputFormat.setOutputPath(conf, outDir);
JobClient.runJob(conf);
}
}
相关推荐
Hadoop 大数据方向 mapreduce计算中的二次排序,讲解透彻
hadoop分区二次排序示例,对基站数据,按电话号码升序、到达时间降序进行排序
hadoop分区二次排序代码示例,包含基站数据集,对基站数据,按电话号码升序、到达时间降序进行排序,只需打包成jar,即可在hadoop集群中运行
NULL 博文链接:https://xjward.iteye.com/blog/1816821
主要介绍了hadoop二次排序的原理和实现,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
mapreduce二次排序,年份升序,按照年份聚合,气温降序
大数据学习资料全排序二次排序
完整的二次排序具有多个层次的排序功能,可以有效提高系统的处理性能。 排序功能分别包括:排序分区、Key值排序、Key值分组 需要注意的是,这多个层次的排序功能均只能针对Key进行,而不能针对Value进行排序。在...
二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户...
二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户 安装Hadoop 测试安装 SSH...
join技术点20 实现semi-join4.1.4 为你的数据挑选最优的合并策略4.2 排序4.2.1 二次排序技术点21 二次排序的实现4.2.2 整体并行排序技术点22 通过多个reducer 对key 进行排序4.3 抽样技术点23 蓄水...
目录 第1章二次排序:简介 19 第2章二次排序:详细示例 42 第3章 Top 10 列表 54 第4章左外连接 96 第5章反转排序 127 第6章移动平均 137 第7章购物篮分析 155 第8章共同好友 182 第9章使用MapReduce实现推荐引擎 ...
10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 - 12、Mahout Kmeans简介 .................................... - 57 -
10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 12、Mahout Kmeans简介 .................................... - 57 -
技术点21 二次排序的实现 4.2.2 整体并行排序 技术点22 通过多个reducer 对key 进行排序 4.3 抽样 技术点23 蓄水池抽样(reservoir 抽样) 4.4 本章小结 5 优化HDFS 处理大数据的技术 5.1 处理小文件 ...
四川大学IT企业实训,拓思爱诺大数据第二次作业,MapReduce编程,包括Hadoop wordcount程序,及flowcount流量统计程序,包括重写排序及分区函数
本文将介绍一种通过使用地理网格进行数据关联,并利用Shuffle过程的二次排序实现高效的统计各条道路上位置点分布情况的方法。交通领域正产生着海量的车辆位置点数据。将这些车辆位置信息和道路进行关联的统计操作则...
好评 住 过 几次 东莞 酒店 海悦 地理位置 早餐 最棒 听说 朋友 说 请来 厨师 来头 呵呵 冲 这个 去 好评 酒店设施 比较 不错 就是 携程 价格 酒店 前台 一样 没有 竞争力 好评 房间 不算 大 中规中矩 北方 服务 ...
03_MapReduce 二次排序回顾及Reduce Join实现详解 04_MapReduce 中Map Join实现思路及伪代码详解 05_Hive重点知识回顾总结及小表与大表关联时MapJoin优化 06_Hive中大表与大表关联时SMB Join优化 07_Hive中高级...