Distributed word scoring can be performed using Redis and Execnet together. For each word in movie_reviews corpus, FreqDist and ConditionalFreqDist are used to calculate information gain.
Using >RedisHashFreqDist and a RedisConditionalHashFreqDist, same thing can be performed with Redis. The scores are then stored in a RedisOrderedDict. In order to get a better performance out of Redis, Execnet is used to distribute the counting.
Redis and execnet are used to together to do distributed word scoring. The information gain of each word in the is calculated using a FreqDist and ConditionalFreqDist. Now with Redis, the same thing can be performed using a RedisHashFreqDist and a RedisConditionalHashFreqDist, and then store the scores in a RedisOrderedDict. Execnet can be used to distribute the counting in order to get a better performance out of Redis. Instance of redis-server must be running on localhost after the installation of Redis and execnet.
Steps:
- For each label in the movie_reviews corpus (which only has pos and neg labels), start by getting a list of tuples – labels and words.
- Then from the dist_featx module, get the word_scores using score_words().
- Total number of words are 39, 764 and word_scores function is an instance of RedisOrderedDict.
- Then get the top 1, 000 words and inspect the top five using the keys() method to see what they are.
- Delete the keys in Redis after getting all the required from word_scores as there is no more need of the data.
Code :
Python3
# importing libraries from dist_featx import score_words from nltk.corpus import movie_reviews # finding category via categoreies category = movie_reviews.categories() print ( "Categories : " , category) category_words = [ (l, movie_reviews.words(categories = [l])) for l in category] # Scores word_scores = score_words(category_words) print ( "Length : " , len (word_scores)) # top words topn_words = word_scores.keys(end = 1000 ) print ( "Top Words : " , topn_words[ 0 : 5 ]) # Delete the keys in Redis after getting # all the required from word_scores from redis import Redis r = Redis() print ([r.delete(key) for key in [ 'word_fd' , 'label_word_fd:neg' , 'label_word_fd:pos' , 'word_scores' ]] ) |
Output :
Categories : ['neg', 'pos'] Length : 39767 Top Words : [b'bad', b', ', b'and', b'?', b'movie'] [1, 1, 1, 1]
The score_words() is a function from dist_featx. But it is expected to wait for a while as it takes some time to complete. The overhead of using execnet and Redis means it will take significantly longer than a nondistributed, in-memory version of the function.
How it works?
The dist_featx.py module contains the score_words() function, which does the following :
- Opens gateways and channels.
- Sends initialization data to each channel.
- For counting, it sends each (label, words) tuple over a channel.
- Sends a done message to each channel.
- Waits for a done reply back.
- closes the channels and gateways.
- Based on the counts calculates the score of each word.
- Store the score in a RedisOrderedDict.
Score all the words and store the results, once the counting is finished. The code is given below :
Code :
Python3
# Importing library import itertools, execnet, remote_word_count from nltk.metrics import BigramAssocMeasures from redis import Redis from redisprob import RedisHashFreqDist, RedisConditionalHashFreqDist from rediscollections import RedisOrderedDict # Scoring the words def score_words(category_words, score_fn = BigramAssocMeasures.chi_sq, host = 'localhost' , specs = [( 'popen' , 2 )]): gateways = [] channels = [] # counting for spec, count in specs: for i in range (count): gw = execnet.makegateway(spec) gateways.append(gw) channel = gw.remote_exec(remote_word_count) channel.send((host, 'word_fd' , 'category_word_fd' )) channels.append(channel) cyc = itertools.cycle(channels) # syncing the channel for category, words in category_words: channel = next (cyc) channel.send((category, list (words))) for channel in channels: channel.send( 'done' ) assert 'done' = = channel.receive() channel.waitclose( 5 ) for gateway in gateways: gateway.exit() r = Redis(host) # frequency distribution fd = RedisHashFreqDist(r, 'word_fd' ) cfd = RedisConditionalHashFreqDist(r, 'category_word_fd' ) word_scores = RedisOrderedDict(r, 'word_scores' ) n_xx = cfd.N() for category in cfd.conditions(): n_xi = cfd[category].N() for word, n_ii in cfd[category].iteritems(): word = word.decode() n_ix = fd[word] if n_ii and n_ix and n_xi and n_xx: score = score_fn(n_ii, (n_ix, n_xi), n_xx) word_scores[word] = score # final word scores return word_scores |
A different scoring method should be used if there are more than two labels. To compare two labels, the scoring method will only be accurate. The requirements will dictate how you store word scores.
There are two kinds of data that can be received over the channel after having the instance –
- A done message : It signals that there is no more data coming in over the channel.
Reply back with another done message, finally exit the loop to close the channel. - A 2-tuple of (label, words): It is used to iterate over to increment counts in both the RedisHashFreqDist and RedisConditionalHashFreqDist