Practical Introduction to Streaming MapReduce: Code Examples

In this article I’ll introduce the concept of Streaming MapReduce processing using GridGain and Scala. The choice of Scala is simply due to the fact that it provides for very concise notation and GridGain provides very effective DSL for Scala. Rest assured you can equally follow this post in Java or Groovy just as well.

The concept of streaming processing (and Streaming MapReduce in particular) can be basically defined as continues distributed processing of continuously incoming data streams. The obvious difference between other forms of distributed processing is that input data cannot be fully sized (or known) before the processing starts, and the incoming data appears to be “endless” from the point of view of the processing application. Typical examples of streaming processing would be processing incoming web event-level logs, twitter firehose, trade-level information in financial systems, facebook updates, RFID chips updates, etc.

Another interesting observation is that streaming processing is almost always real time. The important point here is that streaming nature of input data necessitates the real time characteristic of processing. If your processing lags behind the volume of incoming live data – you will inevitably run out of space to buffer the incoming data and system will crash.

I will provide two code examples to highlight streaming MapReduce processing with GridGain:

  • First is a very simply canonical MapReduce application that I’ll use to illustrate the basics of GridGain.
  • Second is a bit more involved and will demonstrate how you can write a start-to-end streaming MapReduce application (from ingestion to querying).

Examples 1

Let’s start with GridGain. GridGain is Java-based middleware for in-memory processing of big data in a distributed environment. It is based on high performance in-memory data platform that integrates world’s fastest MapReduce implementation with In-Memory Data Grid technology delivering easy to use and easy to scale software.

For the first example we’ll develop an application that will take the string as an argument and will calculate number of non-space characters in it. It will accomplish it by splitting the argument string into individual words, and calculating the number of characters in each word on remote nodes that are currently available in the grid. In the end – it will aggregate the lengths of all words into the final result.

This is a standard “HelloWorld” example in the word of distributed programming.

First off, we need to create the cluster to work with. If you download GridGain and unzip it – all you need to do is to run a node start script passing it a path to XML configuration file 'bin/ggstart.sh examples/config/spring-cache-popularcounts.xml' to start a node:

Note that you can start as many local nodes as you need – just run this script as many times. Note also that you can start standalone nodes from Visor – GridGain DevOps Console (discussion on that is outside of this blog).

Once you started all the nodes (let’s say 2) you’ll notice that all nodes started and discovered each other automatically with no drama. You have exactly zero configuration to worry about and everything works completely out-of-the-box.

Now that we have the cluster running let’s write the code. Open your favorite IDEA or text editor and type this:

import org.gridgain.scalar.scalar
import scalar._

object Main extends App {
  scalar("examples/config/spring-cache-popularcounts.xml") {
    println("Non-space chars: " + grid$.spreadReduce(
      for (w <- input.split(" ")) yield () => w.length)(_.sum))
  }
}

Depending on what build system you use (SBT, Ant, IDEA, Eclipse, etc.) you just need to include the libs from GridGain (main JAR + JARs in '/libs' subfolder) – and compile.

If everything compiles – just RUN it passing it some input string. That’s all there is to it:

Let me quickly explain what’s happening here (it will apply to the following example as well):

  • First we use scalar “keyword” passing it a path to configuration XML file to startup a node from within our Scala app.
  • grid$ denotes a global projection on all node in the cluster (GridGain employes functional API in its core). Projection provides a monadic set of operations available on any arbitraty set of GridGain nodes.
  • We use method spreadReduce(...) on projection that takes two curried arguments:
    • set of closures to spread-execute on the cluster, and
    • reduction function that will be used to aggregate the remote results.
  • When spreadReduce(...) completes (and it’s a synch call among synch and async options) – it returns the non-space count of characters.

Now – let me ask you a question… Did you notice any deployment steps, any Ant, Maven, any copying of JAR or any redeploying after we’ve changed the code?

The answer is no. GridGain provides pretty unique zero deployment technology that allows for complete on-demand class deployment throughout the cluster – leaving you the developer to simply write the code and run your applications as you would do locally. Pretty nifty, isn’t it?

Examples 2

Ok, now that we tried something very simple and trivial let’s develop a full featured streaming MapReduce app using what we’ve learned so far. We’ll adopt a canonical example from Hadoop: we’ll ingest number of books into in-memory data grid, and will find 10 most frequent words from those books.

The way we’ll be doing it is via streaming MapReduce:

while we are loading books into memory we will be continuously querying the data grid for 10 most frequent words. As data gets loaded the results will change, and when all books are fully loaded we’ll get our correct (and final) tally of 10 most frequent words.

Unlike Hadoop example:

  • We’ll show both programmatic ingestion and querying in one application (no need to pre-copy any stuff into anything like HDFS), and
  • We’ll develop this application in true streaming fashion, i.e. we won’t wait until all data is loaded and we’ll start querying concurrently before all data is loaded

Here’s the full source code:

import org.gridgain.scalar.scalar
import scalar._
import org.gridgain.grid.typedef.X
import java.io.File
import io.Source
import java.util.Timer
import actors.threadpool._

object ScalarPopularWordsRealTimeExample extends App {
  private final val WORDS_CNT = 10
  private final val BOOK_PATH = 
    "examples/java/org/gridgain/examples/realtime/books"

  type JINT = java.lang.Integer

  val dir = new File(X.getSystemOrEnv("GRIDGAIN_HOME"), BOOK_PATH)

  if (!dir.exists)
    println("Input directory does not exist: " + dir.getAbsolutePath)
  else
    scalar("examples/config/spring-cache-popularcounts.xml") {
      val pool = Executors.newFixedThreadPool(dir.list.length)
      val timer = new Timer("words-query-worker")

      try {
        timer.schedule(timerTask(() => query(WORDS_CNT)), 3000, 3000)

        // Populate cache & force one more run to get the final counts.
        ingest(pool, dir)
        query(WORDS_CNT)

        // Clean up after ourselves.
        grid$.projectionForCaches(null).bcastRun(
          () => grid$.cache().clearAll())
      }
      finally {
        timer.cancel()
        pool.shutdownNow()
      }
    }

  def ingest(pool: ExecutorService, dir: File) {
    val ldr = dataLoader$[String, Int](null, 2048, 8, 128)

    // For every book, allocate a new thread from the pool and start 
    // populating cache with words and their counts.
    try {
      (for (book <- dir.list()) yield
        pool.submit(() => Source.fromFile(new File(dir, book), "ISO-8859-1").
          getLines().foreach(
            line => for (w <- line.split("[^a-zA-Z0-9]") if !w.isEmpty)
              ldr.addData(w, (i: Int) => if (i == null) 1 else i + 1)
      ))).foreach(_.get)
    }
    finally
      ldr.close(false) // Wait for data loader to complete.
  }

  def query(cnt: Int) {
    cache$[String, JINT].get.sql(grid$.projectionForCaches(null),
      "length(_key) > 3 order by _val desc limit " + cnt).
        toIndexedSeq.sortBy[JINT](_._2).reverse.take(cnt).foreach(println _)

    println("------------------")
  }
}

Few notes about the code in general:

  • We use the books that are shipped with GridGain’s examples
  • We are passing specific XML configuration file for 'scalar' keyword (it configures TCP discovery and partitioned cache with one backup)
  • We use a simple timer to run a query every 3 seconds while we are loading the books
  • After everything is done – we are cleaning after ourselves (so that you can run this app multiple times without leaving garbage in the data grid)

Notes about ingest(...) and query(...) method:

  • We use GridGain’s data loader in ingest(...) method that provides advanced back-pressure management for asynchronous bulk load distributed operations
  • We use method sql(...) on cache projection (cache projections provide monadic set of data grid operations) to issue a simple distributed SQL query
  • In GridGain you can omit “select * from table” in most cases, and just supply a where clause

That’s all there is to it. Compile it (as always, no deployment or redeployment is necessary) and run it. You will see print out of 10 most frequent words every 3 second while books are being read on and put into the data grid:

Final Thoughts

In about 50 lines of code we’ve put together both ingestion and querying streaming MapReduce app. We’ve run it on the local cluster – and it will run just the same way on 3, 10, 100s or 1000s of nodes deployed anywhere in the world (as long as we have some way to connect to them).

Keep in mind that this is obviously a very simply (almost trivialized) example of streaming MapReduce. Yet with additional few lines of code you can replace book with, let’s say, Twitter firehose keyed by hashtags, and print outs with updates to your social dashboard – and you get a pretty useful system tracking most popular Twitter hashtags in real time in a few hundred lines of code – while automatically scaling to 100s terabytes of data being processed on 1000s of nodes.

Not bad at all!

In-Memory Processing As A Business Use Case

In-memory processing is becoming a business necessity in a similar way as collecting and processing ever increasing data sets (a.k.a Big Data) has become a business “must have” rather than just a simple technology in the last five years. Both of these trends are intervened in an interesting ways. Let me explain…

1. Storing Necessitates Processing
The initial foray into BigData for many companies was all about storing the data and then some rudimentary processing that most of the time resulted in some trivialized analytics run on log files, purchase history, and similar type of data (that’s what 90% of analytics are still doing today if you ask people on the “inside”). As the amount of data stored kept growing (as well as associated direct and indirect cost) the IT departments were more and more pressured to get deeper and more actionable, i.e. operational, insights and provide more meaningful results from collected data. That meant more, a lot more, processing.

2. World Is Rapidly Becoming Real Time
As we are pressured for more and more processing we are facing yet another evolution. It would be an understatement to say that one of the most radical evolutions in IT these days is a torrent-like move into “now” data processing, i.e. processing live data stream and existing working sets in real time. Ask yourself this question: “Do you know any business that would make their IT systems NOT real time if the price of making them batch/ETL or real time would be same”. The answer is no.

In the age of real time ad serving, hyper-local advertising, instant sentiment analysis, 24/7 financial trading, global arbitrage, operational BI/analytics, instant mobile applications rapidly growing in processing complexity, geo-based merchant platform, and many, many other systems in place today – what business would specifically lock itself out of these advances, new business opportunities or competitive advantages?

Instant, real time data processing is the reality today and a massive force to reckon in coming years. Businesses that will lag behind and rely on data processing where customers or systems will have to “wait” to get their answers – will be simply swept away.

3. In-Memory Processing Is The Only Answer
This sounds rather bullish but it is a technological reality. There is no other technology in the foreseeable future (that we know of) that would provide enough processing performance to deal with ever increasing amount of data we need to process. Consider this fact: RAM access is up to 10,000,000 (!) faster than access to disk, the next storage layer where we can store the date (and where we’ve been storing data in the last 25 years, and before that we were using tapes…). There’s simply nothing else commercially available today or in the nearest future that approaches that performance differential.

We simply have to adjust to what Gartner calls a new tenet of the data processing: “RAM is a new disk, and disk is a new tape”.

4. RAM Pricing Dropping 30% Every 18 Month
The economics behind in-memory processing is finally hitting wide adoption curve:

  • 1GB of RAM costs today less than $1.
  • The rack with 10 blades, 50 processing cores and total RAM capacity of 1TB can be purchased today for less than $50,000 – the price point that was almost 10x times 10 years ago.

For $500,000 investment a company can have 10TB of RAM (along with associated CPU power and slower disk storage) for in-memory processing of a working set of data. 10TB is considered to be a typical working set size of most of the today large big data installations – and having it in memory enables real time, sub-second processing of this data set.

5. Software and Hardware Availability
Finally, the hardware and software is catching up to enable the massively parallel in-memory processing of large data sets. Consider these facts:

  • Typical commodity hardware today has 8-24 physical cores (DELL R410, R610 lines of rack servers costing in $2,500-4,000 range with 64GB of RAM). Having physical parallelization capability is essential for effective utilization of local RAM.
  • 64-bit CPUs (found in almost any new consumer laptop today) can address up to 16 Exabytes of data – enough to address all data in the world today (just by 1 CPU).
  • Most operating systems (like modern Linux and Windows) provide robust support for advanced parallelization as well as support for necessary application development eco-systems (Java and .NET)
  • New type of software middleware developed specifically to deal with in-memory processing has been introduced and matured over the last couple of years. GridGain, SAP HANA, Oracle Coherence – all provide sophisticated capabilities for in-memory processing.