查看示例$SPARK_HOME/examples/src/main/python/sql/hive.py

Spark SQL支持在Hive读写数据。如果Hive的相关依赖位于classpath,Spark会自动加载这些依赖。

引入依赖

from os.path import expanduser, join

from pyspark.sql import SparkSession
from pyspark.sql import Row

初始化SparkSession

warehouse_location表示数据库及表的默认存储位置。

warehouse_location = 'spark-warehouse'

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

Hive的sql操作

创建数据表

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

加载数据

spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

显示表内容

spark.sql("SELECT * FROM src").show()

聚合运算

spark.sql("SELECT COUNT(*) FROM src").show()

DataFrame操作

sql查询返回DataFrame

sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

DataFrame操作

stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)

创建临时视图

Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

DataFrame临时视图与Hive表联合查询

spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

Hive Metastore

属性 默认 说明
spark.sql.hive.metastore.version 1.2.1 Hive metastore版本
spark.sql.hive.metastore.jars builtin jar路径
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc org.postgresql com.microsoft.sqlserver oracle.jdbc
spark.sql.hive.metastore.barrierPrefixes

results matching ""

    No results matching ""