Tokenizer
Tokenization是将文本分成单独的短语(通常是词汇)。
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
StopWordsRemover
Stop words是需要从input排除的word,通常是因为这些词汇频繁出现但无意义
from pyspark.ml.feature import StopWordsRemover
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
n-gram
from pyspark.ml.feature import NGram
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
Binarizer
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
PCA
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
PolynomialExpansion
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)
Discrete Cosine Transform(DCT)
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(truncate=False)
StringIndexer
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
IndexToString
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)
print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()
print("StringIndexer will store labels in output column metadata\n")
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
One-hot encoding
from pyspark.ml.feature import OneHotEncoder, StringIndexer
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
VectorIndexer
from pyspark.ml.feature import VectorIndexer
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()