Sunday, June 22, 2014

A way to JOIN in MongoDB using the Aggregation framework

I want to start by saying, as you may have heard before, if you get your schema design  right, you normally don't need to do any Joins in MongoDB - a lot of typical uses of Joins are handled through the flexibility of the document model and asking how to design joins out of your schema should always be your first port of call.

I was working on an interesting big data problem last week which I will document in a future blog but I had the need to take a value in each document and project a second value looked up from another collection - essentially a lookup table.

In an RDBMS this is a great use of a JOIN however for sound technical reasons MongoDB doesn't offer this - once you are working on a large data set and it's distributed out on shards the performance overhead of JOINs is bad, traditionally you need to have the data you are joining located together so your query results get pulled back to a central location and merged - not quick and not so scaleable.

I hit upon the idea of pushing out the secondary table inside the aggregation query as a kind of case statement. This is easiest to show with a short example.

Imagine you are a foreign exchange company and you have customers who have made trades with you and you are holding balances for them in a number of currencies. I will show this as the simplest form

custrec = {
   customer: 'john',
   currency: 'USD',
   quantity: 250
}

Now in this case - the exchange rate of USD varies - so If I want to work out things for my customers relative to the current price, or some hypothetical price changes how so I do it. I'm assuming in this you plan to use the aggregation framework.

So you can do

usdprice = 1.65 

db.holdings.aggregate({$project : { customer:1,currency:1,quantity:1,price : { $cond : [ $eq { '$currency','USD',usdprice,null}]}})

And if currency equals USD you will have a price added to the document for the next pipeline stage - as a project before any group or sort operations this will happen on the remote shard.

The thing is there are a bunch of currencies so how do you extend this?

One aggregation operator that lets you do this is $add ( or $concat for strings ) which allows you a construct and array and sum it like this.

usdprice = 1.65
gbpprice=2.2
eurprice=1.89

db.holdings.aggregate({$project : { customer:1,currency:1,quantity:1,
    price :  { $add : [ { $cond : [{ $eq : [ '$currency','USD']},usdprice,null]},
                        { $cond : [ {$eq : [ '$currency','GBP']},gbpprice,null]}, 
                        { $cond : [ {$eq : [ '$currency','EUR']},eurprice,null]}]
             }
    }})

This was my first attempt at joining - to be clear you don't want to type these types of queries  in but rather use some code  to construct the query for you from an array of objects. Thats why MongoDB's object as query model rocks.

Before you do that though - this method is really inefficient as MongoDB will have to evaluate every single conditional for every document, and if you have 10's of thousands of conditionals this takes forever.

I then wondered how to improve this - You could have it stop once it has found a match by nesting $cond statements, $cond has an if-then-else structure so we can rewrite the above as

usdprice = 1.65
gbpprice=2.2
eurprice=1.89


 db.holdings.aggregate({$project : { customer:1,currency:1,quantity:1,
        price :  { $cond : [ { $eq : [ '$currency','USD']},usdprice,
                    { $cond : [ {$eq : [ '$currency','GBP']},gbpprice,

                        { $cond : [{ $eq :[ '$currency','EUR']},eurprice,null]}]}]}}})

This isn't great either - the number of lookups is halved on average, you have simply changed it to a list lookup and it will have to scan 50% of the list each time on average and you have now increased the recursive document depth - MongoDB has some limits to how deep a BSON document can nest, and it's not thousands or even hundreds.

At this point I realised an efficient nested  if-then-else lookup would be a Binary Tree - and all I had to do was 'automatically' transform a table into a set of $cond statements where there were a minimal set of conditionals to get to a given value, this would keep the depth down and the lookups optimal. I would need to use both $eq for leaf nodes and $gt for branching nodes.


Here's an example complete with data to try it on.

First Generate our two data sets


use fx
db.holdings.drop()

numcustomers = 100
numtrades = 100
currencies = ["AFN","ALL","DZD","AOA","XCD","XCD","ARS","AMD","AWG","AUD","AZN","BSD",
"BHD","BDT","BBD","BYR","BZD","XOF","BMD","BTN","INR","BOB","BOV","BAM",
"BWP","NOK","BRL","BND","BGN","XOF","BIF","KHR","XAF","CAD","CVE","KYD",
"XAF","XAF","CLF","CLP","CNY","AUD","AUD","COP","COU","KMF","XAF","CDF",
"NZD","CRC","XOF","HRK","CUC","CUP","ANG","CZK","DKK","DJF","XCD","DOP",
"EGP","SVC","XAF","ERN","ETB","FKP","DKK","FJD","XPF","XAF","GMD","GEL",
"GHS","GIP","DKK","XCD","GTQ","GBP","GNF","XOF","GYD","HTG","AUD","HNL",
"HKD","HUF","ISK","INR","IDR","XDR","IRR","IQD","GBP","ILS","JMD","JPY",
"GBP","JOD","KZT","KES","AUD","KPW","KRW","KWD","KGS","LAK","LBP","LSL",
"ZAR","LRD","LYD","CHF","LTL","MOP","MKD","MGA","MWK","MYR","MVR","XOF",
"MRO","MUR","XUA","MXN","MXV","MDL","MNT","XCD","MAD","MZN","MMK","NAD",
"ZAR","AUD","NPR","XPF","NZD","NIO","XOF","NGN","NZD","AUD","NOK","OMR",
"PKR","PAB","PGK","PYG","PEN","PHP","NZD","PLN","QAR","RON","RUB","RWF",
"SHP","XCD","XCD","XCD","WST","STD","SAR","XOF","RSD","SCR","SLL","SGD",
"ANG","XSU","SBD","SOS","ZAR","SSP","LKR","SDG","SRD","NOK","SZL","SEK",
"CHE","CHF","CHW","SYP","TWD","TJS","TZS","THB","XOF","NZD","TOP","TTD",
"TND","TRY","TMT","AUD","UGX","UAH","AED","GBP","USN","UYI","UYU","UZS",
"VUV","VEF","VND","XPF","MAD","YER","ZMW","ZWL","XBA","XBB","XBC","XBD",
"XTS","XXX","XAU","XPD","XPT","XAG"]

for(t=0;t<numtrades;t++)
    customerid = Math.floor(Math.random() * numcustomers) 
    currency = currencies[ Math.floor(Math.random() * currencies.length)]
    quantity = Math.floor(Math.random() * 10000)
    //Round to 3 decimal places
    buyprice = Math.floor( ((Math.random() * 3.0) + 0.5) * 1000) / 1000
    
    db.holdings.insert({_id:t,customerid:customerid, symbol: currency, quantity: quantity, buyprice: buyprice})
}

//Make a second table with 'todays' price

db.market.drop()

for(s=0;s<currencies.length;s++)
    currency = currencies[s]
    price = Math.floor(((Math.random() * 3.0) + 0.5) * 1000) / 1000
    db.market.insert({_id:currency,price:price})

}

Now we create our function which can recursively add a new value to a MongoDB aggregation query building a tree as it does it.


addtotree = function ( tree, keyfield, key, value ) {
    if ( tree == null) {
        tree = { $cond : [ { $eq : [keyfield,key]}, value, null]}
        return tree
    }    else {
        isleaf = tree['$cond'][0]['$eq'
        if(isleaf)
        {
            nodekey = tree['$cond'][0]['$eq'][1]
            nodeval = tree['$cond'][1]
            left = {$cond:[{$eq:[keyfield,key]},value,null]}
            right = {$cond:[{$eq:[keyfield,nodekey]},nodeval,null]} 
            delete tree['$cond'][0]['$eq']
            if( key > nodekey )
            {         
                tree['$cond'][0]['$gt']=[keyfield,nodekey]
                tree['$cond'][1]=left
                tree['$cond'][2]=right
            } else {
                tree['$cond'][0]['$gt']=[keyfield,key]
                tree['$cond'][1]=right
                tree['$cond'][2]=left
            }
        } else {
            nodekey = tree['$cond'][0]['$gt'][1]
            if( key > nodekey )
            {
                addtotree(tree['$cond'][1],keyfield,key,value)
            } else {
                addtotree(tree['$cond'][2],keyfield,key,value)
            }    
        }
        return tree
    }

  }


Then pull all the current prices and put them in a query tree - Note you need to ensure they are  not sorted by the insertion key as this is a binary tree not a btree and will therefore be unbalanced - I sorted by price as that happens to be random in this data but a little array shuffling would do normally.

var marketPrices = null
var marketCursor = db.market.find({}).sort({price:1});
while(marketCursor.hasNext()){
    price = marketCursor.next()
    marketPrices = addtotree(marketPrices,'$symbol',price['_id'],price['price'])

}

I also put the key in there from the primary table '$symbol' as the query need to know what it's matching on

Finally I can run a nice simple, fast aggregation to get the join (which can then pipeline to calculations etc in a further projection)


projects = { customerid: 1, symbol: 1, quantity: 1, buyprice: 1 }
projects['marketprice'] = marketPrices

db.holdings.aggregate({$project : projects})


> db.holdings.aggregate({$project : projects})
{ "_id" : 0, "customerid" : 312, "symbol" : "WST", "quantity" : 4895, "buyprice" : 0.899, "marketprice" : 3.221 }
{ "_id" : 1, "customerid" : 871, "symbol" : "IDR", "quantity" : 4027, "buyprice" : 2.03, "marketprice" : 1.362 }
{ "_id" : 2, "customerid" : 645, "symbol" : "PLN", "quantity" : 5941, "buyprice" : 2.799, "marketprice" : 2.722 }
{ "_id" : 3, "customerid" : 177, "symbol" : "ALL", "quantity" : 7155, "buyprice" : 1.318, "marketprice" : 1.322 }
{ "_id" : 4, "customerid" : 829, "symbol" : "PAB", "quantity" : 6049, "buyprice" : 1.7, "marketprice" : 2.754 }
{ "_id" : 5, "customerid" : 74, "symbol" : "NZD", "quantity" : 3040, "buyprice" : 1.404, "marketprice" : 2.142 }
{ "_id" : 6, "customerid" : 954, "symbol" : "MOP", "quantity" : 5260, "buyprice" : 3.366, "marketprice" : 0.883 }
{ "_id" : 7, "customerid" : 509, "symbol" : "XBC", "quantity" : 4600, "buyprice" : 2.458, "marketprice" : 1.503 }
{ "_id" : 8, "customerid" : 476, "symbol" : "EGP", "quantity" : 8831, "buyprice" : 0.85, "marketprice" : 2.083 }
{ "_id" : 9, "customerid" : 342, "symbol" : "PLN", "quantity" : 8070, "buyprice" : 1.245, "marketprice" : 2.722 }
{ "_id" : 10, "customerid" : 547, "symbol" : "XDR", "quantity" : 134, "buyprice" : 2.351, "marketprice" : 1.046 }
{ "_id" : 11, "customerid" : 743, "symbol" : "XBC", "quantity" : 9110, "buyprice" : 3.221, "marketprice" : 1.503 }
{ "_id" : 12, "customerid" : 933, "symbol" : "KYD", "quantity" : 6024, "buyprice" : 2.658, "marketprice" : 2.574 }
{ "_id" : 13, "customerid" : 652, "symbol" : "ZAR", "quantity" : 2902, "buyprice" : 2.385, "marketprice" : 1.181 }
{ "_id" : 14, "customerid" : 686, "symbol" : "ERN", "quantity" : 8642, "buyprice" : 1.355, "marketprice" : 2.31 }
{ "_id" : 15, "customerid" : 420, "symbol" : "XAF", "quantity" : 2897, "buyprice" : 1.002, "marketprice" : 2.31 }
{ "_id" : 16, "customerid" : 521, "symbol" : "MGA", "quantity" : 8122, "buyprice" : 2.393, "marketprice" : 2.495 }
{ "_id" : 17, "customerid" : 414, "symbol" : "MWK", "quantity" : 2793, "buyprice" : 0.637, "marketprice" : 2.646 }
{ "_id" : 18, "customerid" : 737, "symbol" : "XPF", "quantity" : 7817, "buyprice" : 0.827, "marketprice" : 3.462 }
{ "_id" : 19, "customerid" : 620, "symbol" : "PYG", "quantity" : 8110, "buyprice" : 2.223, "marketprice" : 1.697 }

The nice thing about this method is it's efficient, it scales to large data sets and given the limit of 16MB in a mongoDB document you can probably build a pretty large lookup table, depending on your keys and values , tens of thousands of values should be possible. I'll be pushing those limits with my big data blog next week.




2 comments:

ultra said...

This is an interesting approach - thanks for the post.

I am currently doing something similar with MongoDB where I would like to join the results of one aggregation (a list of financial instruments) with a lookup/join to other identifiers for those instruments.

So I guess my choice is whether to do this join in the DB, as I would with an RDBMS, or do it in the middle tier and make two DB calls.

I am fairly new to MongoDB and am wondering how this approach would work in practice (I have a C# middle tier). Would you save the JavaScript on the server, cache the tree and so on - and if so, how?

John Page said...

Interesting.. I was searching for some info on mongoDB and came across your blog and saw your name ...
Deja Vu. :-)