Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Tango Clusters

Clusters are groups of computers. For our purposes, the group is a set of computers on a LAN, each of which is participating in distributed yet related behavior. Many types of application can benefit from adopting a distributed nature; in particular, an application designed to be distributed will tend to scale considerably better than a monolithic design.

For example, a cluster is sometimes used for performing intensive calculations: by farming out large chunks of the computation to multiple ‘slaves’ on the network, a compute-intensive application can generally increase throughput, or decrease computation time, quite effectively. More ‘slaves’ can be added to the cluster as requirements demand. Where a machine drops out of the cluster, others should pick up the slack.

Another example would be the distribution of commonly used read-mostly data, across the cluster, retained in such a manner as to provide fast access for a number of clients. The intent of such a design is typically to unload a central point of contention, such as a database, in order to increase overall throughput.

A further example would be where an application simply cannot keep up with peak demands made upon it, and needs some mechanism to decouple a request from the actual processing of that request. In such a scenario, the application could place requests into a persistent storage area and process each at a rate or time more suited to it. This could be handled via a database (or similar), yet something lightweight and decentralized is often more appropriate, particularly where throughput is a concern or where the database itself has become a bottleneck. Workflow systems tend to operate in a decoupled manner.

These are all scenarios where clustering can become useful. Tango supports a clustering model, with various attractive and flexible attributes, in order to assist in the construction of such applications without you having to expend a lot of effort.

To get a taste for usage, here's an illustration of executing an expression task on the cluster:

real add (real x, real y) {return x + y;}

void main (char[][] args)
{
        scope add = new NetCall!(add);
        Stdout.formatln ("cluster expression of 3.0 + 4.0 = {}", add(3, 4));
}

Applicability

Tango exposes the cluster as a toolkit supporting four types of useful behavior: message queuing, message caching, message execution, and message broadcasting. These are referred to as queue, cache, task, and bulletin respectively.

The idea is that you start one or more instances of cache, queue, and/or task servers, on one or more networked computers, and utilize them via one or more cluster clients. This client portion is linked into your application, enabling it to access cluster facilities – it manages cluster communication aspects on your behalf, and is usually hardened to compensate for various failure situations. In general, multiple clients would be conversing with potentially multiple instances of cache, queue, and/or task servers.

One key factor in doing this successfully can be robustness in the face of transient traffic spikes, varying latency, and ongoing changes in cluster membership as participants come and go (potential machine failures). Other factors might include efficiency, both in the communications medium chosen and in the translation of application entities as they traverse the cluster.

Model and QOS

Tango clustering is represented by a model, composed of D interfaces and utility classes. The expectation is that alternate cluster implementations would expose an identical model, and provide you with a trivial transition between those implementations. Why alternates? Different applications have different needs – one may find that throughput is of utmost importance, whilst another may find data replication is more important, and yet another may require some exotic behavior.

Thus, the model is intended to span a variety of needs and a variety of implementations, each of which enable a different quality of service or QOS. It is entirely feasible for a clustered application to be using multiple QOS implementations, likely for different purposes, at the same time.

In order to join the cluster, an application needs an instance of a QOS client. From that point forward, all activity can be managed via the utility classes exposed by the model. In other words, the QOS exposes a single, fairly simplistic and mostly opaque, component and the model utility classes provide additional behavior atop of that.

The model itself is influenced by a number of existing systems including hints of publish/subscribe, Linda, JavaSpaces, and various others you may be familiar with.

Meet Tina

An embedded QOS implementation is provided as a set of services, with the intent of enabling rapid development of a clustered application. The primary components consist of three servers representing each of the cluster services provided (cache, queue, task), and an associated cluster-client implementation. The former are distinct applications in their own right, whereas the latter is usually linked into your application(s) to make the cluster visible to them. Bulletins are handled entirely by the client instances, in a peer to peer fashion - there is no bulletin server.

Tango exposes these servers as D source modules, with complete (and short) example code to construct relevant servers. The client software is exposed as a single class for your application to instantiate. Together, these allow you to quickly and easily construct application specific clusters.

This embedded QOS is geared towards high efficiency, and makes a point of avoiding any heap activity in all but the one case where it becomes a necessity (cache hosting). It leverages both TCP/IP and multicast for transmission purposes, and disk files for queue storage. The embedded QOS is affectionately known as Tina and, to use it, an application would import tango.net.cluster.tina.Cluster

Key Notions

As with any toolkit, there are a few key ideas to become familiar with in order the get the best out of it. There are only a few to deal with and we’ll address these in this section, beginning with the channel:

Channel

Ever used a publish/subscribe system? If so, the notion of a channel will likely be familiar. It is a named entity through which messages are transported and delivered. Clustered applications subscribe (or listen) to a channel, and publish (or write) on a channel. Without a channel, there is no way in which to communicate with the cluster. You obtain one by asking the QOS to create one on your behalf, and then utilize it from that point forward. In reality, various utility classes exposed by the model will perform channel creation in the background for you. However, you are free to utilize a channel directly if the need arises.

With a channel in hand the application may send and request cluster messages, listen for asynchronous activity, execute tasks on the network, and so on. The various utility classes (exposed by the model) provide a structured framework around these facilities, and add further value where appropriate.

When a message is sent on a channel of a given name, only those listening on the same channel are candidates to receive that message. The channel name must be identical in both places for communication to occur: this provides the basis for segregating different types of messages for different purposes. In practice, we’ve found it highly convenient to use dot-notation for channel names – employee.contact.information for example – and to use channels to differentiate between different data types, or aggregates thereof. In fact, channels are a good way of representing a class in the D programming language – one channel for each distributed class – which nicely segregates differing content from one another and notably simplifies the transmission of aggregate data across the cluster itself. This is a good point at which to segue into message:

Message

Messages are the basis of all cluster content. When you send something to a queue it is in the form of a message. When retrieving content from a cache, you will receive a message. When executing a task within the cluster, it is represented by a message. When asynchronous multicast bulletins are distributed across the cluster, they are message instances. Everything in the cluster is a message.

When using the D programming language, each cluster message is implemented as a class. Further, all messages either derive from the utility class NetworkMessage or implement the IMessage interface. Message instances are transmitted around the cluster, being converted into and out of a network representation along the way.

To convert a message between representations, a handy template can be applied to generate a snippet of code on your behalf. Alternatively, a simple pair of message methods may be implemented in order to explicitly control the behavior. When using the template, all native data attributes of a message are sent and restored. Messages composed of sub-aggregates may require intervention in terms of how they are converted, via these reserved methods.

All message variants should be registered by default. That is, in order for a cluster client to correctly reconstruct an incoming message, each type of message should be handed to the NetworkRegistry at application startup. There are specific cases where this is unnecessary - task don't require client registration, and if you explicitly provide a host for an incoming message then registration would be redundant. However, it's considered good practice to register each message-type used by your application. One convenient place to do this is in the static ctor of a message. For example:

import  tango.net.cluster.NetworkMessage,        
        tango.net.cluster.NetworkRegistry;        

class MyMessage : Message
{
        static this()
        {
                NetworkRegistry.shared.enroll (new MyMessage);
        }
}

Other than registration requirements, each message is a standard D class and operates in the normal fashion. It just has the additional abilities to appear and optionally behave upon other machines in the cluster.

Queue

A queue is a stash of messages. Each queue is identified by its channel name, thus each channel being queued will have a distinct queue instance. Messages are placed into the queue(s) via channel activity and retrieved in a similar manner. These latter two operations represent synchronous activity. Alternatively, message consumers (channel subscribers) can listen asynchronously for message activity, and have messages fed to them when queue activity occurs. Both approaches have their utility and the choice is yours to make.

Messages within a queue are not individually addressable; when you ask for one, you do not specify which particular values are pertinent to the request. Queues are segregated by channel name alone, and not by any other meta-data.

Queued messages may be consumed without further action, or they may be replied to. The reply will typically be sent over a distinct channel (a modification of the original channel name), and will usually be queued before the message originator(s) pick it up. The originator(s) should be listening for replies, in a manner consistent with any other queued message subscription.

Queues are persistent; they survive power failures.

Cache

Cache hosts store messages in much the same way as an associative array, or hash table, does. Messages are isolated by channel name, and are addressable by a key value. In Tango, this key is an array of the char type (char[]).

Messages placed into the clustered cache will replace an existing entry with the same key, within a given channel. In addition, there are conditional time-based values to control optional newness tests. The latter can be important and helpful in a distributed environment, where there may be contention over cache updates. Cache instances will automatically purge old entries in order to make room available for new content (as required).

Through usage of the model utilities, the clustered cache may be partially replicated locally. This mechanism emulates a classic level1/level2 combination, and can be useful to many applications. The model utilities also facilitate coherence across a set of cache instances.

Cache instances are intended to be temporal only, thus the Tina implementation does not persist them.

Task

A task is an executable message, and executes outside of the invoking process. In general, it will appear on one of the available task servers (in the cluster) and execute there before returning to the caller with results. This is a synchronous execution model. For a decoupled execution model, the task can be sent to a queue and hosted there until a subscriber retrieves and executes it. Replies from the decoupled model would generally be sent back via another queue, in the same manner as generic queue messages are replied to (see Queue above).

Tasks are distinct from other message types in that they are active messages rather than passive ones. Instead of lying dormant in a queue or cache, these messages are intended to do something within the cluster. Thus, task messages should implement the execute() method provided for this purpose. There is a task subset known as expression tasks, comprised of a single function. These can be converted to true task instances via a NetCall template, which accepts the function as an argument and returns a task instance. The example at the top of this page illustrates its usage.

Task messages are also distinct in that they should be registered with the cluster. This means that the task message is an integral part of each task server, such that it can be executed there. In practice, there are two principal options available: statically link the task messages into each task server, or dynamically distribute and link them into each task server. Please note that dynamic linking is not currently available to D on all platforms, so the default Tina implementation takes the former route for now – registering with a task server is a matter of an import and a method call.

Bulletin

A notification message style sent to all cluster participants, leveraging the most efficient underlying mechanisms available. These messages are limited in size (generally less than 1KB maximum), and are intended to be simple and lightweight in nature. The Tina QOS uses bulletins for cluster discovery, queue activity notification, cache coherence, and uses multicast as the distribution mechanism. When a bulletin is sent, all listeners on the same channel will receive it.

One application-level usage for bulletins might potentially be notification that all listeners should shut themselves down. This approach might be used to ensure all server instances are running the same version – the new version would be written to a central location, and a batch job could be used to restart a server whenever it had been stopped (using the new central copy).

Push versus Pull

Tango clusters support two types of access to content. Requesting content from a cache or queue is considered to be a synchronous request, and is known as a pull model. On the other hand, creating a listener for queue activity or bulletins will enable an asynchronous notification. This latter model is known as push, since the content is pushed to the recipient without that particular message being explicitly requested.

Like synchronous requests, asynchronous push notifications are delivered on a specific channel: you get what you listen for and nothing else. The principal distinction between push and pull is that the former may occur on a separate thread of execution, so one needs to be aware of potential issues related to multi-threading in general (such as synchronized access to shared resources).

When a notification occurs, the arrival context and incoming message are made available via a parameter passed to the listener. In most notification cases, the arriving message is a single entity representing the notification itself. However, a queue notification will result in one or more queued messages being delivered.

Client Usage

In this section we’ll take a look at how to use the cluster features through code examples. The first step is to import an appropriate cluster. For these examples we’ll be using the Tina QOS provided, but for other implementations one would import the relevant package instead. Note that we’ll focus on the client side here, and the server side in a following section.

Cache Client

In this example we show how to use the cluster as a distributed cache. There are a number of operations available, though the general idea is illustrated here. Note that we pass the command-line arguments to the join() methods: this configures the cache with the full set of valid cache instances available. Unlike other facilities, cache instances are not self-discovering.

import tango.net.cluster.NetworkCache;
import tango.net.cluster.tina.Cluster;

void main (char[][] args)
{
        if (args.length > 1)
           {
           // hook into the cluster
           auto cluster = (new Cluster).join (args[1..$]);

           // hook into the Cache layer
           auto cache = new NetworkCache (cluster, "my.cache.channel");

           // add a cache entry to the cluster
           cache.put ("key", cache.EmptyMessage);

           // retrieve it from the cluster
           auto msg = cache.get ("key");
           }
        else
           Stdout.formatln ("usage: cache cachehost:port ...");
}

Bulletin Client

How to send and receive notifications across the cluster. These are send to every listener on the specific broadcast channel. Take note that we create a callback function and pass to the cluster as our bulletin consumer.

private import  tango.net.cluster.NetworkAlert;
private import  tango.net.cluster.tina.Cluster;

void main()
{
        // hook into the cluster
        auto cluster = (new Cluster).join;

        // hook into the Alert layer
        auto alert = new NetworkAlert (cluster, "my.kind.of.alert");

        // declare a listener
        void listen (IEvent event)
        {
                event.log.info ("Received alert on channel " ~ event.channel.name);
        }

        // listen for the broadcast (on this channel)
        alert.createConsumer (&listen);

        // and send everyone an empty alert (on this channel)
        alert.broadcast;

        // wait for it to arrive ...
        Thread.sleep(1);
}

Queue Pull Client

How to setup and use a queue in synchronous mode. We just place something into our queue and retrieve it:

private import  tango.net.cluster.NetworkQueue;
private import  tango.net.cluster.tina.Cluster;

void main ()
{
        // join the cluster 
        auto cluster = (new Cluster).join;

        // access a queue of the specified name
        auto queue = new NetworkQueue (cluster, "my.queue.channel");

        // stuff something into the queue
        queue.put (queue.EmptyMessage);

        // retrieve it synchronously
        auto msg = queue.get;
}

Queue Push Client

Illustrates how to setup and use a Queue in asynchronous mode. We provide a listener delegate to the cluster, invoked when subscribed content arrives in a queue (from anywhere on the cluster).

private import  tango.net.cluster.NetworkQueue;
private import  tango.net.cluster.tina.Cluster;

void main ()
{
        // join the cluster 
        auto cluster = (new Cluster).join;

        // access a queue of the specified name
        auto queue = new NetworkQueue (cluster, "my.queue.channel");

        // declare a listener
        void listen (IEvent event)
        {
                while (event.get)
                       event.log.info ("received msg on channel " ~ event.channel.name);
        }

        // listen for messages placed in my queue, via a delegate
        queue.createConsumer (&listen);

        // stuff something into the queue
        queue.log.info ("sending three messages to the queue");
        queue.put (queue.EmptyMessage);
        queue.put (queue.EmptyMessage);
        queue.put (queue.EmptyMessage);

        // wait for asynchronous msgs to arrive ...
        Thread.sleep (1);
}

Queue Reply Client

In this variation we queue a message in the cluster, receive it via a listener, reply to that message on a different channel and, finally, receive the reply. There are two listeners in this example:

private import  tango.net.cluster.tina.Cluster;
private import  tango.net.cluster.NetworkQueue;

void main()
{
        // open the cluster and a queue channel. Note that the queue has
        // been configured with a reply listener ...
        auto cluster = (new Cluster).join;
        auto queue = new NetworkQueue (cluster, "message.channel", 
                                      (IEvent event){event.log.info ("Received reply");}
                                      );

        void recipient (IEvent event)
        {
                auto msg = event.get;
        
                event.log.info ("Replying to message on channel "~msg.reply);
                event.reply (event.replyChannel(msg), queue.EmptyMessage);
        }

        // setup a listener to receive and reply
        queue.createConsumer (&recipient);

        // toss a message out to the cluster
        queue.put (queue.EmptyMessage);

        // wait for completion ...
        Thread.sleep (1);
}

Task Client

Cluster task execution generally comprises three participants. First we create the task itself, generally in a distinct module. In this case we're demonstrating the use of an expression task:

module Add;

real add (real x, real y)
{
        return x + y;
}

Next, we import the task into a client application and use it. We wrap our expression task in a shell using the NetCall() template to convert it into a cluster message:

import Add, tango.net.cluster.tina.ClusterTask;

void main (char[][] args)
{
        scope add = new NetCall!(add);
        Stdout.formatln ("cluster expression of 3.0 + 4.0 = {}", add(3, 4));
}

Third, we add the same task into the task server(s) such that it will be available for execution on the cluster. This is how to create and configure an operational task server:

import Add, tango.net.cluster.tina.TaskServer;

void main (char[][] args)
{
        // handle command-line
        auto arg = new CmdParser ("task.server");

        // create ourselves a task server
        auto server = new TaskServer (new InternetAddress(arg.port), arg.log);

        // and configure it with our task(s)
        server.enroll (new NetCall!(add));

        // start the server
        server.start;
}

Tina

Tina is the default QOS implementation, providing three distinct servers for handling each or queue, cache, and task requests. Source code is provided in the form of a toolkit, and one is expected to configure each server to specific needs. However, there are also examples programs supplied, through which a working server can be constructed via a simple compilation. These examples are trivial front-ends to the server functionality, so there should be little difficulty in getting going. For example, here is qserver.d in full:

import tango.io.Console;
import tango.net.InternetAddress;
import tango.net.cluster.tina.CmdParser,
       tango.net.cluster.tina.QueueServer;

void main (char[][] args)
{
        auto arg = new CmdParser ("queue.server");
        if (args.length > 1)
            arg.parse (args[1..$]);
                
        if (arg.help)
            Cout ("usage: qserver -port=number -log[=trace, info, warn, error, fatal, none]").newline;
        else
           (new QueueServer(new InternetAddress(arg.port), arg.log)).start;
}

Each of these servers has a set of command-line options for configuring the amount of log data emitted and the server port number. If neither is specified, an appropriate default will be set. All cluster examples reside in the tango/example/cluster folder, and the modules therein are referred to by name in the following discussion.

Queue Server

The queue is straightforward to configure: compile the example module qserver.d and start it up. Each queue is written to a (distinct) file in the directory where the server is started from. This means that two queue-server instances cannot be started from the same directory, since the queue files are not shared. To instantiate multiple queue-servers on a single machine, start them from different directories.

Cache Server

The cache is also straightforward to configure: compile the example module cserver.d and start it up. When using Tina, cache clients require a set of server:port combinations in order to identify the set of valid cache servers. This is needed due to the nature of the distribution algorithm in use, which requires knowledge of all servers. If, for example, not all cache-instances were running when a client started, the cluster-wide cache would potentially be viewed differently by that particular client than another. Thus, when each cache-server is started, make a note of the port selected or configure it on a specific port. This list should be provided to the cache-clients when they are started.

Task Server

Task servers have a different kind of dependency. Because mainstream D currently has limited support for dynamically loading and introspecting compiled code modules, it is not feasible to dynamically distribute task-code across the cluster at this point.

In the meantime, we must import the code of each task into the server itself such that is it available for execution when requested. Thus, each task should reside in a package or module that is distinct from both client and server, so that they may be imported into both. Specifically, the Tina QOS uses the full name of a task class to match a cluster request with execution thereof – the full name includes the module name, and thus both client and server must import identical task modules for the names to match up.

In general, it would be considered good practice to isolate each task, or group of tasks, into distinct modules – if for no other reason that maintenance and ease of isolation.

Logging

The servers in Tina all use the Tango logging subsystem to report activity. By default the content is logged to the console only, but by adjusting the server configuration one can direct the log to various other targets, including files and so on. Each server is provided with a logger instance by the hosting application, and this is where such configuration should take place (adding an appender, etc). Please see the documentation on logging for further details.

Tech Notes

These are programming concerns which may help you get the most out of the cluster toolkit.

Threads

Cluster listeners are asynchronous by nature, being processed on a separate thread from the main program. When a bulletin notification arrives (push), a delegate provided by the client is invoked with sufficient information to retrieve the incoming message(s).

It is up to the client to ensure appropriate measures are taken to ensure correct action ensues when a notification arrives, given that it is inherently a multi-threaded application at that point. We will likely add a module to convert these asynchronous notifications into event, once the event-subsystem is put in place. In the latter case, all asynchronous notifications would effectively be converted into synchronous notification instead.

Message Slicing

IO within Tina is multi-threaded. Rather than share a single set of IO buffers, each channel instance has its own set. This sidesteps any issues regarding thread-contention & synchronization, and enables Tina to avoid heap-allocation entirely for all network activity. This significantly reduces the memory footprint of your applications, avoids a common point of thread contention, removes clustering as a potential instigator of garbage collection, and generally limits the load placed upon the host computer.

However, when a cluster message arrives at a client, array-attributes of the message are aliased rather than being copied into the heap. In short, messages arriving at a client are transient.

This may becomes an issue where a client intends to store the message locally for a period of time, rather than process it immediately. The design trades-off a large savings in GC pressure for the potential of some message cloning as and when necessary – the act of copying an incoming message such that it is no longer considered transient. The message class has a clone() method specifically for this purpose, and it should be used accordingly.

Message Constraints

In order to successfully send a message it should generally be self-contained. That is – wherever a message is re-instantiated, the representation of it should not require the influence of any third party - it should support what's known as a default-constructor.

For example, if we send a message which embeds a database connection, an equivalent connection must be made available (and assigned) wherever that message is brought back into existence. In practice, this type of behavior is only rarely needed since large subsets of message types are usually quite simplistic in nature. In addition, implementations of cache and queue would rarely need (if ever) to instantiate a message since they would tend to store the message in raw network format instead. However, task instances are executed remotely within the cluster, and this is where consideration must be applied.

In order to satisfy these requirements, the Tina QOS uses a form of directed message-cloning to achieve it. First, task messages should be enrolled with each task server and second, the registered message should implement the read() method in order to establish the cluster instance with the environment it needs. In other words, the registered instance of each message has the option of establishing the equivalent of ctor arguments for each incoming message, potentially based upon it's own ctor arguments. In practice, the need for this second step is rare but the support is there when necessary. The vast majority of tasks require registration only, since the actual executable code itself must be present and available for execution.

Shipping and executing unregistered tasks on the cluster will result in a remote exception, returned to the caller. However we expect to add a facility to install and register tasks dynamically, subject to potential security concerns.

Registration and Hosting

Upon receipt of each incoming message, a cluster client requires a class instance to host the content. In most cases, the host is selected from the message registry where all your application message types were previously enrolled. This is not required for task messages, since the outgoing message instance is used to host the result also. For other message types though, the host is required. Instead of depending upon the registry, an application may manually supply an appropriate host as part of a cluster request. This can be convenient in some advanced uses, especially where the channel name maps directly to a specific message type (a one-to-one mapping between the channel and a message class).

Translations

User Comments

Comments
Author Message

Posted: 05/12/09 19:18:53

A simple Example how to implement a QueueClient? would look like this:

module main;

import tango.net.cluster.NetworkMessage?,

tango.net.cluster.NetworkRegistry?,
tango.net.cluster.NetworkQueue?,
tango.net.cluster.tina.Cluster;

import tango.io.Stdout;

class MyMessage? : NetworkMessage?
{

static char[] result;

public static this ()
{

NetworkRegistry?.instance.enroll (new MyMessage?);

}

void read (IReader input)
{

super.read (input);
input (result);

}

void write (IWriter output)
{

char[10] tmp = "1234567890";

super.write (output);
output (tmp);

}

}

void main ()
{

auto cluster = (new Cluster).join;

auto queue = new NetworkQueue? (cluster, "queue.server");

auto message = new MyMessage?;

queue.put (message);

auto msg = queue.get;

if ( msg !is null )

Stdout.formatln("= {}", MyMessage?.result);

}