Categories
程式開發

Apache Arrow和Java:大数据传输快如闪电


本文要点

  • Arrow为分析应用程序提供了零复制数据传输
  • Arrow支持内存中、列式格式和数据处理
  • Arrow是跨平台、跨语言可互操作的数据交换方式
  • Arrow是大数据系统的骨干

大数据天生就太大了,无法装进单独的一台机器里。数据集需要在多台计算机上分区存储。每个分区都分配给一台主机,还可以选择分配给备份机器。这样,每台机器都会有多个分区。多数大数据框架使用随机策略为计算机分配分区。如果每个计算作业都使用一个分区,那么这种策略会将计算负载很好地分散在整个群集上。但是,如果一个作业需要多个分区,则它很有可能需要从其他计算机获取分区。而传输数据的过程总会带来性能损失。

Apache Arrow提出了一种跨语言、跨平台,列式(columnar)内存中(in-memory)的数据格式。由于各个平台和编程语言上的数据都由相同的字节表示,它就不再需要序列化了。这种通用格式支持在大数据系统中进行零复制数据传输,以最大程度地降低数据传输带来的性能影响。

本文的目的是介绍Apache Arrow,并带你熟悉Apache Arrow Java库的基本概念。随本文附带的源代码在这里

通常,一次数据传输操作包括:

  • 以某种格式序列化数据
  • 通过网络连接发送序列化数据
  • 在接收端反序列化数据

以Web应用程序中前端与后端之间的通信为例。人们通常使用JavaScript对象表示(JSON)格式序列化数据。数据量不大时它很好用。序列化和反序列化的开销可以忽略不计,并且JSON可由人类阅读理解,从而简化了调试工作。但当数据量增加时,序列化成本可能成为主要的性能瓶颈。如果没有适当的应对,系统最后可能会花费大部分时间来序列化数据。显然,CPU周期应该用到很多更有价值的事情上。

Apache Arrow和Java:大数据传输快如闪电 1
在此过程中,我们在软件中控制一个要素:(反)序列化。自然,市面上有很多序列化框架。可选项有ProtoBuf、Thrift和MessagePack等等。其中许多框架将尽量降低序列化成本作为主要目标。

尽管它们在努力降低序列化成本,但(反)序列化的步骤依旧是不可避免的。你的代码所作用的对象不是通过网络发送的数据。另一侧的代码处理的对象也不是通过网络接收到的字节。到头来,最快的序列化就是没有序列化

Apache Arrow适合我吗?

从概念上讲,Apache Arrow被设计为大数据系统(例如BallistaDremio)或大数据系统集成的骨干。如果你的用例不在大数据系统领域,可能就无需关心Apache Arrow的开销。对你来说,行业流行的序列化框架(如ProtoBuf、FlatBuffers、Thrift、MessagePack或其他选项)可能更合适。

使用Apache Arrow编程与使用普通的旧Java对象编程的体验有很大区别,因为前者并没有Java对象。代码一路都在缓冲区上操作。现有的实用程序库(如Apache Commons和Guava等)不能再用了。你可能必须重新实现某些算法才能使用字节缓冲区。最后一点也很重要,你必须一直以列而非对象的思维来编程。

在Apache Arrow之上构建系统,需要你读取、写入、呼吸和消耗Arrow缓冲区。如果你要构建一个可以处理数据对象集合(如某种数据库)的系统,想要计算对列友好的内容,并计划在一个群集中运行它,那么Arrow绝对值得投资。

与Parquet的集成(稍后讨论)让持久性更容易实现。跨平台、跨语言方面,Arrow支持多种语言的微服务架构,并能与现有的大数据环境轻松集成。内置的称为Arrow Flight的RPC框架使Arrow可以轻松用标准化且高效的方式共享/提供数据集。

零复制数据传输

首先,为什么我们需要序列化呢?在Java应用程序中,我们通常要使用对象和原始值。这些对象以某种方式映射到计算机内存中的字节上。JDK知道如何将对象映射到计算机上的字节。但是这种映射在另一台计算机上可能会有所不同。字节顺序(又称字节序)就是一个例子。而且,并非所有的编程语言都具有相同的原始类型集,甚至不会以相同的方式存储相似的类型。

序列化将对象使用的内存转换为一种通用格式。这种格式有一个规范,并且为每种编程语言和平台提供了一个库,该库能将对象转换为序列化的格式或转换回来。换句话说,序列化就是用来共享数据的,而且不会破坏每种编程语言和平台的特有行为。序列化可消除不同平台和编程语言中的所有差异,从而让每位程序员都能按自己喜欢的方式工作。这就像翻译员可以消除说不同语言的人们之间的语言障碍一样。

在大多数情况下,序列化是非常好用的。但当我们传输大量数据时,它将成为一个很大的瓶颈。那么在这种情况下,我们可以消除序列化过程吗?这实际上就是零复制序列化框架(如Apache Arrow和Flat Buffers)的目标。你可以将其视为处理序列化数据本身,而非处理对象,以避免序列化步骤。零复制是指你的应用程序所处理的字节可以无需任何修改就通过网络传输。同样,在接收端,应用程序可以按原样处理收到的字节,而无需反序列化步骤。

这里的最大优势在于,数据可以无需任何转换,按原样从一个环境传输到另一环境,因为连接两侧对数据都是按原样理解的。

这里的主要缺陷是失去了编程中的特性(idiosyncrasies)。所有操作都在字节缓冲区上执行。没有整数,有字节序列。没有数组,有字节序列。没有对象,有字节序列的集合。当然,你仍然可以将通用格式的数据转换为整数、数组和对象。但这样以来你就要进行反序列化,零复制就失去了意义。一旦传输到Java对象,则只有Java才能处理数据。

Apache Arrow和Java:大数据传输快如闪电 2
在实践中这是如何工作的呢?我们来简单看一下两个零复制序列化框架:Apache Arrow和来自谷歌的FlatBuffers。尽管两者都是零复制框架,但是它们是针对不同用例的不同设计取向。

FlatBuffers最初是为了支持移动游戏而开发的。它的重点在于以最小的开销将数据从服务器快速传输到客户端。你可以发送单个对象或对象集合。数据存储在(堆上)ByteBuffer中,格式为FlatBuffers通用数据布局。FlatBuffers编译器将根据数据规范生成代码,从而简化你与ByteBuffers的交互。你可以像处理数组、对象或原语一样处理数据。在后台,每个访问器(accessor)方法都获取相应的字节并将字节转换为JVM和你的代码可理解的构造。如果出于某种原因需要访问字节,你也可以这样做。

Arrow与FlatBuffers的不同之处在于它们在内存中布局列表/数组/表的方式。FlatBuffers对表使用一种面向行的格式,而Arrow使用一种列式格式存储表格化数据。这就在对大数据集的分析化(OLAP)查询方面带来了很大的不同。

Arrow针对的是大数据系统,在系统中你通常不传输单个对象,而是传输大量对象。另一方面,FlatBuffers宣传自己是序列化框架(也是这么用的)。换句话说,你的应用程序代码处理的是Java对象和原语,并且只在发送数据时将数据转换为FlatBuffers的内存布局。如果接收侧是只读的,则不必将数据反序列化为Java对象,可以直接从FlatBuffers的ByteBuffers中读取数据。

Apache Arrow和Java:大数据传输快如闪电 3
在大型数据集中,行数通常可以从数千行到数万亿行不等。这样的数据集可能有几列到数千列。

对此类数据集的典型分析查询只会引用少量列。以电子商务交易的数据集为例,一位销售经理想要一个特定区域的销售概览,按项目类别分组。他不想看到具体的交易,平均销售价格就足够了。这样的查询可以通过三个步骤来完成:

  • 遍历区域列中的所有值,并跟踪请求区域中所有销售的行/对象ID
  • 根据项目类别列中的对应值分组过滤后的ID
  • 计算每个组的汇总

基本上,查询处理器在任何给定时间只需在内存中有一列即可。通过以列式格式存储集合,我们可以分别访问单个字段/列的所有值。精心设计的格式可以针对CPU的SIMD指令优化布局,进而完成这一操作。对于此类分析负载,Apache Arrow列式布局比FlatBuffers面向行的布局更合适。

Apache Arrow

Apache Arrow的核心是内存中数据布局格式。除了该格式外,Apache Arrow还提供了一组库(包括C、C++、C#、Go、Java、JavaScript、MATLAB、Python、R、Ruby和Rust),以使用Apache Arrow格式的数据。本文的剩余部分会介绍Arrow的基本概念,以及如何使用Apache Arrow编写Java应用程序。

基本概念

向量Schema根

假设我们正在建模一个连锁店的销售记录。一般来说,你会遇到一个代表某次销售过程的对象。这样的对象将具有各种属性,例如

  • 一个ID
  • 关于这次销售所在商店的信息,例如地区、城市,也许还有商店的类型
  • 一些客户信息
  • 所售商品的编号
  • 所售商品的类别(可能是子类别)
  • 卖了多少商品
  • 等等…

在Java中,销售由Sale类建模。这个类包含单次销售的所有信息。所有销售都通过Sale对象的集合(在内存中)表示。从数据库角度来看,Sale对象的集合等效于面向行的关系型数据库。实际上,往往在这样的应用程序中,对象的集合被映射到数据库中的yi ge 4关系表以实现持久性。

在面向列的数据库中,对象的集合分解为列的集合。所有ID都存储在单个列中。在内存中,所有ID都按顺序存储。同样,有一列用于存储各次销售所在商店的全部所属城市。从概念上讲,这种列式格式可被认为是将一组对象分解为一组等长数组。对象中每个字段对应一个数组。

为了重建某个对象,可在给定索引处选择每个列/数组的值来重组被分解的数组。例如,可以获取id数组的第10个值、商店城市数组的第10个值等来重组第10次销售。

Apache Arrow的工作机制类似于面向列的关系型数据库。Java对象的集合被分解为列的集合,这些列在Arrow中称为向量。向量是Arrow列式格式的基本单位。

所有向量的母亲都是FieldVector。有用于原始类型的向量类型,如Int4Vector和Float8Vector。字符串有一个向量类型:VarCharVector。任意二进制数据都有一个向量类型:VarBinaryVector。有几种类型的向量是用来建模时间的,如TimeStampVector、TimeStampSecVector、TimeStampTZVector和TimeMicroVector。

这里还可以组成更复杂的结构。StructVector用于将一组向量分组为一个字段。例如,考虑上面的销售示例中的商店信息。所有商店信息(地区、城市和类型)都可以分组在一个StructVector中。ListVector允许在一个字段中存储一个可变长度的元素列表。MapVector在一个向量中存储一个键值映射。

继续拿数据库做类比,用表表示对象的集合。为了标识表中的值,表有一个schema:名称到类型的一个映射。在面向行的数据库中,每行将一个名称映射到一个预定义类型的值。在Java中,Schema对应于类定义的成员变量集。面向列的数据库同样具有schema。在表中,schema中的每个名称都映射到一个预定义类型的列。

在Apache Arrow术语中,向量的集合由VectorSchemaRoot(向量Schema根)表示。VectorSchemaRoot还包含一个Schema,将名称(也称为字段)映射到列(也称为向量)。

缓冲区分配器

我们添加到向量中的值存储在哪里?Arrow向量是由缓冲区支持的。一般来说这是一个java.nio.ByteBuffer。缓冲区在缓冲区分配器中池化。你可以要求缓冲区分配器创建一个特定大小的缓冲区,也可以让分配器负责缓冲区的创建和自动扩展以存储新值。分配器跟踪所有已分配的缓冲区。

一个向量由一个分配器管理。我们说支持向量的缓冲区是分配器所有的。向量所有权可以从一个分配器转移到另一个上。

例如,你正在实现一个数据流。这个流由一系列处理阶段组成。每个阶段都会对数据进行一些操作,然后将其传送到下一阶段。每个阶段都有自己的缓冲区分配器,用于管理当前正在处理的缓冲区。处理完成后,数据将进入下一个阶段。

换句话说,支持向量的缓冲区的所有权被转移到下一级的缓冲区分配器。现在,这个缓冲区分配器负责管理内存并在不再需要内存时将其释放。

分配器创建的缓冲区是DirectByteBuffers(直接字节缓冲),因此它们是堆外存储的。这意味着数据使用完后必须释放内存。对Java程序员来说,这种感觉一开始会很奇怪。但这是使用Apache Arrow的一个要点。向量实现了AutoCloseable(可自动关闭)接口,因此建议将向量创建包装在try-with-resources块中,该块将自动关闭向量,也就是释放内存。

示例:写,读和处理

介绍的最后一部分,我们将具体介绍一个使用Apache Arrow的示例应用程序。它的机制是从磁盘上的一个文件中读取一个人员的“数据库”,然后过滤和汇总数据,最后打印出结果。

务必注意,Apache Arrow是内存中格式。在实际应用中,最好使用针对持久存储优化的其他(列式)格式,例如Parquet。Parquet会对写入磁盘的数据做压缩处理并添加中间摘要。因此,从磁盘读取和写入Parquet文件应该比读取和写入Apache Arrow文件更快。在此示例中,Arrow仅用于教学目的。

假设我们有一个Person类和一个Address类(仅显示相关部分):
public Person(String firstName, String lastName, int age, Address address) {
    this.firstName = firstName;
    this.lastName = lastName;
    this.age = age;
    this.address = address;
}
public Address(String street, int streetNumber, String city, int postalCode) {
    this.street = street;
    this.streetNumber = streetNumber;
    this.city = city;
    this.postalCode = postalCode;
}

我们将编写两个应用程序。第一个程序将生成一个随机生成的人员集合,并将其以Arrow格式写入磁盘。接下来我们将编写一个应用程序,其将“人员数据库”以Arrow格式从磁盘读取到内存中。选出所有符合以下特征的人员:

  • 姓氏以“P”开头
  • 年龄在18至35岁之间
  • 住在名称以“way”结尾的街道上

对于选定的人员,我们按每个城市分组并计算各组的平均年龄。看过这个示例后,你应该能对如何使用Apache Arrow实施内存中数据分析有一些概念了。

可以在此Git存储库中找到此示例的代码。

写数据

在我们开始写数据之前。请注意Arrow格式针对的是内存中数据。它尚未针对磁盘数据存储进行优化。在实际的应用程序中,你应该考虑诸如Parquet之类的格式,该格式支持压缩和其他一些技巧以加快磁盘上列式数据的存储,进而持久保存数据。在这里,我们将以Arrow格式写出数据,以简化讨论并强调重点。

给定一个Person对象数组,我们开始将数据写到一个名为people.arrow的文件中。第一步是将Person对象的数组转换为Arrow VectorSchemaRoot。如果你确实想充分利用Arrow,可以让整个应用程序都使用Arrow向量。但是出于教学目的,在此处进行转换更好些。

private void vectorizePerson(int index, Person person, VectorSchemaRoot schemaRoot) {
    // Using setSafe: it increases the buffer capacity if needed
    ((VarCharVector) schemaRoot.getVector("firstName")).setSafe(index, person.getFirstName().getBytes());
    ((VarCharVector) schemaRoot.getVector("lastName")).setSafe(index, person.getLastName().getBytes());
    ((UInt4Vector) schemaRoot.getVector("age")).setSafe(index, person.getAge());
    List childrenFromFields = schemaRoot.getVector("address").getChildrenFromFields();
    Address address = person.getAddress();
    ((VarCharVector) childrenFromFields.get(0)).setSafe(index, address.getStreet().getBytes());
    ((UInt4Vector) childrenFromFields.get(1)).setSafe(index, address.getStreetNumber());
    ((VarCharVector) childrenFromFields.get(2)).setSafe(index, address.getCity().getBytes());
    ((UInt4Vector) childrenFromFields.get(3)).setSafe(index, address.getPostalCode());
}

在vectorizePerson中,Person对象通过person schema映射到schemaRoot中的向量。setSafe方法确保后备缓冲区足够大,以容纳下一个值。如果后备缓冲区不够大,则将扩展缓冲区。
VectorSchemaRoot是schema和向量集合的容器。这样,VectorSchemaRoot类可以被视为一个无模式数据库,因此只有在对象实例化阶段在构造器中传递schema时,才能知道这个schema。因此,所有方法(如getVector)都有非常通用的返回类型,这里是FieldVector。结果,需要基于schema或数据集的知识进行大量casting。

在此示例中,我们可以选择预分配UInt4Vectors和UInt2Vector(因为我们预先知道在一批中有多少人)。然后我们可以使用set方法来避免缓冲区大小检查和重新分配,以扩展缓冲区。

vectorizePerson函数可以传递给一个ChunkedWriter,ChunkedWriter是处理分块并写入Arrow格式化的二进制文件的抽象。

void writeToArrowFile(Person[] people) throws IOException {
   new ChunkedWriter(CHUNK_SIZE, this::vectorizePerson).write(new File("people.arrow"), people);
}
The ChunkedWriter has a write method that looks like this:
public void write(File file, Person[] values) throws IOException {
   DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();
   try (RootAllocator allocator = new RootAllocator();
        VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(personSchema(), allocator);
        FileOutputStream fd = new FileOutputStream(file);
        ArrowFileWriter fileWriter = new ArrowFileWriter(schemaRoot, dictProvider, fd.getChannel())) {
       fileWriter.start();
       int index = 0;
       while (index < values.length) {
           schemaRoot.allocateNew();
           int chunkIndex = 0;
           while (chunkIndex < chunkSize && index + chunkIndex < values.length) {
               vectorizer.vectorize(values[index + chunkIndex], chunkIndex, schemaRoot);
               chunkIndex++;
           }
           schemaRoot.setRowCount(chunkIndex);
           fileWriter.writeBatch();
           index += chunkIndex;
           schemaRoot.clear();
       }
       fileWriter.end();
   }
}

分别解释一下。首先,我们创建一个(i)分配器、(ii)schemaRoot和(iii)dictProvider。我们需要它们(i)分配内存缓冲区,(ii)是向量的容器(由缓冲区支持),以及(iii)进行字典压缩(你现在可以忽略它)。
接下来,在(2)中创建一个ArrowFileWriter。它基于一个VectorSchemaRoot处理写入磁盘的操作。用这种方式批量写出数据集非常容易。最后一个要点是,不要忘记启动写入器(writer)。

该方法的其余部分则将Person数组按块向量化到向量schema根中,然后逐批写出。

分批写入有什么好处?在某些时候将从磁盘读取数据。如果将数据一次写入,则必须一次读取所有数据并将其存储在主内存中。通过分批写入,我们允许读取器(reader)以较小的块处理数据,从而控制了内存占用。

一定不要忘记设置向量的值计数或向量schema根的行计数(这会间接设置所有包含的向量的值计数)。如果不设置计数,即使将值存储在向量中,向量也将显示为空。

最后,当所有数据都存储在向量中时,fileWriter.writeBatch()会将它们提交到磁盘。

有关内存管理的说明

请注意第(3)和(4)行上的schemaRoot.clear()和allocator.close()。前者清除VectorSchemaRoot中包含的所有向量中的所有数据,并将行和值计数重置为零。后者关闭分配器。如果你忘记释放所有分配的缓冲区,则此调用将通知你出现了一个内存泄漏。

在这里的设置下关闭有点多余,因为程序在分配器关闭之后不久就会退出。但在真实的,长期运行的应用程序中,内存管理至关重要。

Java程序员并不习惯关注内存管理。但在这种情况下,这是为性能付出的代价。要特别注意分配的缓冲区,并在其生命周期结束时释放它们。

读取数据

从Arrow格式的文件中读取数据类似于写入。你设置了一个分配器、一个向量schema根(没有schema,它是文件的一部分),打开了一个文件,然后让ArrowFileReader负责其余的工作。不要忘记初始化,因为这将从文件中读取Schema。

要读取一个batch,请调用fileReader.loadNextBatch()。下一批(如果仍然可用)会从磁盘读取,并且schemaRoot中的向量缓冲区将用数据填充,准备处理。

以下代码段简要描述了如何读取一个Arrow文件。while循环的每次执行都会将一个批处理加载到VectorSchemaRoot中。批处理的内容由VectorSchemaRoot描述:(i)VectorSchemaRoot的架构,和(ii)值计数,等于条目数。

try (FileInputStream fd = new FileInputStream("people.arrow");
    ArrowFileReader fileReader = new ArrowFileReader(new SeekableReadChannel(fd.getChannel()), allocator)) {
   // Setup file reader
   fileReader.initialize();
   VectorSchemaRoot schemaRoot = fileReader.getVectorSchemaRoot();
   // Aggregate: Using ByteString as it is faster than creating a String from a byte[]
   while (fileReader.loadNextBatch()) {
      // Processing … 
   }
}

处理数据

最后的要点是过滤,分组和汇总步骤,通过它们你应该能大致了解如何在数据分析软件中使用Arrow向量。我并不是说这就是使用Arrow向量的方法,但它应该为探索Apache Arrow的过程提供一个稳固的起点。在这里查看用于实际Arrow代码的Gandiva处理引擎的源代码。使用Apache Arrow处理数据是一个很大的话题。你甚至可以为此写一本书

请注意,示例代码是特别针对Person用例的。在其他情况下,比如构建一个Arrow向量的查询处理器时,我们事先无法知道这些向量的名称和类型,这就会带来更通用,更难理解的代码。

由于Arrow是一种列式格式,因此我们可以只使用一列就独立应用过滤步骤。

private IntArrayList filterOnAge(VectorSchemaRoot schemaRoot) {
    UInt4Vector age = (UInt4Vector) schemaRoot.getVector("age");
    IntArrayList ageSelectedIndexes = new IntArrayList();
    for (int i = 0; i < schemaRoot.getRowCount(); i++) {
        int currentAge = age.get(i);
        if (18 <= currentAge && currentAge <= 35) {
            ageSelectedIndexes.add(i);
        }
    }
    ageSelectedIndexes.trim();
    return ageSelectedIndexes;
}

此方法收集年龄向量的加载块中,全部值在18到35之间的索引。
每个过滤器都会生成一个此类索引的排序列表。在下一步中,我们将这些列表相交/合并到选定索引的单个列表中。该列表包含满足所有条件的行的所有索引。

下一个代码片段显示了如何从向量和选定ID的集合中轻松填充汇总数据结构(将城市映射为一个计数和一个总和)。

VarCharVector cityVector = (VarCharVector) ((StructVector) schemaRoot.getVector("address")).getChild("city");
UInt4Vector ageDataVector = (UInt4Vector) schemaRoot.getVector("age");
for (int selectedIndex : selectedIndexes) {
   String city = new String(cityVector.get(selectedIndex));
   perCityCount.put(city, perCityCount.getOrDefault(city, 0L) + 1);
   perCitySum.put(city, perCitySum.getOrDefault(city, 0L) + ageDataVector.get(selectedIndex));
}

填充汇总数据结构后,很容易打印出每个城市的平均年龄:

for (String city : perCityCount.keySet()) {
    double average = (double) perCitySum.get(city) / perCityCount.get(city);
    LOGGER.info("City = {}; Average = {}", city, average);
}

总结

本文介绍了Apache Arrow,这是一种列式,内存中,跨语言的数据布局格式。它是大数据系统的基础,主要关注群集中机器之间以及不同大数据系统之间的高效数据传输。为了使用Apache Arrow开发Java应用程序,我们看了两个示例应用程序,它们以Arrow格式写入和读取数据。我们还试着使用了Apache Arrow Java库来处理数据。

Apache Arrow是一种列式格式。面向列的布局通常比面向行的布局更适合分析负载。但权衡取舍总是免不了的。对于你的特定负载而言,面向行的格式可能会带来更好的结果。

VectorSchemaRoots,缓冲区和内存管理用起来并不像你惯用的那些Java代码。如果你可以从其他框架(例如FlatBuffers)中获得所需的全部性能,那么在你决定是否在应用程序中采用Apache Arrow时,这种不常用的工作方式可能就要考虑进来了。

作者介绍:

Joris Gillis是TrendMiner的研究开发人员。TrendMiner为IIoT时间序列数据创建自助分析软件。作为一名研究开发人员,他主要研究可伸缩的分析算法,时间序列数据库以及与外部时间序列数据源的连接。

原文链接

Apache Arrow and Java: Lightning Speed Big Data Transfer