Wednesday, April 17, 2019

Apache Spark SQL

Apache Spark SQL


Apache Spark is a special component on the sparkCore engine that support SQL and Hive Query language without changing any syntax.It's syntax to join SQL table and HQL table.Spark SQL is a Spark module for structured data processing.Spark SQL can support Batch or Streaming SQL.Apache Spark is a lightning-fast cluster computing framework designed for fast computation.

It offers much tighter integration between relational and procedural processing, through declarative DataFrame APIs which integrates with Spark code. It also provides higher optimization. DataFrame API and Datasets API are the ways to interact with Spark SQL.  

You can download files used in the below program.
Download (people.json,people.txt,Mall_Customers.csv)
  






Test1.py

from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
import pandas as pd
import pyspark


# You can configure the SparkContext

conf = SparkConf()
conf.set('spark.local.dir', 'C:/Users/thakudev/PYTHON/Sample/SparkSQL/Config')
conf.set('spark.sql.shuffle.partitions', '2100')
SparkContext.setSystemProperty('spark.executor.memory', '10g')
SparkContext.setSystemProperty('spark.driver.memory', '10g')
sc = SparkContext(appName='mm_exp', conf=conf)
sqlContext = pyspark.SQLContext(sc)


#spark=SparkSession.builder.getOrCreate()
#sc=spark.sparkContext
sqlContext = SQLContext(sc)


#spark=SparkSession.builder.getOrCreate()
#sc=spark.sparkContext

#Fetch records and write it to some file
df = sqlContext.read.load("C:/Users/thakudev/PYTHON/Sample/SparkSQL/people.json", format="json")
#my_test_query =df.select("name", "email").write.save("namesAndemail1.json", format="json")


#Display Fist records
df1 = sqlContext.read.json("C:/Users/thakudev/PYTHON/Sample/SparkSQL/people.json")
print(df1.first())


#Display all records
data = sqlContext.read.json("C:/Users/thakudev/PYTHON/Sample/SparkSQL/people.json",multiLine=True)
print(data.show())



 
Test2.py


#initial SparkSession
from pyspark.sql import SparkSession
from com.pysparktest.test2 import spark
spark=SparkSession.builder.appName("TEST2") \
.config("spark.some.config.option","Config1") \
.getOrCreate()


#Create DataFrames
#RDD
from pyspark.sql.types import Row
from pyspark.sql.types import *
#Infer Schema
sc=spark.sparkContext
lines=sc.textFile("C:/Users/thakudev/PYTHON/Sample/SparkSQL/people.txt")
parts=lines.map (lambda a:a.split(","))
people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
peopledf=spark.createDataFrame(people)
peopledf.dropDuplicates()
print(peopledf.show())









Test3.py


#initial SparkSession
from pyspark.sql import SparkSession
from com.pysparktest.test2 import spark
spark=SparkSession.builder.appName("TEST2") \
.config("spark.some.config.option","Config1") \
.getOrCreate()


#Create DataFrames
#RDD
from pyspark.sql.types import Row
from pyspark.sql.types import *
#Infer Schema
sc=spark.sparkContext
lines=sc.textFile("C:/Users/thakudev/PYTHON/Sample/SparkSQL/people.txt")
parts=lines.map (lambda a:a.split(","))
people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for  field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(people, schema).show()




 Test4.py

from pyspark import SparkContext
import pyspark
from pyspark.sql import functions as F
import email

sc = SparkContext()
sqlContext = pyspark.SQLContext(sc)

data = sqlContext.read.json("C:/Users/thakudev/PYTHON/Sample/SparkSQL/people.json",multiLine=True)
data.select("name").show()
data.select("name","email").show()

data.select(data["city"]!="null").alias("City").show()

#WHEN
data.select("name",F.when(data["city"]!="null", 1).otherwise(0)).show()

#isin
#print(data[data.name.isin("Keeley","Bosco")].collect()) #Not working
data[data.name.isin("Keeley", "Rubye")].show()

#LIKE
#data.select("name","email",data.name.like("Barton")).write.save("TEST.json", format="json")
data.select("name","email").where("name like '%Barton%'").show()
#df.where(df.location.contains('google.com'))

#StartswithEndswith
#data.select("name","email",data.name.startswith("Sm")).show()
data.filter(data.name.startswith("Kee")).show()

 


 Test5.py

from pyspark import SparkContext
from pyspark.sql import SQLContext,DataFrameWriter
from pyspark.sql.types import  *
from pyspark.sql import SparkSession, HiveContext

sc = SparkContext()
SQLContext=SQLContext(sc)

df=SQLContext.read.format("com.databricks.spark.csv").options(header='true').load("C:/Users/thakudev/PYTHON/Sample/SparkSQL/customer-segmentation-tutorial-in-python/Mall_Customers.csv")

print(df.printSchema())
print( df.show(10))



 










 

1 comment: