对于Spark Streaming的使用,可以借助$SPARK_HOME/examples/src/main/python/streaming/network_wordcount.py
要运行该应用,首先启动nc服务
nc -lk 9999
spark提交应用
spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
程序说明
首先,引入StreamingContext
,创建两个工作线程,批处理间隔是1s
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
创建DSstream
,它将会连接localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
DStream
的每个记录都是一行文本,下面对每行文本做出词的分隔处理
words = lines.flatMap(lambda line: line.split(" "))
计算出每个batch的词的词频
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
上面,Spark Streaming
的计算过程构建完毕,现在可以启动流程
ssc.start()
等待计算完毕
ssc.awaitTermination()