Kaydet (Commit) 07102996 authored tarafından Michael Meeks's avatar Michael Meeks

sc: threaded parsing of the core data inside large XLSX files

Enabled in experimental mode only or via SC_IMPORT_THREADS=<N> this
allows significant parallelisation of sheet reading. I also implement
a simple thread pool to manage that.

Change-Id: I66c72211f2699490230e993a374c26b1892eac12
üst 8bdf8dc0
...@@ -211,6 +211,7 @@ $(eval $(call gb_Library_add_exception_objects,scfilt,\ ...@@ -211,6 +211,7 @@ $(eval $(call gb_Library_add_exception_objects,scfilt,\
sc/source/filter/oox/tablebuffer \ sc/source/filter/oox/tablebuffer \
sc/source/filter/oox/tablefragment \ sc/source/filter/oox/tablefragment \
sc/source/filter/oox/themebuffer \ sc/source/filter/oox/themebuffer \
sc/source/filter/oox/threadpool \
sc/source/filter/oox/unitconverter \ sc/source/filter/oox/unitconverter \
sc/source/filter/oox/viewsettings \ sc/source/filter/oox/viewsettings \
sc/source/filter/oox/workbookfragment \ sc/source/filter/oox/workbookfragment \
......
...@@ -23,6 +23,9 @@ ...@@ -23,6 +23,9 @@
#include "excelhandlers.hxx" #include "excelhandlers.hxx"
#include "richstring.hxx" #include "richstring.hxx"
#include "sheetdatabuffer.hxx" #include "sheetdatabuffer.hxx"
#include <vcl/svapp.hxx>
#define MULTI_THREAD_SHEET_PARSING 1
namespace oox { namespace oox {
namespace xls { namespace xls {
...@@ -54,8 +57,16 @@ struct SheetDataContextBase ...@@ -54,8 +57,16 @@ struct SheetDataContextBase
*/ */
class SheetDataContext : public WorksheetContextBase, private SheetDataContextBase class SheetDataContext : public WorksheetContextBase, private SheetDataContextBase
{ {
// If we are doing threaded parsing, this SheetDataContext
// forms the inner loop for bulk data parsing, and for the
// duration of this we can drop the solar mutex.
#if MULTI_THREAD_SHEET_PARSING
SolarMutexReleaser aReleaser;
#endif
public: public:
explicit SheetDataContext( WorksheetFragmentBase& rFragment ); explicit SheetDataContext( WorksheetFragmentBase& rFragment );
virtual ~SheetDataContext();
protected: protected:
virtual ::oox::core::ContextHandlerRef onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs ); virtual ::oox::core::ContextHandlerRef onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs );
......
...@@ -90,6 +90,12 @@ SheetDataContext::SheetDataContext( WorksheetFragmentBase& rFragment ) : ...@@ -90,6 +90,12 @@ SheetDataContext::SheetDataContext( WorksheetFragmentBase& rFragment ) :
mnRow( -1 ), mnRow( -1 ),
mnCol( -1 ) mnCol( -1 )
{ {
SAL_INFO( "sc.filter", "start safe sheet data context - unlock\n" );
}
SheetDataContext::~SheetDataContext()
{
SAL_INFO( "sc.filter", "end safe sheet data context - relock\n" );
} }
ContextHandlerRef SheetDataContext::onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs ) ContextHandlerRef SheetDataContext::onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs )
......
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include "threadpool.hxx"
class ThreadPool::ThreadWorker : public salhelper::Thread
{
ThreadPool *mpPool;
osl::Condition maNewWork;
public:
ThreadWorker( ThreadPool *pPool ) :
salhelper::Thread("sheet-import-thread-pool"),
mpPool( pPool ) {}
virtual void execute()
{
ThreadTask *pTask;
while ( ( pTask = waitForWork() ) )
{
pTask->doWork();
delete pTask;
}
}
ThreadTask *waitForWork()
{
ThreadTask *pRet = NULL;
osl::ResettableMutexGuard aGuard( mpPool->maGuard );
pRet = mpPool->popWork();
while( !pRet )
{
maNewWork.reset();
if( mpPool->mbTerminate )
break;
aGuard.clear(); // unlock
maNewWork.wait();
aGuard.reset(); // lock
pRet = mpPool->popWork();
}
return pRet;
}
//
// Why a condition per worker thread - you may ask.
//
// Unfortunately the Windows synchronisation API that we wrap
// is horribly inadequate cf.
// http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
// The existing osl::Condition API should only ever be used
// between one producer and one consumer thread to avoid the
// lost wakeup problem.
//
void signalNewWork()
{
maNewWork.set();
}
};
ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
mbTerminate( false )
{
for( sal_Int32 i = 0; i < nWorkers; i++ )
maWorkers.push_back( new ThreadWorker( this ) );
maTasksEmpty.reset();
osl::MutexGuard aGuard( maGuard );
for( size_t i = 0; i < maWorkers.size(); i++ )
maWorkers[ i ]->launch();
}
ThreadPool::~ThreadPool()
{
waitUntilWorkersDone();
}
/// wait until all the workers have completed and
/// terminate all threads
void ThreadPool::waitUntilWorkersDone()
{
waitUntilEmpty();
osl::ResettableMutexGuard aGuard( maGuard );
mbTerminate = true;
while( !maWorkers.empty() )
{
rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
maWorkers.pop_back();
assert( maWorkers.find( xWorker ) == maWorkers.end() );
xWorker->signalNewWork();
aGuard.clear();
{ // unlocked
xWorker->join();
xWorker.clear();
}
aGuard.reset();
}
}
void ThreadPool::pushTask( ThreadTask *pTask )
{
osl::MutexGuard aGuard( maGuard );
maTasks.insert( maTasks.begin(), pTask );
// horrible beyond belief:
for( size_t i = 0; i < maWorkers.size(); i++ )
maWorkers[ i ]->signalNewWork();
maTasksEmpty.reset();
}
ThreadTask *ThreadPool::popWork()
{
if( !maTasks.empty() )
{
ThreadTask *pTask = maTasks.back();
maTasks.pop_back();
return pTask;
}
else
maTasksEmpty.set();
return NULL;
}
void ThreadPool::waitUntilEmpty()
{
osl::ResettableMutexGuard aGuard( maGuard );
if( maWorkers.empty() )
{ // no threads at all -> execute the work in-line
ThreadTask *pTask;
while ( ( pTask = popWork() ) )
{
pTask->doWork();
delete pTask;
}
mbTerminate = true;
}
else
{
aGuard.clear();
maTasksEmpty.wait();
aGuard.reset();
}
assert( maTasks.empty() );
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#ifndef SC_THREADPOOL_HXX
#define SC_THREADPOOL_HXX
#include <sal/config.h>
#include <salhelper/thread.hxx>
#include <osl/mutex.hxx>
#include <osl/conditn.hxx>
#include <rtl/ref.hxx>
#include <vector>
class ThreadTask
{
public:
virtual ~ThreadTask() {}
virtual void doWork() = 0;
};
/// A very basic thread pool implementation
class ThreadPool
{
public:
ThreadPool( sal_Int32 nWorkers );
virtual ~ThreadPool();
void pushTask( ThreadTask *pTask /* takes ownership */ );
void waitUntilEmpty();
void waitUntilWorkersDone();
private:
class ThreadWorker;
friend class ThreadWorker;
ThreadTask *waitForWork( osl::Condition &rNewWork );
ThreadTask *popWork();
osl::Mutex maGuard;
osl::Condition maTasksEmpty;
bool mbTerminate;
std::vector< rtl::Reference< ThreadWorker > > maWorkers;
std::vector< ThreadTask * > maTasks;
};
#endif // SC_THREADPOOL_HXX
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
...@@ -42,11 +42,16 @@ ...@@ -42,11 +42,16 @@
#include "workbooksettings.hxx" #include "workbooksettings.hxx"
#include "worksheetbuffer.hxx" #include "worksheetbuffer.hxx"
#include "worksheetfragment.hxx" #include "worksheetfragment.hxx"
#include "sheetdatacontext.hxx"
#include "threadpool.hxx"
#include "officecfg/Office/Common.hxx"
#include "document.hxx" #include "document.hxx"
#include "docsh.hxx" #include "docsh.hxx"
#include "calcconfig.hxx" #include "calcconfig.hxx"
#include <vcl/svapp.hxx>
#include <oox/core/fastparser.hxx> #include <oox/core/fastparser.hxx>
#include <salhelper/thread.hxx> #include <salhelper/thread.hxx>
#include <osl/conditn.hxx> #include <osl/conditn.hxx>
...@@ -54,8 +59,6 @@ ...@@ -54,8 +59,6 @@
#include <queue> #include <queue>
#include <boost/scoped_ptr.hpp> #include <boost/scoped_ptr.hpp>
#define MULTI_THREAD_SHEET_PARSING 0
#include "oox/ole/vbaproject.hxx" #include "oox/ole/vbaproject.hxx"
namespace oox { namespace oox {
...@@ -204,188 +207,77 @@ namespace { ...@@ -204,188 +207,77 @@ namespace {
typedef std::pair<WorksheetGlobalsRef, FragmentHandlerRef> SheetFragmentHandler; typedef std::pair<WorksheetGlobalsRef, FragmentHandlerRef> SheetFragmentHandler;
typedef std::vector<SheetFragmentHandler> SheetFragmentVector; typedef std::vector<SheetFragmentHandler> SheetFragmentVector;
#if MULTI_THREAD_SHEET_PARSING class WorkerThread : public ThreadTask
class WorkerThread;
typedef rtl::Reference<WorkerThread> WorkerThreadRef;
struct WorkerThreadData
{
osl::Mutex maMtx;
std::vector<WorkerThreadRef> maThreads;
};
struct IdleWorkerThreadData
{
osl::Mutex maMtx;
osl::Condition maCondAdded;
std::queue<WorkerThread*> maThreads;
};
struct
{
boost::scoped_ptr<WorkerThreadData> mpWorkerThreads;
boost::scoped_ptr<IdleWorkerThreadData> mpIdleThreads;
} aThreadGlobals;
enum WorkerAction
{
None = 0,
TerminateThread,
Work
};
class WorkerThread : public salhelper::Thread
{ {
WorkbookFragment& mrWorkbookHandler; WorkbookFragment& mrWorkbookHandler;
size_t mnID; rtl::Reference<FragmentHandler> mxHandler;
FragmentHandlerRef mxHandler;
boost::scoped_ptr<oox::core::FastParser> mxParser;
osl::Mutex maMtxAction;
osl::Condition maCondActionChanged;
WorkerAction meAction;
public:
WorkerThread( WorkbookFragment& rWorkbookHandler, size_t nID ) :
salhelper::Thread("sheet-import-worker-thread"),
mrWorkbookHandler(rWorkbookHandler),
mnID(nID),
mxParser(rWorkbookHandler.getOoxFilter().createParser()),
meAction(None) {}
virtual void execute()
{
announceIdle();
// Keep looping until the terminate request is set.
for (maCondActionChanged.wait(); true; maCondActionChanged.wait())
{
osl::MutexGuard aGuard(maMtxAction);
if (!maCondActionChanged.check())
// Wait again.
continue;
maCondActionChanged.reset();
if (meAction == TerminateThread)
// End the thread.
return;
if (meAction != Work)
continue;
#if 0
// TODO : This still deadlocks in the fast parser code.
mrWorkbookHandler.importOoxFragment(mxHandler, *mxParser);
#else
double val = rand() / static_cast<double>(RAND_MAX);
val *= 1000000; // normalize to 1 second.
val *= 1.5; // inflate it a bit.
usleep(val); // pretend to be working while asleep.
#endif
announceIdle();
}
}
void announceIdle()
{
// Set itself idle to receive a new task from the main thread.
osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx);
aThreadGlobals.mpIdleThreads->maThreads.push(this);
aThreadGlobals.mpIdleThreads->maCondAdded.set();
}
void terminate() public:
WorkerThread( WorkbookFragment& rWorkbookHandler,
const rtl::Reference<FragmentHandler>& xHandler ) :
mrWorkbookHandler( rWorkbookHandler ),
mxHandler( xHandler )
{ {
osl::MutexGuard aGuard(maMtxAction);
meAction = TerminateThread;
maCondActionChanged.set();
} }
void assign( const FragmentHandlerRef& rHandler ) virtual void doWork()
{ {
osl::MutexGuard aGuard(maMtxAction); // We hold the solar mutex in all threads except for
mxHandler = rHandler; // the small safe section of the inner loop in
meAction = Work; // sheetdatacontext.cxx
maCondActionChanged.set(); SAL_INFO( "sc.filter", "start wait on solar\n" );
SolarMutexGuard maGuard;
SAL_INFO( "sc.filter", "got solar\n" );
boost::scoped_ptr<oox::core::FastParser> xParser(
mrWorkbookHandler.getOoxFilter().createParser() );
SAL_INFO( "sc.filter", "start import\n" );
mrWorkbookHandler.importOoxFragment( mxHandler, *xParser );
SAL_INFO( "sc.filter", "end import, release solar\n" );
} }
}; };
#endif
void importSheetFragments( WorkbookFragment& rWorkbookHandler, SheetFragmentVector& rSheets ) void importSheetFragments( WorkbookFragment& rWorkbookHandler, SheetFragmentVector& rSheets )
{ {
#if MULTI_THREAD_SHEET_PARSING // threaded version sal_Int32 nThreads = std::min( rSheets.size(), (size_t) 4 /* FIXME: ncpus/2 */ );
size_t nThreadCount = 3;
if (nThreadCount > rSheets.size())
nThreadCount = rSheets.size();
// Create new thread globals. Reference< XComponentContext > xContext = comphelper::getProcessComponentContext();
aThreadGlobals.mpWorkerThreads.reset(new WorkerThreadData);
aThreadGlobals.mpIdleThreads.reset(new IdleWorkerThreadData);
SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); // Force threading off unless experimental mode or env. var is set.
if( !officecfg::Office::Common::Misc::ExperimentalMode::get( xContext ) )
nThreads = 0;
{ const char *pEnv;
// Initialize worker threads. if( ( pEnv = getenv( "SC_IMPORT_THREADS" ) ) )
osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx); nThreads = rtl_str_toInt32( pEnv, 10 );
for (size_t i = 0; i < nThreadCount; ++i)
{
WorkerThreadRef pThread(new WorkerThread(rWorkbookHandler, i));
aThreadGlobals.mpWorkerThreads->maThreads.push_back(pThread);
pThread->launch();
}
}
for (aThreadGlobals.mpIdleThreads->maCondAdded.wait(); true; aThreadGlobals.mpIdleThreads->maCondAdded.wait()) if( nThreads != 0 )
{ {
osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx); // test sequential read in this mode
if (!aThreadGlobals.mpIdleThreads->maCondAdded.check()) if( nThreads < 0)
// Wait again. nThreads = 0;
continue; ThreadPool aPool( nThreads );
aThreadGlobals.mpIdleThreads->maCondAdded.reset();
// Assign work to all idle threads.
while (!aThreadGlobals.mpIdleThreads->maThreads.empty())
{
if (it == itEnd)
break;
WorkerThread* p = aThreadGlobals.mpIdleThreads->maThreads.front();
aThreadGlobals.mpIdleThreads->maThreads.pop();
p->assign(it->second);
++it;
}
if (it == itEnd) SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
// Finished! Exit the loop. for( ; it != itEnd; ++it )
break; aPool.pushTask( new WorkerThread( rWorkbookHandler, it->second ) )
} ;
{
// Terminate all worker threads.
osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx);
for (size_t i = 0, n = aThreadGlobals.mpWorkerThreads->maThreads.size(); i < n; ++i)
{ {
WorkerThreadRef pWorker = aThreadGlobals.mpWorkerThreads->maThreads[i]; // Ideally no-one else but our worker threads can re-acquire that.
pWorker->terminate(); // potentially if that causes a problem we might want to extend
if (pWorker.is()) // the SolarMutex functionality to allow passing it around.
pWorker->join(); SolarMutexReleaser aReleaser;
aPool.waitUntilWorkersDone();
} }
} }
else
// Delete all thread globals.
aThreadGlobals.mpWorkerThreads.reset();
aThreadGlobals.mpIdleThreads.reset();
#else // non-threaded version
for( SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); it != itEnd; ++it)
{ {
// import the sheet fragment SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
rWorkbookHandler.importOoxFragment(it->second); for( ; it != itEnd; ++it )
rWorkbookHandler.importOoxFragment( it->second );
} }
#endif
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment