1 /** 
2  * Receive queue management
3  */
4 module birchwood.client.receiver;
5 
6 import core.thread : Thread, dur;
7 
8 import std.container.slist : SList;
9 import core.sync.mutex : Mutex;
10 
11 import eventy : EventyEvent = Event;
12 
13 // TODO: Examine the below import which seemingly fixes stuff for libsnooze
14 import libsnooze.clib;
15 import libsnooze;
16 
17 import birchwood.client;
18 import birchwood.protocol.messages : Message, decodeMessage;
19 import std.string : indexOf;
20 import birchwood.client.events : PongEvent, IRCEvent;
21 import std.string : cmp;
22 
23 version(unittest)
24 {
25     import std.stdio : writeln;
26 }
27 
28 /** 
29  * Manages the receive queue and performs
30  * message parsing and event triggering
31  * based on said messages
32  */
33 public final class ReceiverThread : Thread
34 {
35     /** 
36      * The receive queue
37      */
38     private SList!(ubyte[]) recvQueue;
39 
40     /** 
41      * The receive queue's lock
42      */
43     private Mutex recvQueueLock;
44 
45     /** 
46      * The libsnooze event to await on which
47      * when we wake up signals a new message
48      * to be processed and received
49      */
50     private Event receiveEvent;
51 
52     /** 
53      * The associated IRC client
54      */
55     private Client client;
56 
57     /** 
58      * Constructs a new receiver thread with the associated
59      * client
60      *
61      * Params:
62      *   client = the Client to associate with
63      * Throws:
64      *   `SnoozeError` on failure to construct an
65      * `Event` or ensure ourselves
66      */
67     this(Client client)
68     {
69         super(&recvHandlerFunc);
70         this.client = client;
71         this.receiveEvent = new Event();
72         this.recvQueueLock = new Mutex();
73         this.receiveEvent.ensure(this);
74     }
75 
76     /** 
77      * Enqueues the raw message into the receieve queue
78      * for eventual processing
79      *
80      * Params:
81      *   encodedMessage = the message to enqueue
82      */
83     public void rq(ubyte[] encodedMessage)
84     {
85         /* Lock queue */
86         recvQueueLock.lock();
87 
88         /* Add to queue */
89         recvQueue.insertAfter(recvQueue[], encodedMessage);
90 
91         /* Unlock queue */
92         recvQueueLock.unlock();
93 
94         /** 
95          * Wake up all threads waiting on this event
96          * (if any, and if so it would only be the receiver)
97          */
98         receiveEvent.notifyAll();
99     }
100 
101     /** 
102      * The receive queue worker function
103      *
104      * This has the job of dequeuing messages
105      * in the receive queue, decoding them
106      * into Message objects and then emitting
107      * an event depending on the type of message
108      *
109      * Handles PINGs along with normal messages
110      */
111     private void recvHandlerFunc()
112     {
113         while(client.running)
114         {
115             // TODO: We could look at libsnooze wait starvation or mutex racing (future thought)
116 
117             try
118             {
119                 receiveEvent.wait();
120             }
121             catch(InterruptedException e)
122             {
123                 version(unittest)
124                 {
125                     writeln("wait() interrupted");
126                 }
127                 continue;
128             }
129             catch(FatalException e)
130             {
131                 // TODO: This should crash and end
132                 version(unittest)
133                 {
134                     writeln("wait() had a FATAL error!!!!!!!!!!!");
135                 }
136                 continue;
137             }
138                         
139 
140             /* Lock the receieve queue */
141             recvQueueLock.lock();
142 
143             /* Parsed messages */
144             SList!(Message) currentMessageQueue;
145 
146             /** 
147              * Parse all messages and save them
148              * into the above array
149              */
150             foreach(ubyte[] message; recvQueue[])
151             {
152                 /* Decode the message */
153                 string decodedMessage = decodeMessage(message);
154 
155                 /* Parse the message */
156                 Message parsedMessage = Message.parseReceivedMessage(decodedMessage);
157 
158                 /* Save it */
159                 currentMessageQueue.insertAfter(currentMessageQueue[], parsedMessage);
160             }
161 
162 
163             /** 
164              * Search for any PING messages, then store it if so
165              * and remove it so it isn't processed again later
166              */
167             Message pingMessage;
168             foreach(Message curMsg; currentMessageQueue[])
169             {
170                 if(cmp(curMsg.getCommand(), "PING") == 0)
171                 {
172                     currentMessageQueue.linearRemoveElement(curMsg);
173                     pingMessage = curMsg;
174                     break;
175                 }
176             }
177 
178             /** 
179              * If we have a PING then respond with a PONG
180              */
181             if(pingMessage !is null)
182             {
183                 logger.log("Found a ping: "~pingMessage.toString());
184 
185                 /* Extract the PING ID */
186                 string pingID = pingMessage.getParams();
187 
188                 /* Spawn a PONG event */
189                 EventyEvent pongEvent = new PongEvent(pingID);
190                 client.engine.push(pongEvent);
191             }
192 
193 
194 
195 
196            
197 
198 
199             /**
200             * TODO: Plan of action
201             *
202             * 1. Firstly, we must run `parseReceivedMessage()` on the dequeued
203             *    ping message (if any)
204             * 2. Then (if there was a PING) trigger said PING handler
205             * 3. Normal message handling; `parseReceivedMessage()` on one of the messages
206             * (make the dequeue amount configurable possibly)
207             * 4. Trigger generic handler
208             * 5. We might need to also have a queue for commands ISSUED and command-replies
209             *    RECEIVED and then match those first and do something with them (tasky-esque)
210             * 6. We can just make a generic reply queue of these things - we have to maybe to this
211             * - we can cache or remember stuff when we get 353
212             */
213 
214             /** 
215              * Process each message remaining in the queue now
216              * till it is empty
217              */
218             while(!currentMessageQueue.empty())
219             {
220                 /* Get the frontmost Message */
221                 Message curMsg = currentMessageQueue.front();
222 
223                 // TODO: Remove the Eventy push and replace with a handler call (on second thought no)
224                 EventyEvent ircEvent = new IRCEvent(curMsg);
225                 client.engine.push(ircEvent);
226 
227                 /* Remove the message from the queue */
228                 currentMessageQueue.linearRemoveElement(curMsg);
229             }
230 
231             /* Clear the receive queue */
232             recvQueue.clear();
233         
234             /* Unlock the receive queue */
235             recvQueueLock.unlock();
236         }
237     }
238 
239     /** 
240      * Stops the receive queue manager
241      */
242     public void end()
243     {
244         // TODO: See above notes about libsnooze behaviour due
245         // ... to usage in our context
246         receiveEvent.notifyAll();
247     }
248 }