本文共 3324 字,大约阅读时间需要 11 分钟。
org.apache.spark.scheduler.ShuffleMapTask.runTaskrunTask对应的代码为:val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any]( dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])writer.stop(success = true).get
org.apache.spark.shuffle.sort.SortShuffleWriter
sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, de.serializer)
sorter.insertAll(records)
while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true)}
"temp_shuffle_" + id
val (blockId, file) = diskBlockManager.createTempShuffleBlock() def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }
private val spills = new ArrayBuffer[SpilledFile]
(p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
val partitionLengths = sorter.writePartitionedFile( blockId, context, outputFile)
for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length }}
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"
shuffleBlockResolver.writeIndexFile( dep.shuffleId, mapId, partitionLengths)
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"
MapNum (注:不包含index文件)
CoreNum
转载地址:http://cissx.baihongyu.com/