Read/Write data from MongoDb using Spark
|
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
conf=SparkConf()
conf.set('spark.mongodb.input.uri', 'mongodb://localhost:27017/Student.StudentRecord')
conf.set('spark.mongodb.output.uri', 'mongodb://localhost:27017/Student.StudentRecord')
sc=SparkContext(conf=conf)
SQLContext=SQLContext(sc)
df=SQLContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
print(df.printSchema())
df.registerTempTable("mycollection")
result_data=SQLContext.sql("SELECT
* from mycollection limit 10")
print(result_data.show())
#Insert records
#createDataFrame() takes a list of tuples containing
StudentName and Studentid
#mode=Overwrite,append
studentRdd = sc.parallelize([("Urvasi", 101), ("Ganesh", 111),("Amita", 112)])
students = SQLContext.createDataFrame(studentRdd, ["StudentName", "Studentid"])
students.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
df = SQLContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()
#Write SQL Query
df.registerTempTable("students")
centenarians = SQLContext.sql("SELECT
StudentName, Studentid FROM students WHERE Studentid >=
100")
print(centenarians.show())
#Use of Filters
print(df.filter(df['Studentid'] <= 100).show())
|
Output:
|
19/04/20 20:11:07 WARN NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR,
use setLogLevel(newLevel).
root
|-- StudentName: string
(nullable = true)
|-- Studentid: long
(nullable = true)
|-- _id: struct (nullable
= true)
| |-- oid: string (nullable = true)
None
+-------------+---------+--------------------+
|
StudentName|Studentid|
_id|
+-------------+---------+--------------------+
| Balin| 178|[5cbb2baf9a2b294d...|
| Thorin| 195|[5cbb2baf9a2b294d...|
| Fili| 82|[5cbb2baf9a2b294d...|
| Oin| 167|[5cbb2baf9a2b294d...|
| Kili| 77|[5cbb2baf9a2b294d...|
| Bombur| null|[5cbb2baf9a2b294d...|
|Bilbo Baggins|
50|[5cbb2baf9a2b294d...|
| Dwalin| 169|[5cbb2baf9a2b294d...|
| Gandalf| 1000|[5cbb2baf9a2b294d...|
| Gloin| 158|[5cbb2baf9a2b294d...|
+-------------+---------+--------------------+
None
[Stage 3:>
(0 + 1) / 1]
[Stage 4:>
(0 + 4) / 4]
[Stage 4:==============>
(1 + 3) / 4]
root
|-- StudentName: string
(nullable = true)
|-- Studentid: long
(nullable = true)
|-- _id: struct (nullable
= true)
| |-- oid: string (nullable = true)
+-----------+---------+
|StudentName|Studentid|
+-----------+---------+
| Balin| 178|
| Thorin| 195|
| Oin| 167|
| Dwalin| 169|
| Gandalf| 1000|
| Gloin| 158|
| Urvasi| 101|
| Amita| 112|
| Ganesh| 111|
+-----------+---------+
None
+-------------+---------+--------------------+
|
StudentName|Studentid|
_id|
+-------------+---------+--------------------+
| Fili| 82|[5cbb2baf9a2b294d...|
| Kili| 77|[5cbb2baf9a2b294d...|
|Bilbo Baggins|
50|[5cbb2baf9a2b294d...|
+-------------+---------+--------------------+
None
SUCCESS: The process with PID 24700 (child process of PID 13024)
has been terminated.
SUCCESS: The process with PID 13024 (child process of PID 9448)
has been terminated.
SUCCESS: The process with PID 9448 (child process of PID 17172)
has been terminated.
|



0 comments:
Post a Comment