Barst  2.0
A server that controls lab hardware.
named pipes.cpp
1 
2 
3 #include "named pipes.h"
4 #include "Log buffer.h"
5 
6 
7 
8 __int64 g_llMaxQueueBytes = -1;
9 
10 __int64 s_llQueueBytes = 0;
11 CRITICAL_SECTION s_hLimitSafe;
12 
13 void InitializeQueueLimit()
14 {
15  InitializeCriticalSection(&s_hLimitSafe);
16 }
17 
18 inline bool IncreaseQueue(__int64 llSize)
19 {
20  bool bRes = 0;
21  if (g_llMaxQueueBytes < 0)
22  return 1;
23  EnterCriticalSection(&s_hLimitSafe);
24  if (s_llQueueBytes + llSize <= g_llMaxQueueBytes)
25  {
26  s_llQueueBytes += llSize;
27  bRes = 1;
28  }
29  LeaveCriticalSection(&s_hLimitSafe);
30  return bRes;
31 }
32 
33 inline void DecreaseQueue(__int64 llSize)
34 {
35  if (g_llMaxQueueBytes < 0)
36  return;
37  EnterCriticalSection(&s_hLimitSafe);
38  s_llQueueBytes -= llSize;
39  LeaveCriticalSection(&s_hLimitSafe);
40 }
41 
42 
44 // Starting point of queue thread
45 DWORD WINAPI PipeProc(LPVOID lpParameter)
46 {
47  // Call ThreadProc function of pipe object
48  return ((CPipeServer*)lpParameter)->ThreadProc();
49 }
50 
51 
52 
53 CPipeServer::CPipeServer()
54 {
55  m_hThread= NULL;
56  m_hDone= NULL;
57  m_pcDevice= NULL;
58  m_pcLogBuffer= NULL;
59  m_pcMemPool= new CMemPool;
60  m_sStream.str(_T(""));
61  m_bWorking= false;
62  m_llNextId= 0;
63  InitializeCriticalSection(&m_sPipeSafe);
64  InitializeCriticalSection(&m_sInitSafe);
65 }
66 
67 CPipeServer::~CPipeServer()
68 {
69  Close();
70  DeleteCriticalSection(&m_sPipeSafe);
71  DeleteCriticalSection(&m_sInitSafe);
72  delete m_pcMemPool;
73 }
74 
75 int CPipeServer::Init(const TCHAR szPipe[], int nPipes, DWORD dwBuffSizeIn, DWORD dwBuffSizeOut, CDevice *cDevice,
76  CLogBuffer *cLogBuffer)
77 {
78  if (m_hThread || !szPipe || nPipes < 1 || !cDevice) // validate params
79  return BAD_INPUT_PARAMS;
80 
81  EnterCriticalSection(&m_sInitSafe);
82  if (m_bWorking) // cannot init twice
83  {
84  LeaveCriticalSection(&m_sInitSafe);
85  return ALREADY_OPEN;
86  }
87 
88  m_csPipe= szPipe; // save params
89  m_pcDevice= cDevice;
90  m_pcLogBuffer= cLogBuffer;
91  // objects used are: 1 to signal done and then 2 for each pipe handle which limits total number of pipes
92  m_nMaxPipes= nPipes>min((MAXIMUM_WAIT_OBJECTS-1)/2, PIPE_UNLIMITED_INSTANCES)?min((MAXIMUM_WAIT_OBJECTS-1)/2, PIPE_UNLIMITED_INSTANCES):nPipes;
93  m_dwBuffSizeIn= dwBuffSizeIn < MIN_PIPE_BUF_SIZE ? MIN_PIPE_BUF_SIZE : dwBuffSizeIn;
94  m_dwBuffSizeOut= dwBuffSizeOut < MIN_PIPE_BUF_SIZE ? MIN_PIPE_BUF_SIZE : dwBuffSizeOut;
95  m_sStream.str(_T(""));
96 
97  m_hDone= CreateEvent(NULL, TRUE, FALSE, NULL); // indicates closing
98  // first pipe
99  SPipeResource *sPipeResource= new SPipeResource(CreateEvent(NULL, TRUE, FALSE, NULL)); // writing event
100  sPipeResource->hPipe= CreateNamedPipe(m_csPipe.c_str(), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
101  PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES,
102  m_dwBuffSizeOut, m_dwBuffSizeIn, 0, NULL);
103  sPipeResource->hEvent= CreateEvent(NULL, TRUE, FALSE, NULL); // the first pipe event starts signaled
104  sPipeResource->oOverlap.hEvent= sPipeResource->hEvent;
105  sPipeResource->pRead= m_pcMemPool->PoolAcquire(m_dwBuffSizeIn); // read into this memory
106 
107  m_aPipes.clear();
108  m_aPipes.push_back(sPipeResource); // first pipe
109  m_ahEvents.clear();
110  m_ahEvents.push_back(m_hDone);
111  m_ahEvents.push_back(sPipeResource->hEvent); // general pipe event
112  m_ahEvents.push_back(sPipeResource->hWriteEvent); // followed by pipe writing event
113  // now verify the resources
114  if (!m_hDone || !sPipeResource->hWriteEvent || sPipeResource->hPipe == INVALID_HANDLE_VALUE || !sPipeResource->hEvent
115  || !sPipeResource->pRead)
116  {
117  Close();
118  if (m_pcLogBuffer)
119  {
120  m_sStream<<_T("3;")<<GetLastError()<<_T(";Couldn't open some pipe resources for the first pipe instance.");
121  m_pcLogBuffer->Add(m_sStream.str().c_str());
122  m_sStream.str(_T(""));
123  }
124  LeaveCriticalSection(&m_sInitSafe);
125  return NO_SYS_RESOURCE;
126  }
127  sPipeResource->llId= m_llNextId++; // assign unique ID for pipe
128  sPipeResource->fPending= TRUE; // assume pending result
129  ConnectNamedPipe(sPipeResource->hPipe, &sPipeResource->oOverlap); // connect so pipe is ready before we return
130  switch (GetLastError())
131  {
132  case ERROR_PIPE_CONNECTED: // success
133  SetEvent(sPipeResource->hEvent); // set it so we'll read next
134  sPipeResource->fPending= FALSE;
135  case ERROR_IO_PENDING:
136  sPipeResource->eStatus= kConnecting; // either way we are still connecting
137  break;
138  default: // error, just delete this pipe instance
139  sPipeResource->llId= m_llNextId++; // so that we won't get data from previous user
140  sPipeResource->ResetResource(); // reset everything
141  break;
142  }
143  m_bWorking= true;
144 
145  // now thread that opens connection
146  m_hThread= CreateThread(NULL, 0, PipeProc, this, 0, NULL);
147  if (!m_hThread)
148  {
149  Close();
150  if (m_pcLogBuffer)
151  {
152  m_sStream<<_T("3;")<<GetLastError()<<_T(";Couldn't create pipe thread.");
153  m_pcLogBuffer->Add(m_sStream.str().c_str());
154  m_sStream.str(_T(""));
155  }
156  LeaveCriticalSection(&m_sInitSafe);
157  return NO_SYS_RESOURCE;
158  }
159  LeaveCriticalSection(&m_sInitSafe);
160  return 0;
161 }
162 
163 DWORD CPipeServer::ThreadProc()
164 {
165  bool bError= false, bNotEmpty;
166  m_nConnected= 0;
167  DWORD dwBytes;
168  while (1)
169  {
170  EnterCriticalSection(&m_sPipeSafe);
171  // if all the open pipes are connected open another one unless we reached max or error occured before
172  if (m_nConnected == m_aPipes.size() && m_aPipes.size() < m_nMaxPipes && !bError)
173  {
174  SPipeResource *sPipeResource= new SPipeResource(CreateEvent(NULL, TRUE, FALSE, NULL)); // writing event
175  sPipeResource->hPipe= CreateNamedPipe(m_csPipe.c_str(), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
176  PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES,
177  m_dwBuffSizeOut, m_dwBuffSizeIn, 0, NULL);
178  sPipeResource->hEvent= CreateEvent(NULL, TRUE, TRUE, NULL); // starts signaled
179  sPipeResource->pRead= m_pcMemPool->PoolAcquire(m_dwBuffSizeIn); // read into this memory
180  if (sPipeResource->hPipe == INVALID_HANDLE_VALUE || !sPipeResource->hEvent || !sPipeResource->hWriteEvent
181  || !sPipeResource->pRead)
182  {
183  if (m_pcLogBuffer)
184  {
185  m_sStream<<_T("2;")<<GetLastError()<<_T(";Couldn't open some pipe resources for secondary pipe instance.");
186  m_pcLogBuffer->Add(m_sStream.str().c_str());
187  m_sStream.str(_T(""));
188  }
189  m_pcMemPool->PoolRelease(sPipeResource->pRead);
190  DecreaseQueue(sPipeResource->ClearQueue());
191  delete sPipeResource; // automatically closes everything
192  bError= true; // once error occured, don't open new handles
193  } else // save it
194  {
195  sPipeResource->oOverlap.hEvent= sPipeResource->hEvent;
196  sPipeResource->llId= m_llNextId++;
197  m_aPipes.push_back(sPipeResource);
198  m_ahEvents.push_back(sPipeResource->hEvent);
199  m_ahEvents.push_back(sPipeResource->hWriteEvent);
200  }
201  // now we are ready to connect
202  }
203  LeaveCriticalSection(&m_sPipeSafe);
204 
205  DWORD dwWait= WaitForMultipleObjects((DWORD)m_ahEvents.size(), &m_ahEvents[0], FALSE, INFINITE);
206  if (dwWait == WAIT_OBJECT_0) // first object indicates we need to close.
207  break;
208  EnterCriticalSection(&m_sPipeSafe);
209  if (dwWait-WAIT_OBJECT_0 > m_ahEvents.size()-1 || dwWait-WAIT_OBJECT_0 < 0)
210  {
211  if (m_pcLogBuffer)
212  {
213  m_sStream<<_T("1;")<<GetLastError()<<_T(";WaitForMultipleObjects returned invalid index.");
214  m_pcLogBuffer->Add(m_sStream.str().c_str());
215  m_sStream.str(_T(""));
216  }
217  LeaveCriticalSection(&m_sPipeSafe);
218  continue;
219  }
220 
221  // now we find the pipe that signaled
222  DWORD i= (DWORD)floor((dwWait-WAIT_OBJECT_0-1)/2.0);
223  if (!((dwWait-WAIT_OBJECT_0-1)%2)) // first pipe event
224  {
225  ResetEvent(m_aPipes[i]->hEvent);
226  switch (m_aPipes[i]->eStatus) // what do we need to do on pipe?
227  {
228  case kCreated: // we need to connect since it was just created
229  m_aPipes[i]->fPending= TRUE; // assume pending result
230  ConnectNamedPipe(m_aPipes[i]->hPipe, &m_aPipes[i]->oOverlap);
231  switch (GetLastError())
232  {
233  case ERROR_PIPE_CONNECTED: // success
234  SetEvent(m_aPipes[i]->hEvent); // set it so we'll read next
235  m_aPipes[i]->fPending= FALSE;
236  case ERROR_IO_PENDING:
237  m_aPipes[i]->eStatus= kConnecting; // either way we are still connecting
238  break;
239  default: // error, just delete this pipe instance
240  if (m_nMaxPipes == 1) // don't delete this pipe b/c it will disconnect server
241  {
242  m_aPipes[i]->llId= m_llNextId++; // so that we won't get data from previous user
243  m_aPipes[i]->ResetResource(); // reset everything
244  } else
245  {
246  m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
247  DecreaseQueue(m_aPipes[i]->ClearQueue());
248  delete m_aPipes[i];
249  m_aPipes.erase(m_aPipes.begin()+i);
250  m_ahEvents.erase(m_ahEvents.begin()+2*i+1, m_ahEvents.begin()+2*i+3);
251  }
252  break;
253  }
254  break;
255  case kConnecting: // finish connecting and read
256  if (m_aPipes[i]->fPending) // was pending
257  {
258  if (!GetOverlappedResult(m_aPipes[i]->hPipe, &m_aPipes[i]->oOverlap, &dwBytes, FALSE)) // failed
259  {
260  if (m_nMaxPipes == 1) // don't delete this pipe b/c it will disconnect server
261  {
262  m_aPipes[i]->llId= m_llNextId++; // so that we won't get data from previous user
263  m_aPipes[i]->ResetResource(); // reset everything
264  } else
265  {
266  m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
267  DecreaseQueue(m_aPipes[i]->ClearQueue());
268  delete m_aPipes[i];
269  m_aPipes.erase(m_aPipes.begin()+i);
270  m_ahEvents.erase(m_ahEvents.begin()+2*i+1, m_ahEvents.begin()+2*i+3);
271  }
272  LeaveCriticalSection(&m_sPipeSafe);
273  continue;
274  }
275  m_aPipes[i]->fPending= FALSE;
276  }
277  // now we need to read, set so we'll go into reading mode
278  ++m_nConnected;
279  m_aPipes[i]->eStatus= kReading;
280  SetEvent(m_aPipes[i]->hEvent);
281  break;
282  case kReading:
283  if (m_aPipes[i]->fPending) // was pending
284  {
285  if (!GetOverlappedResult(m_aPipes[i]->hPipe, &m_aPipes[i]->oOverlap, &dwBytes, FALSE)) // failed
286  {
287  if (m_nMaxPipes == 1) // don't delete this pipe b/c it will disconnect server
288  {
289  m_aPipes[i]->llId= m_llNextId++; // so that we won't get data from previous user
290  SData* sData; // clear write queue
291  bool bRes;
292  while (m_aPipes[i]->cWriteQueue.GetSize())
293  {
294  sData= m_aPipes[i]->cWriteQueue.Front(true, bRes);
295  if (sData)
296  DecreaseQueue(sData->dwSize);
297  sData->pDevice->Result(sData->pHead, false);
298  delete sData;
299  }
300  m_aPipes[i]->ResetResource(); // reset everything
301  } else
302  {
303  m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
304  DecreaseQueue(m_aPipes[i]->ClearQueue());
305  delete m_aPipes[i];
306  m_aPipes.erase(m_aPipes.begin()+i);
307  m_ahEvents.erase(m_ahEvents.begin()+2*i+1, m_ahEvents.begin()+2*i+3);
308  --m_nConnected;
309  }
310  LeaveCriticalSection(&m_sPipeSafe);
311  continue;
312  }
313  LeaveCriticalSection(&m_sPipeSafe);
314  m_pcDevice->ProcessData(m_aPipes[i]->pRead, dwBytes, m_aPipes[i]->llId); // finish up
315  EnterCriticalSection(&m_sPipeSafe);
316  m_aPipes[i]->fPending= FALSE;
317  }
318 
319  if (ReadFile(m_aPipes[i]->hPipe, m_aPipes[i]->pRead, m_dwBuffSizeIn, &dwBytes, &m_aPipes[i]->oOverlap))
320  {
321  LeaveCriticalSection(&m_sPipeSafe);
322  m_pcDevice->ProcessData(m_aPipes[i]->pRead, dwBytes, m_aPipes[i]->llId);
323  EnterCriticalSection(&m_sPipeSafe);
324  SetEvent(m_aPipes[i]->hEvent); // read again
325  } else if (GetLastError() == ERROR_IO_PENDING)
326  {
327  m_aPipes[i]->fPending= TRUE; // finish later
328  } else
329  {
330  if (m_nMaxPipes == 1) // don't delete this pipe b/c it will disconnect server
331  {
332  m_aPipes[i]->llId= m_llNextId++; // so that we won't get data from previous user
333  SData* sData; // clear write queue
334  bool bRes;
335  while (m_aPipes[i]->cWriteQueue.GetSize())
336  {
337  sData= m_aPipes[i]->cWriteQueue.Front(true, bRes);
338  if (sData)
339  DecreaseQueue(sData->dwSize);
340  sData->pDevice->Result(sData->pHead, false);
341  delete sData;
342  }
343  m_aPipes[i]->ResetResource(); // reset everything
344  } else
345  {
346  m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
347  DecreaseQueue(m_aPipes[i]->ClearQueue());
348  delete m_aPipes[i];
349  m_aPipes.erase(m_aPipes.begin()+i);
350  m_ahEvents.erase(m_ahEvents.begin()+2*i+1, m_ahEvents.begin()+2*i+3);
351  --m_nConnected;
352  }
353  }
354  break;
355  }
356  } else
357  {
358  ResetEvent(m_aPipes[i]->hWriteEvent);
359  SData* sData= m_aPipes[i]->cWriteQueue.Front(false, bNotEmpty); // next item to write/written
360  if (!bNotEmpty) // nothing to write/written
361  {
362  m_aPipes[i]->fWritePending= FALSE;
363  LeaveCriticalSection(&m_sPipeSafe);
364  continue;
365  }
366  if (m_aPipes[i]->fWritePending) // finish previous write
367  {
368  sData->pDevice->Result(sData->pHead, GetOverlappedResult(m_aPipes[i]->hPipe, &m_aPipes[i]->oWriteOverlap, &dwBytes, FALSE)
369  && dwBytes == sData->dwSize);
370  m_aPipes[i]->fWritePending= FALSE;
371  DecreaseQueue(sData->dwSize);
372  delete m_aPipes[i]->cWriteQueue.Front(true, bNotEmpty);
373  if (m_aPipes[i]->cWriteQueue.GetSize()) // write next element
374  SetEvent(m_aPipes[i]->hWriteEvent);
375  } else // write next item
376  {
377  if (WriteFile(m_aPipes[i]->hPipe, sData->pHead, sData->dwSize, &dwBytes, &m_aPipes[i]->oWriteOverlap))
378  { // success
379  sData->pDevice->Result(sData->pHead, true);
380  DecreaseQueue(sData->dwSize);
381  delete m_aPipes[i]->cWriteQueue.Front(true, bNotEmpty);
382  if (m_aPipes[i]->cWriteQueue.GetSize()) // write next element
383  SetEvent(m_aPipes[i]->hWriteEvent);
384  } else if (GetLastError() == ERROR_IO_PENDING)
385  m_aPipes[i]->fWritePending= TRUE;
386  else
387  {
388  sData->pDevice->Result(sData->pHead, false);
389  DecreaseQueue(sData->dwSize);
390  delete m_aPipes[i]->cWriteQueue.Front(true, bNotEmpty);
391  if (m_aPipes[i]->cWriteQueue.GetSize()) // write next element
392  SetEvent(m_aPipes[i]->hWriteEvent);
393  }
394  }
395  }
396  LeaveCriticalSection(&m_sPipeSafe);
397  }
398 
399  return 0;
400 }
401 
403 {
404  // close thread handle
405  EnterCriticalSection(&m_sInitSafe);
406  m_bWorking= false;
407 
408  DWORD dwRes= WAIT_OBJECT_0 +1;
409  if (m_hThread)
410  {
411  if (m_hDone)
412  dwRes= SignalObjectAndWait(m_hDone, m_hThread, 2000, FALSE);
413  if (dwRes != WAIT_OBJECT_0)
414  {
415  TerminateThread(m_hThread, 0);
416  DeleteCriticalSection(&m_sPipeSafe); // we do this so that we can enter cc after close, in case it was terminated in cc.
417  InitializeCriticalSection(&m_sPipeSafe);
418  }
419  }
420  for (size_t i= 0; i<m_aPipes.size();++i)
421  {
422  m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
423  DecreaseQueue(m_aPipes[i]->ClearQueue());
424  delete m_aPipes[i];
425  }
426  m_aPipes.clear();
427  if (m_hDone) CloseHandle(m_hDone);
428  if (m_hThread) CloseHandle(m_hThread);
429  m_hThread= NULL;
430  m_hDone= NULL;
431  m_pcDevice= NULL;
432  m_pcLogBuffer= NULL;
433  m_sStream.str(_T(""));
434  LeaveCriticalSection(&m_sInitSafe);
435 }
436 
437 int CPipeServer::SendData(const SData *pData, __int64 llId)
438 {
439  if (!pData || !pData->pHead || !pData->pDevice || !pData->dwSize)
440  return TRUE;
441  EnterCriticalSection(&m_sInitSafe); // so that close (terminate) cannot be called on this thread
442  if (!m_bWorking) // haven't activated this comm
443  {
444  LeaveCriticalSection(&m_sInitSafe);
445  return TRUE;
446  }
447 
448  BOOL fRes= TRUE; // assume failure
449  EnterCriticalSection(&m_sPipeSafe); // so that pipe won't close on us suddenly
450  for (DWORD i= 0; i<m_aPipes.size(); ++i)
451  {
452  if (m_aPipes[i]->llId == llId)
453  {
454  if (IncreaseQueue(pData->dwSize))
455  {
456  m_aPipes[i]->cWriteQueue.Push(new SData(*pData));
457  fRes= FALSE;
458  }
459  break;
460  }
461  }
462  LeaveCriticalSection(&m_sPipeSafe);
463  LeaveCriticalSection(&m_sInitSafe);
464  return fRes;
465 }
int SendData(const SData *pData, __int64 llId)
int Add(const TCHAR *szItem)
Definition: Log buffer.h:39
virtual void Result(void *pHead, bool bPass)=0
int Init(const TCHAR szPipe[], int nPipes, DWORD dwBuffSizeIn, DWORD dwBuffSizeOut, CDevice *cDevice, CLogBuffer *cLogBuffer)
Definition: named pipes.cpp:75
virtual void ProcessData(const void *pHead, DWORD dwSize, __int64 llId)=0