def save[D: ClassTag](rdd: RDD[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
rdd.foreachPartition(iter => if (iter.nonEmpty) {
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => collection.insertMany(batch.toList.asJava))
})
})
}
|
def load[D: ClassTag](sc: SparkContext, readConfig: ReadConfig)(implicit e: D DefaultsTo Document): MongoRDD[D] =
builder().sparkContext(sc).readConfig(readConfig).build().toRDD[D]()
def builder(): Builder = new Builder
Builder代码
def sparkContext(sparkContext: SparkContext): Builder = {
this.sparkSession = Option(SparkSession.builder().config(sparkContext.getConf).getOrCreate())
this
}
def readConfig(readConfig: ReadConfig): Builder = {
this.readConfig = Option(readConfig)
this
}
def build(): MongoSpark = {
require(sparkSession.isDefined, "The SparkSession must be set, either explicitly or via the SparkContext”)
val session = sparkSession.get
val readConf = readConfig.isDefined match {
case true => ReadConfig(options, readConfig)
case false => ReadConfig(session.sparkContext.getConf, options)
}
val mongoConnector = connector.getOrElse(MongoConnector(readConf))
val bsonDocumentPipeline = pipeline.map(x => x.toBsonDocument(classOf[Document], mongoConnector.codecRegistry))
new MongoSpark(session, mongoConnector, readConf, bsonDocumentPipeline)
}
根据readconfig配置mongo connector和pipeline,最终还是调用MongoSpark.toRDD代码
def toRDD[D: ClassTag]()(implicit e: D DefaultsTo Document): MongoRDD[D] = rdd[D]
private def rdd[D: ClassTag]()(implicit e: D DefaultsTo Document): MongoRDD[D] =
new MongoRDD[D](sparkSession, sparkSession.sparkContext.broadcast(connector), readConfig, pipeline)
|
Property name | Description | Default value |
partitionKey | The field to partition the collection by. The field should be indexed and contain unique values.用哪个字段来进行分区 | _id |
partitionSizeMB | The size (in MB) for each partition.每个partition大小 | 64 |
samplesPerPartition | The number of sample documents to take for each partition.每个paritition里抽取的文档记录个数 |
val avgObjSizeInBytes = results.get("avgObjSize", new BsonInt64(0)).asNumber().longValue()
val numDocumentsPerPartition: Int = math.floor(partitionSizeInBytes.toFloat / avgObjSizeInBytes).toInt
val numberOfSamples = math.floor(samplesPerPartition * count / numDocumentsPerPartition.toFloat).toInt
|