/*------------------------------------------------------------------------- allegdb.cpp Implementation of OLE DB layer for Allegiance Owner: Copyright 1986-2000 Microsoft Corporation, All Rights Reserved *-----------------------------------------------------------------------*/ #include "pch.h" #include "allegdb.h" unsigned CALLBACK SQLQueueProc(void * pvSQLQueueThread) { CSQLQueueThread * pSQLQueueThread = (CSQLQueueThread *) pvSQLQueueThread; HANDLE rgeventHandles[1]; rgeventHandles[0] = pSQLQueueThread->GetEventDie(); DWORD cHandles = sizeof(rgeventHandles) / sizeof(rgeventHandles[0]); DWORD dwWait = WAIT_TIMEOUT; do { //Wait until either we get a message or we ran out of time dwWait = MsgWaitForMultipleObjects(cHandles, rgeventHandles, FALSE, INFINITE, QS_ALLINPUT); // Process the message queue, if any messages were received static MSG msg; while (WAIT_OBJECT_0 != dwWait && PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)) { // dispatch Windows Messages to allow for the admin tool's COM to work TranslateMessage(&msg); switch (msg.message) { case wm_sql_addquery: { pSQLQueueThread->AddQuery((CSQLQuery*) msg.lParam); break; } case WM_QUIT: dwWait = WAIT_OBJECT_0; // let the thread be shutdown by sending a quit, or signalling pSqlThread->m_hEventExit break; default: ZError("SQLThreadProc: Unexpected thread message.\n"); } } } while (WAIT_OBJECT_0 != dwWait); return 0; } unsigned CALLBACK SQLThreadProc(void * pvsqlThread) { CSQLThread * pSqlThread = (CSQLThread *) pvsqlThread; CSQLCore * psql = pSqlThread->GetSQLCore(); HRESULT hr = E_FAIL; hr = pSqlThread->Open(); if (FAILED(hr)) return hr; HANDLE rgeventHandles[2]; rgeventHandles[0] = pSqlThread->GetEventDie(); rgeventHandles[1] = pSqlThread->GetNotify() ? psql->GetNotifySemaphore() : psql->GetSilentSemaphore(); DWORD cHandles = sizeof(rgeventHandles) / sizeof(rgeventHandles[0]); DWORD dwWait = WAIT_TIMEOUT; do { //Wait until either we get a message or we ran out of time dwWait = WaitForMultipleObjects(cHandles, rgeventHandles, FALSE, INFINITE); switch (dwWait) { case WAIT_OBJECT_0: // pSqlThread->m_hEventExit break; case WAIT_OBJECT_0 + 1: // spemaphore for the queue we're servicing // We may not actually get the query that we were signaled for, but that doesn't matter hr = pSqlThread->ServiceQuery(pSqlThread->GetNotify() ? psql->GetNotifyQuery() : psql->GetSilentQuery()); break; default: ZError("Unexpected object signaled in SQLThreadProc.\n"); } } while (WAIT_OBJECT_0 != dwWait); return 0; } HRESULT CSQLCore::Init(LPCOLESTR strSQLConfig, DWORD nThreadIDNotify, DWORD cSilentThreads, DWORD cNotifyThreads) { HRESULT hr = E_FAIL; if (cSilentThreads + cNotifyThreads < 1 || cSilentThreads + cNotifyThreads > 20) // must have between 1 and 20 threads return E_INVALIDARG; m_nThreadIDNotify = nThreadIDNotify; m_cSilentThreads = cSilentThreads; m_cNotifyThreads = cNotifyThreads; // create event used to signal that ALL sql threads should exit m_hKillSQLEvent = CreateEvent(NULL, TRUE, FALSE, NULL); assert(m_hKillSQLEvent); hr = m_connection.OpenFromInitializationString(strSQLConfig); if (FAILED(hr)) { DumpErrorInfo(m_connection.m_spInit, IID_IDBInitialize, NULL); return hr; } m_hQueryNotify = CreateSemaphore(NULL, 0, MAXLONG, NULL); m_hQuerySilent = CreateSemaphore(NULL, 0, MAXLONG, NULL); m_pthdQueue = new CSQLQueueThread(m_hKillSQLEvent, this); assert (m_pthdQueue); m_pargSilentThreads = (CSQLThread **) new char[sizeof(CSQLThread *) * m_cSilentThreads]; m_pargNotifyThreads = (CSQLThread **) new char[sizeof(CSQLThread *) * m_cNotifyThreads]; DWORD i = 0; for (i = 0; i < m_cNotifyThreads; i++) { m_pargNotifyThreads[i] = new CSQLThread(m_hKillSQLEvent, true, this); } for (i = 0; i < m_cSilentThreads; i++) { m_pargSilentThreads[i] = new CSQLThread(m_hKillSQLEvent, false, this); } return hr; } CSQLCore::~CSQLCore() { DWORD i = 0; for (i = 0; i < m_cNotifyThreads && m_pargNotifyThreads; i++) { // TODO: cleanup m_listQuries delete m_pargNotifyThreads[i]; } delete [] m_pargNotifyThreads; for (i = 0; i < m_cSilentThreads && m_pargSilentThreads; i++) { // TODO: cleanup m_listQuries delete m_pargSilentThreads[i]; } delete [] m_pargSilentThreads; CloseHandle(m_hQueryNotify); CloseHandle(m_hQuerySilent); } void CSQLCore::AddQuery(CSQLQuery * pquery) { HANDLE h = NULL; if (pquery->GetNotify()) { m_listQueriesNotify.last(pquery); h = GetNotifySemaphore(); } else { m_listQueriesSilent.last(pquery); h = GetSilentSemaphore(); } ReleaseSemaphore(h, 1, NULL); // signal some thread to pick up the query } void CSQLCore::PostQuery(CSQLQuery * pquery) { PostThreadMessage(m_pthdQueue->GetThreadID(), wm_sql_addquery, (WPARAM) NULL, (LPARAM) pquery); } HRESULT CSQLThread::ServiceQuery(CSQLQuery * pqueryNew) { CSQLQuery * pqueryCache = NULL; REFGUID guid = pqueryNew->GetGuid(); HRESULT hrQuery = E_FAIL; // it is important that ONLY results from calls to pqueryCache be assigned here // See if this query is already in the per-thread cache for (SLinkQueries * plinkQuery = m_listQueries.first(); plinkQuery && !pqueryCache; plinkQuery = plinkQuery->next()) { CSQLQuery * pqueryT = plinkQuery->data(); if (pqueryT->GetGuid() == guid) { // make sure we didn't accidentally use the same guid twice for different queries assert(!lstrcmp(pqueryNew->GetStrQuery(), pqueryT->GetStrQuery())); pqueryCache = pqueryT; } } if (!pqueryCache) // the first time this thread has seen this query, so let's add it to our reportoire { //pqueryCache = pqueryNew; pqueryCache = pqueryNew->Copy(NULL); m_listQueries.first(pqueryCache); hrQuery = pqueryCache->OnPrepare(m_session); } else { hrQuery = S_OK; pqueryNew->Copy(pqueryCache); } bool fRetry = SUCCEEDED(hrQuery); int cRetries = 0; while (fRetry) { hrQuery = pqueryCache->OnExecute(); if (SUCCEEDED(hrQuery)) { fRetry = false; } else { m_psql->DumpErrorInfo(pqueryCache->GetIUnknown(), IID_ICommand, &fRetry); if (fRetry) { debugf("Query deadlocked or timed-out. Retry #%d\n", ++cRetries); Sleep(50 * cRetries); } } } if (FAILED(hrQuery)) m_psql->DumpErrorInfo(pqueryCache->GetIUnknown(), IID_ICommand, NULL); pqueryCache->Copy(pqueryNew); // We post it whether the client requested notification or not, because at least we need to let the query clean up itself if (pqueryNew->GetCallbackOnMainThread()) { BOOL bPosted = PostThreadMessage(GetSQLCore()->GetNotifyThreadID(), wm_sql_querydone, (WPARAM) NULL, (LPARAM) pqueryNew); if (!bPosted) { // If the PostThreadMessage failed because the thread id is gone, we still need to clean up DWORD dwLastError = ::GetLastError(); if (ERROR_INVALID_THREAD_ID == dwLastError) { pqueryNew->DataReady(); } } } else { pqueryNew->DataReady(); } return hrQuery; }