Berkeley Researchers Highlight Emergence of In-Memory Processing

Excellent paper released by researchers at University of California, Berkeley . They have analyzed data from Hadoop installation at Facebook (one of the largest as such in the world) looking at various metrics for Hadoop jobs running at Facebook datacenter that has over 3,000 computers dedicated to Hadoop-based processing.

They have come up with very interesting insights. I advise everyone read it firsthand but I will list some of the interesting bits.

Traditional quest for disk locality (a.k.a. affinity between the Hadoop task and the disk that contains the input data for that task) was based on two key assumptions:

  1. Local disk access is significantly faster than network access to remote disk
  2. Hadoop tasks spend significant amount of their processing time in disk IO reading input data

Through careful analysis of Hadoop system at Facebook (as their prime testbed) authors claim that both of these assumptions are rapidly loosing hold:

  1. With new full-bisection topologies in the modern data centers the local disk access is almost identical in performance to a network access even across the racks (with performance difference today between two is less than 10%).
  2. Greater parallelization and data compressions leads to lower disk IO demand on the individual tasks; in fact, Hadoop job at Facebook deal mostly with text-baed data that can be compressed dramatically.

Authors then argue that memory locality (i.e. keeping input data in memory and maintaining affinity between Hadoop task and its in-memory input data) produces much greater performance advantages because:

  • RAM access is up to three orders of magnitude faster than a local disk access
  • Even though memory size is significantly less than disk capacity it is large enough for most cases (see below)

Consider this fact: despite the fact that 75% of all HDFS blocks are accessed only once the 64% of Hadoop jobs at Facebook achieve the full memory locality for all their tasks (!). In case of Hadoop – full locality means that there is no outlier task that will have to access disk and delay the entire job. And this is all achieved utilizing rather primitive LFU caching policy and basic pre-fetching for input data.

With these facts authors conclude that disk locality is no longer worth while to vie for – and in-memory co-location is the way forward for high performance big data processing as it yields far greater returns.

Facebook’s case is a solid proof of this technology, and GridGain’s In-Memory Data Platform is a solid platform for the rest of us.

Scala Report: Summer 2012

As was returning from Scalathon 2012 I’ve decided to pen down some of my thoughts on state of the Scala land by the mid-2012. My last “report” about 6 months ago has received a fair amount of attention and I think it’s a good occasion to provide another 6 month look back.

As many of you may know we are old time Scala users and have Scala in production usage for almost 3 years now. I think there may be just a few dozens of other products/projects that can claim the same and just may be a few that are not some type of internal IT systems but rather something that you sell as ISV – and we are almost alone in a serious distributed middleware category. So… we have a pretty good vantage point.

Things Calmed Down

In my last note about Scala I have really complained about some erratic “moves” in Scala world – whether it was purely technical (parallel collections) or business related (growing pains with Typesafe, etc).

I think in 2012 Scala’s advance has been a lot more steady and less erratic. 2.9 transition (at least for us in GridGain) was painless. 2.10 is coming alone and many glaring (performance) issues are started to being addressed. I don’t remember when was the last time I got a compiler crash (fcs doesn’t count)… IDEA plugin has matured A LOT in the last 6-9 months to the point where it minimally works in most cases (why people even touch Eclipse for Scala development at this point is beyond me).

StackOverflow is getting more and more useful for Scala folks. More (and better) books got published. Even Scalaz fascination has settled down to healthy levels (thanks, in part, to sobering attitude from Martin Odersky) and things like Lens (and their implementation in Scalaz) is something that actually usable in real life projects. Akka’s futures are migrating to 2.10 standard library – long time overdue (we’ve had that in GridGain 5 years ago – our entire implementation is practically Future-based). Java Futures (even those developed by us) make me cringe when I look at Scala counterparts – Java is so crippled comparing to the elegancy of Scala expressiveness.

An hey, new MacBook Pros helped with compilation speed a lot 🙂

Scala 2.10

Scala 2.10 new features are mostly welcome (at least IMHO). I have even warmed up to disabling some of the “advanced” features by default, and requiring and explicit import to enable them. Macros, or more appropriately a compiler plugin support, is extremely interesting capability and really is a differentiating feature among other languages in the JVM eco-system. Kotlin – are you listening?

Scala documentation – is still a poor state of affairs. But I guess I’m slowly overcoming it by simply remembering a lot and not consulting it as often. Newcomers are still suffering though (and I see it in our team first hand).


I believe the Josh Suereth’s (@jsuereth) influence (personal and the book) has been very positive on the inner circle of Scala fun base. Whoever made that hiring decision at Typesafe deserve a credit. Right individuals have a surprising level of influence on early stage adoption technologies like Scala.

I believe a new head coach at Typesafe is a clear sign of getting the house in order. Yes, Typesafe is still just a bodyshop today but they apparently invest more and more into maintaining Scala language eco-system – and that’s a very positive sign.

I hope they can maintain a healthy balance between “cheap” consulting/training revenue and “expensive” product development (and not only for Akka!). The more adoption Scala gets – the better it is for GridGain (my skin in the game) as we are practically the only distributed middleware with native Scala support that exists today.

Scala as a hiring tool

This actually has emerged as a clear standalone concept: if you want to filter out the “noise” resumes – put “Scala” as one of the key requirements. You’ll be amazed on the quality delta in resumes you’d be receiving (and how less of them too).

I firmly believe that for some companies that alone is worth having Scala in their software stack. Just ask Novus about that…

So, what am I unhappy about?

My biggest gripe is the same as last time – seemingly slow adoption of Scala. May be that’s meant to be this way, may be not – I don’t know. I’ve stopped long time ago trying to parse the reasons behind why small percentage of Java folks utterly in love with Scala (me included) and much, much larger percentage is either indifferent and downright hostile towards it.

In the end – I think Scala is moving in the right direction. Typesafe is very close to raising their round B – how I wish they’d amass some $50M+ valuation, raise $20M-30M and spend good chunk of it on a smart marketing strategy for Scala. We could have all benefited from it – along with Scala adoption!

Practical Introduction to Streaming MapReduce: Code Examples

In this article I’ll introduce the concept of Streaming MapReduce processing using GridGain and Scala. The choice of Scala is simply due to the fact that it provides for very concise notation and GridGain provides very effective DSL for Scala. Rest assured you can equally follow this post in Java or Groovy just as well.

The concept of streaming processing (and Streaming MapReduce in particular) can be basically defined as continues distributed processing of continuously incoming data streams. The obvious difference between other forms of distributed processing is that input data cannot be fully sized (or known) before the processing starts, and the incoming data appears to be “endless” from the point of view of the processing application. Typical examples of streaming processing would be processing incoming web event-level logs, twitter firehose, trade-level information in financial systems, facebook updates, RFID chips updates, etc.

Another interesting observation is that streaming processing is almost always real time. The important point here is that streaming nature of input data necessitates the real time characteristic of processing. If your processing lags behind the volume of incoming live data – you will inevitably run out of space to buffer the incoming data and system will crash.

I will provide two code examples to highlight streaming MapReduce processing with GridGain:

  • First is a very simply canonical MapReduce application that I’ll use to illustrate the basics of GridGain.
  • Second is a bit more involved and will demonstrate how you can write a start-to-end streaming MapReduce application (from ingestion to querying).

Examples 1

Let’s start with GridGain. GridGain is Java-based middleware for in-memory processing of big data in a distributed environment. It is based on high performance in-memory data platform that integrates world’s fastest MapReduce implementation with In-Memory Data Grid technology delivering easy to use and easy to scale software.

For the first example we’ll develop an application that will take the string as an argument and will calculate number of non-space characters in it. It will accomplish it by splitting the argument string into individual words, and calculating the number of characters in each word on remote nodes that are currently available in the grid. In the end – it will aggregate the lengths of all words into the final result.

This is a standard “HelloWorld” example in the word of distributed programming.

First off, we need to create the cluster to work with. If you download GridGain and unzip it – all you need to do is to run a node start script passing it a path to XML configuration file 'bin/ examples/config/spring-cache-popularcounts.xml' to start a node:

Note that you can start as many local nodes as you need – just run this script as many times. Note also that you can start standalone nodes from Visor – GridGain DevOps Console (discussion on that is outside of this blog).

Once you started all the nodes (let’s say 2) you’ll notice that all nodes started and discovered each other automatically with no drama. You have exactly zero configuration to worry about and everything works completely out-of-the-box.

Now that we have the cluster running let’s write the code. Open your favorite IDEA or text editor and type this:

import org.gridgain.scalar.scalar
import scalar._

object Main extends App {
  scalar("examples/config/spring-cache-popularcounts.xml") {
    println("Non-space chars: " + grid$.spreadReduce(
      for (w <- input.split(" ")) yield () => w.length)(_.sum))

Depending on what build system you use (SBT, Ant, IDEA, Eclipse, etc.) you just need to include the libs from GridGain (main JAR + JARs in '/libs' subfolder) – and compile.

If everything compiles – just RUN it passing it some input string. That’s all there is to it:

Let me quickly explain what’s happening here (it will apply to the following example as well):

  • First we use scalar “keyword” passing it a path to configuration XML file to startup a node from within our Scala app.
  • grid$ denotes a global projection on all node in the cluster (GridGain employes functional API in its core). Projection provides a monadic set of operations available on any arbitraty set of GridGain nodes.
  • We use method spreadReduce(...) on projection that takes two curried arguments:
    • set of closures to spread-execute on the cluster, and
    • reduction function that will be used to aggregate the remote results.
  • When spreadReduce(...) completes (and it’s a synch call among synch and async options) – it returns the non-space count of characters.

Now – let me ask you a question… Did you notice any deployment steps, any Ant, Maven, any copying of JAR or any redeploying after we’ve changed the code?

The answer is no. GridGain provides pretty unique zero deployment technology that allows for complete on-demand class deployment throughout the cluster – leaving you the developer to simply write the code and run your applications as you would do locally. Pretty nifty, isn’t it?

Examples 2

Ok, now that we tried something very simple and trivial let’s develop a full featured streaming MapReduce app using what we’ve learned so far. We’ll adopt a canonical example from Hadoop: we’ll ingest number of books into in-memory data grid, and will find 10 most frequent words from those books.

The way we’ll be doing it is via streaming MapReduce:

while we are loading books into memory we will be continuously querying the data grid for 10 most frequent words. As data gets loaded the results will change, and when all books are fully loaded we’ll get our correct (and final) tally of 10 most frequent words.

Unlike Hadoop example:

  • We’ll show both programmatic ingestion and querying in one application (no need to pre-copy any stuff into anything like HDFS), and
  • We’ll develop this application in true streaming fashion, i.e. we won’t wait until all data is loaded and we’ll start querying concurrently before all data is loaded

Here’s the full source code:

import org.gridgain.scalar.scalar
import scalar._
import org.gridgain.grid.typedef.X
import io.Source
import java.util.Timer
import actors.threadpool._

object ScalarPopularWordsRealTimeExample extends App {
  private final val WORDS_CNT = 10
  private final val BOOK_PATH = 

  type JINT = java.lang.Integer

  val dir = new File(X.getSystemOrEnv("GRIDGAIN_HOME"), BOOK_PATH)

  if (!dir.exists)
    println("Input directory does not exist: " + dir.getAbsolutePath)
    scalar("examples/config/spring-cache-popularcounts.xml") {
      val pool = Executors.newFixedThreadPool(dir.list.length)
      val timer = new Timer("words-query-worker")

      try {
        timer.schedule(timerTask(() => query(WORDS_CNT)), 3000, 3000)

        // Populate cache & force one more run to get the final counts.
        ingest(pool, dir)

        // Clean up after ourselves.
          () => grid$.cache().clearAll())
      finally {

  def ingest(pool: ExecutorService, dir: File) {
    val ldr = dataLoader$[String, Int](null, 2048, 8, 128)

    // For every book, allocate a new thread from the pool and start 
    // populating cache with words and their counts.
    try {
      (for (book <- dir.list()) yield
        pool.submit(() => Source.fromFile(new File(dir, book), "ISO-8859-1").
            line => for (w <- line.split("[^a-zA-Z0-9]") if !w.isEmpty)
              ldr.addData(w, (i: Int) => if (i == null) 1 else i + 1)
      ldr.close(false) // Wait for data loader to complete.

  def query(cnt: Int) {
    cache$[String, JINT].get.sql(grid$.projectionForCaches(null),
      "length(_key) > 3 order by _val desc limit " + cnt).
        toIndexedSeq.sortBy[JINT](_._2).reverse.take(cnt).foreach(println _)


Few notes about the code in general:

  • We use the books that are shipped with GridGain’s examples
  • We are passing specific XML configuration file for 'scalar' keyword (it configures TCP discovery and partitioned cache with one backup)
  • We use a simple timer to run a query every 3 seconds while we are loading the books
  • After everything is done – we are cleaning after ourselves (so that you can run this app multiple times without leaving garbage in the data grid)

Notes about ingest(...) and query(...) method:

  • We use GridGain’s data loader in ingest(...) method that provides advanced back-pressure management for asynchronous bulk load distributed operations
  • We use method sql(...) on cache projection (cache projections provide monadic set of data grid operations) to issue a simple distributed SQL query
  • In GridGain you can omit “select * from table” in most cases, and just supply a where clause

That’s all there is to it. Compile it (as always, no deployment or redeployment is necessary) and run it. You will see print out of 10 most frequent words every 3 second while books are being read on and put into the data grid:

Final Thoughts

In about 50 lines of code we’ve put together both ingestion and querying streaming MapReduce app. We’ve run it on the local cluster – and it will run just the same way on 3, 10, 100s or 1000s of nodes deployed anywhere in the world (as long as we have some way to connect to them).

Keep in mind that this is obviously a very simply (almost trivialized) example of streaming MapReduce. Yet with additional few lines of code you can replace book with, let’s say, Twitter firehose keyed by hashtags, and print outs with updates to your social dashboard – and you get a pretty useful system tracking most popular Twitter hashtags in real time in a few hundred lines of code – while automatically scaling to 100s terabytes of data being processed on 1000s of nodes.

Not bad at all!

In-Memory Processing As A Business Use Case

In-memory processing is becoming a business necessity in a similar way as collecting and processing ever increasing data sets (a.k.a Big Data) has become a business “must have” rather than just a simple technology in the last five years. Both of these trends are intervened in an interesting ways. Let me explain…

1. Storing Necessitates Processing
The initial foray into BigData for many companies was all about storing the data and then some rudimentary processing that most of the time resulted in some trivialized analytics run on log files, purchase history, and similar type of data (that’s what 90% of analytics are still doing today if you ask people on the “inside”). As the amount of data stored kept growing (as well as associated direct and indirect cost) the IT departments were more and more pressured to get deeper and more actionable, i.e. operational, insights and provide more meaningful results from collected data. That meant more, a lot more, processing.

2. World Is Rapidly Becoming Real Time
As we are pressured for more and more processing we are facing yet another evolution. It would be an understatement to say that one of the most radical evolutions in IT these days is a torrent-like move into “now” data processing, i.e. processing live data stream and existing working sets in real time. Ask yourself this question: “Do you know any business that would make their IT systems NOT real time if the price of making them batch/ETL or real time would be same”. The answer is no.

In the age of real time ad serving, hyper-local advertising, instant sentiment analysis, 24/7 financial trading, global arbitrage, operational BI/analytics, instant mobile applications rapidly growing in processing complexity, geo-based merchant platform, and many, many other systems in place today – what business would specifically lock itself out of these advances, new business opportunities or competitive advantages?

Instant, real time data processing is the reality today and a massive force to reckon in coming years. Businesses that will lag behind and rely on data processing where customers or systems will have to “wait” to get their answers – will be simply swept away.

3. In-Memory Processing Is The Only Answer
This sounds rather bullish but it is a technological reality. There is no other technology in the foreseeable future (that we know of) that would provide enough processing performance to deal with ever increasing amount of data we need to process. Consider this fact: RAM access is up to 10,000,000 (!) faster than access to disk, the next storage layer where we can store the date (and where we’ve been storing data in the last 25 years, and before that we were using tapes…). There’s simply nothing else commercially available today or in the nearest future that approaches that performance differential.

We simply have to adjust to what Gartner calls a new tenet of the data processing: “RAM is a new disk, and disk is a new tape”.

4. RAM Pricing Dropping 30% Every 18 Month
The economics behind in-memory processing is finally hitting wide adoption curve:

  • 1GB of RAM costs today less than $1.
  • The rack with 10 blades, 50 processing cores and total RAM capacity of 1TB can be purchased today for less than $50,000 – the price point that was almost 10x times 10 years ago.

For $500,000 investment a company can have 10TB of RAM (along with associated CPU power and slower disk storage) for in-memory processing of a working set of data. 10TB is considered to be a typical working set size of most of the today large big data installations – and having it in memory enables real time, sub-second processing of this data set.

5. Software and Hardware Availability
Finally, the hardware and software is catching up to enable the massively parallel in-memory processing of large data sets. Consider these facts:

  • Typical commodity hardware today has 8-24 physical cores (DELL R410, R610 lines of rack servers costing in $2,500-4,000 range with 64GB of RAM). Having physical parallelization capability is essential for effective utilization of local RAM.
  • 64-bit CPUs (found in almost any new consumer laptop today) can address up to 16 Exabytes of data – enough to address all data in the world today (just by 1 CPU).
  • Most operating systems (like modern Linux and Windows) provide robust support for advanced parallelization as well as support for necessary application development eco-systems (Java and .NET)
  • New type of software middleware developed specifically to deal with in-memory processing has been introduced and matured over the last couple of years. GridGain, SAP HANA, Oracle Coherence – all provide sophisticated capabilities for in-memory processing.

GridGain is a Platinum Sponsor and Speaker at CloudDevelop 2012, Aug 3rd, Columbus, OH.

GridGain Systems is a platinum sponsor and the speaker at CloudDevelop 2012 hosted in Columbus, OH on August 3rd, 2012. Jon Webster will present “In-Memory BigData: Everything You Need To Know”, in-depth discussion about why, what and when of in-memory processing of large data sets in real time.

Hope to see you there and stop by our booth!