Mining the Social Web, 2nd Edition

Chapter 6: Mining Mailboxes: Analyzing Who's Talking To Whom About What, How Often, and More

This IPython Notebook provides an interactive way to follow along with and explore the numbered examples from Mining the Social Web (2nd Edition). The intent behind this notebook is to reinforce the concepts from the sample code in a fun, convenient, and effective way. This notebook assumes that you are reading along with the book and have the context of the discussion as you work through these exercises.

In the somewhat unlikely event that you've somehow stumbled across this notebook outside of its context on GitHub, you can find the full source code repository here.

You are free to use or adapt this notebook for any purpose you'd like. However, please respect the Simplified BSD License that governs its use.

Example 1. Converting a toy mailbox to JSON

In []:
import mailbox
import email
import json

MBOX = 'resources/ch06-mailboxes/data/northpole.mbox'

# A routine that makes a ton of simplifying assumptions
# about converting an mbox message into a Python object
# given the nature of the northpole.mbox file in order
# to demonstrate the basic parsing of an mbox with mail
# utilities

def objectify_message(msg):
    
    # Map in fields from the message
    o_msg = dict([ (k, v) for (k,v) in msg.items() ])
    
    # Assume one part to the message and get its content
    # and its content type
    
    part = [p for p in msg.walk()][0]
    o_msg['contentType'] = part.get_content_type()
    o_msg['content'] = part.get_payload()
    
    return o_msg

# Create an mbox that can be iterated over and transform each of its
# messages to a convenient JSON representation

mbox = mailbox.UnixMailbox(open(MBOX, 'rb'), email.message_from_file)

messages = []

while 1:
    msg = mbox.next()
    
    if msg is None: break
        
    messages.append(objectify_message(msg))
    
print json.dumps(messages, indent=1)

The following script downloads and extracts the Enron corpus to the proper working location and unarchive it
This script adapts a function from http://stackoverflow.com/a/22776/2292400 to download a file and extract it to the proper working location. It's a lot of code just to download and extract a file but is included to minimize the configuration burden and demonstrate some of the vast possibilities with IPython Notebook. The file is about 450MB so it may take a while to download, and one of the interesting things about this script is that it displays an updated download status every 5 seconds.

Alternatively, you could just download the file yourself and place it in the ipynb/resources/ch06-mailboxes/data directory or use %%bash cell magic as discussed in the Chapter0 notebook to retrive the file with a Bash script that would just be a few lines long.

Warning - Unfortunately, the download and decompression of the file is relatively fast compared to the time that it takes for Vagrant to synchronize the high number of files that decompress with the host machine, and at the current time, there isn't a known workaround that will speed this up for all platforms. It may take longer than a hour for Vagrant to synchronize the thousands of files that decompress. It is recommended that you let this code run at an opportune time such as when you step out for lunch or overnight.

Alternative Workaround - The GitHub repository's ipynb/resources/ch06-mailboxes/data directory contains a highly compressed version of the output (enron.mbox.json.bz2) that you'll get from Example 3. You can follow the instructions in the note after Example 3 to import this data directly into MongoDB if the time to download and let Vagrant synchronize is too much of an inconvenience for you.

In []:
import sys
import urllib2
import time
import os
import envoy # pip install envoy

URL = "http://www.cs.cmu.edu/~enron/enron_mail_20110402.tgz"
DOWNLOAD_DIR = "resources/ch06-mailboxes/data"

# Downloads a file and displays a download status every 5 seconds

def download(url, download_dir):    
    file_name = url.split('/')[-1]
    u = urllib2.urlopen(url)
    f = open(os.path.join(download_dir, file_name), 'wb')
    meta = u.info()
    file_size = int(meta.getheaders("Content-Length")[0])
    print "Downloading: %s Bytes: %s" % (file_name, file_size)

    file_size_dl = 0
    block_sz = 8192
    last_update = time.time()
    while True:
        buffer = u.read(block_sz)
        if not buffer:
            break

        file_size_dl += len(buffer)
        f.write(buffer)
        download_status = r"%10d MB  [%3.2f%%]" % (file_size_dl / 1000000.0, file_size_dl * 100.0 / file_size)
        download_status = download_status + chr(8)*(len(download_status)+1)
        if time.time() - last_update > 5:
            print download_status,
            sys.stdout.flush()
            last_update = time.time()
    f.close()
    return f.name

# Extracts a gzipped tarfile. e.g. "$ tar xzf filename.tgz"

def tar_xzf(f):
    # Call out to the shell for a faster decompression.
    # This will still take a while because Vagrant synchronizes
    # thousands of files that are extracted to the host machine
    r = envoy.run("tar xzf %s -C %s" % (f, DOWNLOAD_DIR))
    print r.std_out
    print r.std_err

f = download(URL, DOWNLOAD_DIR)
print "Download complete: %s" % (f,)
tar_xzf(f)
print "Decompression complete"
print "Data is ready"

Example 2. Converting the Enron corpus to a standardized mbox format

In []:
import re
import email
from time import asctime
import os
import sys
from dateutil.parser import parse # pip install python_dateutil

# XXX: Download the Enron corpus to resources/ch06-mailboxes/data
# and unarchive it there.

MAILDIR = 'resources/ch06-mailboxes/data/enron_mail_20110402/' + \
          'enron_data/maildir' 

# Where to write the converted mbox
MBOX = 'resources/ch06-mailboxes/data/enron.mbox'

# Create a file handle that we'll be writing into...
mbox = open(MBOX, 'w')

# Walk the directories and process any folder named 'inbox'

for (root, dirs, file_names) in os.walk(MAILDIR):

    if root.split(os.sep)[-1].lower() != 'inbox':
        continue

    # Process each message in 'inbox'

    for file_name in file_names:
        file_path = os.path.join(root, file_name)
        message_text = open(file_path).read()

        # Compute fields for the From_ line in a traditional mbox message

        _from = re.search(r"From: ([^\r]+)", message_text).groups()[0]
        _date = re.search(r"Date: ([^\r]+)", message_text).groups()[0]

        # Convert _date to the asctime representation for the From_ line

        _date = asctime(parse(_date).timetuple())

        msg = email.message_from_string(message_text)
        msg.set_unixfrom('From %s %s' % (_from, _date))

        mbox.write(msg.as_string(unixfrom=True) + "\n\n")
    
mbox.close()

Example 3. Converting an mbox to a JSON structure suitable for import into MongoDB

In []:
import sys
import mailbox
import email
import quopri
import json
import time
from BeautifulSoup import BeautifulSoup
from dateutil.parser import parse

MBOX = 'resources/ch06-mailboxes/data/enron.mbox'
OUT_FILE = 'resources/ch06-mailboxes/data/enron.mbox.json'

def cleanContent(msg):

    # Decode message from "quoted printable" format
    msg = quopri.decodestring(msg)
        
    # Strip out HTML tags, if any are present.
    # Bail on unknown encodings if errors happen in BeautifulSoup.
    try:
        soup = BeautifulSoup(msg)
    except:
        return ''
    return ''.join(soup.findAll(text=True))

# There's a lot of data to process, and the Pythonic way to do it is with a 
# generator. See http://wiki.python.org/moin/Generators.
# Using a generator requires a trivial encoder to be passed to json for object 
# serialization.

class Encoder(json.JSONEncoder):
    def default(self, o): return  list(o)

# The generator itself...
def gen_json_msgs(mb):
    while 1:
        msg = mb.next()
        if msg is None:
            break
        yield jsonifyMessage(msg)
 
def jsonifyMessage(msg):
    json_msg = {'parts': []}
    for (k, v) in msg.items():
        json_msg[k] = v.decode('utf-8', 'ignore')

    # The To, Cc, and Bcc fields, if present, could have multiple items.
    # Note that not all of these fields are necessarily defined.

    for k in ['To', 'Cc', 'Bcc']:
        if not json_msg.get(k):
            continue
        json_msg[k] = json_msg[k].replace('\n', '').replace('\t', '').replace('\r', '')\
                                 .replace(' ', '').decode('utf-8', 'ignore').split(',')

    for part in msg.walk():
        json_part = {}
        if part.get_content_maintype() == 'multipart':
            continue
            
        json_part['contentType'] = part.get_content_type()
        content = part.get_payload(decode=False).decode('utf-8', 'ignore')
        json_part['content'] = cleanContent(content)
           
        json_msg['parts'].append(json_part)
        
    # Finally, convert date from asctime to milliseconds since epoch using the
    # $date descriptor so it imports "natively" as an ISODate object in MongoDB
    then = parse(json_msg['Date'])
    millis = int(time.mktime(then.timetuple())*1000 + then.microsecond/1000)
    json_msg['Date'] = {'$date' : millis}

    return json_msg

mbox = mailbox.UnixMailbox(open(MBOX, 'rb'), email.message_from_file)

# Write each message out as a JSON object on a separate line
# for easy import into MongoDB via mongoimport

f = open(OUT_FILE, 'w')
for msg in gen_json_msgs(mbox):
    if msg != None:
        f.write(json.dumps(msg, cls=Encoder) + '\n')
f.close()

Workaround for obtaining output from Example 3.

If you have opted to use the compressed version of the output from Example 3 that is checked-in as part of this GitHub repository, just execute the following cell, and you'll be able to proceed with Example 4 as usual. See the previous note after Example 1 for details on why this workaround may be helpful.

In []:
import envoy

# This data is checked-in to the repository and is a compressed 
# version of the output from Example 3

F = 'resources/ch06-mailboxes/data/enron.mbox.json.bz2'

r = envoy.run("bunzip2 %s" % (F,))
print r.std_out
print r.std_err

Example 4. Getting the options for the mongoimport command from IPython Notebook

In []:
import envoy # pip install envoy

r = envoy.run('mongoimport')
print r.std_out
print r.std_err

Example 5. Using mongoimport to load data into MongoDB from IPython Notebook

In []:
import os
import sys
import envoy

data_file = os.path.join(os.getcwd(), 'resources/ch06-mailboxes/data/enron.mbox.json')

# Run a command just as you would in a terminal on the virtual machine to 
# import the data file into MongoDB.
r = envoy.run('mongoimport --db enron --collection mbox ' + \
              '--file %s' % data_file)

# Print its standard output
print r.std_out
print sys.stderr.write(r.std_err)

Example 6. Simulating a MongoDB shell that you can run from within IPython Notebook

In []:
# We can even simulate a MongoDB shell using envoy to execute commands.
# For example, let's get some stats out of MongoDB just as though we were working 
# in a shell by passing it the command and wrapping it in a printjson function to 
# display it for us.

def mongo(db, cmd):
    r = envoy.run("mongo %s --eval 'printjson(%s)'" % (db, cmd,))
    print r.std_out
    if r.std_err: print r.std_err
    
mongo('enron', 'db.mbox.stats()')

Example 7. Using PyMongo to access MongoDB from Python

In []:
import json
import pymongo # pip install pymongo
from bson import json_util # Comes with pymongo

# Connects to the MongoDB server running on 
# localhost:27017 by default

client = pymongo.MongoClient()

# Get a reference to the enron database

db = client.enron

# Reference the mbox collection in the Enron database

mbox = db.mbox

# The number of messages in the collection

print "Number of messages in mbox:"
print mbox.count()
print

# Pick a message to look at...

msg = mbox.find_one()

# Display the message as pretty-printed JSON. The use of
# the custom serializer supplied by PyMongo is necessary in order
# to handle the date field that is provided as a datetime.datetime 
# tuple.

print "A message:"
print json.dumps(msg, indent=1, default=json_util.default)

Example 8. Querying MongoDB by date/time range

In []:
import json
import pymongo # pip install pymongo
from bson import json_util # Comes with pymongo
from datetime import datetime as dt

client = pymongo.MongoClient()

db = client.enron

mbox = db.mbox

# Create a small date range here of one day

start_date = dt(2001, 4, 1) # Year, Month, Day
end_date = dt(2001, 4, 2) # Year, Month, Day

# Query the database with the highly versatile "find" command,
# just like in the MongoDB shell.

msgs = [ msg 
         for msg in mbox.find({"Date" : 
                                  {
                                   "$lt" : end_date, 
                                   "$gt" : start_date
                                  }
                              }).sort("date")]

# Create a convenience function to make pretty-printing JSON a little
# less cumbersome

def pp(o, indent=1):
    print json.dumps(msgs, indent=indent, default=json_util.default)

print "Messages from a query by date range:"
pp(msgs)

Example 9. Enumerating senders and receivers of messages

In []:
import json
import pymongo # pip install pymongo
from bson import json_util # Comes with pymongo

client = pymongo.MongoClient()
db = client.enron
mbox = db.mbox

senders = [ i for i in mbox.distinct("From") ]

receivers = [ i for i in mbox.distinct("To") ]

cc_receivers = [ i for i in mbox.distinct("Cc") ]

bcc_receivers = [ i for i in mbox.distinct("Bcc") ]

print "Num Senders:", len(senders)
print "Num Receivers:", len(receivers)
print "Num CC Receivers:", len(cc_receivers)
print "Num BCC Receivers:", len(bcc_receivers)

Example 10. Analyzing senders and receivers with set operations

In []:
senders = set(senders)
receivers = set(receivers)
cc_receivers = set(cc_receivers)
bcc_receivers = set(bcc_receivers)

# Find the number of senders who were also direct receivers

senders_intersect_receivers = senders.intersection(receivers)

# Find the senders that didn't receive any messages

senders_diff_receivers = senders.difference(receivers)

# Find the receivers that didn't send any messages

receivers_diff_senders = receivers.difference(senders)

# Find the senders who were any kind of receiver by
# first computing the union of all types of receivers

all_receivers = receivers.union(cc_receivers, bcc_receivers)
senders_all_receivers = senders.intersection(all_receivers)

print "Num senders in common with receivers:", len(senders_intersect_receivers)
print "Num senders who didn't receive:", len(senders_diff_receivers)
print "Num receivers who didn't send:", len(receivers_diff_senders)
print "Num senders in common with *all* receivers:", len(senders_all_receivers)

Example 11. Finding senders and receivers of messages who were Enron employees

In []:
# In a Mongo shell, you could try this query for the same effect:
# db.mbox.find({"To" : {"$regex" : /.*enron.com.*/i} }, 
#              {"To" : 1, "_id" : 0})

senders = [ i 
            for i in mbox.distinct("From") 
                if i.lower().find("@enron.com") > -1 ]

receivers = [ i 
              for i in mbox.distinct("To") 
                  if i.lower().find("@enron.com") > -1 ]

cc_receivers = [ i 
                 for i in mbox.distinct("Cc") 
                     if i.lower().find("@enron.com") > -1 ]

bcc_receivers = [ i 
                  for i in mbox.distinct("Bcc") 
                      if i.lower().find("@enron.com") > -1 ]

print "Num Senders:", len(senders)
print "Num Receivers:", len(receivers)
print "Num CC Receivers:", len(cc_receivers)
print "Num BCC Receivers:", len(bcc_receivers)

Example 12. Counting sent/received messages for particular email addresses

In []:
import json
import pymongo # pip install pymongo
from bson import json_util # Comes with pymongo

client = pymongo.MongoClient()
db = client.enron
mbox = db.mbox

aliases = ["kenneth.lay@enron.com", "ken_lay@enron.com", "ken.lay@enron.com", 
           "kenneth_lay@enron.net", "klay@enron.com"] # More possibilities?

to_msgs = [ msg 
            for msg in mbox.find({"To" : { "$in" : aliases } })]

from_msgs = [ msg 
         for msg in mbox.find({"From" : { "$in" : aliases } })]

print "Number of message sent to:", len(to_msgs)
print "Number of messages sent from:", len(from_msgs)

Example 13. Using MongoDB's data aggregation framework

In []:
import json
import pymongo # pip install pymongo
from bson import json_util # Comes with pymongo

# The basis of our query
FROM = "kenneth.lay@enron.com"

client = pymongo.MongoClient()
db = client.enron
mbox = db.mbox

# Get the recipient lists for each message

recipients_per_message = db.mbox.aggregate([
  {"$match" : {"From" : FROM} }, 
  {"$project" : {"From" : 1, "To" : 1} }, 
  {"$group" : {"_id" : "$From", "recipients" : {"$addToSet" : "$To" } } }                    
])['result'][0]['recipients']

# Collapse the lists of recipients into a single list

all_recipients = [recipient
                  for message in recipients_per_message
                      for recipient in message]

# Calculate the number of recipients per sent message and sort

recipients_per_message_totals = \
  sorted([len(recipients) 
   for recipients in recipients_per_message])

# Demonstrate how to use $unwind followed by $group to collapse
# the recipient lists into a single list (with no duplicates
# per the $addToSet operator)
    
unique_recipients = db.mbox.aggregate([
  {"$match" : {"From" : FROM} }, 
  {"$project" : {"From" : 1, "To" : 1} }, 
  {"$unwind" : "$To"}, 
  {"$group" : {"_id" : "From", "recipients" : {"$addToSet" : "$To"}} }
])['result'][0]['recipients']

print "Num total recipients on all messages:", len(all_recipients)
print "Num recipients for each message:", recipients_per_message_totals
print "Num unique recipients", len(unique_recipients)

Example 14. Creating a text index on MongoDB documents with PyMongo

In []:
import json
import pymongo # pip install pymongo
from bson import json_util # Comes with pymongo

client = pymongo.MongoClient()
db = client.enron
mbox = db.mbox

# Create an index if it doesn't already exist
mbox.ensure_index([("$**", "text")], name="TextIndex")

# Get the collection stats (collstats) on a collection
# named "mbox"
print json.dumps(db.command("collstats", "mbox"), indent=1)

# Use the db.command method to issue a "text" command
# on collection "mbox" with parameters, remembering that
# we need to use json_util to handle serialization of our JSON
print json.dumps(db.command("text", "mbox",  
                            search="raptor", 
                            limit=1), 
                 indent=1, default=json_util.default)

Example 15. Aggregate querying for counts of messages by date/time range

In []:
import json
import pymongo # pip install pymongo
from bson import json_util # Comes with pymongo

client = pymongo.MongoClient()
db = client.enron
mbox = db.mbox

results = mbox.aggregate([
{
  # Create a subdocument called DateBucket with each date component projected
  # so that these fields can be grouped on in the next stage of the pipeline
  "$project" :
  {
     "_id" : 0,
     "DateBucket" : 
     {
       "year" : {"$year" : "$Date"}, 
       "month" : {"$month" : "$Date"}, 
       "day" : {"$dayOfMonth" : "$Date"},
       "hour" : {"$hour" : "$Date"},
     }
  }
},
{
  "$group" : 
  {
    # Group by year and date by using these fields for the key.
    "_id" : {"year" : "$DateBucket.year", "month" : "$DateBucket.month"},
    
    # Increment the sum for each group by 1 for every document that's in it
    "num_msgs" : {"$sum" : 1}
  }
},
{
  "$sort" : {"_id.year" : 1, "_id.month" : 1} 
}
])

print results

Example 16. Rendering time series results as a nicely displayed table

In []:
from prettytable import PrettyTable


pt = PrettyTable(field_names=['Year', 'Month', 'Num Msgs'])
pt.align['Num Msgs'], pt.align['Month'] = 'r', 'r'
[ pt.add_row([ result['_id']['year'], result['_id']['month'], result['num_msgs'] ]) 
  for result in results['result'] ]

print pt

Example 17. Connecting to Gmail with Xoauth

In []:
import sys
import oauth2 as oauth
import oauth2.clients.imap as imaplib

# See http://code.google.com/p/google-mail-xoauth-tools/wiki/
# XoauthDotPyRunThrough for details on obtaining and
# running xoauth.py to get the credentials

OAUTH_TOKEN = ''  # XXX: Obtained with xoauth.py
OAUTH_TOKEN_SECRET = ''  # XXX: Obtained with xoauth.py
GMAIL_ACCOUNT = ''  # XXX: Your Gmail address - example@gmail.com

url = 'https://mail.google.com/mail/b/%s/imap/' % (GMAIL_ACCOUNT, )

# Standard values for Gmail's Xoauth
consumer = oauth.Consumer('anonymous', 'anonymous')  
token = oauth.Token(OAUTH_TOKEN, OAUTH_TOKEN_SECRET)

conn = imaplib.IMAP4_SSL('imap.googlemail.com')
conn.debug = 4  # Set to the desired debug level
conn.authenticate(url, consumer, token)

conn.select('INBOX')

# Access your INBOX data

Example 18. Query your Gmail inbox and store the results as JSON

In []:
import sys
import mailbox
import email
import quopri
import json
import time
from BeautifulSoup import BeautifulSoup
from dateutil.parser import parse

# What you'd like to search for in the subject of your mail.
# See Section 6.4.4 of http://www.faqs.org/rfcs/rfc3501.html
# for more SEARCH options.

Q = "Alaska" # XXX

# Recycle some routines from Example 6-3 so that you arrive at the
# very same data structure you've been using throughout this chapter

def cleanContent(msg):

    # Decode message from "quoted printable" format
    msg = quopri.decodestring(msg)
        
    # Strip out HTML tags, if any are present.
    # Bail on unknown encodings if errors happen in BeautifulSoup.
    try:
        soup = BeautifulSoup(msg)
    except:
        return ''
    return ''.join(soup.findAll(text=True))

def jsonifyMessage(msg):
    json_msg = {'parts': []}
    for (k, v) in msg.items():
        json_msg[k] = v.decode('utf-8', 'ignore')

    # The To, Cc, and Bcc fields, if present, could have multiple items.
    # Note that not all of these fields are necessarily defined.

    for k in ['To', 'Cc', 'Bcc']:
        if not json_msg.get(k):
            continue
        json_msg[k] = json_msg[k].replace('\n', '').replace('\t', '')\
                                 .replace('\r', '').replace(' ', '')\
                                 .decode('utf-8', 'ignore').split(',')

    for part in msg.walk():
        json_part = {}
        if part.get_content_maintype() == 'multipart':
            continue
            
        json_part['contentType'] = part.get_content_type()
        content = part.get_payload(decode=False).decode('utf-8', 'ignore')
        json_part['content'] = cleanContent(content)
           
        json_msg['parts'].append(json_part)
        
    # Finally, convert date from asctime to milliseconds since epoch using the
    # $date descriptor so it imports "natively" as an ISODate object in MongoDB.
    then = parse(json_msg['Date'])
    millis = int(time.mktime(then.timetuple())*1000 + then.microsecond/1000)
    json_msg['Date'] = {'$date' : millis}

    return json_msg

# Consume a query from the user. This example illustrates searching by subject.

(status, data) = conn.search(None, '(SUBJECT "%s")' % (Q, ))
ids = data[0].split()

messages = []
for i in ids:
    try:
        (status, data) = conn.fetch(i, '(RFC822)')
        messages.append(email.message_from_string(data[0][1]))
    except Exception, e:
        print e
        print 'Print error fetching message %s. Skipping it.' % (i, )

print len(messages)
jsonified_messages = [jsonifyMessage(m) for m in messages]

# Separate out the text content from each message so that it can be analyzed.

content = [p['content'] for m in jsonified_messages for p in m['parts']]

# Content can still be quite messy and contain line breaks and other quirks.

filename = os.path.join('resources/ch06-mailboxes/data', 
                        GMAIL_ACCOUNT.split("@")[0] + '.gmail.json')
f = open(filename, 'w')
f.write(json.dumps(jsonified_messages))
f.close()

print >> sys.stderr, "Data written out to", f.name