Flink managed memory是由flink管理的内存,不受JVM管理。
自主内存管理的优点:
内存更可控,可定制更高效的算法;
减少JVM GC压力;
节省数据内存空间占用;
高效的二进制操作和缓存敏感性;
MemoryManager 负责将 MemorySegments 分配、计算和分发给数据处理操作符,例如 sort 和 join 等操作符。MemorySegment 是 Flink 的内存分配单元,默认大小为 32 KB,支持堆内和堆外内存分配。
MemorySegments 在 TaskManager 启动时分配一次,并在 TaskManager 关闭时销毁。因此,在 TaskManager 的整个生命周期中,MemorySegment 是重用的,而不会被垃圾收集的。在初始化 TaskManager 的所有内部数据结构并且已启动所有核心服务之后,MemoryManager 开始创建 MemorySegments。默认情况下, MemoryManager使用堆外内存,在数据传输时,减少了堆内存向堆外内存复制的过程,传输效率更高。
Flink 有如下几种数据类型的 TypeInformations:
BasicTypeInfo:所有 Java 的基础类型或 java.lang.String
BasicArrayTypeInfo:Java 基本类型构成的数组或 java.lang.String
WritableTypeInfo:Hadoop 的 Writable 接口的任何实现
TupleTypeInfo:任何 Flink tuple(Tuple1 到 Tuple25)。Flink tuples 是具有类型化字段的固定长度元组的 Java 表示
CaseClassTypeInfo:任何 Scala CaseClass(包括 Scala tuples)
PojoTypeInfo:任何 POJO(Java 或 Scala),即所有字段都是 public 的或通过 getter 和 setter 访问的对象,遵循通用命名约定
GenericTypeInfo:不能标识为其他类型的任何数据类型
每个 TypeInformation 都为它所代表的数据类型提供了一个序列化器。例如,BasicTypeInfo 返回一个序列化器,该序列化器写入相应的基本类型;WritableTypeInfo 的序列化器将序列化和反序列化委托给实现 Hadoop 的 Writable 接口的对象的 write() 和 readFields() 方法;GenericTypeInfo 返回一个序列化器,该序列化器将序列化委托给 Kryo。对象将自动通过 Java 中高效的 Unsafe 方法来序列化到 DataOutput中,DataOutput内存由MemorySegments提供。对于可用作键的数据类型,例如哈希值,TypeInformation 提供了 TypeComparators,TypeComparators 比较和哈希对象,并且可以根据具体的数据类型有效的比较二进制并提取固定长度的二进制 key 前缀。
通过提供定制的 TypeInformations、Serializers(序列化器) 和 Comparators(比较器),可以方便地扩展 Flink 的类型系统,从而提高序列化和比较自定义数据类型的性能。
Flink支持二进制数据的操作,比如比较和元素交换的方法,使得其排序算法的性能也有提升。
那么,对二进制数据进行操作对性能意味着什么?我们将运行一个基准测试,对 1000 万个Tuple2<Integer, String>对象进行排序以找出答案。整数字段的值从均匀分布中采样。String 字段值的长度为 12 个字符,并从长尾分布中进行采样。输入数据由返回可变对象的迭代器提供,即返回具有不同字段值的相同 Tuple 对象实例。Flink 在从内存,网络或磁盘读取数据时使用此技术,以避免不必要的对象实例化。基准测试在具有 900 MB 堆大小的 JVM 中运行,在堆上存储和排序 1000 万个 Tuple 对象并且不会导致触发 OutOfMemoryError 大约需要这么大的内存。我们使用三种排序方法在Integer 字段和 String 字段上对 Tuple 对象进行排序:
对象存在堆中:Tuple 对象存储在常用的 java.util.ArrayList 中,初始容量设置为 1000 万,并使用 Java 中常用的集合排序进行排序。
Flink 序列化:使用 Flink 的自定义序列化程序将 Tuple 字段序列化为 600 MB 大小的排序缓冲区,如上所述排序,最后再次反序列化。在 Integer 字段上进行排序时,完整的 Integer 用作排序 key,以便排序完全发生在二进制数据上(不需要对象的反序列化)。对于 String 字段的排序,使用 8 字节前缀 key,如果前缀 key 相等,则对 Tuple 对象进行反序列化。
Kryo 序列化:使用 Kryo 序列化将 Tuple 字段序列化为 600 MB 大小的排序缓冲区,并在没有二进制排序 key 的情况下进行排序。这意味着每次比较需要对两个对象进行反序列化。
所有排序方法都使用单线程实现。结果的时间是十次运行结果的平均值。在每次运行之后,我们调用System.gc()请求垃圾收集运行,该运行不会进入测量的执行时间。下图显示了将输入数据存储在内存中,对其进行排序并将其作为对象读回的时间。
从图中可以看出,Flink序列化显著提升了数据载入时的性能,在排序算法上性能表现也最佳。