以$SPARK_HOME/examples/src/main/python/sql/basic.py
为例,运行
spark-submit $SPAARK_HOME/examples/src/main/python/sql/basic.py
创建SparkSession
Spark SQL应用的入口点是SparkSession
,创建SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
创建DataFrame
借助SparkSession
,应用可以从现存的RDD或Hive表创建DataFrame
df = spark.read.json("examples/src/main/resources/people.json")
显示DataFrame
df.show()
DataFrame操作
打印DataFrame的schema
df.printSchema()
选择name列
df.select("name").show()
select语句表达式
df.select(df['name'], df['age'] + 1).show()
过滤
df.filter(df['age'] > 21).show()
分组
df.groupBy("age").count().show()
SQL查询
将DataFrame转变为SQL临时视图
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
全局SQL临时视图
df.createGlobalTempView("people")
全局临时视图是跨SparkSession
存在的
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()