In this lecture, we will discuss Hadoop Streaming API and we'll also see how to apply what we have learnt to processing New York City data using MapReduce Paradigm. We'll see an example of actually using and doing some programming. What is Hadoop Streaming API? Hadoop Streaming is basically a generic API that allows you to write MapReduce in any language. The basic concept is very similar to MapReduce. Mappers and reducers receive their input and output on the stdin and stdout, which basically means on the terminal as key-value pairs. Similar to MapReduce, you always deal with key-value pairs. That's the most important thing when you're doing MapReduce, right? So both input and output are represented as text and the program reads one line at a time as it process it. Let's take the same example we saw in the last lecture of doing a word count, and we will see how we will write this program. This is a very simple program to write and execute on Hadoop. First things first, we are logging into the Cluster. You will have SSH, your user ID, and login to a hostname. Then you will copy the data into HDFS. In this case, we'll provide HDFS, DFS dash put where the data is and that it should go in the HDFS. So, [inaudible] of the data locally and where should go on the HDFS file system. So we are taking an example of the Constitution of the United States as the text input and we'll just count the unique words there, and the frequency each of the word occurs. First thing first, we have to write a mapper. What is the output of a mapper? The output of the mapper is key and a value. Since this was as very simple example, what did we see? When you see a word, it should say, "Hey, that word occurs and it occurs one time." That's all the mapper needs to do. So that's what this program is written. This is a very simple Python program. It imports system library, it takes in every line from standard input. It says, "First-line." If you don't clearly understand it it's okay. It basically strips the white spaces at the end, it removes any brackets that are there. It removes the colons, full stops, semicolons, colons, and commas and things like that. So all of these get removed. After that, it splits every word which are delimited by white spaces. It takes the line and it splits it into words. So this command line, dot split will basically split and you get words into this kind of a list, a Python list. Then all you do is for word in words. This we already saw when we were looking at Python so it goes through every word in this list of words. What am I printing? I'm printing that word and I'm printing one. That's all it is. As I said, whatever word you see, you print that word followed by the number 1. That's all the mapper does. What is so unique about it? This runs on distributed nodes. Every node it will run and whatever portion of the text is on that note, whatever word it sees, it will say, "Hey, I saw this word one time." Every word, even if the word occurs two times in the same text it will have two instance of say, the word occurring one time. That's the mapper's task. The next step is the reducer, which basically has to do a summation based on the key. It takes this key-value pair where the key is same, it sums up all the values how many times it occurs. Here's the code for how to do that. We are creating here is a hash or a Python dictionary, count the number of words, and then for every line in the standard input, as I said initially many are using Hadoop Streaming API, it takes input and output through standard in. So for every line you're splitting on this comma. Why are you splitting on the comma? If you'll go back and look at our mapper, what does our mapper output? It outputs the word comma one, right? So that's why we are splitting on comma. What does that give you? It gives you the word, and it gives you the word count. L zero becomes the first, before the comma is the word, and after the comma is the count. Since this is a string we're converting to an integer, so that becomes the word count. That's what this does. Every time it sees a word, it will say, "Hey, how many time this word occurred?" This is what reducer do. Then what are you doing? You are doing word to count word, if we have already seen that word, we're adding one to it, or how many other times it occurred to it. Then if this is the first time we are seeing the word to count, then this will be an exception because it's trying to access a memory location, which does not exist. So that is an error which gets thrown here and you come to an exception and then work to count word is, "Hey, this is the first time I'm seeing this word and this occurs how many other times we have." That's it. So that's kind of the loop. At the end of this what are we doing? For every key in this word [inaudible] we are printing the key and the value associated with the key. This will print out how many times any particular word occurred in that document. These two steps, mapper and reducer, will do a word count on this distributed thing and how do you run on Hadoop? These programs you can run individually on a computer, but that wil give you local results. If your data is actually distributed on Hadoop, this is how we will run it, you would use yarn and you will say you'll call the streaming API and say what your value mapper and value reducer and then you are saying like," Hey I need to get these two files, which are my files uploaded in order to run it" and what is your input and what is your output? This creates an output, basically it creates an output directory because each of these nodes will run and create this result, so you want to merge back and get the results to hdfs dfs dot get merge and you are getting the word_count dot txt. What we saw, simple problem, taking document, splitting it into words and the mapper was doing, "Hey, I saw this word one time." Then the reducer say, "Hey, now if the key is same, how many times have I seen it, let me add more times to it." Like basically, how many times the key has been seen and how many times the value in the current record is, so it's basically doing a summation of all the keys. Let's look at a different problem, this is a little bit more geospatial, till now we looked at very simple non-geospatial problem. Let's say we are looking at a geospatial problem, you have New York taxi data that you have and you want to create a density map of where passenger pickups are happening most. You have a data set which has records which says, "Hey, here it was the pickup, here was the drop up, here was the fare and things like that." Here's what the data looks like, you have the trip ID, all those trip information pertinent to us has it has the pickup location and drop off location if we need it. We have how many passenger count, its how many passengers are there? We have the date and the time stamp if we want to access it in that way or reduce it and that's for every trip and then we also have a fare thing, so we have these two record from that data set. Let's think about how we would do this program, let's first say we want to design a MapReduce program produced. What should a mapper do? What should the reducer do? What's the output that we want? What we want in the end is basically a map, a density map, which essentially is a raster, so you want a raster image to be output. Your output needs to be cell and this cell has a value associated with it, that's our key and value. Since we are creating a density map, you cannot just plot all the latitude longitude you have to aggregate them together, so you will change the latitude longitude into a grid cell. Say, "Hey, the pickup happened on this grid cell and how many passengers and the taxi cab were there?" Then the value is the number of passengers picked up and reducer essentially what it does if it belongs to the same cell, put them all together. How do we define this cell? That's our key, that is the most important thing you have to do. Once you do this stage, writing the MapReduce function will be very simple. Let's say we have our bounding box for the New York City area where we are interested in x_min y_min to x_max y_max, so we are saying a column ID is whatever that size is, minus x_min divided by the cell size, you can think about cell size as the raster size. You can do a very fine raster, so you will have lot more cells, your cell size would be small or you can do a coarser scale thing which your cell size is large, but this code will essentially work for any cell size basically because cell size is an argument. Essentially you can think about it as every latitude longitude will basically be transformed into one of the location with a row ID and column ID in this metrics, make sense? Let's see how we will write a mapper for this, what should a mapper do? The mapper should transform this latitude longitude value into the cell ID, that's the key and then it should also list the number of passages, that should be the very simple thing a mapper should do. This seems complicated? No, this is exactly doing that same thing, but since we are dealing with real data, what we have to do is we have to do some cleanup of the data, that's something which we are also doing here. Here's our main program on the right-hand side, you are taking all the parameters which are passed to this mapper saying, "Hey, I'm interested from the start time to this end time", so that can be parsed out. This is the starting latitude and ending latitude, x_max y_max and starting longitude, ending longitude, so which is x_min, x_max, y_min y_max, then the grid width, which is the cell size. In this case, we are saying you can have a different cell size for latitude and longitude, you don't need it to be a square you can have it as a rectangle, but that's something you can adjust. Then what does the main program do for every line it creates? It calls this function called makeIndex. Then if that is not empty, it just prints that. That's what this does for every line it makes an index. For every record it sees, it says, "Hey, does this record fit into our criteria," meaning it falls within the start time, end time it falls within the latitude, longitude. If that condition is met and some passengers are specified, you will say, "Hey, what is the cell size?" which is our key, and what is the value? MakeIndex is the function which does that. Let's go and look at MakeIndex function. The MakeIndex function is here. Basically what does it do? It takes in a record, meaning one line from the taxi data. Every line in the taxi data will get one call here. It splits the data by comma. If you remember, each of the line in the taxi data was split by a comma. It splits by a comma and then it does some of the validation checks. It checks that you have enough number of data columns. If there is not enough data columns, it will drop that, it will say, "Hey, this record was not valid" so it will ignore it. Then it does the time filter. Does that row have a start time? Does it have an end time listed? If not, it will return, then it does similarly a latitude filter and a longitude filter. Also it filters to see if it is within the latitude and longitude. This is checking for the time, this is checking for the latitude, whether it's in the light latitude, this is checking whether it's in the right longitude, this one is checking whether there are some number of passengers is specified so number seven there. If you go back and look at this, this is every data-set that we have, so zero, one, two. We will look at these things, these records. For every record we will make sure that we have the right things that is needed. After it has made sure that all the things it needs are there, all it does now is passenger count is in the eighth column or the seventh. Remember Python starts from zero, so zero, one, two, three, four, five, six, seven, that's the passenger count. That column is not empty, we already checked that. Then we are converting it into an integer because that's the number of passengers has to be an integer, how many passengers were in that car? Once this is done, what do we know? It's in the right time, it has a latitude, it has a longitude, which falls within our bounding box, it has a valid number of passengers and then we calculate the row index and column index, like I explained in the previous slides, latitude minus start latitude divided by grid_width, longitude minus start longitude divided by grid_height. Since we have grid_width and grid_height can be different, but if you want to make them same as the cell size, both these parameters can have the same value. After it does that, what does this output? It outputs a key. What is our key? Our key is row index, column index. This is one taxi. This taxi is in this cell within our raster. That's what it says. It's in this row index, column index. How many passengers are there, and how many taxi data are we talking about? This is one taxi data. How many passengers are there? Is how many passengers the record had, had we got from here passenger count. Row index, column index, passenger count one. That's what the mapper does. Every line of data it takes from the text file or the input file, it makes sure that they are valid. It converts it to a row index, column index, and gets the passenger count. Then the output is key, in this case it's row index, column index, and value is passenger count, how many taxis in this case, we are dealing with? One taxi record so, one. That's the mapper. Makes sense? Let's now go to and look at reducer. How do you combine all this record? Now I think it's a much more simpler task. What we want to do is we want to add up all the counts of the passengers and then add up all the taxis since we are looking at density, we care about density. We say passenger count divided by number of taxis for each cell, which gives you a density, that's the simple logic. Here's the code of how to do that. What is the input to this reducer? The input to the reducer is what the mapper output. The mapper output, row ID, column ID, and passengerCount, the number of taxis. We are using some libraries here. This is the main function here. What are we doing here? What is the current key? We say initially it's none, no key. These are just initialization, no key passengerCount is zero, the current number of taxi is zero, that's the initialization. Then for every line in standard input, every line we're reading is now remember row ID, column ID, passengerCount, number of taxis so you are getting the first one, which is the key, and you are getting the passengerCount and you're getting the taxiCount. You're getting the key, which is basically the row ID, and you are getting the value here, which is the passengerCount and taxiCount. Then if key is none, the current key becomes the key. If key is equal to current key, then you are just adding the number of passengers and adding the number of taxis so this is current passengers is equal to current passengers plus the passenger count for this taxi so whatever was there for the cell I'm just adding it up and then you are adding A that was one more taxi in this area with these many passengers, that's what you're doing here. You're adding the number of passengers in a new taxi and you're adding say, hey, we saw one more taxi. B are saying result is equal to number of passengers. After that, we are doing the result is this key, the number of passengers divided by the number of taxis. That gives you the density. You do key divided by the value, so you are getting the density, so number of passengers divided by number of taxis. Now, for every cell, what you're doing is you're finding the number of passengers, you're saying, hey, there are so many passengers already in this key which is a cell in the raster, and you're saying, I found one more taxi which was in the cell add that number of taxis and the number of passengers in that taxi. That's what these two lines does, later what you are doing is, we're just doing a result, once it's done, what we are doing is we are taking a key and we are dividing the number of passengers divided by the number of taxis, that gives you a density, right? How many passengers pickup was that per taxi essentially at that location. Once that's done, we are outputting this here, which says, hey, this is my result, sent all the number of passengers and associated taxi information. So what does the mapper do? The mapper takes every lie or every taxi information and says, hey, this taxi is this cell, and these are the number of passengers and how many taxis, so in that case will be one. Reducer says, for every cell, what does it do for every cell? If the keys match, it says, okay, let me add, I'm seeing one more new taxi which was in this cell, let me add the number of passengers and add the number of taxis after that, for every cell, what it's doing is, it's doing total number of passengers that were in the cell divided by the total number of taxi into you will see one point zero so that we don't do an integer division, we do a floating-point division, and then that's your density, how many passengers per taxi while they're in that cell. Now, we can write a simple function, so the output of this will be key and a value which is the average value. We can write a simple function or a simple program with changes this into an ASCII grid or another format, and then we can plot it on a map. This is a very simple map which got created out of a density map essentially, and now you will see there are some errors in the data also. You see there are some pickup right here, which of course are not possible right in the Atlantic Ocean. There are possibly some errors in the data which we're not got, which is okay, which is kind of how you deal with real data, so you will have real data. You could add more checks, you could have a little bit more stricter boundary of New York City, you can say, hey, this has to live within the boundary of New York City in order to be considered, so that way you will remove these errors, so we did not do that, we were just checking whether it's in the bounding box or not. Of course, these points were within the bounding box, and so they showed up. What have we seen in this lecture? We saw what a Hadoop Streaming API was and how to use Hadoop Streaming API to solve simple problems. We looked at a problem of using Hadoop MapReduce to look at a simple word count example and how to write a simple mapper and simple reducer to do like counting of the words in a text corpus. Then we looked at more of a geospatial example, we took a New York City data. We saw how to write mapper, how do you transform this point data into a raster data associated with cell. Then how do you write a mapper for that cell and how do you write the reducer to a density? Finally, we are able to plot that data on this map and displayed, so that shows you how to do this simple Hadoop MapReduce paradigm with a geospatial data set. Thank you.