3 #include "named pipes.h" 4 #include "Log buffer.h" 8 __int64 g_llMaxQueueBytes = -1;
10 __int64 s_llQueueBytes = 0;
11 CRITICAL_SECTION s_hLimitSafe;
13 void InitializeQueueLimit()
15 InitializeCriticalSection(&s_hLimitSafe);
18 inline bool IncreaseQueue(__int64 llSize)
21 if (g_llMaxQueueBytes < 0)
23 EnterCriticalSection(&s_hLimitSafe);
24 if (s_llQueueBytes + llSize <= g_llMaxQueueBytes)
26 s_llQueueBytes += llSize;
29 LeaveCriticalSection(&s_hLimitSafe);
33 inline void DecreaseQueue(__int64 llSize)
35 if (g_llMaxQueueBytes < 0)
37 EnterCriticalSection(&s_hLimitSafe);
38 s_llQueueBytes -= llSize;
39 LeaveCriticalSection(&s_hLimitSafe);
45 DWORD WINAPI PipeProc(LPVOID lpParameter)
53 CPipeServer::CPipeServer()
60 m_sStream.str(_T(
""));
63 InitializeCriticalSection(&m_sPipeSafe);
64 InitializeCriticalSection(&m_sInitSafe);
67 CPipeServer::~CPipeServer()
70 DeleteCriticalSection(&m_sPipeSafe);
71 DeleteCriticalSection(&m_sInitSafe);
78 if (m_hThread || !szPipe || nPipes < 1 || !cDevice)
79 return BAD_INPUT_PARAMS;
81 EnterCriticalSection(&m_sInitSafe);
84 LeaveCriticalSection(&m_sInitSafe);
90 m_pcLogBuffer= cLogBuffer;
92 m_nMaxPipes= nPipes>min((MAXIMUM_WAIT_OBJECTS-1)/2, PIPE_UNLIMITED_INSTANCES)?min((MAXIMUM_WAIT_OBJECTS-1)/2, PIPE_UNLIMITED_INSTANCES):nPipes;
93 m_dwBuffSizeIn= dwBuffSizeIn < MIN_PIPE_BUF_SIZE ? MIN_PIPE_BUF_SIZE : dwBuffSizeIn;
94 m_dwBuffSizeOut= dwBuffSizeOut < MIN_PIPE_BUF_SIZE ? MIN_PIPE_BUF_SIZE : dwBuffSizeOut;
95 m_sStream.str(_T(
""));
97 m_hDone= CreateEvent(NULL, TRUE, FALSE, NULL);
100 sPipeResource->hPipe= CreateNamedPipe(m_csPipe.c_str(), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
101 PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES,
102 m_dwBuffSizeOut, m_dwBuffSizeIn, 0, NULL);
103 sPipeResource->hEvent= CreateEvent(NULL, TRUE, FALSE, NULL);
104 sPipeResource->oOverlap.hEvent= sPipeResource->hEvent;
105 sPipeResource->pRead= m_pcMemPool->PoolAcquire(m_dwBuffSizeIn);
108 m_aPipes.push_back(sPipeResource);
110 m_ahEvents.push_back(m_hDone);
111 m_ahEvents.push_back(sPipeResource->hEvent);
112 m_ahEvents.push_back(sPipeResource->hWriteEvent);
114 if (!m_hDone || !sPipeResource->hWriteEvent || sPipeResource->hPipe == INVALID_HANDLE_VALUE || !sPipeResource->hEvent
115 || !sPipeResource->pRead)
120 m_sStream<<_T(
"3;")<<GetLastError()<<_T(
";Couldn't open some pipe resources for the first pipe instance.");
121 m_pcLogBuffer->
Add(m_sStream.str().c_str());
122 m_sStream.str(_T(
""));
124 LeaveCriticalSection(&m_sInitSafe);
125 return NO_SYS_RESOURCE;
127 sPipeResource->llId= m_llNextId++;
128 sPipeResource->fPending= TRUE;
129 ConnectNamedPipe(sPipeResource->hPipe, &sPipeResource->oOverlap);
130 switch (GetLastError())
132 case ERROR_PIPE_CONNECTED:
133 SetEvent(sPipeResource->hEvent);
134 sPipeResource->fPending= FALSE;
135 case ERROR_IO_PENDING:
136 sPipeResource->eStatus= kConnecting;
139 sPipeResource->llId= m_llNextId++;
140 sPipeResource->ResetResource();
146 m_hThread= CreateThread(NULL, 0, PipeProc,
this, 0, NULL);
152 m_sStream<<_T(
"3;")<<GetLastError()<<_T(
";Couldn't create pipe thread.");
153 m_pcLogBuffer->
Add(m_sStream.str().c_str());
154 m_sStream.str(_T(
""));
156 LeaveCriticalSection(&m_sInitSafe);
157 return NO_SYS_RESOURCE;
159 LeaveCriticalSection(&m_sInitSafe);
163 DWORD CPipeServer::ThreadProc()
165 bool bError=
false, bNotEmpty;
170 EnterCriticalSection(&m_sPipeSafe);
172 if (m_nConnected == m_aPipes.size() && m_aPipes.size() < m_nMaxPipes && !bError)
175 sPipeResource->hPipe= CreateNamedPipe(m_csPipe.c_str(), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
176 PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES,
177 m_dwBuffSizeOut, m_dwBuffSizeIn, 0, NULL);
178 sPipeResource->hEvent= CreateEvent(NULL, TRUE, TRUE, NULL);
179 sPipeResource->pRead= m_pcMemPool->PoolAcquire(m_dwBuffSizeIn);
180 if (sPipeResource->hPipe == INVALID_HANDLE_VALUE || !sPipeResource->hEvent || !sPipeResource->hWriteEvent
181 || !sPipeResource->pRead)
185 m_sStream<<_T(
"2;")<<GetLastError()<<_T(
";Couldn't open some pipe resources for secondary pipe instance.");
186 m_pcLogBuffer->
Add(m_sStream.str().c_str());
187 m_sStream.str(_T(
""));
189 m_pcMemPool->PoolRelease(sPipeResource->pRead);
190 DecreaseQueue(sPipeResource->ClearQueue());
191 delete sPipeResource;
195 sPipeResource->oOverlap.hEvent= sPipeResource->hEvent;
196 sPipeResource->llId= m_llNextId++;
197 m_aPipes.push_back(sPipeResource);
198 m_ahEvents.push_back(sPipeResource->hEvent);
199 m_ahEvents.push_back(sPipeResource->hWriteEvent);
203 LeaveCriticalSection(&m_sPipeSafe);
205 DWORD dwWait= WaitForMultipleObjects((DWORD)m_ahEvents.size(), &m_ahEvents[0], FALSE, INFINITE);
206 if (dwWait == WAIT_OBJECT_0)
208 EnterCriticalSection(&m_sPipeSafe);
209 if (dwWait-WAIT_OBJECT_0 > m_ahEvents.size()-1 || dwWait-WAIT_OBJECT_0 < 0)
213 m_sStream<<_T(
"1;")<<GetLastError()<<_T(
";WaitForMultipleObjects returned invalid index.");
214 m_pcLogBuffer->
Add(m_sStream.str().c_str());
215 m_sStream.str(_T(
""));
217 LeaveCriticalSection(&m_sPipeSafe);
222 DWORD i= (DWORD)floor((dwWait-WAIT_OBJECT_0-1)/2.0);
223 if (!((dwWait-WAIT_OBJECT_0-1)%2))
225 ResetEvent(m_aPipes[i]->hEvent);
226 switch (m_aPipes[i]->eStatus)
229 m_aPipes[i]->fPending= TRUE;
230 ConnectNamedPipe(m_aPipes[i]->hPipe, &m_aPipes[i]->oOverlap);
231 switch (GetLastError())
233 case ERROR_PIPE_CONNECTED:
234 SetEvent(m_aPipes[i]->hEvent);
235 m_aPipes[i]->fPending= FALSE;
236 case ERROR_IO_PENDING:
237 m_aPipes[i]->eStatus= kConnecting;
240 if (m_nMaxPipes == 1)
242 m_aPipes[i]->llId= m_llNextId++;
243 m_aPipes[i]->ResetResource();
246 m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
247 DecreaseQueue(m_aPipes[i]->ClearQueue());
249 m_aPipes.erase(m_aPipes.begin()+i);
250 m_ahEvents.erase(m_ahEvents.begin()+2*i+1, m_ahEvents.begin()+2*i+3);
256 if (m_aPipes[i]->fPending)
258 if (!GetOverlappedResult(m_aPipes[i]->hPipe, &m_aPipes[i]->oOverlap, &dwBytes, FALSE))
260 if (m_nMaxPipes == 1)
262 m_aPipes[i]->llId= m_llNextId++;
263 m_aPipes[i]->ResetResource();
266 m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
267 DecreaseQueue(m_aPipes[i]->ClearQueue());
269 m_aPipes.erase(m_aPipes.begin()+i);
270 m_ahEvents.erase(m_ahEvents.begin()+2*i+1, m_ahEvents.begin()+2*i+3);
272 LeaveCriticalSection(&m_sPipeSafe);
275 m_aPipes[i]->fPending= FALSE;
279 m_aPipes[i]->eStatus= kReading;
280 SetEvent(m_aPipes[i]->hEvent);
283 if (m_aPipes[i]->fPending)
285 if (!GetOverlappedResult(m_aPipes[i]->hPipe, &m_aPipes[i]->oOverlap, &dwBytes, FALSE))
287 if (m_nMaxPipes == 1)
289 m_aPipes[i]->llId= m_llNextId++;
292 while (m_aPipes[i]->cWriteQueue.GetSize())
294 sData= m_aPipes[i]->cWriteQueue.Front(
true, bRes);
296 DecreaseQueue(sData->dwSize);
297 sData->pDevice->
Result(sData->pHead,
false);
300 m_aPipes[i]->ResetResource();
303 m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
304 DecreaseQueue(m_aPipes[i]->ClearQueue());
306 m_aPipes.erase(m_aPipes.begin()+i);
307 m_ahEvents.erase(m_ahEvents.begin()+2*i+1, m_ahEvents.begin()+2*i+3);
310 LeaveCriticalSection(&m_sPipeSafe);
313 LeaveCriticalSection(&m_sPipeSafe);
314 m_pcDevice->
ProcessData(m_aPipes[i]->pRead, dwBytes, m_aPipes[i]->llId);
315 EnterCriticalSection(&m_sPipeSafe);
316 m_aPipes[i]->fPending= FALSE;
319 if (ReadFile(m_aPipes[i]->hPipe, m_aPipes[i]->pRead, m_dwBuffSizeIn, &dwBytes, &m_aPipes[i]->oOverlap))
321 LeaveCriticalSection(&m_sPipeSafe);
322 m_pcDevice->
ProcessData(m_aPipes[i]->pRead, dwBytes, m_aPipes[i]->llId);
323 EnterCriticalSection(&m_sPipeSafe);
324 SetEvent(m_aPipes[i]->hEvent);
325 }
else if (GetLastError() == ERROR_IO_PENDING)
327 m_aPipes[i]->fPending= TRUE;
330 if (m_nMaxPipes == 1)
332 m_aPipes[i]->llId= m_llNextId++;
335 while (m_aPipes[i]->cWriteQueue.GetSize())
337 sData= m_aPipes[i]->cWriteQueue.Front(
true, bRes);
339 DecreaseQueue(sData->dwSize);
340 sData->pDevice->
Result(sData->pHead,
false);
343 m_aPipes[i]->ResetResource();
346 m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
347 DecreaseQueue(m_aPipes[i]->ClearQueue());
349 m_aPipes.erase(m_aPipes.begin()+i);
350 m_ahEvents.erase(m_ahEvents.begin()+2*i+1, m_ahEvents.begin()+2*i+3);
358 ResetEvent(m_aPipes[i]->hWriteEvent);
359 SData* sData= m_aPipes[i]->cWriteQueue.Front(
false, bNotEmpty);
362 m_aPipes[i]->fWritePending= FALSE;
363 LeaveCriticalSection(&m_sPipeSafe);
366 if (m_aPipes[i]->fWritePending)
368 sData->pDevice->
Result(sData->pHead, GetOverlappedResult(m_aPipes[i]->hPipe, &m_aPipes[i]->oWriteOverlap, &dwBytes, FALSE)
369 && dwBytes == sData->dwSize);
370 m_aPipes[i]->fWritePending= FALSE;
371 DecreaseQueue(sData->dwSize);
372 delete m_aPipes[i]->cWriteQueue.Front(
true, bNotEmpty);
373 if (m_aPipes[i]->cWriteQueue.GetSize())
374 SetEvent(m_aPipes[i]->hWriteEvent);
377 if (WriteFile(m_aPipes[i]->hPipe, sData->pHead, sData->dwSize, &dwBytes, &m_aPipes[i]->oWriteOverlap))
379 sData->pDevice->
Result(sData->pHead,
true);
380 DecreaseQueue(sData->dwSize);
381 delete m_aPipes[i]->cWriteQueue.Front(
true, bNotEmpty);
382 if (m_aPipes[i]->cWriteQueue.GetSize())
383 SetEvent(m_aPipes[i]->hWriteEvent);
384 }
else if (GetLastError() == ERROR_IO_PENDING)
385 m_aPipes[i]->fWritePending= TRUE;
388 sData->pDevice->
Result(sData->pHead,
false);
389 DecreaseQueue(sData->dwSize);
390 delete m_aPipes[i]->cWriteQueue.Front(
true, bNotEmpty);
391 if (m_aPipes[i]->cWriteQueue.GetSize())
392 SetEvent(m_aPipes[i]->hWriteEvent);
396 LeaveCriticalSection(&m_sPipeSafe);
405 EnterCriticalSection(&m_sInitSafe);
408 DWORD dwRes= WAIT_OBJECT_0 +1;
412 dwRes= SignalObjectAndWait(m_hDone, m_hThread, 2000, FALSE);
413 if (dwRes != WAIT_OBJECT_0)
415 TerminateThread(m_hThread, 0);
416 DeleteCriticalSection(&m_sPipeSafe);
417 InitializeCriticalSection(&m_sPipeSafe);
420 for (
size_t i= 0; i<m_aPipes.size();++i)
422 m_pcMemPool->PoolRelease(m_aPipes[i]->pRead);
423 DecreaseQueue(m_aPipes[i]->ClearQueue());
427 if (m_hDone) CloseHandle(m_hDone);
428 if (m_hThread) CloseHandle(m_hThread);
433 m_sStream.str(_T(
""));
434 LeaveCriticalSection(&m_sInitSafe);
439 if (!pData || !pData->pHead || !pData->pDevice || !pData->dwSize)
441 EnterCriticalSection(&m_sInitSafe);
444 LeaveCriticalSection(&m_sInitSafe);
449 EnterCriticalSection(&m_sPipeSafe);
450 for (DWORD i= 0; i<m_aPipes.size(); ++i)
452 if (m_aPipes[i]->llId == llId)
454 if (IncreaseQueue(pData->dwSize))
456 m_aPipes[i]->cWriteQueue.Push(
new SData(*pData));
462 LeaveCriticalSection(&m_sPipeSafe);
463 LeaveCriticalSection(&m_sInitSafe);
int SendData(const SData *pData, __int64 llId)
int Add(const TCHAR *szItem)
virtual void Result(void *pHead, bool bPass)=0
int Init(const TCHAR szPipe[], int nPipes, DWORD dwBuffSizeIn, DWORD dwBuffSizeOut, CDevice *cDevice, CLogBuffer *cLogBuffer)
virtual void ProcessData(const void *pHead, DWORD dwSize, __int64 llId)=0