前言

部分内容摘自尚硅谷、黑马等培训资料

1. Hadoop Archive归档

  HDFS 并不擅长存储小文件,因为每个文件最少一个 block,每个 block 的元数据都会在 NameNode 占用内存,如果存在大量的小文件,它们会吃掉 NameNode 节点的大量内存。如下所示,模拟小文件场景:

1
2
3
4
5
[hadoop@hadoop1 input]$ hadoop fs -mkdir /smallfile
[hadoop@hadoop1 input]$ echo 1 > 1.txt
[hadoop@hadoop1 input]$ echo 2 > 2.txt
[hadoop@hadoop1 input]$ echo 3 > 3.txt
[hadoop@hadoop1 input]$ hadoop fs -put 1.txt 2.txt 3.txt /smallfile


  Hadoop Archives可以有效的处理以上问题,它可以把多个文件归档成为一个文件,归档成一个文件后还可以透明的访问每一个文件。

1.1 创建Archive

  Usage: hadoop archive -archiveName name -p <parent> <src>* <dest>
  其中-archiveName是指要创建的存档的名称。比如test.har,archive 的名字的扩展名应该是*.har-p参数指定文件存档文件(src)的相对路径。
  举个例子:-p /foo/bar a/b/c e/f/g,这里的/foo/bara/b/ce/f/g的父路径,所以完整路径为/foo/bar/a/b/c/foo/bar/e/f/g
  例如:如果你只想存档一个目录/smallfile下的所有文件:
  hadoop archive -archiveName test.har -p /smallfile /outputdir
  这样就会在/outputdir目录下创建一个名为test.har的存档文件。
  注意:Archive 归档是通过 MapReduce 程序完成的,需要启动 YARN 集群。

1.2 查看Archive

1.2.1 查看归档之后的样子

  首先我们来看下创建好的 har 文件。使用如下的命令:
  hadoop fs -ls /outputdir/test.har

  这里可以看到 har 文件包括:两个索引文件,多个 part 文件(本例只有一个)以及一个标识成功与否的文件。part文件是多个原文件的集合, 通过 index 文件可以去找到原文件。
  例如上述的三个小文件 1.txt 2.txt 3.txt 内容分别为 1,2,3。进行 archive 操作之后,三个小文件就归档到 test.har 里的 part-0 一个文件里。

1.2.2 查看归档之前的样子

  在查看 har 文件的时候,如果没有指定访问协议,默认使用的就是 hdfs://,此时所能看到的就是归档之后的样子。
  此外,Archive 还提供了自己的 har uri 访问协议。如果用har uri去访问的话,索引、标识等文件就会隐藏起来,只显示创建档案之前的原文件
  Hadoop Archives 的 URI 是:
  har://scheme-hostname:port/archivepath/fileinarchive
  scheme-hostname 格式为hdfs-域名:端口

1
2
3
hadoop fs -ls har://hdfs-node1:8020/outputdir/test.har/
hadoop fs -ls har:///outputdir/test.har
hadoop fs -cat har:///outputdir/test.har/1.txt

1.3 提取Archive

  按顺序解压存档(串行):
  hadoop fs -cp har:///user/zoo/foo.har/dir1 hdfs:/user/zoo/newdir

1
2
3
hadoop fs -mkdir /smallfile1
hadoop fs -cp har:///outputdir/test.har/* /smallfile1
hadoop fs -ls /smallfile1


  要并行解压存档,请使用 DistCp,对应大的归档文件可以提高效率:
  hadoop distcp har:///user/zoo/foo.har/dir1 hdfs:/user/zoo/newdir

1
hadoop distcp har:///outputdir/test.har/* /smallfile2

1.4 Archive使用注意事项

  1. Hadoop archives 是特殊的档案格式。一个 Hadoop archive 对应一个文件系统目录。Hadoop archive 的扩展名是*.har
  2. 创建 archives 本质是运行一个 Map/Reduce 任务,所以应该在 Hadoop 集群上运行创建档案的命令;
  3. 创建 archive 文件要消耗和原文件一样多的硬盘空间;
  4. archive 文件不支持压缩,尽管 archive 文件看起来像已经被压缩过;
  5. archive 文件一旦创建就无法改变,要修改的话,需要创建新的 archive 文件。事实上,一般不会再对存档后的文件进行修改,因为它们是定期存档的,比如每周或每日
  6. 当创建 archive 时,源文件不会被更改或删除;

2. Sequence File

2.1 Sequence File介绍

  Sequence File是 Hadoop API 提供的一种二进制文件支持。这种二进制文件直接将<key, value>键值对序列化到文件中。

2.2 Sequence File优缺点

  • 优点
    • 二级制格式存储,比文本文件更紧凑。
    • 支持不同级别压缩(基于 Record 或 Block 压缩)。
    • 文件可以拆分和并行处理,适用于 MapReduce。
  • 缺点
    • 二进制格式文件不方便查看。
    • 特定于 hadoop,只有 Java API 可用于与之件进行交互。尚未提供多语言支持。

2.3 Sequence File格式

  Hadoop Sequence File 是一个由二进制键/值对组成的。根据压缩类型,有 3 种不同的 Sequence File 格式:未压缩格式record压缩格式block压缩格式
  Sequence File 由一个header和一个或多个record组成。以上三种格式均使用相同的 header 结构,如下所示:

  前 3 个字节为 SEQ,表示该文件是序列文件,后跟一个字节表示实际版本号(例如 SEQ4 或 SEQ6)。Header 中其他也包括 key、value class 名字、 压缩细节、metadata、Sync marker。Sync Marker 同步标记,用于可以读取任意位置的数据。

2.3.1 未压缩格式


  未压缩的 Sequence File 文件由 header、record、sync 三个部分组成。其中 record 包含了 4 个部分:record length(记录长度)、key length(键长)、key、value。
  每隔几个 record(100字节左右)就有一个同步标记。

2.3.2 基于record压缩格式


  基于 record 压缩的 Sequence File 文件由 header、record、sync 三个部分组成。其中 record 包含了4个部分:record length(记录长度)、key length(键长)、key、compressed value(被压缩的值)
  每隔几个 record(100字节左右)就有一个同步标记。

2.3.3 基于block压缩格式


  基于 block 压缩的 Sequence File 文件由 header、block、sync 三个部分组成。
  block指的是record block,可以理解为多个record记录组成的块。注意,这个 block 和 HDFS 中分块存储的 block(128M)是不同的概念。
  Block 中包括:record 条数、压缩的 key 长度、压缩的 keys、压缩的 value 长度、压缩的 values。每隔一个 block 就有一个同步标记。
  block 压缩比 record 压缩提供更好的压缩率。使用 Sequence File 时,通常首选块压缩。

2.4 Sequence File文件读写

2.4.1 开发环境构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>

2.4.2 SequenceFileWrite

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;


public class SequenceFileWrite {

private static final String[] DATA = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};

public static void main(String[] args) throws Exception {
//设置客户端运行身份 以root去操作访问HDFS
System.setProperty("HADOOP_USER_NAME","hadoop");
//Configuration 用于指定相关参数属性
Configuration conf = new Configuration();
//sequence file key、value
IntWritable key = new IntWritable();
Text value = new Text();
//构造Writer参数属性
SequenceFile.Writer writer = null;
CompressionCodec Codec = new GzipCodec();
SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(new Path("hdfs://192.168.68.101:8020/seq.out"));
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(value.getClass());
SequenceFile.Writer.Option optCom = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD,Codec);

try {
writer = SequenceFile.createWriter( conf, optPath, optKey, optVal, optCom);

for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}

  运行结果:

  最终输出的文件如下:

2.4.3 SequenceFileRead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;

public class SequenceFileRead {
public static void main(String[] args) throws IOException {
//设置客户端运行身份 以root去操作访问HDFS
System.setProperty("HADOOP_USER_NAME","hadoop");
//Configuration 用于指定相关参数属性
Configuration conf = new Configuration();

SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(new Path("hdfs://192.168.68.101:8020/seq.out"));
SequenceFile.Reader.Option option2 = SequenceFile.Reader.length(174);//这个参数表示读取的长度
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf,option1,option2);
Writable key = (Writable) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(
reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";//是否返回了Sync Mark同步标记
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next record
}
} finally {
IOUtils.closeStream(reader);
}
}
}

  运行结果:

2.5 案例:使用Sequence File合并小文件

2.5.1 理论依据

  可以使用 Sequence File 对小文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。例如,假设有 10,000 个 100KB 文件,那么我们可以编写一个程序将它们放入单个 Sequence File 中,如下所示,可以在其中使用 filename 作为键,并使用 content 作为值。

2.5.2 具体值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class MergeSmallFilesToSequenceFile {

private Configuration configuration = new Configuration();
private List<String> smallFilePaths = new ArrayList<String>();


//定义方法用来添加小文件的路径
public void addInputPath(String inputPath) throws Exception{
File file = new File(inputPath);
//给定路径是文件夹,则遍历文件夹,将子文件夹中的文件都放入smallFilePaths
//给定路径是文件,则把文件的路径放入smallFilePaths
if(file.isDirectory()){
File[] files = FileUtil.listFiles(file);
for(File sFile:files){
smallFilePaths.add(sFile.getPath());
System.out.println("添加小文件路径:" + sFile.getPath());
}
}else{
smallFilePaths.add(file.getPath());
System.out.println("添加小文件路径:" + file.getPath());
}
}
//把smallFilePaths的小文件遍历读取,然后放入合并的sequencefile容器中
public void mergeFile() throws Exception{
Writer.Option bigFile = Writer.file(new Path("D:\\datasets\\bigfile"));
Writer.Option keyClass = Writer.keyClass(Text.class);
Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
//构造writer
Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);
//遍历读取小文件,逐个写入sequencefile
Text key = new Text();
for(String path:smallFilePaths){
File file = new File(path);
long fileSize = file.length();//获取文件的字节数大小
byte[] fileContent = new byte[(int)fileSize];
FileInputStream inputStream = new FileInputStream(file);
inputStream.read(fileContent, 0, (int)fileSize);//把文件的二进制流加载到fileContent字节数组中去
String md5Str = DigestUtils.md5Hex(fileContent);
System.out.println("merge小文件:"+path+",md5:"+md5Str);
key.set(path);
//把文件路径作为key,文件内容做为value,放入到sequencefile中
writer.append(key, new BytesWritable(fileContent));
}
writer.hflush();
writer.close();
}
//读取大文件中的小文件
public void readMergedFile() throws Exception{
Reader.Option file = Reader.file(new Path("D:\\bigfile.seq"));
Reader reader = new Reader(configuration, file);
Text key = new Text();
BytesWritable value = new BytesWritable();
while(reader.next(key, value)){
byte[] bytes = value.copyBytes();
String md5 = DigestUtils.md5Hex(bytes);
String content = new String(bytes, Charset.forName("GBK"));
System.out.println("读取到文件:"+key+",md5:"+md5+",content:"+content);
}
}

public static void main(String[] args) throws Exception {
MergeSmallFilesToSequenceFile msf = new MergeSmallFilesToSequenceFile();
//合并小文件
msf.addInputPath("D:\\datasets\\smallfile");
msf.mergeFile();
//读取大文件
// msf.readMergedFile();
}
}