1 /** 
2  * Receive queue management
3  */
4 module birchwood.client.receiver;
5 
6 import core.thread : Thread, dur;
7 
8 import std.container.slist : SList;
9 import core.sync.mutex : Mutex;
10 import core.sync.condition : Condition;
11 
12 import eventy : EventyEvent = Event;
13 
14 import birchwood.client;
15 import birchwood.protocol.messages : Message, decodeMessage;
16 import std.string : indexOf;
17 import birchwood.client.events : PongEvent, IRCEvent;
18 import std.string : cmp;
19 
20 version(unittest)
21 {
22     import std.stdio : writeln;
23 }
24 import birchwood.logging;
25 
26 /** 
27  * Manages the receive queue and performs
28  * message parsing and event triggering
29  * based on said messages
30  */
31 public final class ReceiverThread : Thread
32 {
33     /** 
34      * The receive queue
35      */
36     private SList!(ubyte[]) recvQueue;
37 
38     /** 
39      * The receive queue's lock
40      */
41     private Mutex recvQueueLock;
42 
43     /** 
44      * Condition variable for waking
45      * up receive queue reader
46      */
47     private Condition recvQueueCond;
48 
49     /** 
50      * The associated IRC client
51      */
52     private Client client;
53 
54     /** 
55      * Constructs a new receiver thread with the associated
56      * client
57      *
58      * Params:
59      *   client = the Client to associate with
60      * Throws:
61      *   `SnoozeError` on failure to construct an
62      * `Event` or ensure ourselves
63      */
64     this(Client client)
65     {
66         super(&recvHandlerFunc);
67         this.client = client;
68         this.recvQueueLock = new Mutex();
69         this.recvQueueCond = new Condition(this.recvQueueLock);
70     }
71 
72     /** 
73      * Enqueues the raw message into the receieve queue
74      * for eventual processing
75      *
76      * Params:
77      *   encodedMessage = the message to enqueue
78      */
79     public void rq(ubyte[] encodedMessage)
80     {
81         /* Lock queue */
82         recvQueueLock.lock();
83 
84         /* Add to queue */
85         recvQueue.insertAfter(recvQueue[], encodedMessage);
86 
87         /* Wake the sleeping message handler */
88         recvQueueCond.notify();
89 
90         /* Unlock queue */
91         recvQueueLock.unlock();
92     }
93 
94     /** 
95      * The receive queue worker function
96      *
97      * This has the job of dequeuing messages
98      * in the receive queue, decoding them
99      * into Message objects and then emitting
100      * an event depending on the type of message
101      *
102      * Handles PINGs along with normal messages
103      */
104     private void recvHandlerFunc()
105     {
106         while(client.isRunning())
107         {
108             /* Lock the queue */
109             recvQueueLock.lock();
110 
111             /* Sleep till woken (new message) */
112             recvQueueCond.wait(); // TODO: Check SyncError?
113 
114             /* Parsed messages */
115             SList!(Message) currentMessageQueue;
116 
117             /** 
118              * Parse all messages and save them
119              * into the above array
120              */
121             foreach(ubyte[] message; recvQueue[])
122             {
123                 /* Decode the message */
124                 string decodedMessage = decodeMessage(message);
125 
126                 /* Parse the message */
127                 Message parsedMessage = Message.parseReceivedMessage(decodedMessage);
128 
129                 /* Save it */
130                 currentMessageQueue.insertAfter(currentMessageQueue[], parsedMessage);
131             }
132 
133 
134             /** 
135              * Search for any PING messages, then store it if so
136              * and remove it so it isn't processed again later
137              */
138             Message pingMessage;
139             foreach(Message curMsg; currentMessageQueue[])
140             {
141                 if(cmp(curMsg.getCommand(), "PING") == 0)
142                 {
143                     currentMessageQueue.linearRemoveElement(curMsg);
144                     pingMessage = curMsg;
145                     break;
146                 }
147             }
148 
149             /** 
150              * If we have a PING then respond with a PONG
151              */
152             if(pingMessage !is null)
153             {
154                 DEBUG("Found a ping: "~pingMessage.toString());
155 
156                 /* Extract the PING ID */
157                 string pingID = pingMessage.getParams();
158 
159                 /* Spawn a PONG event */
160                 EventyEvent pongEvent = new PongEvent(pingID);
161                 client.getEngine().push(pongEvent);
162             }
163 
164 
165 
166 
167            
168 
169 
170             /**
171             * TODO: Plan of action
172             *
173             * 1. Firstly, we must run `parseReceivedMessage()` on the dequeued
174             *    ping message (if any)
175             * 2. Then (if there was a PING) trigger said PING handler
176             * 3. Normal message handling; `parseReceivedMessage()` on one of the messages
177             * (make the dequeue amount configurable possibly)
178             * 4. Trigger generic handler
179             * 5. We might need to also have a queue for commands ISSUED and command-replies
180             *    RECEIVED and then match those first and do something with them (tasky-esque)
181             * 6. We can just make a generic reply queue of these things - we have to maybe to this
182             * - we can cache or remember stuff when we get 353
183             */
184 
185             /** 
186              * Process each message remaining in the queue now
187              * till it is empty
188              */
189             while(!currentMessageQueue.empty())
190             {
191                 /* Get the frontmost Message */
192                 Message curMsg = currentMessageQueue.front();
193 
194                 // TODO: Remove the Eventy push and replace with a handler call (on second thought no)
195                 EventyEvent ircEvent = new IRCEvent(curMsg);
196                 client.getEngine.push(ircEvent);
197 
198                 /* Remove the message from the queue */
199                 currentMessageQueue.linearRemoveElement(curMsg);
200             }
201 
202             /* Clear the receive queue */
203             recvQueue.clear();
204         
205             /* Unlock the receive queue */
206             recvQueueLock.unlock();
207         }
208     }
209 
210     /** 
211      * Stops the receive queue manager
212      */
213     public void end()
214     {
215         /* Lock the queue */
216         recvQueueLock.lock();
217 
218         /* Wake up sleeping thread (so it can exit) */
219         recvQueueCond.notify();
220 
221         /* Unlock the queue */
222         recvQueueLock.unlock();
223 
224         // Wait on the manager thread to end
225         join();
226     }
227 }