查看示例$SPARK_HOME/examples/src/main/python/sql/hive.py
Spark SQL支持在Hive读写数据。如果Hive的相关依赖位于classpath,Spark会自动加载这些依赖。
引入依赖
from os.path import expanduser, join
from pyspark.sql import SparkSession
from pyspark.sql import Row
初始化SparkSession
warehouse_location
表示数据库及表的默认存储位置。
warehouse_location = 'spark-warehouse'
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
Hive的sql操作
创建数据表
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
加载数据
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
显示表内容
spark.sql("SELECT * FROM src").show()
聚合运算
spark.sql("SELECT COUNT(*) FROM src").show()
DataFrame操作
sql查询返回DataFrame
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
DataFrame操作
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
创建临时视图
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
DataFrame临时视图与Hive表联合查询
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
Hive Metastore
属性 | 默认 | 说明 |
---|---|---|
spark.sql.hive.metastore.version | 1.2.1 | Hive metastore版本 |
spark.sql.hive.metastore.jars | builtin | jar路径 |
spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc org.postgresql com.microsoft.sqlserver oracle.jdbc | |
spark.sql.hive.metastore.barrierPrefixes | 空 |