Tuesday, September 24, 2013

MongoDB Real Time Analytics.


MongoDB makes it super easy to implement real-time analytics, while the natural tendency is to use Map Reduce feature provided my mongoDB to aggregate data, thought on using upsert feature and $inc operator to see how complex aggregation and groupings can be performed in real-time.
we can update total counts , averages and find distinct entities for a given period( per/sec , per/min, per/hour) in a single upsert statement. this data is then immediately available to draw real-time charts.

Let's say we want to obtain the following information every second for our application.
1. Total Hits
2. Unique Users
3. Unique IP Address
4. Average Response time for each URL ..etc

We will start by setting up some sample requests. Here, I am hardcoding them. In real world, we can tail and parse a log file. Alternatively, this information can be sent directly from the application to our analytics system.

     Date d = new Date();
     SimpleDateFormat dt1 = new SimpleDateFormat("yyyyy_mm_dd_hh_mm_ss");
    
  Request r1 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r2 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user2", 2);
  Request r3 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user1", 1);
  Request r4 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user2", 1);
  Request r5 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r6 = new Request(dt1.format(d), "192_168_1_20","/apple/support1""user3", 4);
  Request r7 = new Request(dt1.format(d), "127_0_0_1","/apple/support3""user1", 1);

Note that the periods in IP Address have been replaced with underscores. This is done because we will be using them as the keys in our JSON documents and having periods in there will cause undesirable effects.
Next, we need to decide the aggregation interval. This determines how we choose our document id’s.
In this example, we have decided to aggregate every second, so we can choose our id as the request time that includes second level information(2013_09_23_02_29_28). If we were aggregating at an hourly interval, we could have simply used the id that contains only hour level information; such as ”2013_09_23_02″.
Based on the chosen id, all requests coming within a second will update  a single document. While there may be hundreds of request per second, we will end up having one document per second, that will represent the aggregated view for that second.
This has a very nice side effect. MongoDB index sizes are directly propotional to the number of documents.  Since we maintain one record per second instead of storing each request as a separate document, we greatly reduce the required index size and its much easier to fit the index in memory.
So, let us see how does aggregation and counts happen. Following is the function that creates the upsert request for each record.
1 public static DBObject createUpsert(Request request) {
2
3        DBObject upsert = new BasicDBObject();
4        DBObject inc = new BasicDBObject();
5        // Count total hits
6        inc.put("hits", 1);
7        // Get distinct IP's & Count hits per IP
8        inc.put("ip." + request.getIp() + ".hits", 1);
9        // Get Distinct Users & Count hits per user
10        inc.put("users." + request.getUser() + ".hits", 1);
11
12        // Get Distinct URL's & Count hits per URL
13        inc.put("urls." + request.getUrl() + ".hits", 1);
14        // Total time taken to process 'n' requests.
15        // Divide it by the count obtained above to get the average processing time for each URL.
16        inc.put("urls." + request.getUrl() + ".totalTime",request.getProcessTime());
17        upsert.put("$inc", inc);
18        return upsert;
19    }

Line Number 6 simply increments the ‘hits’ field, giving us the total hits in a second.
In Line Number 8, we get more creative. For the current request, if the field ip.’ip_address’ (e.g. ip.localhost) exists, then its ‘hits’ field is incremented. Otherwise, the field  ip.’ip_address’ get added to the ‘ip’ field. At the end of the second, we have collected all distinct IPs from which the application got the requests as well as the number of requests received from each IP.
We do the same for users and URL’s. Finally, in line number 16, we accumulate the total processing time for all requests for each ‘url’ in the field urls.’url’.totalTime.
By simply dividing the accumulated time by the ‘hits’ count later, we can get the average response time.
Here’s the function that gets invoked for each request, updating/inserting the records in database.
  public static void upsert(String id, Request request,
            DBCollection collection) {
        BasicDBObject query = new BasicDBObject();
        query.put("_id", id);
        DBObject dbo = createUpsert(request);
        collection.update(query, dbo, truefalse);

    }
As each request is received and upserted, our data keeps on getting aggregated immediately. Below is the final aggregated document at the end of processing requests for the time period represented by ’2013_09_23_02_29_28′.
{
    "_id" : "2013_09_23_02_29_28",
    "hits" : 7,
    "ip" : {
        "192_168_1_20" : {
            "hits" : 1
        },
        "127_0_0_1" : {
            "hits" : 6
        }
    },
    "urls" : {
        "/apple/support1" : {
            "hits" : 4,
            "totalTime" : 8
        },
        "/apple/support2" : {
            "hits" : 2,
            "totalTime" : 2
        },
        "/apple/support3" : {
            "hits" : 1,
            "totalTime" : 1
        }
    },
    "users" : {
        "user1" : {
            "hits" : 4
        },
        "user2" : {
            "hits" : 2
        },
        "user3" : {
            "hits" : 1
        }
    }
}

In this approach, each request is a single write operation. With each write we do multiple aggregations and generate the latest aggregated view with almost no delay. There is absolutely no need to run any batch processes or expensive Map-Reduce to get this information. It is right there and can be read from the database and presented to the UI in less than a second

If you chose to implement sharding, then the document id’s would have to be chosen differently.
For example, in the current scenario, we might append host name to the time, effectively storing data for each host in a different shard. At runtime, we can do a simple aggregation at the application level to present a unified view, while equally distributing the load to multiple database servers.

Complete Code for below.

Aggregator.java

package com.realtime;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
public class Aggregator {

  public static void main(String[] args) {

  Date d = new Date();
  SimpleDateFormat dt1 = new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss");
        
  Request r1 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r2 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user2", 2);
  Request r3 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user1", 1);
  Request r4 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user2", 1);
  Request r5 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r6 = new Request(dt1.format(d), "192_168_1_20","/apple/support1""user3", 4);
  Request r7 = new Request(dt1.format(d), "127_0_0_1","/apple/support3""user1", 1);

  try {
  Thread.sleep(1000);
  } catch (InterruptedException e) {
  }


  Date d1 = new Date();
  SimpleDateFormat dt2 = new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss");

  Request r8 = new Request(dt2.format(d1), "127_0_0_1","/apple/support1""user4", 2);
        
        Request requests[] = { r1, r2, r3, r4, r5, r6, r7, r8 };
        DBCollection collection = getCollection();
        for (Request request : requests) {
            upsert(request.getTime(), request, collection);
        }
    }

  public static void upsert(String id, Request request,
            DBCollection collection) {
        BasicDBObject query = new BasicDBObject();
        query.put("_id", id);
        DBObject dbo = createUpsert(request);
        collection.update(query, dbo, truefalse);

    }

    public static DBObject createUpsert(Request request) {
        DBObject upsert = new BasicDBObject();
        DBObject inc = new BasicDBObject();
        // Count total hits
        inc.put("hits", 1);
        // Get distinct IP's & Count hits per IP
        inc.put("ip." + request.getIp() + ".hits", 1);
        // Get Distinct Users & Count hits per user
        inc.put("users." + request.getUser() + ".hits", 1);

        // Get Distinct URL's & Count hits per URL
        inc.put("urls." + request.getUrl() + ".hits", 1);
        // Total time taken to process 'n' requests.
        // Divide it by the count obtained above to get the average processing
        // time for each URL.
        inc.put("urls." + request.getUrl() + ".totalTime",
                request.getProcessTime());
        upsert.put("$inc", inc);
        return upsert;
    }

    public static DBCollection getCollection() {

        Mongo mongo;
        try {
            mongo = new Mongo();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        DB db = mongo.getDB("realtime");
        return db.getCollection("requests");
    }
}


Below is the code for Request.java

package com.realtime;

public class Request {
    private String time;
    private String ip;
    private String url;
    private String user;
    private int processTime;

    public Request(String time, String ip, String url, String user,
            int processTime) {
        this.ip = ip;
        this.time = time;
        this.user = user;
        this.url = url;
        this.processTime = processTime;
    }

    public String getTime() {
        return time;
    }

    public String getIp() {
        return ip;
    }

    public String getUrl() {
        return url;
    }

    public String getUser() {
        return user;
    }

    public int getProcessTime() {
        return processTime;
    }

}


Create ElasticSearch cluster on single machine

I wanted to figure out how to create a multi-node ElasticSearch cluster on single machine. So i followed these instructions First i did...