 |
Changeset 2467
- Timestamp:
- 07/25/07 09:16:15
(1 year ago)
- Author:
- kris
- Message:
adjusted the queue retrieval mechanism to be lazier than before, and renamed thaw() to be get() instead
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
| r2465 |
r2467 |
|
| 23 | 23 | // listen for the broadcast (on this channel) |
|---|
| 24 | 24 | alert.createConsumer (delegate void (IEvent event) |
|---|
| 25 | | {alert.cluster.log.info ("Recieved alert on channel " ~ event.channel.name);} |
|---|
| | 25 | {event.log.info ("Recieved alert on channel " ~ event.channel.name);} |
|---|
| 26 | 26 | ); |
|---|
| 27 | 27 | |
|---|
| 28 | 28 | // say what's going on |
|---|
| 29 | | alert.cluster.log.info ("broadcasting alert"); |
|---|
| | 29 | alert.log.info ("broadcasting alert"); |
|---|
| 30 | 30 | |
|---|
| 31 | 31 | // and send everyone an empty alert (on this channel) |
|---|
| r2465 |
r2467 |
|
| 28 | 28 | |
|---|
| 29 | 29 | // stuff something in the local cache |
|---|
| 30 | | dst.getCache.put ("key", dst.EmptyMessage); |
|---|
| | 30 | dst.cache.put ("key", dst.EmptyMessage); |
|---|
| 31 | 31 | |
|---|
| 32 | 32 | // get it removed via a network broadcast |
|---|
| 33 | | cluster.log.info ("invalidating 'key' across the cluster"); |
|---|
| | 33 | src.log.info ("invalidating 'key' across the cluster"); |
|---|
| 34 | 34 | src.invalidate ("key"); |
|---|
| 35 | 35 | |
|---|
| r2442 |
r2467 |
|
| 9 | 9 | /******************************************************************************* |
|---|
| 10 | 10 | |
|---|
| 11 | | Illustrates how to setup and use a Queue in both synchronous and |
|---|
| 12 | | asynchronous modes |
|---|
| | 11 | Illustrates how to setup and use a Queue in asynchronous mode |
|---|
| 13 | 12 | |
|---|
| 14 | 13 | *******************************************************************************/ |
|---|
| … | … | |
| 16 | 15 | void main () |
|---|
| 17 | 16 | { |
|---|
| | 17 | void listen (IEvent event) |
|---|
| | 18 | { |
|---|
| | 19 | while (event.get) |
|---|
| | 20 | event.log.info ("received asynch msg on channel " ~ event.channel.name); |
|---|
| | 21 | } |
|---|
| | 22 | |
|---|
| | 23 | |
|---|
| 18 | 24 | // join the cluster |
|---|
| 19 | 25 | auto cluster = (new Cluster).join; |
|---|
| … | … | |
| 22 | 28 | auto queue = new NetworkQueue (cluster, "my.queue.channel"); |
|---|
| 23 | 29 | |
|---|
| 24 | | |
|---|
| 25 | | /***** synchronous operation ********/ |
|---|
| | 30 | // listen for messages placed in my queue, via a delegate |
|---|
| | 31 | queue.createConsumer (&listen); |
|---|
| 26 | 32 | |
|---|
| 27 | 33 | // stuff something into the queue |
|---|
| 28 | | queue.put (queue.EmptyMessage); |
|---|
| 29 | | |
|---|
| 30 | | // retrieve it synchronously |
|---|
| 31 | | auto msg = queue.get; |
|---|
| 32 | | |
|---|
| 33 | | |
|---|
| 34 | | |
|---|
| 35 | | /***** asynchronous operation ********/ |
|---|
| 36 | | |
|---|
| 37 | | // listen for messages placed in my queue, via a delegate |
|---|
| 38 | | queue.createConsumer ((IEvent event) {queue.cluster.log.info ("received asynch msg on channel " ~ event.channel.name);}); |
|---|
| 39 | | |
|---|
| 40 | | // stuff something into the queue |
|---|
| 41 | | queue.cluster.log.info ("sending three messages to the queue"); |
|---|
| | 34 | queue.log.info ("sending three messages to the queue"); |
|---|
| 42 | 35 | queue.put (queue.EmptyMessage); |
|---|
| 43 | 36 | queue.put (queue.EmptyMessage); |
|---|
| r2465 |
r2467 |
|
| 17 | 17 | auto cluster = (new Cluster).join; |
|---|
| 18 | 18 | auto queue = new NetworkQueue (cluster, "message.channel", |
|---|
| 19 | | (IEvent event){event.channel.log.info ("Received reply");} |
|---|
| | 19 | (IEvent event){event.log.info ("Received reply");} |
|---|
| 20 | 20 | ); |
|---|
| 21 | 21 | |
|---|
| 22 | 22 | void recipient (IEvent event) |
|---|
| 23 | 23 | { |
|---|
| 24 | | auto msg = event.thaw; |
|---|
| | 24 | auto msg = event.get; |
|---|
| 25 | 25 | |
|---|
| 26 | | event.channel.log.info ("Replying to message on channel "~msg.reply); |
|---|
| | 26 | event.log.info ("Replying to message on channel "~msg.reply); |
|---|
| 27 | 27 | event.reply (event.replyChannel(msg), queue.EmptyMessage); |
|---|
| 28 | 28 | } |
|---|
| r2465 |
r2467 |
|
| 12 | 12 | |
|---|
| 13 | 13 | module tango.net.cluster.CacheInvalidatee; |
|---|
| 14 | | |
|---|
| 15 | | private import tango.util.log.Logger; |
|---|
| 16 | 14 | |
|---|
| 17 | 15 | private import tango.net.cluster.model.ICache; |
|---|
| … | … | |
| 36 | 34 | alias ICache!(char[], IMessage) Cache; |
|---|
| 37 | 35 | |
|---|
| 38 | | private Cache cache; |
|---|
| 39 | | private Logger logger; |
|---|
| | 36 | private Cache cache_; |
|---|
| 40 | 37 | private IConsumer consumer; |
|---|
| 41 | 38 | |
|---|
| … | … | |
| 49 | 46 | |
|---|
| 50 | 47 | this (ICluster cluster, char[] name, Cache cache) |
|---|
| 51 | | in { |
|---|
| 52 | | assert (cache); |
|---|
| 53 | | } |
|---|
| 54 | | body |
|---|
| 55 | 48 | { |
|---|
| 56 | 49 | super (cluster, name); |
|---|
| 57 | 50 | |
|---|
| 58 | | this.cache = cache; |
|---|
| 59 | | this.logger = cluster.log; |
|---|
| | 51 | assert (cache); |
|---|
| | 52 | cache_ = cache; |
|---|
| 60 | 53 | |
|---|
| 61 | 54 | // start listening for invalidation requests |
|---|
| … | … | |
| 81 | 74 | ***********************************************************************/ |
|---|
| 82 | 75 | |
|---|
| 83 | | Cache getCache () |
|---|
| | 76 | Cache cache () |
|---|
| 84 | 77 | { |
|---|
| 85 | | return cache; |
|---|
| | 78 | return cache_; |
|---|
| 86 | 79 | } |
|---|
| 87 | 80 | |
|---|
| … | … | |
| 96 | 89 | { |
|---|
| 97 | 90 | scope p = new InvalidatorPayload; |
|---|
| 98 | | event.thaw (p); |
|---|
| | 91 | event.get (p); |
|---|
| 99 | 92 | |
|---|
| 100 | 93 | // remove entry from our cache |
|---|
| 101 | | if (cache.remove (p.key, p.time)) |
|---|
| 102 | | logger.trace ("removed cache entry '"~p.key~ |
|---|
| 103 | | "' on channel '"~event.channel.name~"'"); |
|---|
| | 94 | if (cache_.remove (p.key, p.time)) |
|---|
| | 95 | event.log.trace ("removed cache entry '"~p.key~ |
|---|
| | 96 | "' on channel '"~event.channel.name~"'"); |
|---|
| 104 | 97 | } |
|---|
| 105 | 98 | } |
|---|
| r2465 |
r2467 |
|
| 99 | 99 | |
|---|
| 100 | 100 | /*********************************************************************** |
|---|
| | 101 | |
|---|
| | 102 | Return the Log instance |
|---|
| | 103 | |
|---|
| | 104 | ***********************************************************************/ |
|---|
| | 105 | |
|---|
| | 106 | Logger log () |
|---|
| | 107 | { |
|---|
| | 108 | return cluster_.log; |
|---|
| | 109 | } |
|---|
| | 110 | |
|---|
| | 111 | /*********************************************************************** |
|---|
| 101 | 112 | |
|---|
| 102 | 113 | Create a channel with the specified name. A channel |
|---|
| r2465 |
r2467 |
|
| 31 | 31 | { |
|---|
| 32 | 32 | /*********************************************************************** |
|---|
| | 33 | |
|---|
| | 34 | Return the Logger associated with this cluster |
|---|
| | 35 | |
|---|
| | 36 | ***********************************************************************/ |
|---|
| | 37 | |
|---|
| | 38 | Logger log (); |
|---|
| | 39 | |
|---|
| | 40 | /*********************************************************************** |
|---|
| 33 | 41 | |
|---|
| 34 | 42 | Return the name of this channel. This is the name provided |
|---|
| … | … | |
| 38 | 46 | |
|---|
| 39 | 47 | char[] name (); |
|---|
| 40 | | |
|---|
| 41 | | |
|---|
| 42 | | /*********************************************************************** |
|---|
| 43 | | |
|---|
| 44 | | Return the Logger associated with this cluster |
|---|
| 45 | | |
|---|
| 46 | | ***********************************************************************/ |
|---|
| 47 | | |
|---|
| 48 | | Logger log (); |
|---|
| 49 | 48 | |
|---|
| 50 | 49 | /*********************************************************************** |
|---|
| … | … | |
| 160 | 159 | /*********************************************************************** |
|---|
| 161 | 160 | |
|---|
| 162 | | Return the reader used to access incoming requests |
|---|
| 163 | | |
|---|
| 164 | | ***********************************************************************/ |
|---|
| 165 | | |
|---|
| 166 | | IMessage thaw (IMessage host = null); |
|---|
| | 161 | Return one or more messages associated with this event, or |
|---|
| | 162 | null if there is nothing available |
|---|
| | 163 | |
|---|
| | 164 | ***********************************************************************/ |
|---|
| | 165 | |
|---|
| | 166 | IMessage get (IMessage host = null); |
|---|
| 167 | 167 | |
|---|
| 168 | 168 | /*********************************************************************** |
|---|
| … | … | |
| 183 | 183 | |
|---|
| 184 | 184 | IChannel replyChannel (IMessage message); |
|---|
| | 185 | |
|---|
| | 186 | /*********************************************************************** |
|---|
| | 187 | |
|---|
| | 188 | Return the Logger associated with this cluster |
|---|
| | 189 | |
|---|
| | 190 | ***********************************************************************/ |
|---|
| | 191 | |
|---|
| | 192 | Logger log (); |
|---|
| 185 | 193 | } |
|---|
| 186 | 194 | |
|---|
| r2465 |
r2467 |
|
| 196 | 196 | { |
|---|
| 197 | 197 | scope rollcall = new RollCall; |
|---|
| 198 | | event.thaw (rollcall); |
|---|
| | 198 | event.get (rollcall); |
|---|
| 199 | 199 | |
|---|
| 200 | 200 | switch (rollcall.type) |
|---|
| … | … | |
| 756 | 756 | ***********************************************************************/ |
|---|
| 757 | 757 | |
|---|
| 758 | | IMessage thaw (IMessage host = null) |
|---|
| | 758 | IMessage get (IMessage host = null) |
|---|
| 759 | 759 | { |
|---|
| 760 | 760 | if (hasMore) |
|---|
| … | … | |
| 762 | 762 | |
|---|
| 763 | 763 | throw new ClusterException ("attempting to thaw a non-existant message"); |
|---|
| | 764 | } |
|---|
| | 765 | |
|---|
| | 766 | /*********************************************************************** |
|---|
| | 767 | |
|---|
| | 768 | Return the assigned logger |
|---|
| | 769 | |
|---|
| | 770 | ***********************************************************************/ |
|---|
| | 771 | |
|---|
| | 772 | final Logger log () |
|---|
| | 773 | { |
|---|
| | 774 | return cluster.log; |
|---|
| 764 | 775 | } |
|---|
| 765 | 776 | |
|---|
| … | … | |
| 910 | 921 | /*********************************************************************** |
|---|
| 911 | 922 | |
|---|
| 912 | | override the default processing to sweep the cluster for |
|---|
| | 923 | Overrides the default processing to sweep the cluster for |
|---|
| 913 | 924 | queued entries. Each server node is queried until one is |
|---|
| 914 | 925 | found that contains a message. Note that it is possible |
|---|
| 915 | 926 | to set things up where we are told exactly which node to |
|---|
| 916 | | go to; howerver given that we won't be listening whilst |
|---|
| | 927 | go to; however given that we won't be listening whilst |
|---|
| 917 | 928 | scanning, and that there's likely to be a group of new |
|---|
| 918 | 929 | entries in the cluster, it's just as effective to scan. |
|---|
| … | … | |
| 920 | 931 | should make the strategy pluggable instead. |
|---|
| 921 | 932 | |
|---|
| | 933 | Note also that the content is retrieved via a duplicate |
|---|
| | 934 | channel to avoid potential race-conditions on the original |
|---|
| | 935 | |
|---|
| | 936 | ***********************************************************************/ |
|---|
| | 937 | |
|---|
| | 938 | override IMessage get (IMessage host = null) |
|---|
| | 939 | { |
|---|
| | 940 | if (channel.scanQueue) |
|---|
| | 941 | return channel.thaw (host); |
|---|
| | 942 | return null; |
|---|
| | 943 | } |
|---|
| | 944 | |
|---|
| | 945 | /*********************************************************************** |
|---|
| | 946 | |
|---|
| | 947 | Send a message back to the producer |
|---|
| | 948 | |
|---|
| | 949 | ***********************************************************************/ |
|---|
| | 950 | |
|---|
| | 951 | override void reply (IChannel channel, IMessage message) |
|---|
| | 952 | { |
|---|
| | 953 | assert (channel); |
|---|
| | 954 | assert (message); |
|---|
| | 955 | |
|---|
| | 956 | channel.putQueue (message); |
|---|
| | 957 | } |
|---|
| | 958 | |
|---|
| | 959 | /*********************************************************************** |
|---|
| | 960 | |
|---|
| | 961 | Override the default notification handler in order to |
|---|
| | 962 | disable multicast reciepts while the application does |
|---|
| | 963 | what it needs to |
|---|
| | 964 | |
|---|
| 922 | 965 | ***********************************************************************/ |
|---|
| 923 | 966 | |
|---|
| 924 | 967 | override protected void invoke (IEvent event) |
|---|
| 925 | 968 | { |
|---|
| 926 | | // temporarily pause listening while we sweep cluster ... |
|---|
| 927 | | pauseGroup; |
|---|
| 928 | | scope (exit) |
|---|
| 929 | | resumeGroup; |
|---|
| 930 | | |
|---|
| 931 | | while (channel.scanQueue) |
|---|
| 932 | | listener (event); |
|---|
| 933 | | } |
|---|
| 934 | | |
|---|
| 935 | | /*********************************************************************** |
|---|
| 936 | | |
|---|
| 937 | | Deserialize a received message. Note that the content is |
|---|
| 938 | | lying in the channel input and not our own |
|---|
| 939 | | |
|---|
| 940 | | ***********************************************************************/ |
|---|
| 941 | | |
|---|
| 942 | | override IMessage thaw (IMessage host = null) |
|---|
| 943 | | { |
|---|
| 944 | | return channel.thaw (host); |
|---|
| 945 | | } |
|---|
| 946 | | |
|---|
| 947 | | /*********************************************************************** |
|---|
| 948 | | |
|---|
| 949 | | Send a message back to the producer |
|---|
| 950 | | |
|---|
| 951 | | ***********************************************************************/ |
|---|
| 952 | | |
|---|
| 953 | | override void reply (IChannel channel, IMessage message) |
|---|
| 954 | | { |
|---|
| 955 | | assert (channel); |
|---|
| 956 | | assert (message); |
|---|
| 957 | | |
|---|
| 958 | | channel.putQueue (message); |
|---|
| | 969 | // temporarily pause listening while processing |
|---|
| | 970 | pauseGroup; |
|---|
| | 971 | try { |
|---|
| | 972 | listener (event); |
|---|
| | 973 | } finally resumeGroup; |
|---|
| 959 | 974 | } |
|---|
| 960 | 975 | } |
|---|
| … | … | |
| 1353 | 1368 | /******************************************************************************* |
|---|
| 1354 | 1369 | |
|---|
| 1355 | | Models a generic set of cluster nodes. This is indented to be |
|---|
| | 1370 | Models a generic set of cluster nodes. This is intended to be |
|---|
| 1356 | 1371 | thread-safe, with no locking on a lookup operation |
|---|
| 1357 | 1372 | |
|---|
| … | … | |
| 1387 | 1402 | /*********************************************************************** |
|---|
| 1388 | 1403 | |
|---|
| 1389 | | Add a node to the list of servers, and sort them such that |
|---|
| 1390 | | all clients will have the same ordered set |
|---|
| | 1404 | Add a node to the list of servers |
|---|
| 1391 | 1405 | |
|---|
| 1392 | 1406 | ***********************************************************************/ |
|---|
| … | … | |
| 1606 | 1620 | ***********************************************************************/ |
|---|
| 1607 | 1621 | |
|---|
| 1608 | | final bool scan (bool delegate (Node) dg) |
|---|
| 1609 | | { |
|---|
| 1610 | | //auto list = set.nodes; |
|---|
| 1611 | | auto hosts = set.random; |
|---|
| 1612 | | int index = hosts.length; |
|---|
| 1613 | | |
|---|
| 1614 | | while (index--) |
|---|
| | 1622 | final bool scan (bool delegate(Node) dg) |
|---|
| | 1623 | { |
|---|
| | 1624 | auto hosts = set.random; |
|---|
| | 1625 | auto index = hosts.length; |
|---|
| | 1626 | |
|---|
| | 1627 | while (index) |
|---|
| 1615 | 1628 | { |
|---|
| 1616 | 1629 | // lookup the randomized set of server nodes |
|---|
| 1617 | | auto node = hosts [index]; |
|---|
| | 1630 | auto node = hosts [--index]; |
|---|
| 1618 | 1631 | |
|---|
| 1619 | 1632 | // callback on each enabled node |
|---|
| r2465 |
r2467 |
|
| 180 | 180 | { |
|---|
| 181 | 181 | scope input = new RollCall; |
|---|
| 182 | | event.thaw (input); |
|---|
| | 182 | event.get (input); |
|---|
| 183 | 183 | |
|---|
| 184 | 184 | // if this is a request, reply with our identity |
|---|
Download in other formats:
|
 |
 |
|
 |
Copyright © 2006-2008 Tango. All Rights Reserved. | Page Width:
Static or
Dynamic