'Matching vocabulary elements to indices from LDA Model using PySpark
I'd like to take a Spark LDA Model's term indices from the .describeTopics()
output and match them to the appropriate term in the count vectorizer's vocabulary. Here is the point of friction:
terms = ['fuzzy', 'wuzzy', 'bear', 'seashells', 'chuck', 'wood', 'big', 'black', 'woodchuck', 'sell', 'hair', 'rug', 'sat', 'seashore', 'much', 'sells', 'many']
+-----+--------------+------------------------------------------------------------------------------------+
|topic|termIndices |termWeights |
+-----+--------------+------------------------------------------------------------------------------------+
|0 |[6, 16, 4, 13]|[0.07759153026889895, 0.07456018590515792, 0.06590443764744822, 0.06529979905841589]|
|1 |[0, 8, 1, 12] |[0.08460924078697102, 0.06935412981755526, 0.06803462316387827, 0.06505150660960128]|
|2 |[8, 14, 5, 3] |[0.07473487407268035, 0.06999332154185754, 0.06923579179113146, 0.06673236538997057]|
|3 |[2, 15, 10, 9]|[0.07990489772171691, 0.07352818255574894, 0.0725564301639141, 0.0705481456715216] |
+-----+--------------+------------------------------------------------------------------------------------+
My desired output would be the above Dataframe with an array column, terms
, containing the appropriate terms based on the termIndices
.
Here is the code to set the problem up:
from pyspark.sql.functions import udf
from pyspark.ml.feature import CountVectorizer, StopWordsRemover, RegexTokenizer, NGram
from pyspark.ml import Pipeline
from pyspark.ml.clustering import LDA
import numpy as np
text_df = spark.createDataFrame([
(0, "How much WOOD could a woodchuck chuck if a woodchuck could chuck wood?"),
(1, "She sells SEASHELLS by the seashore. How many seashells did she sell?"),
(2, "Fuzzy Wuzzy was a bear. Fuzzy Wuzzy had no hair. Fuzzy Wuzzy wasn't very fuzzy, was he?"),
(3, "A BIG BLACK bear sat on a big black rug.")
], ['id','text'])
# Arguments to be passed to functions
input_data = text_df
text_col = "text"
n_topics = 4
n_gram = 1
min_doc_freq = 0.1
max_doc_freq = 0.9
# Model pipeline; RegexTokenizer allows us to tokenize on the regex, forgoing an explicit symbol removal step
tokenizer = RegexTokenizer(inputCol=text_col, outputCol="tokens", pattern=r"[\s{2,}&;!\.\(\)-<>/,\?]+")
stopwords = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
ngrams = NGram(inputCol = "tokens_clean", n=n_gram, outputCol = "ngram")
count_vec = CountVectorizer(inputCol="ngram", outputCol="features", minDF = min_doc_freq, maxDF = max_doc_freq)
lda = LDA(k=n_topics, seed=477)
pipeline = Pipeline(stages=[tokenizer, stopwords, ngrams, count_vec, lda])
# Fitting and transforming
model = pipeline.fit(input_data)
Here is what I have tried:
# This section to experiment with matching vocabulary indices to words in terms
topics = model.stages[-1].describeTopics(4) # This yields the desired topic table above
terms = model.stages[-2].vocabulary # This yields the vocabulary above
# Defining a UDF to try and match indices to terms
@udf
def indices_to_terms(indices):
terms_subset = [terms[index] for index in indices]
return(np.array(terms_subset))
# Attempting to use UDF to add a terms column
topics = (
topics
.withColumn("terms", indices_to_terms(F.col("termIndices")))
)
topics.show()
I don't actually get an error when I run this code, but it doesn't show me anything. Perhaps the UDF isn't the right approach. How might I match the index in termIndices
to the model vocabulary in terms
and make it an array column to work with?
Solution 1:[1]
The solution was just defining my UDF more carefully. The following code solved my problem.
# This section to experiment with matching vocabulary indices to words in terms
topics = model.stages[-1].describeTopics(4)
terms = model.stages[-2].vocabulary
# Pandas function for matching indices to terms in vocabulary
def indices_to_terms(indices, terms=terms):
terms_subset = [terms[index] for index in indices]
return terms_subset
# Defining Spark UDF from above function
udf_indices_to_terms = F.udf(indices_to_terms, ArrayType(StringType()))
topics = (
topics
.withColumn("terms", udf_indices_to_terms(F.col("termIndices")))
)
Solution 2:[2]
A simplified solution for spark only operations outside of a pipeline wrapper (without pandas):
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as f
...your CountVectorizer implementation
# get (tokenized) vocabulary mapping to trace back topics later
terms = cv_model.vocabulary
...your LDA model implementation
# get topics
topics_df = model.describeTopics()
# define udf to map term indices to original token from count vectorizer vocabulary
@f.udf(returnType=ArrayType(StringType()))
def indices_to_terms(indices, terms=terms):
terms_mapping = [terms[index] for index in indices]
return terms_mapping
# apply udf to generate topic idx to token mapping
topics_df = topics_df.withColumn("termValues", indices_to_terms(f.col("termIndices")))
print("The topics described by their top-weighted terms:")
topics_df.show(truncate=False)
A simple, elegant and efficient solution save for anything built into spark that I wasn't able to find... Thanks to @Chris Aguilar for saving me several hours of headbanging heuristics :)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Chris Aguilar |
Solution 2 | nasty |