Schedule Grid Workload with Cron using GridGain

One of the “unsung” APIs in GridGain 3.0 is a cron-based scheduling for task, closures, runnables, callable – and pretty much everything else that you may want to run on the grid. It wasn’t too hard to do it before in GridGain 2 – but in GridGain we’ve added a comprehensive API support for distributed cron-based scheduling.

Here’s an example of scheduling a closure that broadcasts a message to all nodes every minute with initial delay in two seconds five times:

Grid g = G.grid();

UUID lid = g.localNode().id();

g.scheduleLocal(
    new CAX() {
        @Override public void applyx() {
            g.run(BROADCAST, F.println("Hello from: " + lid));
        }   
    }, "{2, 5} * * * * *" // 2 seconds delay with 5 executions only.
);

First of all, notice how nicely you can reuse variable lid that is defined in local context but is reused in the closure that gets executed remotely thanks to our Zero Deployment technology.

Secondly, method scheduleLocal() returns comprehensive future object of type GridScheduleFuture that gives you rich API for managing cron-based execution.

All in all – very nice and powerful addition to GridGain’s arsenal of distributed functionality.

Scala’s PingPong with GridGain using Java and Scala

Anyone who’s looked at Scala’s Actor framework is familiar with PingPong example… Essentially, two actors exchange request/response messages (PING and PONG) demonstrating basic of inter-Actor communication. In GridGain we have similar functionality – but it is, of course, distributed.

Distributed version of PingPong example will configure two grid nodes and let one node send PING message to another, and receive corresponding PONG response from remote node. Essentially, we’ll have two nodes engaged in a simple request/response protocol.

Here’s how a full source code would look in Java using GridGain 3.0:

import org.gridgain.grid.*;
import org.gridgain.grid.typedef.*;
import java.util.*;
import java.util.concurrent.*;

public class GridMessagingPingPongExample {
    public static void main(String[] args) throws GridException {
        G.in(args.length == 0 ? null : args[0], new CIX1<Grid>() {
            @Override public void applyx(Grid g) throws GridException {
                if (g.nodes().size() < 2) {
                    System.err.println("I need a partner to play a ping pong!");

                    return;
                }

                // Pick random remote node as a partner.
                GridRichNode nodeA = g.localNode();
                GridRichNode nodeB = F.rand(g.remoteNodes());

                // Set up remote player.
                nodeB.remoteListenAsync(nodeA, new GridListenActor<String>() {
                    @Override public void receive(UUID nodeId, String msg) throws GridException {
                        System.out.println(msg);

                        if ("PING".equals(msg)) {
                            respond("PONG");
                        }
                        else if ("STOP".equals(msg)) {
                            stop();
                        }
                    }
                }).get();

                int MAX_PLAYS = 10;

                final CountDownLatch cnt = new CountDownLatch(MAX_PLAYS);

                // Set up local player.
                nodeB.listen(new GridListenActor<String>() {
                    @Override protected void receive(UUID nodeId, String msg) throws GridException {
                        System.out.println(msg);

                        if (cnt.getCount() == 1) {
                            stop("STOP");
                        }
                        else if ("PONG".equals(msg)) {
                            respond("PING");
                        }

                        cnt.countDown();
                    }
                });

                // Serve!
                nodeB.send("PING");

                // Wait til the game is over.
                try {
                    cnt.await();
                }
                catch (InterruptedException e) {
                    System.err.println("Hm... let us finish the game!\n" + e);
                }
            }
        });
    }
}

If you run this program it will join the grid upon its start, pick randomly some remote node out of existing grid topology, configure both (local and remote) message listeners and exchange 10 PING/PONG messages.

Sounds innocent, isn’t it? The most striking part for anyone who’s seeing this code for the first time is the fact that there is only… one program that configures both local and remote nodes. In fact, take a look at the line marked by “Setup the remote player” comment. You essentially supplying an actor (i.e. a closure) as a listener that will only be executed on the remote node (nodeB) when it received the message from this, local, node (nodeA).

This technique nicely demonstrates our drive for Cloud-as-a-VM virtualization technology that essentially removes the boundaries of a single VM and allows you to develop/test/deploy Java code that runs anywhere on the grid or cloud in pretty much the same way as locally.

And if you like Scala – here’s the same application written using our ScalaR – Scala-based DSL for cloud computing:

import org.gridgain.scalar.scalar
import scalar._
import java.util.UUID
import java.util.concurrent.CountDownLatch
import org.gridgain.grid._

object ScalarPingPongExample {
    def main(args: Array[String]) {
        scalar {
            if (grid.nodes().size < 2) {
                error("I need a partner to play a ping pong!")

                return
            }

            // Pick first remote node as a partner.
            val loc = grid.localNode
            val rmt = grid.remoteNodes().iterator.next

            // Set up remote player.
            rmt.remoteListenAsync(loc, new GridListenActor[String]() {
                def receive(nodeId: UUID, msg: String) {
                    println(msg)

                    msg match {
                        case "PING" => respond("PONG")
                        case "STOP" => stop
                    }
                }
            }).get

            val MAX_PLAYS = 10

            val cnt = new CountDownLatch(MAX_PLAYS)

            // Set up local player.
            rmt listen new GridListenActor[String]() {
                def receive(nodeId: UUID, msg: String) {
                    println(msg)

                    if (cnt.getCount() == 1)
                        stop("STOP")
                    else
                        respond("PING")

                    cnt.countDown();
                }
            }

            // Serve!
            rmt !< "PING"

            // Wait til the match is over.
            cnt await
        }
    }
}

As you can see the difference between Java and Scala code is insignificant – but Scala obviously looks a bit more elegant.

Enjoy!