Wednesday, April 16, 2014

Restricts the contents of the documents based on information stored in the documents themselves.

MongoDB 2.6 has an API that Restricts the contents of the documents based on information stored in the documents themselves.

Evaluates Access at Every Document/Sub-Document Level

test collection contains documents of the following form where the tags field lists the different access values for that document/subdocument level; i.e. a value of [ “public", “internal" ] specifies either “public" or “internal" can access the data:

Sample Example JSON Document below
{

    "_id" : 1,
    "title" : "Document Formatting",
    "tags" : [
       "PUBLIC",
        "SERVICE_PROVIDER",
        "INTERNAL"
    ],
    "year" : 2014,
    "subsections" : [
        {
            "subtitle" : "Section 1: Overview",
            "tags" : [
                "PUBLIC",
                "SERVICE_PROVIDER"
            ],
            "content" : "Section 1: This is PUBLIC and SERVICE PROVIDER content of section 1."
        },
        {
            "subtitle" : "Section 2: SuperScript",
            "tags" : [
               "SERVICE_PROVIDER"
            ],
            "content" : "Section 2: This is SERVICE PROVIDER content of section 2."
        },
        {
            "subtitle" : "Section 3: SubScript",
            "tags" : [
                "PUBLIC"
            ],
            "content" : {
                "text" : "Section 3: This is INTERNAL content of section3.",
                "tags" : [
                   "INTERNAL"
                ]
            }
        }
    ]
}

A user has access to view information with either the tag “public" or “internal". To run a query on all documents with year 2014 for this user, include a $redact stage as in the following:


var userAccess = ["INTERNAL","PUBLIC"];
db.test.aggregate(
   [
     { $match: { year: 2014 } },
     { $redact:
         {
            $cond:
               {
                 if: { $gt: [ { $size: { $setIntersection: [ "$tags", userAccess ] } }, 0 ] },
                 then: "$$DESCEND",
                 else: "$$PRUNE"
               }
         }
     }
   ]
)

The aggregation operation returns the following “redacted” document, which does not include SERVICE_PROVIDER subsection.

{

    "result" : [
        {
            "_id" : 1,
            "title" : "Document Formatting",
            "tags" : [
               "PUBLIC",
                "SERVICE_PROVIDER",
                "INTERNAL"
            ],
            "year" : 2014,
            "subsections" : [
                {
                    "subtitle" : "Section 1: Overview",
                    "tags" : [
                       "PUBLIC",
                        "SERVICE_PROVIDER"
                    ],
                    "content" : "Section 1: This is PUBLIC and SERVICE PROVIDER content of section 1."
                },
                {
                    "subtitle" : "Section 3: SubScript",
                    "tags" : [
                        "PUBLIC"
                    ],
                    "content" : {
                        "text" : "Section 3: This is INTERNAL content of section3.",
                        "tags" : [
                           "INTERNAL"
                        ]
                    }
                }
            ]
        }
    ],
    "ok" : 1
}

Apache Mahout for Document Similarity.

Using Apache Mahout for Document Similarity.

Below are steps to run on the text file collection.


  • sh mahout seqdirectory -c UTF-8 -i /Users/xxxx/myfiles/ -o seqfiles
  • sh mahout seq2sparse -i seqfiles/ -o vectors/  -ow -chunk 100  -x 90  -seq  -ml 50  -n 2  -s 5 -md 5  -ng 3  -nv
  • sh mahout rowid -i vectors/tfidf-vectors/part-r-00000 -o matrix
  • sh mahout rowsimilarity -i matrix/matrix -o similarity  --similarityClassname SIMILARITY_COSINE -m 10 -ess
  • sh mahout seqdumper -i similarity > similarity.txt
  • sh mahout seqdumper -i matrix/docIndex > docIndex.txt
Apache Mahout does following Steps 

  • Tokenize and Transform
  • Generate word vectors and weights
  • Find Document similarity based on TF-IDF( Term Frequence - Inverted Document Frequency) using COSINE_SIMILARITY

Tokenization

The first step is to convert the text document into sequence of tokens. Content is tokenized based on single word. All the tokens are then transformed to lower cases so that the content is standardized.
Create N-Grams : A term n-Gram is defined as a series of consecutive tokens of length n. n-Gram consists of all series of consecutive tokens of length n.

Generate Vectors and Weights

Each document is converted to a word vector of n-Grams with weight assigned to each n-Gram. This weight is assigned based on TF-IDF (Term Frequency - Inverse Document Frequency), This is a measure of importance of a term in a document. it is measured by the frequency of the term in the document but is offset by the total frequency of the term in the whole document set.  this creates a balance in weighing terms, as some terms can occur more commonly than other and can be less significant in arriving at similarity.

Document Similarity

Each Document word vector is compared to every other document’s vector. Document similarity is based on cosine similarity measure between the word vector’s.  two vectors with the same orientation have a cosine similarity of 1 , two vectors at 90 have similarity of 0. A matrix similarity between each document with every other document is generated.

Sample docIndex.txt Below

Key: 0: Value: /File558
Key: 1: Value: /File4340
Key: 2: Value: /File4208
Key: 3: Value: /File471

Sample Similarity.txt Below

Key: 0: Value: {0:1.0000000000000002,3865:0.15434639725421775,318:0.16924612516623572,558:0.24384373237418783,471:0.16826200999114294,4340:0.16651713654958472,7:0.164357811310303,1841:0.15904628827648598,4208:0.16296041411613846,14:0.14043468960009342}

Key: 1: Value: {1615:0.3312716794782159,2181:0.2840451034186393,2126:0.32415666313248037,3188:0.1628119850871482,1496:0.24775558568026784,1:1.0,1575:0.13525396149776772,1269:0.13286526354605824,28:0.45703740702783774,1866:0.3262754564949865}

Key: 2: Value: {2:1.0,4350:0.13571272853930183,348:0.12600225826696973,3210:0.13949921190207168,560:0.15234464042319912,3294:0.2889578044491356,802:0.17942407070282945,1633:0.1964965769704117,3355:0.1298340236494648,495:0.12627029072308343}

Key: 3: Value: {1990:0.17193706865160252,3700:0.1302978723523794,2082:0.16196813164388732,2227:0.12561545966019144,665:0.15584753719122243,1163:0.19345501767136697,6:0.22582692114456704,3:1.0,1555:0.1742692199362734,4:0.1818170186646791}

Key: 4: Value: {1990:0.13213250264185705,3:0.1818170186646791,2082:0.1349661297563062,1163:0.15679941310702006,1555:0.18261523201994426,6:0.1981917938129962,738:0.29646182670818444,1684:0.21050749439902763,4:0.9999999999999999,3150:0.1397676584929176}


you can see above similarity matrix with scores 
e;g [  558:0.24384373237418783,471:0.16826200999114294 ]

docIndex  key 0 maps to 0:1.0000000000000002  in similarity.txt output.
Similarly docIndex key 558 maps to 558:0.24384373237418783. in similarity.txt output.


--- Sample

sh mahout seqdumper -i vectors/tfidf-vectors > vectors.txt
sh mahout seqdumper -i vectors/tf-vectors > vectors.txt

Friday, October 11, 2013

MongoDB Real-Time Analytics Part-2


Here is another schema sample for storing Daily,Hourly and Minutes data 

{
    "_id" : "daily", ---> should be unique date for the day
    "article" : {
        "HT97" : 100,
        "HT98" : 100,
        "HT99" : 100
    },
    "daily" : 300,
    "hourly" : {
        "15" : 200,
        "16" : 100
    },
    "minutely" : {
        "957" : 100,
        "958" : 100,
        "960" : 100
    }
}



package com.realtime;

 import java.text.SimpleDateFormat;
import java.util.Date;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;

public class test {

public static void main(String[] args) {
   
    DBCollection collection = getCollection();
    for(int i=0;i<100;i++){
    Date now = new Date();
    String format  = new SimpleDateFormat("yyyy-MM-dd-HH-mm").format(now);
    Request1 r = new Request1(format,"HT"+i);
    upsert(r.getArticleId(),r,collection);
    System.out.println("upsert request: HT"+i);
    }
         
}
public static void upsert(String id, Request1 request,
            DBCollection collection) {
        BasicDBObject query = new BasicDBObject();
         query.put("_id", "monthly");
        DBObject dbo = createUpsert(request);
        collection.update(query, dbo, true, false);

    }

/**
* increment the stats
* @param request
* @return
*/
    public static DBObject createUpsert(Request1 request) {
        DBObject upsert = new BasicDBObject();
        DBObject inc = new BasicDBObject();
         // Count total hits
        inc.put("daily", 1);
        inc.put("minutely."+getMinutes(request),1); 
        inc.put("hourly."+getHour(request), 1);
        inc.put("article."+request.getArticleId(),1);

        upsert.put("$inc", inc);
        return upsert;
    }
    /**
     * get the current minute in day
     * @param r
     * @return
     */
    public static int getMinutes(Request1 r ){
    String format = r.getTime();
    String sMinutes = format.substring(format.lastIndexOf("-")+1);
    int minutes= Integer.parseInt(sMinutes);
    
    String sHour = getHour(r);
    int hour = Integer.parseInt(sHour);
    
    minutes = hour * 60 + minutes;
     return minutes;
    }
    /**
     * get current hour in a day
     * @param r
     * @return
     */
public static String getHour(Request1 r){
String format = r.getTime();
format = format.substring(format.lastIndexOf("-")-2,format.length()-3);
 
return format;
}
    public static DBCollection getCollection() {
     
        Mongo mongo;
        try {
            mongo = new Mongo();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        DB db = mongo.getDB("analytics");
        return db.getCollection("metrics");
    }
}

Tuesday, September 24, 2013

MongoDB Real Time Analytics.


MongoDB makes it super easy to implement real-time analytics, while the natural tendency is to use Map Reduce feature provided my mongoDB to aggregate data, thought on using upsert feature and $inc operator to see how complex aggregation and groupings can be performed in real-time.
we can update total counts , averages and find distinct entities for a given period( per/sec , per/min, per/hour) in a single upsert statement. this data is then immediately available to draw real-time charts.

Let's say we want to obtain the following information every second for our application.
1. Total Hits
2. Unique Users
3. Unique IP Address
4. Average Response time for each URL ..etc

We will start by setting up some sample requests. Here, I am hardcoding them. In real world, we can tail and parse a log file. Alternatively, this information can be sent directly from the application to our analytics system.

     Date d = new Date();
     SimpleDateFormat dt1 = new SimpleDateFormat("yyyyy_mm_dd_hh_mm_ss");
    
  Request r1 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r2 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user2", 2);
  Request r3 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user1", 1);
  Request r4 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user2", 1);
  Request r5 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r6 = new Request(dt1.format(d), "192_168_1_20","/apple/support1""user3", 4);
  Request r7 = new Request(dt1.format(d), "127_0_0_1","/apple/support3""user1", 1);

Note that the periods in IP Address have been replaced with underscores. This is done because we will be using them as the keys in our JSON documents and having periods in there will cause undesirable effects.
Next, we need to decide the aggregation interval. This determines how we choose our document id’s.
In this example, we have decided to aggregate every second, so we can choose our id as the request time that includes second level information(2013_09_23_02_29_28). If we were aggregating at an hourly interval, we could have simply used the id that contains only hour level information; such as ”2013_09_23_02″.
Based on the chosen id, all requests coming within a second will update  a single document. While there may be hundreds of request per second, we will end up having one document per second, that will represent the aggregated view for that second.
This has a very nice side effect. MongoDB index sizes are directly propotional to the number of documents.  Since we maintain one record per second instead of storing each request as a separate document, we greatly reduce the required index size and its much easier to fit the index in memory.
So, let us see how does aggregation and counts happen. Following is the function that creates the upsert request for each record.
1 public static DBObject createUpsert(Request request) {
2
3        DBObject upsert = new BasicDBObject();
4        DBObject inc = new BasicDBObject();
5        // Count total hits
6        inc.put("hits", 1);
7        // Get distinct IP's & Count hits per IP
8        inc.put("ip." + request.getIp() + ".hits", 1);
9        // Get Distinct Users & Count hits per user
10        inc.put("users." + request.getUser() + ".hits", 1);
11
12        // Get Distinct URL's & Count hits per URL
13        inc.put("urls." + request.getUrl() + ".hits", 1);
14        // Total time taken to process 'n' requests.
15        // Divide it by the count obtained above to get the average processing time for each URL.
16        inc.put("urls." + request.getUrl() + ".totalTime",request.getProcessTime());
17        upsert.put("$inc", inc);
18        return upsert;
19    }

Line Number 6 simply increments the ‘hits’ field, giving us the total hits in a second.
In Line Number 8, we get more creative. For the current request, if the field ip.’ip_address’ (e.g. ip.localhost) exists, then its ‘hits’ field is incremented. Otherwise, the field  ip.’ip_address’ get added to the ‘ip’ field. At the end of the second, we have collected all distinct IPs from which the application got the requests as well as the number of requests received from each IP.
We do the same for users and URL’s. Finally, in line number 16, we accumulate the total processing time for all requests for each ‘url’ in the field urls.’url’.totalTime.
By simply dividing the accumulated time by the ‘hits’ count later, we can get the average response time.
Here’s the function that gets invoked for each request, updating/inserting the records in database.
  public static void upsert(String id, Request request,
            DBCollection collection) {
        BasicDBObject query = new BasicDBObject();
        query.put("_id", id);
        DBObject dbo = createUpsert(request);
        collection.update(query, dbo, truefalse);

    }
As each request is received and upserted, our data keeps on getting aggregated immediately. Below is the final aggregated document at the end of processing requests for the time period represented by ’2013_09_23_02_29_28′.
{
    "_id" : "2013_09_23_02_29_28",
    "hits" : 7,
    "ip" : {
        "192_168_1_20" : {
            "hits" : 1
        },
        "127_0_0_1" : {
            "hits" : 6
        }
    },
    "urls" : {
        "/apple/support1" : {
            "hits" : 4,
            "totalTime" : 8
        },
        "/apple/support2" : {
            "hits" : 2,
            "totalTime" : 2
        },
        "/apple/support3" : {
            "hits" : 1,
            "totalTime" : 1
        }
    },
    "users" : {
        "user1" : {
            "hits" : 4
        },
        "user2" : {
            "hits" : 2
        },
        "user3" : {
            "hits" : 1
        }
    }
}

In this approach, each request is a single write operation. With each write we do multiple aggregations and generate the latest aggregated view with almost no delay. There is absolutely no need to run any batch processes or expensive Map-Reduce to get this information. It is right there and can be read from the database and presented to the UI in less than a second

If you chose to implement sharding, then the document id’s would have to be chosen differently.
For example, in the current scenario, we might append host name to the time, effectively storing data for each host in a different shard. At runtime, we can do a simple aggregation at the application level to present a unified view, while equally distributing the load to multiple database servers.

Complete Code for below.

Aggregator.java

package com.realtime;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
public class Aggregator {

  public static void main(String[] args) {

  Date d = new Date();
  SimpleDateFormat dt1 = new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss");
        
  Request r1 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r2 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user2", 2);
  Request r3 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user1", 1);
  Request r4 = new Request(dt1.format(d), "127_0_0_1","/apple/support2""user2", 1);
  Request r5 = new Request(dt1.format(d), "127_0_0_1","/apple/support1""user1", 1);
  Request r6 = new Request(dt1.format(d), "192_168_1_20","/apple/support1""user3", 4);
  Request r7 = new Request(dt1.format(d), "127_0_0_1","/apple/support3""user1", 1);

  try {
  Thread.sleep(1000);
  } catch (InterruptedException e) {
  }


  Date d1 = new Date();
  SimpleDateFormat dt2 = new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss");

  Request r8 = new Request(dt2.format(d1), "127_0_0_1","/apple/support1""user4", 2);
        
        Request requests[] = { r1, r2, r3, r4, r5, r6, r7, r8 };
        DBCollection collection = getCollection();
        for (Request request : requests) {
            upsert(request.getTime(), request, collection);
        }
    }

  public static void upsert(String id, Request request,
            DBCollection collection) {
        BasicDBObject query = new BasicDBObject();
        query.put("_id", id);
        DBObject dbo = createUpsert(request);
        collection.update(query, dbo, truefalse);

    }

    public static DBObject createUpsert(Request request) {
        DBObject upsert = new BasicDBObject();
        DBObject inc = new BasicDBObject();
        // Count total hits
        inc.put("hits", 1);
        // Get distinct IP's & Count hits per IP
        inc.put("ip." + request.getIp() + ".hits", 1);
        // Get Distinct Users & Count hits per user
        inc.put("users." + request.getUser() + ".hits", 1);

        // Get Distinct URL's & Count hits per URL
        inc.put("urls." + request.getUrl() + ".hits", 1);
        // Total time taken to process 'n' requests.
        // Divide it by the count obtained above to get the average processing
        // time for each URL.
        inc.put("urls." + request.getUrl() + ".totalTime",
                request.getProcessTime());
        upsert.put("$inc", inc);
        return upsert;
    }

    public static DBCollection getCollection() {

        Mongo mongo;
        try {
            mongo = new Mongo();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        DB db = mongo.getDB("realtime");
        return db.getCollection("requests");
    }
}


Below is the code for Request.java

package com.realtime;

public class Request {
    private String time;
    private String ip;
    private String url;
    private String user;
    private int processTime;

    public Request(String time, String ip, String url, String user,
            int processTime) {
        this.ip = ip;
        this.time = time;
        this.user = user;
        this.url = url;
        this.processTime = processTime;
    }

    public String getTime() {
        return time;
    }

    public String getIp() {
        return ip;
    }

    public String getUrl() {
        return url;
    }

    public String getUser() {
        return user;
    }

    public int getProcessTime() {
        return processTime;
    }

}


Create ElasticSearch cluster on single machine

I wanted to figure out how to create a multi-node ElasticSearch cluster on single machine. So i followed these instructions First i did...