对于Spark Streaming的使用,可以借助$SPARK_HOME/examples/src/main/python/streaming/network_wordcount.py

要运行该应用,首先启动nc服务

nc -lk 9999

spark提交应用

spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999

程序说明

首先,引入StreamingContext,创建两个工作线程,批处理间隔是1s

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

创建DSstream ,它将会连接localhost:9999

lines = ssc.socketTextStream("localhost", 9999)

DStream 的每个记录都是一行文本,下面对每行文本做出词的分隔处理

words = lines.flatMap(lambda line: line.split(" "))

计算出每个batch的词的词频

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

上面,Spark Streaming 的计算过程构建完毕,现在可以启动流程

ssc.start()

等待计算完毕

ssc.awaitTermination()

results matching ""

    No results matching ""