Friday, October 11, 2013

MongoDB Real-Time Analytics Part-2


Here is another schema sample for storing Daily,Hourly and Minutes data 

{
    "_id" : "daily", ---> should be unique date for the day
    "article" : {
        "HT97" : 100,
        "HT98" : 100,
        "HT99" : 100
    },
    "daily" : 300,
    "hourly" : {
        "15" : 200,
        "16" : 100
    },
    "minutely" : {
        "957" : 100,
        "958" : 100,
        "960" : 100
    }
}



package com.realtime;

 import java.text.SimpleDateFormat;
import java.util.Date;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;

public class test {

public static void main(String[] args) {
   
    DBCollection collection = getCollection();
    for(int i=0;i<100;i++){
    Date now = new Date();
    String format  = new SimpleDateFormat("yyyy-MM-dd-HH-mm").format(now);
    Request1 r = new Request1(format,"HT"+i);
    upsert(r.getArticleId(),r,collection);
    System.out.println("upsert request: HT"+i);
    }
         
}
public static void upsert(String id, Request1 request,
            DBCollection collection) {
        BasicDBObject query = new BasicDBObject();
         query.put("_id", "monthly");
        DBObject dbo = createUpsert(request);
        collection.update(query, dbo, true, false);

    }

/**
* increment the stats
* @param request
* @return
*/
    public static DBObject createUpsert(Request1 request) {
        DBObject upsert = new BasicDBObject();
        DBObject inc = new BasicDBObject();
         // Count total hits
        inc.put("daily", 1);
        inc.put("minutely."+getMinutes(request),1); 
        inc.put("hourly."+getHour(request), 1);
        inc.put("article."+request.getArticleId(),1);

        upsert.put("$inc", inc);
        return upsert;
    }
    /**
     * get the current minute in day
     * @param r
     * @return
     */
    public static int getMinutes(Request1 r ){
    String format = r.getTime();
    String sMinutes = format.substring(format.lastIndexOf("-")+1);
    int minutes= Integer.parseInt(sMinutes);
    
    String sHour = getHour(r);
    int hour = Integer.parseInt(sHour);
    
    minutes = hour * 60 + minutes;
     return minutes;
    }
    /**
     * get current hour in a day
     * @param r
     * @return
     */
public static String getHour(Request1 r){
String format = r.getTime();
format = format.substring(format.lastIndexOf("-")-2,format.length()-3);
 
return format;
}
    public static DBCollection getCollection() {
     
        Mongo mongo;
        try {
            mongo = new Mongo();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        DB db = mongo.getDB("analytics");
        return db.getCollection("metrics");
    }
}

Tuesday, September 24, 2013

MongoDB Real Time Analytics.


MongoDB makes it super easy to implement real-time analytics, while the natural tendency is to use Map Reduce feature provided my mongoDB to aggregate data, thought on using upsert feature and $inc operator to see how complex aggregation and groupings can be performed in real-time.
we can update total counts , averages and find distinct entities for a given period( per/sec , per/min, per/hour) in a single upsert statement. this data is then immediately available to draw real-time charts.

Let's say we want to obtain the following information every second for our application.
1. Total Hits
2. Unique Users
3. Unique IP Address
4. Average Response time for each URL ..etc

We will start by setting up some sample requests. Here, I am hardcoding them. In real world, we can tail and parse a log file. Alternatively, this information can be sent directly from the application to our analytics system.

     Date d = new Date();
     SimpleDateFormat dt1 = new SimpleDateFormat("yyyyy_mm_dd_hh_mm_ss");
    
  Request r1 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r2 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user2", 2);
  Request r3 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user1", 1);
  Request r4 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user2", 1);
  Request r5 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r6 = new Request(dt1.format(d), "192_168_1_20","/apple/support1""user3", 4);
  Request r7 = new Request(dt1.format(d), "127_0_0_1","/apple/support3""user1", 1);

Note that the periods in IP Address have been replaced with underscores. This is done because we will be using them as the keys in our JSON documents and having periods in there will cause undesirable effects.
Next, we need to decide the aggregation interval. This determines how we choose our document id’s.
In this example, we have decided to aggregate every second, so we can choose our id as the request time that includes second level information(2013_09_23_02_29_28). If we were aggregating at an hourly interval, we could have simply used the id that contains only hour level information; such as ”2013_09_23_02″.
Based on the chosen id, all requests coming within a second will update  a single document. While there may be hundreds of request per second, we will end up having one document per second, that will represent the aggregated view for that second.
This has a very nice side effect. MongoDB index sizes are directly propotional to the number of documents.  Since we maintain one record per second instead of storing each request as a separate document, we greatly reduce the required index size and its much easier to fit the index in memory.
So, let us see how does aggregation and counts happen. Following is the function that creates the upsert request for each record.
1 public static DBObject createUpsert(Request request) {
2
3        DBObject upsert = new BasicDBObject();
4        DBObject inc = new BasicDBObject();
5        // Count total hits
6        inc.put("hits", 1);
7        // Get distinct IP's & Count hits per IP
8        inc.put("ip." + request.getIp() + ".hits", 1);
9        // Get Distinct Users & Count hits per user
10        inc.put("users." + request.getUser() + ".hits", 1);
11
12        // Get Distinct URL's & Count hits per URL
13        inc.put("urls." + request.getUrl() + ".hits", 1);
14        // Total time taken to process 'n' requests.
15        // Divide it by the count obtained above to get the average processing time for each URL.
16        inc.put("urls." + request.getUrl() + ".totalTime",request.getProcessTime());
17        upsert.put("$inc", inc);
18        return upsert;
19    }

Line Number 6 simply increments the ‘hits’ field, giving us the total hits in a second.
In Line Number 8, we get more creative. For the current request, if the field ip.’ip_address’ (e.g. ip.localhost) exists, then its ‘hits’ field is incremented. Otherwise, the field  ip.’ip_address’ get added to the ‘ip’ field. At the end of the second, we have collected all distinct IPs from which the application got the requests as well as the number of requests received from each IP.
We do the same for users and URL’s. Finally, in line number 16, we accumulate the total processing time for all requests for each ‘url’ in the field urls.’url’.totalTime.
By simply dividing the accumulated time by the ‘hits’ count later, we can get the average response time.
Here’s the function that gets invoked for each request, updating/inserting the records in database.
  public static void upsert(String id, Request request,
            DBCollection collection) {
        BasicDBObject query = new BasicDBObject();
        query.put("_id", id);
        DBObject dbo = createUpsert(request);
        collection.update(query, dbo, truefalse);

    }
As each request is received and upserted, our data keeps on getting aggregated immediately. Below is the final aggregated document at the end of processing requests for the time period represented by ’2013_09_23_02_29_28′.
{
    "_id" : "2013_09_23_02_29_28",
    "hits" : 7,
    "ip" : {
        "192_168_1_20" : {
            "hits" : 1
        },
        "127_0_0_1" : {
            "hits" : 6
        }
    },
    "urls" : {
        "/apple/support1" : {
            "hits" : 4,
            "totalTime" : 8
        },
        "/apple/support2" : {
            "hits" : 2,
            "totalTime" : 2
        },
        "/apple/support3" : {
            "hits" : 1,
            "totalTime" : 1
        }
    },
    "users" : {
        "user1" : {
            "hits" : 4
        },
        "user2" : {
            "hits" : 2
        },
        "user3" : {
            "hits" : 1
        }
    }
}

In this approach, each request is a single write operation. With each write we do multiple aggregations and generate the latest aggregated view with almost no delay. There is absolutely no need to run any batch processes or expensive Map-Reduce to get this information. It is right there and can be read from the database and presented to the UI in less than a second

If you chose to implement sharding, then the document id’s would have to be chosen differently.
For example, in the current scenario, we might append host name to the time, effectively storing data for each host in a different shard. At runtime, we can do a simple aggregation at the application level to present a unified view, while equally distributing the load to multiple database servers.

Complete Code for below.

Aggregator.java

package com.realtime;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
public class Aggregator {

  public static void main(String[] args) {

  Date d = new Date();
  SimpleDateFormat dt1 = new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss");
        
  Request r1 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r2 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user2", 2);
  Request r3 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user1", 1);
  Request r4 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user2", 1);
  Request r5 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r6 = new Request(dt1.format(d), "192_168_1_20","/apple/support1""user3", 4);
  Request r7 = new Request(dt1.format(d), "127_0_0_1","/apple/support3""user1", 1);

  try {
  Thread.sleep(1000);
  } catch (InterruptedException e) {
  }


  Date d1 = new Date();
  SimpleDateFormat dt2 = new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss");

  Request r8 = new Request(dt2.format(d1), "127_0_0_1","/apple/support1""user4", 2);
        
        Request requests[] = { r1, r2, r3, r4, r5, r6, r7, r8 };
        DBCollection collection = getCollection();
        for (Request request : requests) {
            upsert(request.getTime(), request, collection);
        }
    }

  public static void upsert(String id, Request request,
            DBCollection collection) {
        BasicDBObject query = new BasicDBObject();
        query.put("_id", id);
        DBObject dbo = createUpsert(request);
        collection.update(query, dbo, truefalse);

    }

    public static DBObject createUpsert(Request request) {
        DBObject upsert = new BasicDBObject();
        DBObject inc = new BasicDBObject();
        // Count total hits
        inc.put("hits", 1);
        // Get distinct IP's & Count hits per IP
        inc.put("ip." + request.getIp() + ".hits", 1);
        // Get Distinct Users & Count hits per user
        inc.put("users." + request.getUser() + ".hits", 1);

        // Get Distinct URL's & Count hits per URL
        inc.put("urls." + request.getUrl() + ".hits", 1);
        // Total time taken to process 'n' requests.
        // Divide it by the count obtained above to get the average processing
        // time for each URL.
        inc.put("urls." + request.getUrl() + ".totalTime",
                request.getProcessTime());
        upsert.put("$inc", inc);
        return upsert;
    }

    public static DBCollection getCollection() {

        Mongo mongo;
        try {
            mongo = new Mongo();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        DB db = mongo.getDB("realtime");
        return db.getCollection("requests");
    }
}


Below is the code for Request.java

package com.realtime;

public class Request {
    private String time;
    private String ip;
    private String url;
    private String user;
    private int processTime;

    public Request(String time, String ip, String url, String user,
            int processTime) {
        this.ip = ip;
        this.time = time;
        this.user = user;
        this.url = url;
        this.processTime = processTime;
    }

    public String getTime() {
        return time;
    }

    public String getIp() {
        return ip;
    }

    public String getUrl() {
        return url;
    }

    public String getUser() {
        return user;
    }

    public int getProcessTime() {
        return processTime;
    }

}


Friday, August 9, 2013

XmlAdapter - JAXB's Secret Ninja.

The XmlAdapter mechanism in JAXB ensures that there is no such thing as an unmappable class. However there appears to be some confusion on how to use XmlAdapter, below is the general concept:
  1. Identify the unmappable class
  2. Create an equivalent class that is mappable
  3. Create an XmlAdapter to convert between unmappable and mappable objects
  4. Specify the XmlAdapte
Recently i ran into a problem where i need to Store/Retrieve data from MongoDB using ApacheCXF and Spring-Data for MongoDB.

Storing data was easy i had no complaints when using Map's. As shown below.



@XmlRootElement(name = "article")
 public class Article implements Serializable{
     
private static final long serialVersionUID = 1L;
private Map<String,Map<String,String>> map = new HashMap<String,Map<String,String>>();
 
    public Article(){}
    public Article(Map<String,Map<String,String>> m){
    this.map = m;
    }
  public Map<String,Map<String,String>> getMap() {
return map;
}
public void setMap(Map<String,Map<String,String>>  map) {
this.map = map;
}
}

Above class was good enough for Create Operation, however when i was trying to retrieve data it was throwing exception that unable to convert Map to XML , JAXB exception all over.

that's when i decided to extend XMLAdapter.

1. Identify the Unmappable Class
  In this example the unmappable class is java.util.Map.

2.  Create an Equivalent Class that is Mappable 
Map could be represented by an object ,that contained a list of objects with two properties: key and value . Below are the classes that i implemented to achieve this.


public class MapType {
public List element = new ArrayList();

public class MapEntryType {
@XmlAttribute 
public String name;
@XmlElement
public List<LinkCountMapType> attribute = new ArrayList<LinkCountMapType>();

public class LinkCountMapType {
@XmlAttribute
public String name;
@XmlValue 
public String count;
}  


3. Create an XmlAdapter to Convert Between Unmappable and Mappable Objects
The XmlAdapter class is responsible for converting between instances of the unmappable and mappable classes. Most people get confused between the value and bound types. The value type is the mappable class, and the bound type is the unmappable class.




public final class MapAdapter extends XmlAdapter<MapType, Map<String, Map<String, String>>> {

@Override
public Map<String, Map<String, String>> unmarshal(MapType v) throws Exception {
    Map<String, Map<String, String>> mainMap = new HashMap<String, Map<String, String>>();

    List<MapEntryType> myMapEntryTypes = v.element;
    for (MapEntryType myMapEntryType : myMapEntryTypes) {
        Map<String, String> linkCountMap = new HashMap<String, String>();
        for (LinkCountMapType myLinkCountMapType : myMapEntryType.attribute) {
            linkCountMap.put(myLinkCountMapType.name, myLinkCountMapType.count);
        }
        mainMap.put(myMapEntryType.name, linkCountMap);
    }  
    return mainMap;
}

@Override
public MapType marshal(Map<String, Map<String, String>> v) throws Exception {
    MapType myMapType = new MapType();

    List<MapEntryType> entry = new ArrayList<MapEntryType>();

    for (String name : v.keySet()) {
        MapEntryType myMapEntryType = new MapEntryType();
        Map<String, String> linkCountMap = v.get(name);
        List<LinkCountMapType> linkCountList = new ArrayList<LinkCountMapType>();
        for (String link : linkCountMap.keySet()) {
            LinkCountMapType myLinkCountMapType = new LinkCountMapType();
            String count = linkCountMap.get(link);
            myLinkCountMapType.count = count;
            myLinkCountMapType.name = link;
            linkCountList.add(myLinkCountMapType);
        }
        myMapEntryType.name = name;
        myMapEntryType.attribute = linkCountList;
        entry.add(myMapEntryType);
    }
    myMapType.element = entry;
    return myMapType;
}
}



4. Specify the XmlAdapter 
The @XmlJavaTypeAdapter annotation is used to specify the use of the XmlAdapter. Below it is specified on the map field on the Article class. Now during marshal/unmarshal operations the instance of Map is treated as an instance of My MapType.



@XmlRootElement(name = "article")
@XmlAccessorType(XmlAccessType.FIELD)
 public class Article implements Serializable{
     
private static final long serialVersionUID = 1L;
    @XmlJavaTypeAdapter(MapAdapter.class)
private Map<String,Map<String,String>> map = new HashMap<String,Map<String,String>>();
 
    public Article(){}
    public Article(Map<String,Map<String,String>> m){
    this.map = m;
    }
  public Map<String,Map<String,String>> getMap() {
return map;
}
public void setMap(Map<String,Map<String,String>>  map) {
this.map = map;
}
}

Create ElasticSearch cluster on single machine

I wanted to figure out how to create a multi-node ElasticSearch cluster on single machine. So i followed these instructions First i did...