1 /** 2 * Receive queue management 3 */ 4 module birchwood.client.receiver; 5 6 import core.thread : Thread, dur; 7 8 import std.container.slist : SList; 9 import core.sync.mutex : Mutex; 10 import core.sync.condition : Condition; 11 12 import eventy : EventyEvent = Event; 13 14 import birchwood.client; 15 import birchwood.protocol.messages : Message, decodeMessage; 16 import std.string : indexOf; 17 import birchwood.client.events : PongEvent, IRCEvent; 18 import std.string : cmp; 19 20 version(unittest) 21 { 22 import std.stdio : writeln; 23 } 24 import birchwood.logging; 25 26 /** 27 * Manages the receive queue and performs 28 * message parsing and event triggering 29 * based on said messages 30 */ 31 public final class ReceiverThread : Thread 32 { 33 /** 34 * The receive queue 35 */ 36 private SList!(ubyte[]) recvQueue; 37 38 /** 39 * The receive queue's lock 40 */ 41 private Mutex recvQueueLock; 42 43 /** 44 * Condition variable for waking 45 * up receive queue reader 46 */ 47 private Condition recvQueueCond; 48 49 /** 50 * The associated IRC client 51 */ 52 private Client client; 53 54 /** 55 * Constructs a new receiver thread with the associated 56 * client 57 * 58 * Params: 59 * client = the Client to associate with 60 * Throws: 61 * `SnoozeError` on failure to construct an 62 * `Event` or ensure ourselves 63 */ 64 this(Client client) 65 { 66 super(&recvHandlerFunc); 67 this.client = client; 68 this.recvQueueLock = new Mutex(); 69 this.recvQueueCond = new Condition(this.recvQueueLock); 70 } 71 72 /** 73 * Enqueues the raw message into the receieve queue 74 * for eventual processing 75 * 76 * Params: 77 * encodedMessage = the message to enqueue 78 */ 79 public void rq(ubyte[] encodedMessage) 80 { 81 /* Lock queue */ 82 recvQueueLock.lock(); 83 84 /* Add to queue */ 85 recvQueue.insertAfter(recvQueue[], encodedMessage); 86 87 /* Wake the sleeping message handler */ 88 recvQueueCond.notify(); 89 90 /* Unlock queue */ 91 recvQueueLock.unlock(); 92 } 93 94 /** 95 * The receive queue worker function 96 * 97 * This has the job of dequeuing messages 98 * in the receive queue, decoding them 99 * into Message objects and then emitting 100 * an event depending on the type of message 101 * 102 * Handles PINGs along with normal messages 103 */ 104 private void recvHandlerFunc() 105 { 106 while(client.isRunning()) 107 { 108 /* Lock the queue */ 109 recvQueueLock.lock(); 110 111 /* Sleep till woken (new message) */ 112 recvQueueCond.wait(); // TODO: Check SyncError? 113 114 /* Parsed messages */ 115 SList!(Message) currentMessageQueue; 116 117 /** 118 * Parse all messages and save them 119 * into the above array 120 */ 121 foreach(ubyte[] message; recvQueue[]) 122 { 123 /* Decode the message */ 124 string decodedMessage = decodeMessage(message); 125 126 /* Parse the message */ 127 Message parsedMessage = Message.parseReceivedMessage(decodedMessage); 128 129 /* Save it */ 130 currentMessageQueue.insertAfter(currentMessageQueue[], parsedMessage); 131 } 132 133 134 /** 135 * Search for any PING messages, then store it if so 136 * and remove it so it isn't processed again later 137 */ 138 Message pingMessage; 139 foreach(Message curMsg; currentMessageQueue[]) 140 { 141 if(cmp(curMsg.getCommand(), "PING") == 0) 142 { 143 currentMessageQueue.linearRemoveElement(curMsg); 144 pingMessage = curMsg; 145 break; 146 } 147 } 148 149 /** 150 * If we have a PING then respond with a PONG 151 */ 152 if(pingMessage !is null) 153 { 154 DEBUG("Found a ping: "~pingMessage.toString()); 155 156 /* Extract the PING ID */ 157 string pingID = pingMessage.getParams(); 158 159 /* Spawn a PONG event */ 160 EventyEvent pongEvent = new PongEvent(pingID); 161 client.getEngine().push(pongEvent); 162 } 163 164 165 166 167 168 169 170 /** 171 * TODO: Plan of action 172 * 173 * 1. Firstly, we must run `parseReceivedMessage()` on the dequeued 174 * ping message (if any) 175 * 2. Then (if there was a PING) trigger said PING handler 176 * 3. Normal message handling; `parseReceivedMessage()` on one of the messages 177 * (make the dequeue amount configurable possibly) 178 * 4. Trigger generic handler 179 * 5. We might need to also have a queue for commands ISSUED and command-replies 180 * RECEIVED and then match those first and do something with them (tasky-esque) 181 * 6. We can just make a generic reply queue of these things - we have to maybe to this 182 * - we can cache or remember stuff when we get 353 183 */ 184 185 /** 186 * Process each message remaining in the queue now 187 * till it is empty 188 */ 189 while(!currentMessageQueue.empty()) 190 { 191 /* Get the frontmost Message */ 192 Message curMsg = currentMessageQueue.front(); 193 194 // TODO: Remove the Eventy push and replace with a handler call (on second thought no) 195 EventyEvent ircEvent = new IRCEvent(curMsg); 196 client.getEngine.push(ircEvent); 197 198 /* Remove the message from the queue */ 199 currentMessageQueue.linearRemoveElement(curMsg); 200 } 201 202 /* Clear the receive queue */ 203 recvQueue.clear(); 204 205 /* Unlock the receive queue */ 206 recvQueueLock.unlock(); 207 } 208 } 209 210 /** 211 * Stops the receive queue manager 212 */ 213 public void end() 214 { 215 /* Lock the queue */ 216 recvQueueLock.lock(); 217 218 /* Wake up sleeping thread (so it can exit) */ 219 recvQueueCond.notify(); 220 221 /* Unlock the queue */ 222 recvQueueLock.unlock(); 223 224 // Wait on the manager thread to end 225 join(); 226 } 227 }