Google Phone Numbers in Spark

The Problem

Our CRM team have always relied on having a cleaned up phone number from our old datawarehouse so that they can push SMS messages to customers. The old system was something of a black box in the way it cleaned and validated phone numbers, and various people across the business have attempted to come up with an alternative. We operate across multiple countries, so any SQL code to validate phone numbers ends up being either very naive or very hard to read or both.

A first attempt at cleaning up these phone numbers included many levels of nested IF statements in SQL, the following being just a tiny segment.

if(substr(c.telephone,1,5) in ('+3505','+3538','+3584','+3585'),
        --then
        regexp_replace(c.telephone,'\\+',''),
        --else
        if(substr(c.telephone,1,4) = '+447',
        --then
        regexp_replace(c.telephone,'\\+',''),
        --else
        if(substr(c.telephone,1,2) = '07' and c.country_code='UK',
            --then
            concat('44',regexp_replace(c.telephone,'^0','')),
            --else
            if(substr(c.telephone,1,3) = '058' and c.country_code='GI',
            --then
            concat('350',regexp_replace(c.telephone,'^0','')),
            --else
            if(substr(c.telephone,1,2) = '08' and c.country_code='IE',
                --then
                concat('353',regexp_replace(c.telephone,'^0','')),
                --else
                if(substr(c.telephone,1,2) in ('05','04') and c.country_code='FI',
                --then
                concat('358',regexp_replace(c.telephone,'^0','')),
                --else
                c.mobile
                )
            )
            )
        )
        )
    )

Surely there must be a better way? Somebody must have solved this problem already?

Thanks Google! Their phone number library libphonenumber for parsing, formatting and validating international phone numbers has been in use in Android since 4.0. It’s written in Java though, and we wanted to try something out quickly. Fortunately there’s a Python wrapper to make usage much easier.

Proof of concept

darrell@darrell-hadoop python (dev) $ python
Python 2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import phonenumbers
>>> p = phonenumbers.parse('07700900016', 'GB')
>>> p
PhoneNumber(country_code=44,
            national_number=7700900016,
            extension=None,
            italian_leading_zero=None,
            number_of_leading_zeros=None,
            country_code_source=None,
            preferred_domestic_carrier_code=None)
>>> phonenumbers.is_valid_number(p)
True

Process for all customers

After a few minutes we can see that libphonenumber is easy to work with, and correctly identifies some numbers we’ve put into it (side note, Italian numbers are weird). But how can we apply this to all of our customers in a simple and performant manner? Traditionally we’d use a Hive UDF for this kind of functionality, but they’re a bit awkward to maintain and we’ve had problems in migrating UDFs from Hive to Impala in the past. We also have a more modern framework in the form of PySpark so lets get the cluster to do all the work.

Proof of Concept - part 2

Lets load the relevant fields from a small subset of data into a Spark dataframe using SparkSQL and work on them there.

#
# Cleanup phone numbers
#
#
from pyspark import SparkContext
# sc is an existing SparkContext.
from pyspark.sql import *
from pyspark.sql.types import *
import phonenumbers
from phonenumbers import PhoneNumberType
from datetime import datetime

sc = SparkContext(appName="CleanPhoneNumbers")

sqlContext = HiveContext(sc)
sqlContext.setConf("spark.sql.hive.convertMetastoreParquet", "false")
dfCustomers = sqlContext.sql("""SELECT \
                                cust_id,\
                                telephone,\
                                mobile,\
                                country_code, \
                                email, \
                                dw_last_modified_dt \
                                FROM \
                                sbgi_customers.customers_detail\
                                """)

dfCustomers = (
    dfCustomers
    .filter(dfCustomers.dw_last_modified_dt >= '2016-03-21 00:00:00')
)

def fixrecord(c):
    # Prefer the mobile field over the telephone field
    number = c.telephone if c.mobile is None else c.mobile

    # Our country code definitions don't match up with Google's so fix the UK ones
    if c.country_code == 'UK':
        country_code = 'GB'
    else:
        country_code = c.country_code

    is_valid_number = "N"
    clean_number = None
    number_type = None

    p = None

    if number is not None:
        # Clean the number first
        try:
            p = phonenumbers.parse(number, country_code)

            if phonenumbers.is_valid_number(p):
                is_valid_number = "Y"
            elif phonenumbers.truncate_too_long_number(p):
                is_valid_number = "Y"
            else:
                is_valid_number = "N"

            clean_number = "%s%s" % (p.country_code, p.national_number)
        except:
            p = None

    return Row(
        # These are listed in alphabetical order as this is the way they
        # come out when the data frame is created
        clean_number=clean_number,
        cust_id=c.cust_id,
        is_valid_number=is_valid_number
    )

rddCustomers = dfCustomers.map(lambda c: fixrecord(c))

schema = StructType([
    StructField("clean_number",    StringType(),    True),
    StructField("cust_id",         LongType(),      True),
    StructField("is_valid_number", StringType(),    True)
])

df = SQLContext(sc).createDataFrame(rddCustomers, schema)
df.write.parquet(
    '/user/taylord/test/clean_numbers/', mode='overwrite')

To run this code in the cluster is simple, we just need to pass in the library along with the code we’ve just written to spark-submit.

spark-submit --py-files phonenumbers.zip clean_phone_numbers.py

That’s all

Results

So how does this compare to the old way of doing things? Well the numbers are not insignificant, and our CRM team could give you some numbers around this.

Out of a customer base of millions just for the UK and Ireland, libphonenumber marks hundreds of thousands of records as valid that were previously invalid.

We now need to understand why the big difference. The previous system had some bugs around Irish mobile numbers, but there’s also a sizeable number of cases where the phone number doesn’t match the customer’s country. In one instance, a Dubai phone number, the old system naively tried to add +44 to the number which meant it created a phone number it then decided was invalid; libphonenumber instead correctly identified the country and handled it accordingly.

Phone number type

Libphonenumber can identify phone number types much better than our old system. Of most interest to us is being able to identify Fixed line, Mobile, Personal number, Pager and Premium Rate numbers.

We discovered that about 25% of the valid phone numbers were landline numbers, 74% mobile numbers. There’s also a small number of pagers as well as VoIP numbers, toll free numbers and premium rate numbers.

Can receive SMS?

Going back to the original requirement, our CRM team want to use this data to send SMS messages to customers. Lets help them make the decision on which number type they can use.

is_mobile = (number_type == PhoneNumberType.MOBILE or
			 number_type == PhoneNumberType.FIXED_LINE_OR_MOBILE or
				 number_type == PhoneNumberType.PAGER)
if (is_valid_number=='Y' and is_mobile):
	can_rx_sms = 'Y'

Manage spark partitions

This is now coming together nicely, but when we try running it across our entire customer base it takes over an hour. That can’t be right, so what’s happening? The HiveContext we’re using to read data into an RDD uses a single partition, meaning a single Yarn container to do all of the work. We should probably repartition the data to speed this up.

#
# Work out the number of partitions we need based on 150K customers per executor
# seems to be a fair trade off for processing speed.  If this partition stage is
# skipped the process takes over an hour.
#
NumberOfCustomers = dfCustomers.count()
NumberOfPartitions = max(NumberOfCustomers / 150000, 1)
rddCustomers = (
    dfCustomers
    .repartition(NumberOfPartitions)
    .map(lambda c: fixrecord(c))
    )

By dynamically repartitioning the data to paritions of up to 150,000 records we managed to speed this up to just a few minutes across the entire customer base.

Whilst it’s great that we can run this across our entire customer base in just a few minutes, we don’t really want to be doing that. First of all it’s wasted work, but more importantly we need to know about customer contact details as soon as they change - not a day later. So rather than run this against the customers table, we run it against newly created or modified customer records every thirty minutes.

Lessons learned

  • Using regular Python modules in PySpark is easy, just submit a .zip with the job
  • PoC to production is really quick, particularly now we’ve extended our Pidl code to support Spark