Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Changeset 2467

Show
Ignore:
Timestamp:
07/25/07 09:16:15 (1 year ago)
Author:
kris
Message:

adjusted the queue retrieval mechanism to be lazier than before, and renamed thaw() to be get() instead

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/example/cluster/alert.d

    r2465 r2467  
    2323        // listen for the broadcast (on this channel) 
    2424        alert.createConsumer (delegate void (IEvent event) 
    25                              {alert.cluster.log.info ("Recieved alert on channel " ~ event.channel.name);} 
     25                             {event.log.info ("Recieved alert on channel " ~ event.channel.name);} 
    2626                             ); 
    2727 
    2828        // say what's going on 
    29         alert.cluster.log.info ("broadcasting alert"); 
     29        alert.log.info ("broadcasting alert"); 
    3030 
    3131        // and send everyone an empty alert (on this channel) 
  • trunk/example/cluster/invalidate.d

    r2465 r2467  
    2828 
    2929        // stuff something in the local cache 
    30         dst.getCache.put ("key", dst.EmptyMessage); 
     30        dst.cache.put ("key", dst.EmptyMessage); 
    3131 
    3232        // get it removed via a network broadcast 
    33         cluster.log.info ("invalidating 'key' across the cluster"); 
     33        src.log.info ("invalidating 'key' across the cluster"); 
    3434        src.invalidate ("key"); 
    3535 
  • trunk/example/cluster/qlisten.d

    r2442 r2467  
    99/******************************************************************************* 
    1010 
    11         Illustrates how to setup and use a Queue in both synchronous and 
    12         asynchronous modes 
     11        Illustrates how to setup and use a Queue in asynchronous mode 
    1312 
    1413*******************************************************************************/ 
     
    1615void main () 
    1716{ 
     17        void listen (IEvent event) 
     18        { 
     19                while (event.get) 
     20                       event.log.info ("received asynch msg on channel " ~ event.channel.name); 
     21        } 
     22                 
     23 
    1824        // join the cluster  
    1925        auto cluster = (new Cluster).join; 
     
    2228        auto queue = new NetworkQueue (cluster, "my.queue.channel"); 
    2329 
    24  
    25         /***** synchronous operation ********/ 
     30        // listen for messages placed in my queue, via a delegate 
     31        queue.createConsumer (&listen); 
    2632 
    2733        // stuff something into the queue 
    28         queue.put (queue.EmptyMessage); 
    29  
    30         // retrieve it synchronously 
    31         auto msg = queue.get; 
    32  
    33  
    34  
    35         /***** asynchronous operation ********/ 
    36  
    37         // listen for messages placed in my queue, via a delegate 
    38         queue.createConsumer ((IEvent event) {queue.cluster.log.info ("received asynch msg on channel " ~ event.channel.name);}); 
    39  
    40         // stuff something into the queue 
    41         queue.cluster.log.info ("sending three messages to the queue"); 
     34        queue.log.info ("sending three messages to the queue"); 
    4235        queue.put (queue.EmptyMessage); 
    4336        queue.put (queue.EmptyMessage); 
  • trunk/example/cluster/reply.d

    r2465 r2467  
    1717        auto cluster = (new Cluster).join; 
    1818        auto queue = new NetworkQueue (cluster, "message.channel",  
    19                                       (IEvent event){event.channel.log.info ("Received reply");} 
     19                                      (IEvent event){event.log.info ("Received reply");} 
    2020                                      ); 
    2121 
    2222        void recipient (IEvent event) 
    2323        { 
    24                 auto msg = event.thaw
     24                auto msg = event.get
    2525         
    26                 event.channel.log.info ("Replying to message on channel "~msg.reply); 
     26                event.log.info ("Replying to message on channel "~msg.reply); 
    2727                event.reply (event.replyChannel(msg), queue.EmptyMessage); 
    2828        } 
  • trunk/tango/net/cluster/CacheInvalidatee.d

    r2465 r2467  
    1212 
    1313module tango.net.cluster.CacheInvalidatee; 
    14  
    15 private import  tango.util.log.Logger; 
    1614 
    1715private import  tango.net.cluster.model.ICache; 
     
    3634        alias ICache!(char[], IMessage) Cache; 
    3735 
    38         private Cache                   cache; 
    39         private Logger                  logger; 
     36        private Cache                   cache_; 
    4037        private IConsumer               consumer; 
    4138 
     
    4946         
    5047        this (ICluster cluster, char[] name, Cache cache) 
    51         in { 
    52            assert (cache); 
    53            } 
    54         body 
    5548        { 
    5649                super (cluster, name); 
    5750 
    58                 this.cache = cache
    59                 this.logger = cluster.log
     51                assert (cache)
     52                cache_ = cache
    6053         
    6154                // start listening for invalidation requests 
     
    8174        ***********************************************************************/ 
    8275         
    83         Cache getCache () 
     76        Cache cache () 
    8477        { 
    85                 return cache
     78                return cache_
    8679        } 
    8780 
     
    9689        { 
    9790                scope p = new InvalidatorPayload; 
    98                 event.thaw (p); 
     91                event.get (p); 
    9992 
    10093                // remove entry from our cache 
    101                 if (cache.remove (p.key, p.time)) 
    102                     logger.trace ("removed cache entry '"~p.key~ 
    103                                   "' on channel '"~event.channel.name~"'"); 
     94                if (cache_.remove (p.key, p.time)) 
     95                    event.log.trace ("removed cache entry '"~p.key~ 
     96                                     "' on channel '"~event.channel.name~"'"); 
    10497        }   
    10598} 
  • trunk/tango/net/cluster/NetworkClient.d

    r2465 r2467  
    9999 
    100100        /*********************************************************************** 
     101 
     102                Return the Log instance 
     103 
     104        ***********************************************************************/ 
     105         
     106        Logger log () 
     107        { 
     108                return cluster_.log; 
     109        } 
     110 
     111        /*********************************************************************** 
    101112         
    102113                Create a channel with the specified name. A channel  
  • trunk/tango/net/cluster/model/IChannel.d

    r2465 r2467  
    3131{ 
    3232        /*********************************************************************** 
     33 
     34                Return the Logger associated with this cluster 
     35 
     36        ***********************************************************************/ 
     37         
     38        Logger log (); 
     39 
     40        /*********************************************************************** 
    3341         
    3442                Return the name of this channel. This is the name provided 
     
    3846 
    3947        char[] name (); 
    40  
    41  
    42         /*********************************************************************** 
    43  
    44                 Return the Logger associated with this cluster 
    45  
    46         ***********************************************************************/ 
    47          
    48         Logger log (); 
    4948 
    5049        /*********************************************************************** 
     
    160159        /*********************************************************************** 
    161160 
    162                 Return the reader used to access incoming requests 
    163  
    164         ***********************************************************************/ 
    165          
    166         IMessage thaw (IMessage host = null); 
     161                Return one or more messages associated with this event, or 
     162                null if there is nothing available 
     163 
     164        ***********************************************************************/ 
     165         
     166        IMessage get (IMessage host = null); 
    167167 
    168168        /*********************************************************************** 
     
    183183         
    184184        IChannel replyChannel (IMessage message); 
     185 
     186        /*********************************************************************** 
     187 
     188                Return the Logger associated with this cluster 
     189 
     190        ***********************************************************************/ 
     191         
     192        Logger log (); 
    185193} 
    186194 
  • trunk/tango/net/cluster/tina/Cluster.d

    r2465 r2467  
    196196        { 
    197197                scope rollcall = new RollCall; 
    198                 event.thaw (rollcall); 
     198                event.get (rollcall); 
    199199 
    200200                switch (rollcall.type) 
     
    756756        ***********************************************************************/ 
    757757 
    758         IMessage thaw (IMessage host = null) 
     758        IMessage get (IMessage host = null) 
    759759        { 
    760760                if (hasMore) 
     
    762762 
    763763                throw new ClusterException ("attempting to thaw a non-existant message"); 
     764        } 
     765 
     766        /*********************************************************************** 
     767         
     768                Return the assigned logger 
     769 
     770        ***********************************************************************/ 
     771 
     772        final Logger log () 
     773        { 
     774                return cluster.log; 
    764775        } 
    765776 
     
    910921        /*********************************************************************** 
    911922 
    912                 override the default processing to sweep the cluster for  
     923                Overrides the default processing to sweep the cluster for  
    913924                queued entries. Each server node is queried until one is 
    914925                found that contains a message. Note that it is possible  
    915926                to set things up where we are told exactly which node to 
    916                 go to; howerver given that we won't be listening whilst 
     927                go to; however given that we won't be listening whilst 
    917928                scanning, and that there's likely to be a group of new 
    918929                entries in the cluster, it's just as effective to scan. 
     
    920931                should make the strategy pluggable instead.                  
    921932 
     933                Note also that the content is retrieved via a duplicate 
     934                channel to avoid potential race-conditions on the original 
     935 
     936        ***********************************************************************/ 
     937 
     938        override IMessage get (IMessage host = null) 
     939        { 
     940                if (channel.scanQueue) 
     941                    return channel.thaw (host); 
     942                return null; 
     943        } 
     944 
     945        /*********************************************************************** 
     946 
     947                Send a message back to the producer        
     948 
     949        ***********************************************************************/ 
     950         
     951        override void reply (IChannel channel, IMessage message) 
     952        { 
     953                assert (channel); 
     954                assert (message); 
     955 
     956                channel.putQueue (message); 
     957        } 
     958 
     959        /*********************************************************************** 
     960 
     961                Override the default notification handler in order to 
     962                disable multicast reciepts while the application does 
     963                what it needs to 
     964                 
    922965        ***********************************************************************/ 
    923966        
    924967        override protected void invoke (IEvent event) 
    925968        {                 
    926                 // temporarily pause listening while we sweep cluster ... 
    927                 pauseGroup;              
    928                 scope (exit) 
    929                        resumeGroup; 
    930  
    931                 while (channel.scanQueue) 
    932                        listener (event); 
    933         } 
    934  
    935         /*********************************************************************** 
    936  
    937                 Deserialize a received message. Note that the content is 
    938                 lying in the channel input and not our own  
    939  
    940         ***********************************************************************/ 
    941  
    942         override IMessage thaw (IMessage host = null) 
    943         { 
    944                 return channel.thaw (host); 
    945         } 
    946  
    947         /*********************************************************************** 
    948  
    949                 Send a message back to the producer        
    950  
    951         ***********************************************************************/ 
    952          
    953         override void reply (IChannel channel, IMessage message) 
    954         { 
    955                 assert (channel); 
    956                 assert (message); 
    957  
    958                 channel.putQueue (message); 
     969                // temporarily pause listening while processing 
     970                pauseGroup; 
     971                try { 
     972                    listener (event); 
     973                    } finally resumeGroup; 
    959974        } 
    960975} 
     
    13531368/******************************************************************************* 
    13541369         
    1355         Models a generic set of cluster nodes. This is indented to be 
     1370        Models a generic set of cluster nodes. This is intended to be 
    13561371        thread-safe, with no locking on a lookup operation 
    13571372 
     
    13871402        /*********************************************************************** 
    13881403 
    1389                 Add a node to the list of servers, and sort them such that 
    1390                 all clients will have the same ordered set 
     1404                Add a node to the list of servers 
    13911405 
    13921406        ***********************************************************************/ 
     
    16061620        ***********************************************************************/ 
    16071621         
    1608         final bool scan (bool delegate (Node) dg) 
    1609         { 
    1610                 //auto  list = set.nodes; 
    1611                 auto    hosts = set.random; 
    1612                 int     index = hosts.length; 
    1613  
    1614                 while (index--) 
     1622        final bool scan (bool delegate(Node) dg) 
     1623        { 
     1624                auto hosts = set.random; 
     1625                auto index = hosts.length; 
     1626                 
     1627                while (index) 
    16151628                      { 
    16161629                      // lookup the randomized set of server nodes 
    1617                       auto node = hosts [index]; 
     1630                      auto node = hosts [--index]; 
    16181631 
    16191632                      // callback on each enabled node 
  • trunk/tango/net/cluster/tina/ClusterServer.d

    r2465 r2467  
    180180        { 
    181181                scope input = new RollCall; 
    182                 event.thaw (input); 
     182                event.get (input); 
    183183 
    184184                // if this is a request, reply with our identity