它会强制管理存储(storage)和执行(execution)之间的内存使用
# 记录用了多少 storage memory 和 execution memory
# 申请 storage、execution 和 unroll memory
# 释放 storage 和 execution memory
execution memory: 是指 shuffles,joins,sorts 和 aggregation 的计算操作
storage memory:是指persist或者cache缓存数据到内存等
unroll memory: 则是指展开block本身就需要耗费内存,好比打开文件,打开文件我们是需要耗费内存的
MemoryManager根据spark.memory.useLegacyMode这个配置项决定你是否使用遗留的MemoryManager策略即StaticMemoryManager。默认是不使用StaticMemoryManager,而是UnifiedMemoryManager。
一 MemoryManager
1.1 核心属性
Int numCores: 核数
Long onHeapStorageMemory:堆内storage 内存大小
Long onHeapExecutionMemory: 堆内execution内存大小
StorageMemoryPool onHeapStorageMemoryPool:创建堆内storage内存池
StorageMemoryPool offHeapStorageMemoryPool:创建堆外storage内存池
ExecutionMemoryPool onHeapExecutionMemoryPool:创建堆内execution内存池
ExecutionMemoryPool offHeapExecutionMemoryPool:创建堆外execution内存池
Long maxOffHeapMemory: 最大的对外内存大小,可以由spark.memory.offHeap.size配置,如果要配置必须启用了才可以生效spark.memory.offHeap.enabled
Long maxOnHeapStorageMemory: 最大的堆内storage内存大小
Long maxOffHeapStorageMemory 最大的堆外storage内存大小
1.2 重要方法
# 释放numBytes字节的执行内存
defreleaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Unit = synchronized {
memoryMode match{
case MemoryMode.ON_HEAP=> onHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)
case MemoryMode.OFF_HEAP=> offHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)
}
}
# 释放指定task的所有execution内存
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized { onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) + offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) }
# 释放N字节存储内存
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { memoryMode match { case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes) case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes) } }
# 释放所有存储内存
final def releaseAllStorageMemory(): Unit = synchronized { onHeapStorageMemoryPool.releaseAllMemory() offHeapStorageMemoryPool.releaseAllMemory() }
二 StaticMemoryManager
Executor的内存界限分明,分别由3部分组成:execution,storage和system。对各部分内存静态划分好后便不可变化
# executor:execution内存大小通过设置spark.shuffle.memoryFraction参数来控制大小,默认为0.2。
为了避免shuffle,join,排序和聚合这些操作直接将数据写入磁盘,所设置的buffer大小,减少了磁盘读写的次数。
#storage: storage内存大小通过设置spark.storage.memoryFraction参数来控制大小,默认为0.6。
用于存储用户显示调用的persist,cache,broadcast等命令存储的数据空间。
#system:程序运行需要的空间,存储一些spark内部的元数据信息,用户的数据结构,避免一些不寻常的大记录带来的OOM。
这种划分方式,在某些时候可能会带来一定的资源浪费,比如我对cache或者persist没啥要求,那么storage的内存就剩余了
由于很多属性都继承了父类MemoryManager,在这里不做赘述。
# maxUnrollMemory
最大的block展开内存空间,默认是占用最大存储内存的20%
private val maxUnrollMemory: Long = { (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong }
# 申请分配storage内存,注意StaticMemoryManager不支持堆外内存
override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { require(memoryMode != MemoryMode.OFF_HEAP, "StaticMemoryManager does not support off-heap storage memory") // 要申请的空间大小超过最大的storage内存,肯定失败 if (numBytes > maxOnHeapStorageMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxOnHeapStorageMemory bytes)") false } else { // 调用StorageMemoryPool分配numBytes字节内存 onHeapStorageMemoryPool.acquireMemory(blockId, numBytes) } }
# acquireUnrollMemory 用于申请展开block的内存
override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { require(memoryMode != MemoryMode.OFF_HEAP, "StaticMemoryManager does not support off-heap unroll memory") // 当前storage内存用于展开block的所需要内存 val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory // 当前storage中获取空闲的内存 val freeMemory = onHeapStorageMemoryPool.memoryFree // 判断还剩余的可用于展开block的内存-还剩余的内存,如果小于或者等0表示不够分配了,没有空闲内存可供分配 val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) }
# acquireExecutionMemory申请执行内存
override def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { memoryMode match { case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) } }
# getMaxStorageMemory 返回有效的最大的storage内存空间
private def getMaxStorageMemory(conf: SparkConf): Long = { // 系统最大内存内存 val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) // 内存占用比例 val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) // 安全比例 val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) (systemMaxMemory * memoryFraction * safetyFraction).toLong }
# getMaxExecutionMemory 返回最大的execution内存空间
private def getMaxExecutionMemory(conf: SparkConf): Long = { // 系统内存空间 val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) // 如果系统内存空间 < 最小内存空间,抛出异常 if (systemMaxMemory < MIN_MEMORY_BYTES) { throw new IllegalArgumentException(s"System memory $systemMaxMemory must " + s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " + s"option or spark.driver.memory in Spark configuration.") } // 如果指定了执行内存空间 if (conf.contains("spark.executor.memory")) { // 获取执行执行内存空间 val executorMemory = conf.getSizeAsBytes("spark.executor.memory") // 如果执行内存空间 < 最小内存,抛出异常 if (executorMemory < MIN_MEMORY_BYTES) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$MIN_MEMORY_BYTES. Please increase executor memory using the " + s"--executor-memory option or spark.executor.memory in Spark configuration.") } } // shuffle内存占用比例 val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) // shuffle内存安全比例 val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) (systemMaxMemory * memoryFraction * safetyFraction).toLong }
三UnifiedMemoryManager
由于传统的StaticMemoryManager存在资源浪费问题,所以引入了这个MemoryManager。UnifiedMemoryManager管理机制淡化了execution空间和storage空间的边界,让它们之间可以相互借内存。
它们总共可用的内存由spark.memory.fraction决定,默认0.6.可使用的堆内存比例 * 可使用的内存。在该空间内部,对execution和storage进行了进一步的划分。由spark.memory.storageFraction决定
# 计算最大的存储内存
计算最大的存储内存 = 最大内存 - 最大执行内存
override def maxOnHeapStorageMemory: Long = synchronized { maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed }
# 计算最大的堆外存储内存
计算最大的堆外存储内存 = 最大堆外内存 - 最大堆外执行内存
override def maxOffHeapStorageMemory: Long = synchronized { maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed }
# acquireExecutionMemory 申请执行内存
override private[memory] def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { assertInvariants() assert(numBytes >= 0) // 跟据不同内存模式,构建不同的组件和初始值 val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, onHeapStorageRegionSize, maxHeapMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, offHeapStorageMemory, maxOffHeapMemory) } // 通过回收缓存的block,会增加执行内存,从而存储内存量就占用内存量减少了 // 当为task申请内存的实时呢,执行内存需要多次尝试,每一次尝试可能都会回收一些存储内存 def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { // 如果需要申请的内存大于0 if (extraMemoryNeeded > 0) { // 计算execution 可以借到的storage的内存,应该是storage的空闲内存空间和可借出的内存较大者 val memoryReclaimableFromStorage = math.max( storagePool.memoryFree,// storage的空闲内存空间 storagePool.poolSize - storageRegionSize) // 可借出的内存 // 如果可以借到内存 if (memoryReclaimableFromStorage > 0) { // 减小pool大小,释放一些内存空间 val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) storagePool.decrementPoolSize(spaceToReclaim) executionPool.incrementPoolSize(spaceToReclaim) } } } // 计算存储内存占用的内存和存储 def computeMaxExecutionPoolSize(): Long = { maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } executionPool.acquireMemory( numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) } // 申请分配存储内存 override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { assertInvariants() assert(numBytes >= 0) // 跟据不同内存模式,构建不同的组件和初始值v val (executionPool, storagePool, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, maxOnHeapStorageMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, maxOffHeapMemory) } // 如果要申请的内存空间大于最大内存空间,直接返回false if (numBytes > maxMemory) { logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxMemory bytes)") return false } // 如果要申请的内存空间比当前storage剩余空间多,不够用则去向execution借 if (numBytes > storagePool.memoryFree) { // 表示没有足够内存,需要从执行缓存借一些数据,增加storage内存,缩小execution内存 val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } storagePool.acquireMemory(blockId, numBytes) }
# acquireStorageMemory 申请分配存储内存
override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { assertInvariants() assert(numBytes >= 0) // 跟据不同内存模式,构建不同的组件和初始值v val (executionPool, storagePool, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, maxOnHeapStorageMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, maxOffHeapMemory) } // 如果要申请的内存空间大于最大内存空间,直接返回false if (numBytes > maxMemory) { logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxMemory bytes)") return false } // 如果要申请的内存空间比当前storage剩余空间多,不够用则去向execution借 if (numBytes > storagePool.memoryFree) { // 表示没有足够内存,需要从执行缓存借一些数据,增加storage内存,缩小execution内存 val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } storagePool.acquireMemory(blockId, numBytes) }
#getMaxMemory 返回最大的内存
private def getMaxMemory(conf: SparkConf): Long = { // 获取系统内存 val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) // 获取预留的内存 val reservedMemory = conf.getLong("spark.testing.reservedMemory", if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) // 最小的系统内存 val minSystemMemory = (reservedMemory * 1.5).ceil.toLong // 如果系统内存 < 最小的系统内存,抛出异常 if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + s"option or spark.driver.memory in Spark configuration.") } // 如果指定了executor内存 if (conf.contains("spark.executor.memory")) { // 获取executor内存 val executorMemory = conf.getSizeAsBytes("spark.executor.memory") // 如果executor内存 < 最小的系统内存抛出异常 if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + s"--executor-memory option or spark.executor.memory in Spark configuration.") } } // 系统内存 - 预留的系统内存 = 可使用的内存 val usableMemory = systemMemory - reservedMemory // 可使用的JVM堆内存比例,默认60% val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) // 返回可使用的堆内存比例 * 可使用的内存 (usableMemory * memoryFraction).toLong }