Distributed Actors in GridGain

One of the features we’ve added to GridGain 3.0 last year was Distributed Actors. Distributed actors have similar functionality to the actors found in Scala standard library and 3-rd party projects like Akka:

they allow you to replace low-level concurrency and synchronization routines with an asynchronous message passing between actors where logical state is only stored within actors themselves and never globally.

Now, while these ideas have been used for quite some time in Java world (a-al executor service, etc.) – and their implementations in Scala or Akka are essentially a set of queues with a thread pool behind – Scala DSL capabilities made it really nice and cute to use it. Furthermore, actors’ popularization sprung out the interest in more esoteric concurrency models like STM and Erlang’s failover (which is a great benefit in of its own).

One of the deficiencies of current Actor implementations, however, is the fact that they are predominately local (in-VM) constructs. Attempts for remote actors by current implementations can only described as trivialized. It is important to note that, in general, problems of local synchronization are very different from the issues of distributed synchronization – and therefore specific APIs that work for local actors simply won’t work well for distributed ones – and vise versa.

When we started the design for our APIs around distributed actors we’ve attempted to provide natively distributed actors.

Here’s the number of features that are specific to GridGain’s Distributed Actors (I’ll use Java notation although functionality is equality available in Java and Scala):

  • Each GridGain node can have only one mailbox (i.e. essentially have a node-local storage).
  • Exchanging messages between actors means exchanging messaged between nodes.
  • You can register actor on any node from any other nodes – thanks to GridGain’s Zero Deployment.
  • Classes for messages are automatically deployed on-demand on receiving actors if necessary classes are not found.
  • Actor’s exchange of messages utilizes the same user-pluggable communication, discovery and marshaling subsystems. All of them can be 100% customized without affecting the actors’ functionality.
  • All actors (i.e. nodes) are auto-discovered and topology is automatically maintained when new actors join or existing ones leave.
  • The whole API is based on only few methods in GridProjection interface (listen(...) and remoteListenAsync(...) and interface GridListenActor. This nicely ties it into GridGain FP framework. See code example below…
  • GridListenActor interface extends the predicate that makes it native FP citizen in GridGain.

Here’s the full source code of a simple Java program that ubiquitous ping-pong play between two nodes on the cloud using GridGain Distributed Actors (this and the same program in Scala can be found in GridGain’s distribution examples). Note also that this program configures actors on two nodes from the same source code

package org.gridgain.examples.messaging;

import org.gridgain.grid.*;
import org.gridgain.grid.typedef.*;

import java.util.*;
import java.util.concurrent.*;

public class GridMessagingPingPongExample {
    public static void main(String[] args) throws GridException {
        // Game is played over the default grid.
        G.in(args.length == 0 ? null : args[0], new CIX1() {
            @Override public void applyx(Grid g) throws GridException {
                if (g.nodes().size() < 2) {
                    System.err.println("I need a partner to play a ping pong!");

                    return;
                }

                GridRichNode nodeA = g.localNode();                
                GridRichNode nodeB = F.rand(g.remoteNodes()); // Pick random remote node as a partner.

                // Set up remote player.
                nodeB.remoteListenAsync(nodeA, new GridListenActor() {
                    @Override public void receive(UUID nodeId, String msg) throws GridException {
                        System.out.println(msg);

                        if ("PING".equals(msg)) 
                            respond("PONG");
                        else if ("STOP".equals(msg)) 
                            stop();
                    }
                }).get();

                int MAX_PLAYS = 10;

                final CountDownLatch cnt = new CountDownLatch(MAX_PLAYS);

                // Set up local player.
                nodeB.listen(new GridListenActor() {
                    @Override protected void receive(UUID nodeId, String msg) throws GridException {
                        System.out.println(msg);

                        if (cnt.getCount() == 1) 
                            stop("STOP");
                        else if ("PONG".equals(msg)) 
                            respond("PING");

                        cnt.countDown();
                    }
                });

                // Serve!
                nodeB.send("PING");

                // Wait til the game is over.
                try {
                    cnt.await();
                }
                catch (InterruptedException e) {
                    System.err.println("Hm... let us finish the game!\n" + e);
                }
            }
        });
    }
}

Enjoy!

[tweetmeme source=”gridgain” only_single=false]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: