Apache Spark >> SparkSession, Create DataFrame
Table of Contents
Apache Spark, written in Scala, is a general-purpose distributed computing and data processing engine. Apache Spark supports Scala, Java, Python, SQL and R language.
In this series, we will use Python as main language.
Create a Spark Session
We can use a Session builder to create a Spark session.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("<You App Name>") \
.config("spark.some.config.option", "<some-value>") \
.getOrCreate()
Create DataFrames
In Spark
, DataFrame represent immutable, lazily evaluted plans that specify what operations to apply to data to generate some output.
Creating DataFrames
is the first practical steps in learning Spark
.
Creating DataFrame from files
Spark has generic read method and built-in reading way to read csv files.
df = spark.read.format("<file type>").load("<path/to/sample.<file type>>")
Some file types have specific read methods.
- csv
df = spark.read.csv("<path/to/file>")
Read all csv files in the given directory (folder).
df = spark.read.csv("<path/to/directory>")
Read multiple csv files at same time.
df = spark.read.csv(["<path/to/file1>", "<path/to/file2>", "<path/to/file3>"])
- text
df = spark.read.text("<path/to/file>")
- json
df = spark.read.json("<path/to/file>")
- XML
By default XML
file is not supported. We must add some dependecy (spark-xml) to read XML
files.
df = spark.read.format('com.databricks.spark.xml').load('sample.xml')
Create DataFrame from RDD
We can use parallelize method of SparkContext
to generate RDD
and use toDF method of RDD
to create DataFrame
.
Firstly we will prepare test data.
data = [{"class": "A", "name": "Kevin", "score": 80},
{"class": "A", "name": "Mary", "score": 90},
{"class": "B", "name": "Tom", "score": 85},
{"class": "B", "name": "Annie", "score": 83}]
Then use parallelize to generate RDD
.
rdd = spark.parallelize(data)
Finally use toDF method of RDD
to create DataFrame
.
df = rdd.toDF()
Create DataFrame from Database
The prerequisite is that you need to install the database driver.
df = spark.read.format("jdbc").options( url="jdbc:mysql://localhost:3306/<your_db_name>",
driver="com.mysql.jdbc.Driver",
dbtable="<your_table_name>",
user="root",
password="root").load()