1 /** 2 * Receive queue management 3 */ 4 module birchwood.client.receiver; 5 6 import core.thread : Thread, dur; 7 8 import std.container.slist : SList; 9 import core.sync.mutex : Mutex; 10 11 import eventy : EventyEvent = Event; 12 13 // TODO: Examine the below import which seemingly fixes stuff for libsnooze 14 import libsnooze.clib; 15 import libsnooze; 16 17 import birchwood.client; 18 import birchwood.protocol.messages : Message, decodeMessage; 19 import std.string : indexOf; 20 import birchwood.client.events : PongEvent, IRCEvent; 21 import std.string : cmp; 22 23 version(unittest) 24 { 25 import std.stdio : writeln; 26 } 27 28 /** 29 * Manages the receive queue and performs 30 * message parsing and event triggering 31 * based on said messages 32 */ 33 public final class ReceiverThread : Thread 34 { 35 /** 36 * The receive queue 37 */ 38 private SList!(ubyte[]) recvQueue; 39 40 /** 41 * The receive queue's lock 42 */ 43 private Mutex recvQueueLock; 44 45 /** 46 * The libsnooze event to await on which 47 * when we wake up signals a new message 48 * to be processed and received 49 */ 50 private Event receiveEvent; 51 52 /** 53 * The associated IRC client 54 */ 55 private Client client; 56 57 /** 58 * Constructs a new receiver thread with the associated 59 * client 60 * 61 * Params: 62 * client = the Client to associate with 63 * Throws: 64 * `SnoozeError` on failure to construct an 65 * `Event` or ensure ourselves 66 */ 67 this(Client client) 68 { 69 super(&recvHandlerFunc); 70 this.client = client; 71 this.receiveEvent = new Event(); 72 this.recvQueueLock = new Mutex(); 73 this.receiveEvent.ensure(this); 74 } 75 76 /** 77 * Enqueues the raw message into the receieve queue 78 * for eventual processing 79 * 80 * Params: 81 * encodedMessage = the message to enqueue 82 */ 83 public void rq(ubyte[] encodedMessage) 84 { 85 /* Lock queue */ 86 recvQueueLock.lock(); 87 88 /* Add to queue */ 89 recvQueue.insertAfter(recvQueue[], encodedMessage); 90 91 /* Unlock queue */ 92 recvQueueLock.unlock(); 93 94 /** 95 * Wake up all threads waiting on this event 96 * (if any, and if so it would only be the receiver) 97 */ 98 receiveEvent.notifyAll(); 99 } 100 101 /** 102 * The receive queue worker function 103 * 104 * This has the job of dequeuing messages 105 * in the receive queue, decoding them 106 * into Message objects and then emitting 107 * an event depending on the type of message 108 * 109 * Handles PINGs along with normal messages 110 */ 111 private void recvHandlerFunc() 112 { 113 while(client.running) 114 { 115 // TODO: We could look at libsnooze wait starvation or mutex racing (future thought) 116 117 try 118 { 119 receiveEvent.wait(); 120 } 121 catch(InterruptedException e) 122 { 123 version(unittest) 124 { 125 writeln("wait() interrupted"); 126 } 127 continue; 128 } 129 catch(FatalException e) 130 { 131 // TODO: This should crash and end 132 version(unittest) 133 { 134 writeln("wait() had a FATAL error!!!!!!!!!!!"); 135 } 136 continue; 137 } 138 139 140 /* Lock the receieve queue */ 141 recvQueueLock.lock(); 142 143 /* Parsed messages */ 144 SList!(Message) currentMessageQueue; 145 146 /** 147 * Parse all messages and save them 148 * into the above array 149 */ 150 foreach(ubyte[] message; recvQueue[]) 151 { 152 /* Decode the message */ 153 string decodedMessage = decodeMessage(message); 154 155 /* Parse the message */ 156 Message parsedMessage = Message.parseReceivedMessage(decodedMessage); 157 158 /* Save it */ 159 currentMessageQueue.insertAfter(currentMessageQueue[], parsedMessage); 160 } 161 162 163 /** 164 * Search for any PING messages, then store it if so 165 * and remove it so it isn't processed again later 166 */ 167 Message pingMessage; 168 foreach(Message curMsg; currentMessageQueue[]) 169 { 170 if(cmp(curMsg.getCommand(), "PING") == 0) 171 { 172 currentMessageQueue.linearRemoveElement(curMsg); 173 pingMessage = curMsg; 174 break; 175 } 176 } 177 178 /** 179 * If we have a PING then respond with a PONG 180 */ 181 if(pingMessage !is null) 182 { 183 logger.log("Found a ping: "~pingMessage.toString()); 184 185 /* Extract the PING ID */ 186 string pingID = pingMessage.getParams(); 187 188 /* Spawn a PONG event */ 189 EventyEvent pongEvent = new PongEvent(pingID); 190 client.engine.push(pongEvent); 191 } 192 193 194 195 196 197 198 199 /** 200 * TODO: Plan of action 201 * 202 * 1. Firstly, we must run `parseReceivedMessage()` on the dequeued 203 * ping message (if any) 204 * 2. Then (if there was a PING) trigger said PING handler 205 * 3. Normal message handling; `parseReceivedMessage()` on one of the messages 206 * (make the dequeue amount configurable possibly) 207 * 4. Trigger generic handler 208 * 5. We might need to also have a queue for commands ISSUED and command-replies 209 * RECEIVED and then match those first and do something with them (tasky-esque) 210 * 6. We can just make a generic reply queue of these things - we have to maybe to this 211 * - we can cache or remember stuff when we get 353 212 */ 213 214 /** 215 * Process each message remaining in the queue now 216 * till it is empty 217 */ 218 while(!currentMessageQueue.empty()) 219 { 220 /* Get the frontmost Message */ 221 Message curMsg = currentMessageQueue.front(); 222 223 // TODO: Remove the Eventy push and replace with a handler call (on second thought no) 224 EventyEvent ircEvent = new IRCEvent(curMsg); 225 client.engine.push(ircEvent); 226 227 /* Remove the message from the queue */ 228 currentMessageQueue.linearRemoveElement(curMsg); 229 } 230 231 /* Clear the receive queue */ 232 recvQueue.clear(); 233 234 /* Unlock the receive queue */ 235 recvQueueLock.unlock(); 236 } 237 } 238 239 /** 240 * Stops the receive queue manager 241 */ 242 public void end() 243 { 244 // TODO: See above notes about libsnooze behaviour due 245 // ... to usage in our context 246 receiveEvent.notifyAll(); 247 } 248 }