BlockStore是存储block抽象类,子类包括DiskStore,MemoryStore以及ExternalBlockStore等
一 DiskStore 磁盘存储
存储数据块(block)到磁盘,我我们可以在DiskStore中配置多个存放block的目录,DiskBlockManager会根据 这些配置创建不同的文件夹,存放block
二 MemoryStore 内存存储
# getSize 获取指定blockId对应的block文件大小
defgetSize(blockId:BlockId): Long = {
diskManager.getFile(blockId.name).length
}
# put 放数据
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = { // 判断指定的blockId对应的block的文件是否存在 if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis // 获取block所在文件 val file = diskManager.getFile(blockId) // 构建文件输出流 val fileOutputStream = new FileOutputStream(file) var threwException: Boolean = true try { // 将数据写入blockId指定的文件 writeFunc(fileOutputStream) threwException = false } finally { try { Closeables.close(fileOutputStream, threwException) } finally { if (threwException) { remove(blockId) } } } val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(file.length()), finishTime - startTime)) }
# putBytes 据指定的byte数据,将其写入block文件
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { fileOutputStream => val channel = fileOutputStream.getChannel Utils.tryWithSafeFinally { bytes.writeFully(channel) } { channel.close() } } }
二 MemoryStore
2.1 核心属性
BlockInfoManager blockInfoManager:跟踪单个数据块的元数据
SerializerManager serializerManager:序列化管理器
MemoryManager memoryManager:内存管理器
BlockEvictionHandler blockEvictionHandler:回收block的处理器
LinkedHashMap[BlockId, MemoryEntry[_]] entries:存放在内存的block数据
HashMap[Long, Long] onHeapUnrollMemoryMap:一个
HashMap[Long, Long] offHeapUnrollMemoryMap:开的block已经使用的内存(对外存储)
Long unrollMemoryThreshold:展开block之前初始化的内存阀值
2.2 重要的类和方法
# Long maxMemory :最大的内存大小
private def maxMemory: Long = { memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory }
# memoryUsed 已经使用的内存,包括正展开的block使用的内存
private def memoryUsed: Long = memoryManager.storageMemoryUsed
# blocksMemoryUsed 经写完block占用的内存,不包括正在展开的block
private def blocksMemoryUsed: Long = memoryManager.synchronized { memoryUsed - currentUnrollMemory }
# putBytes 往内存添加数据
往内存添加数据,如果内存足够,则创建ByteBuffer,然后放进MemoryStore,否则不会创建ByteBuffer
def putBytes[T: ClassTag](blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // 通过MemoryManager申请storage内存 if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // 如果为这个block申请到了足够的内存 val bytes = _bytes() assert(bytes.size == size) // 创建一个SerializedMemoryEntry,然后放入内存 val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) true } else { false } }
# putIteratorAsValues 尝试将一个迭代器放到block
private[storage] def putIteratorAsValues[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // 目前为止已经展开了多少元素 var elementsUnrolled = 0 // 是否这里还有足够的内存继续保持这个block展开 var keepUnrolling = true // Initial per-task memory to request for unrolling blocks (bytes). // 对于展开的数据块,初始化配一个任务内存 val initialMemoryThreshold = unrollMemoryThreshold // 多久检查一次我们是否需要请求更多的内存 val memoryCheckPeriod = 16 // 这个task预留的内存 var memoryThreshold = initialMemoryThreshold // 内存请求增长因子 val memoryGrowthFactor = 1.5 // 跟踪特殊block或者putIterator操作展开的内存 var unrollMemoryUsedByThisBlock = 0L // 对于展开的block构建一个vector,只用于添加,然后预计其大小 var vector = new SizeTrackingVector[T]()(classTag) // 为任务预定内存用于展开指定的block,因为展开block也需要消费内存 keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP) // 如果没有预定的内存为展开指定的block,给出警告信息 if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") } else { // 否则这个unrollMemoryUsedByThisBlock 需要加上初始化内存阀值initialMemoryThreshold,表示预留给展开block的内存 unrollMemoryUsedByThisBlock += initialMemoryThreshold } // 安全展开block,定期检查是否我们超过阀值 while (values.hasNext && keepUnrolling) { // 迭代每一个vlaue vector += values.next() // 是否达到我们需要进行内存申请检测 if (elementsUnrolled % memoryCheckPeriod == 0) { // 如果满足条件触发了检测,先获取预估的大小,如果预估的大小超过了内存阀值 val currentSize = vector.estimateSize() // 如果超过了阀值,则需要申请更多的内存,申请算法(当前大小 * 内存增长因子 - 内存阀值) if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong // 再次为task申请预定用于展开block的内存 keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) // 如果预订成功 if (keepUnrolling) { // 则更新unrollMemoryUsedByThisBlock unrollMemoryUsedByThisBlock += amountToRequest } // 当前内存的阀值也需要更新了 memoryThreshold += amountToRequest } } // 更新滚动元素 elementsUnrolled += 1 } // 如果预定成功 if (keepUnrolling) { // 将vector转变位数组 val arrayValues = vector.toArray vector = null // 创建一个反序列化的DeserializedMemoryEntry对象 val entry = new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { // Synchronize so that transfer is atomic memoryManager.synchronized { releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } } // Acquire storage memory if necessary to store this block in memory. // 如果需要在内存存储block,申请storage内存 val enoughStorageMemory = { // 滚动这个block使用内存 < block的大小 if (unrollMemoryUsedByThisBlock <= size) { // 我们需要申请额外的storage内存 val acquiredExtra = memoryManager.acquireStorageMemory( blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) if (acquiredExtra) { transferUnrollToStorage(unrollMemoryUsedByThisBlock) } acquiredExtra } else { // unrollMemoryUsedByThisBlock > size val excessUnrollMemory = unrollMemoryUsedByThisBlock - size releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) transferUnrollToStorage(size) true } } // storage内存足够的话,将entry放入内存中 if (enoughStorageMemory) { entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = arrayValues.toIterator, rest = Iterator.empty)) } } else { // 没有足够展开内存用于打开block logUnrollFailureMessage(blockId, vector.estimateSize()) Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values)) } }
# remove 从内存中删除某一个blockId对应的数据
def remove(blockId: BlockId): Boolean = memoryManager.synchronized { // 从内存中删除 val entry = entries.synchronized { entries.remove(blockId) } if (entry != null) { entry match { case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() case _ => } // 开始释放storage内存 memoryManager.releaseStorageMemory(entry.size, entry.memoryMode) logDebug(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") true } else { false } }
# evictBlocksToFreeSpace 试图回收block已释放内存空间
private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L // 剩余内存 val rddToAdd = blockId.flatMap(getRddId) // 选中的block val selectedBlocks = new ArrayBuffer[BlockId] // 判断block是否可以被回收 def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) } entries.synchronized { // 遍历每一个entry元素 val iterator = entries.entrySet().iterator() // 剩余的内存小于block大小 while (freedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey val entry = pair.getValue if (blockIsEvictable(blockId, entry)) { // 更新被选中block if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { selectedBlocks += blockId freedMemory += pair.getValue.size } } } } // 删除block def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) case SerializedMemoryEntry(buffer, _, _) => Right(buffer) } // 从内存中删除 val newEffectiveStorageLevel = blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag) if (newEffectiveStorageLevel.isValid) { // The block is still present in at least one store, so release the lock // but don't delete the block info blockInfoManager.unlock(blockId) } else { // The block isn't present in any store, so delete the block info so that the // block can be stored again blockInfoManager.removeBlock(blockId) } } // 如果空闲内存大于block大小的时候 if (freedMemory >= space) { logInfo(s"${selectedBlocks.size} blocks selected for dropping " + s"(${Utils.bytesToString(freedMemory)} bytes)") // 开始遍历那些可以从内存中移除的blockId,并且调用dropBlock进行移除 for (blockId <- selectedBlocks) { val entry = entries.synchronized { entries.get(blockId) } // This should never be null as only one task should be dropping // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { dropBlock(blockId, entry) } } logInfo(s"After dropping ${selectedBlocks.size} blocks, " + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") freedMemory } else { blockId.foreach { id => logInfo(s"Will not store $id") } selectedBlocks.foreach { id => blockInfoManager.unlock(id) } 0L } } }
# reserveUnrollMemoryForThisTask 为任务预定内存用于展开指定的block,因为展开block也需要消费内存
def reserveUnrollMemoryForThisTask( blockId: BlockId, memory: Long, memoryMode: MemoryMode): Boolean = { memoryManager.synchronized { val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode) if (success) { val taskAttemptId = currentTaskAttemptId() val unrollMemoryMap = memoryMode match { case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap } unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory } success } }
def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() memoryManager.synchronized { val unrollMemoryMap = memoryMode match { case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap } if (unrollMemoryMap.contains(taskAttemptId)) { val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { unrollMemoryMap(taskAttemptId) -= memoryToRelease memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode) } if (unrollMemoryMap(taskAttemptId) == 0) { unrollMemoryMap.remove(taskAttemptId) } } } }