'How to efficiently read data from mongodb and convert it into spark's dataframe?

I have already researched a lot but could not find a solution. Closest question I could find here is Why my SPARK works very slowly with mongoDB.

I am trying to load a mongodb collection into spark's DataFrame using mongo-hadoop connector. Here is a snippet of relevant code:

connection_string = 'mongodb://%s:%s/randdb.%s'%(dbhost, dbport, collection_name)
trainrdd = sc.mongoRDD(connection_string, config=config)
#     traindf = sqlcontext.createDataFrame(trainrdd)
#     traindf = sqlcontext.read.json(trainrdd)
traindf = sqlcontext.jsonRDD(trainrdd) 

Here, 'sc' is the SparkContext object. I have also tried the variants which are commented out in the code. But all are equally slow. For a collection of size 2GB (100000 rows and 1000 columns), it takes around 6 hours(holy moly :/) on a cluster of 3 machines each with 12 cores and 72 GB RAM (using all the cores in this spark cluster). Mongodb server is also running on one of these machines.

I am not sure if I am doing it correctly. Any pointers on how to optimize this code would be really helpful.



Solution 1:[1]

Efficient way to read data from mongo using pyspark is to use MongoDb spark connector

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
sc = SparkContext()
spark = SparkSession(sc)
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").load()

And this will be spark dataframe, no need to convert it.You just need to configure mongodb spark connector.

If you are using notebook write this at the top-

 %%configure
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2"}}

If you are using spark-submit command:

spark-submit --conf spark.pyspark.python=/usr/bin/anaconda/envs/py35/bin/python3.5 --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1 file_name.py

If you want to write it back to mangoDB, try:

data.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("spark.mongodb.output.uri","mongodb://+username:password@server_details:27017/db_name.collection_name?authSource=admin").save()

Solution 2:[2]

By default pyspark.sql.SQLContext.jsonRDD will dynamically infer the schema of the given JSON dataset. Columns will be added as new JSON fields are found. This can be slow as every JSON attribute is inspected. Especially if you have 1000 coloumns.

What you could do is to define the schema explicitly instead, given if the data is known or only a specific set of fields is needed.

In addition, due to ObjectId issue described in HADOOP-277 you need to either remove fields containing such incompatible types, or convert to other types. i.e. str(ObjectId(...))

For example :

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
import pymongo_spark 
pymongo_spark.activate()
data_rdd = sc.mongoRDD("mongodb://localhost:27017/database.collection")
sqlcontext = SQLContext(sc)

# Define your schema explicitly
schema = StructType([StructField("firstname", StringType()),
                     StructField("lastname", StringType()),
                     StructField("description", StringType())])

# Create a mapper function to return only the fields wanted, or to convert. 
def project(doc):
    return {"firstname": str(doc["firstname"]), 
            "lastname": str(doc["lastname"]), 
            "description": str(doc["description"])}

projected_rdd = data_rdd.map(project)
train_df = sqlcontext.jsonRDD(projected_rdd, schema)
train_df.first()

The above snippet was tested in environment: Spark v1.6.1, mongo-hadoop spark v1.5.2

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Wan Bachtiar