Tuesday, September 16, 2014

Full text Search Using MongoDB and Pymongo

Released from version 2.3.2 MongoDB supports text indexes to support text search of string content in documents.  Text indexes can include any field whose value is a string or an array of string elements. In this post I will talk about creating and using text indexes in MongoDB using pymongo to make a full text search.
  • Features of MongoDB Full text Search
  •  Full text search as an index type when creating new indexes.
  •  Advanced queries like negation and phrase matching supported.
  • Multiple fields indexing with weighting to give different fields higher priority.
  • Avoid stop words.
  • Stemming, to deal with plurals.
  • Multiple language support with initially Danish, Dutch, English, Finnish, French, German, Hungarian, Italian, Norwegian, Portuguese, Romanian, Russian, Spanish, Swedish and Turkish.


Creating MongoDB text Indexes

First step to create MongoDB text index is to enable text indexing. You can enable the text indexing using following command in mongo console.

Enabling Index Support

use <DB_NAME>
db.adminCommand ({ setParameter : "*", textSearchEnabled : true });

After this you we can insert data into the database and start using the text index for search. Here I use a question database to create index on questions text and then make a question search. The structure of the “question” collection is as follows:-       

       

{
    "_id" : ObjectId("53a71fb3421aa9422f49ac8c"),
    "answer" : {
        "a" : {
            "text" : "$ sin1^{0} > sin1 $"
            "image" : "",
        },
        "b" : {
            "image" : "",
            "text" : "$ sin1^{0} < sin1 $"
        },
        "c" : {
            "image" : "",
            "text" : "$ sin1^{0} = sin1 $"
        },
        "correct" : "b",
        "d" : {           
            "image" : "",
            "text" : "$ sin1^{0} = \\frac {\\pi}{180} sin1 $"
        }
    },
    "exam_code" : 201,
    "exam_type" : "ENGINEERING",
    "marks" : 1,
    "question" : {
        "html" : "Which one of the following is correct?",
        "image" : "",
        "text" : "Which one of the following is correct?"
    },
    "question_number" : 1,
    "subject" : "mathematics"
}

Creating Indexing

We will create index on text field of question in the question collection and then use the index from mongo console using following command:-

db.questions.ensureIndex( { "question.text": "text" } );

The index created above is the single index we can also create compound index in the collection.

Using the index 

Now our index is ready to be used.  For pymongo with MongoDB version 2.4
client = MongoClient(<host>, <port>)
db = client[‘db name’]           
result =  db.command ("text", question , search = ‘query text’, project = fields, limit = limit)

Command execution in  Mongo Console:
result = db.question.runCommand("text",  search = ‘query text’, project = fields, limit = limit)

For pymongo with MongoDB version 2.6

client = MongoClient(<host>, <port>)
db = client[db_name] 
result = db.question.find ({"$text”: {“$search”: ‘query text’ }}, project={‘_id’:0}).limit(10)

Command execution in Mongo Console:
result = db.question.find ({"$text”: {“$search”: ‘query text’ }}, {‘_id’:0}).limit(10)

Using limit we can find the top documents that matches the search queries.


Monday, April 14, 2014

Map Reduce Framework and its components

Map Reduce is a programming model and an associated implementation for processing and generating large data sets. Programs written in this functional style of Map Reduce are automatically parallelized and executed on a large cluster of commodity machines. In a basic Map Reduce job, it consists of the following four components:

 Input
 Mapper
 Reducer &
 Output

Input data is usually large and the computations have to be distributed across hundreds or thousands of machines in order to finish in a reasonable amount of time. A Map Reduce job usually splits the input data-set into independent pieces usually 16 MB to 64 MB per piece.

Mapper component process each input in a completely parallel manner as key/value pairs to generate a set of intermediate key/value pairs.

Reducer component merges all the intermediate values associated with the same intermediate key and provided as output files. Typically both the input and the output of the job are stored in a file-system.

In a Map Reduce job, a special program called master assigns mapper or reducer tasks to rest of the idle workers. Although, the basic Map Reduce job consists of the above components, followings are also the useful extensions:

 Partitioner – partitions the data into specified number of reduce tasks/output files
 Combiner – does partial merging of the data before sending over the network

Data Flow through Hadoop Map Reduce
Hadoop Map Reduce runs the job by dividing it into tasks. The two types are

1.      Map tasks
2.      Reduce tasks

There are two types of nodes that control the job execution process: a job tracker (master) and a number of task trackers (workers). The job tracker coordinates all the jobs run on the system by scheduling tasks to run on task trackers.

Hadoop divides the input to a Map Reduce job into fixed-size pieces called input splits, or splits. Hadoop creates one map task for each split, which runs the user-defined map function for each record in the split. Having many splits means the time taken to process each split is small compared to the time to process the whole input. On the other hand, if splits are too small, then the overhead of managing the splits and of map task creation begins to dominate the total job execution time.

Map tasks write their output to local disk, not to HDFS. The reducer(s) is fed by the mapper outputs. The output of the reducer is normally stored in HDFS for reliability.



Explanation of Map Reduce paradigm with reference to Word Count Algorithm

The computation takes a set of input key/value pairs, and produces a set of output key/value pairs. The user of the MapReduce library expresses the computation as two functions: Map and Reduce. Map, written by the user, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the Reduce function.

The Reduce function, also written by the user, accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per Reduce invocation. The intermediate values are supplied to the user's reduce function via an iterator. This allows us to handle lists of values that are too large to _t in memory.

Consider the problem of counting the number of occurrences of each word in a large collection of documents. The user would write code similar to the following pseudo-code:

map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

Straggler Machine

Straggler machine is a machine that takes an unusually long time to complete map or reduce tasks in the computation. Stragglers are usually generated due to the variation in the CPU availability, network traffic or IO contention.

Since a job in Hadoop environment does not finish until all Map and Reduce tasks are finished, even small number of stragglers can largely deteriorate the overall response time for the job. It is therefore essential for Hadoop to find stragglers and run a speculative copy to ensure lower response times. Earlier the detection of the stragglers better is the overall response time for the job. The detection of stragglers needs to be accurate, since the speculative copies put up an overhead on the available resources.

Combiner Function


Combiner is a user specified function that does partial merging of the data before it is sent over the network. In some cases, there is significant repetition in the inter-mediate keys produced by each map task. For example in the word counting example in word frequencies tend to follow a Zipf distribution and each map task will produce hundreds or thousands of records of the form <the, 1>. All of these counts will be sent over the network to a single reduce task and then added together by the Reduce function to produce one number. So to decentralize the count of reduce, user are allowed to specify an optional Combiner function that does partial merging of this data before it is sent over the network. The Combiner function is executed on each machine that performs a map task. 

General architecture of Google File System

GFS is clusters of computers. A cluster is simply a network of computers. Each cluster might contain hundreds or even thousands of machines. In each GFS clusters there are three main entities:

1.      Clients
2.      Master servers
3.      Chunk servers.



Client can be other computers or computer applications and make a file request. Requests can range from retrieving and manipulating existing files to creating new files on the system. Clients can be thought as customers of the GFS.

Master Server is the coordinator for the cluster. Its task include:-

1.       Maintaining an operation log, that keeps track of the activities of the cluster. The operation log helps keep service interruptions to a minimum if the master server crashes, a replacement server that has monitored the operation log can take its place.

2.      The master server also keeps track of metadata, which is the information that describes chunks. The metadata tells the master server to which files the chunks belong and where they fit within the overall file.

Chunk Servers are the workhorses of the GFS. They store 64-MB file chunks. The chunk servers don't send chunks to the master server. Instead, they send requested chunks directly to the client. The GFS copies every chunk multiple times and stores it on different chunk servers. Each copy is called a replica. By default, the GFS makes three replicas per chunk, but users can change the setting and make more or fewer replicas if desired.




Management done to overloading single master in Google File System
Having a single master enables the master to make sophisticated chunk placement and replication decisions using global knowledge. However, the involvement of master in reads and writes must be minimized so that it does not become a bottleneck. Clients never read and write file data through the master. Instead, a client asks the master which chunk servers it should contact. It caches this information for a limited time and interacts with the chunk servers directly for many subsequent operations.

General scenario of client request handling by GFS
File requests follow a standard work flow. A read request is simple; the client sends a request to the master server to find out where the client can find a particular file on the system. The server responds with the location for the primary replica of the respective chunk. The primary replica holds a lease from the master server for the chunk in question.

If no replica currently holds a lease, the master server designates a chunk as the primary. It does this by comparing the IP address of the client to the addresses of the chunk servers containing the replicas. The master server chooses the chunk server closest to the client. That chunk server's chunk becomes the primary.   The client then contacts the appropriate chunk server directly, which sends the replica to the client.

Write requests are a little more complicated. The client still sends a request to the master server, which replies with the location of the primary and secondary replicas. The client stores this information in a memory cache. That way, if the client needs to refer to the same replica later on, it can bypass the master server. If the primary replica becomes unavailable or the replica changes then the client will have to consult the master server again before contacting a chunk server.

The client then sends the write data to all the replicas, starting with the closest replica and ending with the furthest one. It doesn't matter if the closest replica is a primary or secondary. Google compares this data delivery method to a pipeline.

Once the replicas receive the data, the primary replica begins to assign consecutive serial numbers to each change to the file. Changes are called mutations. The serial numbers instruct the replicas on how to order each mutation. The primary then applies the mutations in sequential order to its own data. Then it sends a write request to the secondary replicas, which follow the same application process. If everything works as it should, all the replicas across the cluster incorporate the new data. The secondary replicas report back to the primary once the application process is over.

At that time, the primary replica reports back to the client. If the process was successful, it ends here. If not, the primary replica tells the client what happened. For example, if one secondary replica failed to update with a particular mutation, the primary replica notifies the client and retries the mutation application several more times. If the secondary replica doesn't update correctly, the primary replica tells the secondary replica to start over from the beginning of the write process. If that doesn't work, the master server will identify the affected replica as garbage.




Advantages and disadvantages of large sized chunks in Google File System

Chunks size is one of the key design parameters. In GFS it is 64 MB, which is much larger than typical file system blocks sizes. Each chunk replica is stored as a plain Linux file on a chunk server and is extended only as needed.

Advantages

1. It reduces clients’ need to interact with the master because reads and writes on the same chunk require only one initial request to the master for chunk location information.

2. Since on a large chunk, a client is more likely to perform many operations on a given chunk, it can reduce network overhead by keeping a persistent TCP connection to the chunk server over an extended period of time.

3. It reduces the size of the metadata stored on the master. This allows us to keep the metadata in memory, which in turn brings other advantages.

Disadvantages

1. Lazy space allocation avoids wasting space due to internal fragmentation.


2. Even with lazy space allocation, a small file consists of a small number of chunks, perhaps just one. The chunk servers storing those chunks may become hot spots if many clients are accessing the same file. In practice, hot spots have not been a major issue because the applications mostly read large multi-chunk files sequentially. To mitigate it, replication and allowance to read from other clients can be done. 

Big Data Introduction


 Big data are those data sets with sizes beyond the ability of commonly used software tools to capture, curate, manage, and process the data within a tolerable elapsed time. It is the term for a collection of large and complex data sets that is difficult to process using traditional database management tools or traditional data processing applications.  Big Data is characterized by 3V- volume, velocity and variety.

Data sets grow in size in part because they are increasingly being gathered from many sources such as information-sensing mobile devices, aerial sensory technologies (remote sensing), software logs, cameras, microphones, radio-frequency identification readers, and wireless sensor networks.

As the data collection is increasing day by day, it becomes difficult to work with using most relational database management systems and desktop statistics and visualization packages, requiring "massively parallel software running on tens, hundreds, or even thousands of servers. The challenges include capture, duration, storage, search, sharing, transfer, analysis, and visualization. So such large gathering of data suffers the organization forces the need to big data management with distributed approach.

Distributed System in Big Data Technology

A distributed system is a collection of independent computers that appears to its users as a single coherent system. A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messages.
Distributed system play an important role in managing the big data problems that prevails in today’s world. In the distributed approach, data are placed in multiple machines and are made available to the user as if they are in a single system. Distributed system makes the proper use of hardware and resources in multiple location and multiple machines.

Example: How google uses distributed system to manage data for search engines

Due to accumulation of large amount of data in the web every day, it is difficult to manage the document in the centralized server. So to overcome the big data problems, search engines companies like Google uses distributed server. In distributed search engine there is no central server.


Unlike traditional centralized search engines, work such as crawling, data mining, indexing, and query processing is distributed among several peers in decentralized manner where there is no single point of control. Several distributed servers are set up in different location. The information is made accessible to the user from nearby located servers. Mirror servers perform different types of caching operation as required.