Skip to content

Commit

Permalink
Merge pull request #10746 from rouault/raster_multi_thread
Browse files Browse the repository at this point in the history
[RFC 101 implementation] Add GDALGetThreadSafeDataset() and GDAL_OF_THREAD_SAFE
  • Loading branch information
rouault committed Sep 19, 2024
2 parents 817899c + 580c610 commit 927be3f
Show file tree
Hide file tree
Showing 27 changed files with 2,824 additions and 125 deletions.
3 changes: 3 additions & 0 deletions apps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ if (BUILD_APPS)
add_executable(gdalasyncread EXCLUDE_FROM_ALL gdalasyncread.cpp)
add_executable(gdalwarpsimple EXCLUDE_FROM_ALL gdalwarpsimple.c)
add_executable(multireadtest EXCLUDE_FROM_ALL multireadtest.cpp)
if(NOT MSVC AND CMAKE_THREAD_LIBS_INIT)
target_link_libraries(multireadtest PRIVATE ${CMAKE_THREAD_LIBS_INIT})
endif()
add_executable(test_ogrsf test_ogrsf.cpp)
add_executable(testreprojmulti EXCLUDE_FROM_ALL testreprojmulti.cpp)

Expand Down
139 changes: 109 additions & 30 deletions apps/multireadtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,32 @@
#include "gdal_alg.h"
#include "cpl_multiproc.h"
#include "cpl_string.h"

#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static int nIterations = 1;
static bool bLockOnOpen = false;
static int nOpenIterations = 1;
static volatile int nPendingThreads = 0;
static bool bThreadCanFinish = false;
static std::mutex oMutex;
static std::condition_variable oCond;
static const char *pszFilename = nullptr;
static int nChecksum = 0;
static int nWidth = 0;
static int nHeight = 0;

static CPLMutex *pGlobalMutex = nullptr;

/************************************************************************/
/* Usage() */
/************************************************************************/

static void Usage()
{
printf("multireadtest [-lock_on_open] [-open_in_main] [-t <thread#>]\n"
" [-i <iterations>] [-oi <iterations>]\n"
printf("multireadtest [[-thread_safe] | [[-lock_on_open] [-open_in_main]]\n"
" [-t <thread#>] [-i <iterations>] [-oi <iterations>]\n"
" [-width <val>] [-height <val>]\n"
" filename\n");
exit(1);
Expand All @@ -74,12 +79,12 @@ static void WorkerFunc(void *arg)
else
{
if (bLockOnOpen)
CPLAcquireMutex(pGlobalMutex, 100.0);
oMutex.lock();

hDS = GDALOpen(pszFilename, GA_ReadOnly);

if (bLockOnOpen)
CPLReleaseMutex(pGlobalMutex);
oMutex.unlock();
}

for (int iIter = 0; iIter < nIterations && hDS != nullptr; iIter++)
Expand All @@ -99,20 +104,24 @@ static void WorkerFunc(void *arg)
if (hDS && hDSIn == nullptr)
{
if (bLockOnOpen)
CPLAcquireMutex(pGlobalMutex, 100.0);
oMutex.lock();
GDALClose(hDS);
if (bLockOnOpen)
CPLReleaseMutex(pGlobalMutex);
oMutex.unlock();
}
else if (hDSIn != nullptr)
{
GDALFlushCache(hDSIn);
}
}

CPLAcquireMutex(pGlobalMutex, 100.0);
nPendingThreads--;
CPLReleaseMutex(pGlobalMutex);
{
std::unique_lock oLock(oMutex);
nPendingThreads--;
oCond.notify_all();
while (!bThreadCanFinish)
oCond.wait(oLock);
}
}

/************************************************************************/
Expand All @@ -131,6 +140,10 @@ int main(int argc, char **argv)

int nThreadCount = 4;
bool bOpenInThreads = true;
bool bThreadSafe = false;
bool bJoinAfterClosing = false;
bool bDetach = false;
bool bClose = true;

for (int iArg = 1; iArg < argc; iArg++)
{
Expand All @@ -154,6 +167,10 @@ int main(int argc, char **argv)
{
nHeight = atoi(argv[++iArg]);
}
else if (EQUAL(argv[iArg], "-thread_safe"))
{
bThreadSafe = true;
}
else if (EQUAL(argv[iArg], "-lock_on_open"))
{
bLockOnOpen = true;
Expand All @@ -162,6 +179,18 @@ int main(int argc, char **argv)
{
bOpenInThreads = false;
}
else if (EQUAL(argv[iArg], "-join_after_closing"))
{
bJoinAfterClosing = true;
}
else if (EQUAL(argv[iArg], "-detach"))
{
bDetach = true;
}
else if (EQUAL(argv[iArg], "-do_not_close"))
{
bClose = false;
}
else if (pszFilename == nullptr)
{
pszFilename = argv[iArg];
Expand All @@ -186,12 +215,10 @@ int main(int argc, char **argv)
/* -------------------------------------------------------------------- */
/* Get the checksum of band1. */
/* -------------------------------------------------------------------- */
GDALDatasetH hDS = nullptr;

GDALAllRegister();
for (int i = 0; i < 2; i++)
{
hDS = GDALOpen(pszFilename, GA_ReadOnly);
GDALDatasetH hDS = GDALOpen(pszFilename, GA_ReadOnly);
if (hDS == nullptr)
exit(1);

Expand All @@ -210,45 +237,97 @@ int main(int argc, char **argv)
/* -------------------------------------------------------------------- */
/* Fire off worker threads. */
/* -------------------------------------------------------------------- */
pGlobalMutex = CPLCreateMutex();
CPLReleaseMutex(pGlobalMutex);

nPendingThreads = nThreadCount;

GDALDatasetH hThreadSafeDS = nullptr;
if (bThreadSafe)
{
hThreadSafeDS =
GDALOpenEx(pszFilename, GDAL_OF_RASTER | GDAL_OF_THREAD_SAFE,
nullptr, nullptr, nullptr);
if (!hThreadSafeDS)
exit(1);
}
std::vector<std::thread> aoThreads;
std::vector<GDALDatasetH> aoDS;
for (int iThread = 0; iThread < nThreadCount; iThread++)
{
hDS = nullptr;
if (!bOpenInThreads)
GDALDatasetH hDS = nullptr;
if (bThreadSafe)
{
hDS = GDALOpen(pszFilename, GA_ReadOnly);
if (!hDS)
hDS = hThreadSafeDS;
}
else
{
if (!bOpenInThreads)
{
printf("GDALOpen() failed.\n");
exit(1);
hDS = GDALOpen(pszFilename, GA_ReadOnly);
if (!hDS)
{
printf("GDALOpen() failed.\n");
exit(1);
}
aoDS.push_back(hDS);
}
aoDS.push_back(hDS);
}
if (CPLCreateThread(WorkerFunc, hDS) == -1)
aoThreads.push_back(std::thread([hDS]() { WorkerFunc(hDS); }));
}

{
std::unique_lock oLock(oMutex);
while (nPendingThreads > 0)
{
printf("CPLCreateThread() failed.\n");
exit(1);
// printf("nPendingThreads = %d\n", nPendingThreads);
oCond.wait(oLock);
}
}

while (nPendingThreads > 0)
CPLSleep(0.5);

CPLDestroyMutex(pGlobalMutex);
if (!bJoinAfterClosing && !bDetach)
{
{
std::lock_guard oLock(oMutex);
bThreadCanFinish = true;
oCond.notify_all();
}
for (auto &oThread : aoThreads)
oThread.join();
}

for (size_t i = 0; i < aoDS.size(); ++i)
GDALClose(aoDS[i]);
if (bClose)
GDALClose(hThreadSafeDS);

if (bDetach)
{
for (auto &oThread : aoThreads)
oThread.detach();
}
else if (bJoinAfterClosing)
{
{
std::lock_guard oLock(oMutex);
bThreadCanFinish = true;
oCond.notify_all();
}
for (auto &oThread : aoThreads)
oThread.join();
}

printf("All threads complete.\n");

CSLDestroy(argv);

GDALDestroyDriverManager();

{
std::lock_guard oLock(oMutex);
bThreadCanFinish = true;
oCond.notify_all();
}

printf("End of main.\n");

return 0;
}
Loading

0 comments on commit 927be3f

Please sign in to comment.