Kaydet (Commit) ed89a069 authored tarafından Matúš Kukan's avatar Matúš Kukan

datastreams: read data in another thread

Change-Id: Iedd4075eadce9ca8fc41b279ea03c2679b01ec71
üst 32a62102
...@@ -14,23 +14,30 @@ ...@@ -14,23 +14,30 @@
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp> #include <boost/scoped_ptr.hpp>
#include <vector>
namespace datastreams { class CallerThread; } namespace datastreams {
class CallerThread;
class ReaderThread;
}
class ScDocShell; class ScDocShell;
class ScDocument; class ScDocument;
class ScRange; class ScRange;
class SvStream; class SvStream;
class Window; class Window;
typedef std::vector<OString> LinesList;
class DataStreams : boost::noncopyable class DataStreams : boost::noncopyable
{ {
public: public:
enum MoveEnum { NO_MOVE, RANGE_DOWN, MOVE_DOWN, MOVE_UP }; enum MoveEnum { NO_MOVE, RANGE_DOWN, MOVE_DOWN, MOVE_UP };
DataStreams(ScDocShell *pScDocShell); DataStreams(ScDocShell *pScDocShell);
~DataStreams(); ~DataStreams();
OString ConsumeLine();
bool ImportData(); bool ImportData();
void MoveData(); void MoveData();
void Set(const OUString& rUrl, bool bIsScript, bool bValuesInLine, void Set(SvStream *pStream, bool bValuesInLine,
const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove); const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove);
void ShowDialog(Window *pParent); void ShowDialog(Window *pParent);
void Start(); void Start();
...@@ -43,11 +50,13 @@ private: ...@@ -43,11 +50,13 @@ private:
bool mbRunning; bool mbRunning;
bool mbIsUndoEnabled; bool mbIsUndoEnabled;
bool mbValuesInLine; bool mbValuesInLine;
LinesList *mpLines;
size_t mnLinesCount;
boost::scoped_ptr<ScRange> mpRange; boost::scoped_ptr<ScRange> mpRange;
boost::scoped_ptr<ScRange> mpStartRange; boost::scoped_ptr<ScRange> mpStartRange;
boost::scoped_ptr<ScRange> mpEndRange; boost::scoped_ptr<ScRange> mpEndRange;
boost::scoped_ptr<SvStream> mpStream;
rtl::Reference<datastreams::CallerThread> mxThread; rtl::Reference<datastreams::CallerThread> mxThread;
rtl::Reference<datastreams::ReaderThread> mxReaderThread;
}; };
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ /* vim:set shiftwidth=4 softtabstop=4 expandtab: */
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
#include <tabvwsh.hxx> #include <tabvwsh.hxx>
#include <viewdata.hxx> #include <viewdata.hxx>
#include <queue>
namespace datastreams { namespace datastreams {
class CallerThread : public salhelper::Thread class CallerThread : public salhelper::Thread
...@@ -57,6 +59,82 @@ private: ...@@ -57,6 +59,82 @@ private:
} }
}; };
class ReaderThread : public salhelper::Thread
{
SvStream *mpStream;
public:
bool mbTerminateReading;
osl::Condition maProduceResume;
osl::Condition maConsumeResume;
osl::Mutex maLinesProtector;
std::queue<LinesList* > maPendingLines;
std::queue<LinesList* > maUsedLines;
ReaderThread(SvStream *pData):
Thread("ReaderThread")
,mpStream(pData)
,mbTerminateReading(false)
{
}
virtual ~ReaderThread()
{
delete mpStream;
while (!maPendingLines.empty())
{
delete maPendingLines.front();
maPendingLines.pop();
}
while (!maUsedLines.empty())
{
delete maUsedLines.front();
maUsedLines.pop();
}
}
void terminate()
{
mbTerminateReading = true;
maProduceResume.set();
join();
}
private:
virtual void execute() SAL_OVERRIDE
{
while (!mbTerminateReading)
{
LinesList *pLines = 0;
osl::ResettableMutexGuard aGuard(maLinesProtector);
if (!maUsedLines.empty())
{
pLines = maUsedLines.front();
maUsedLines.pop();
aGuard.clear(); // unlock
}
else
{
aGuard.clear(); // unlock
pLines = new LinesList(10);
}
for (size_t i = 0; i < pLines->size(); ++i)
mpStream->ReadLine( pLines->at(i) );
aGuard.reset(); // lock
while (!mbTerminateReading && maPendingLines.size() >= 8)
{ // pause reading for a bit
aGuard.clear(); // unlock
maProduceResume.wait();
maProduceResume.reset();
aGuard.reset(); // lock
}
maPendingLines.push(pLines);
maConsumeResume.set();
if (!mpStream->good())
mbTerminateReading = true;
}
}
};
} }
DataStreams::DataStreams(ScDocShell *pScDocShell): DataStreams::DataStreams(ScDocShell *pScDocShell):
...@@ -64,6 +142,8 @@ DataStreams::DataStreams(ScDocShell *pScDocShell): ...@@ -64,6 +142,8 @@ DataStreams::DataStreams(ScDocShell *pScDocShell):
, mpScDocument(mpScDocShell->GetDocument()) , mpScDocument(mpScDocShell->GetDocument())
, meMove(NO_MOVE) , meMove(NO_MOVE)
, mbRunning(false) , mbRunning(false)
, mpLines(0)
, mnLinesCount(0)
{ {
mxThread = new datastreams::CallerThread( this ); mxThread = new datastreams::CallerThread( this );
mxThread->launch(); mxThread->launch();
...@@ -76,6 +156,31 @@ DataStreams::~DataStreams() ...@@ -76,6 +156,31 @@ DataStreams::~DataStreams()
mxThread->mbTerminate = true; mxThread->mbTerminate = true;
mxThread->maStart.set(); mxThread->maStart.set();
mxThread->join(); mxThread->join();
if (mxReaderThread.is())
mxReaderThread->terminate();
}
OString DataStreams::ConsumeLine()
{
if (!mpLines || mnLinesCount >= mpLines->size())
{
mnLinesCount = 0;
osl::ResettableMutexGuard aGuard(mxReaderThread->maLinesProtector);
if (mpLines)
mxReaderThread->maUsedLines.push(mpLines);
while (mxReaderThread->maPendingLines.empty())
{
aGuard.clear(); // unlock
mxReaderThread->maConsumeResume.wait();
mxReaderThread->maConsumeResume.reset();
aGuard.reset(); // lock
}
mpLines = mxReaderThread->maPendingLines.front();
mxReaderThread->maPendingLines.pop();
if (mxReaderThread->maPendingLines.size() <= 4)
mxReaderThread->maProduceResume.set(); // start producer again
}
return mpLines->at(mnLinesCount++);
} }
void DataStreams::Start() void DataStreams::Start()
...@@ -117,13 +222,13 @@ void DataStreams::Stop() ...@@ -117,13 +222,13 @@ void DataStreams::Stop()
mpScDocument->EnableUndo(mbIsUndoEnabled); mpScDocument->EnableUndo(mbIsUndoEnabled);
} }
void DataStreams::Set(const OUString& rUrl, bool bIsScript, bool bValuesInLine, void DataStreams::Set(SvStream *pStream, bool bValuesInLine,
const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove) const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove)
{ {
if (bIsScript) if (mxReaderThread.is())
mpStream.reset( new SvScriptStream(rUrl) ); mxReaderThread->terminate();
else mxReaderThread = new datastreams::ReaderThread( pStream );
mpStream.reset( new SvFileStream(rUrl, STREAM_READ) ); mxReaderThread->launch();
mpEndRange.reset( NULL ); mpEndRange.reset( NULL );
mpRange.reset ( new ScRange() ); mpRange.reset ( new ScRange() );
...@@ -170,14 +275,6 @@ void DataStreams::MoveData() ...@@ -170,14 +275,6 @@ void DataStreams::MoveData()
bool DataStreams::ImportData() bool DataStreams::ImportData()
{ {
if (!mpStream->good())
{
// if there is a problem with SvStream, stop running
mbRunning = false;
return mbRunning;
}
OString sTmp;
SolarMutexGuard aGuard; SolarMutexGuard aGuard;
MoveData(); MoveData();
if (mbValuesInLine) if (mbValuesInLine)
...@@ -186,8 +283,7 @@ bool DataStreams::ImportData() ...@@ -186,8 +283,7 @@ bool DataStreams::ImportData()
OStringBuffer aBuf; OStringBuffer aBuf;
while (nHeight--) while (nHeight--)
{ {
mpStream->ReadLine(sTmp); aBuf.append(ConsumeLine());
aBuf.append(sTmp);
aBuf.append('\n'); aBuf.append('\n');
} }
SvMemoryStream aMemoryStream((void *)aBuf.getStr(), aBuf.getLength(), STREAM_READ); SvMemoryStream aMemoryStream((void *)aBuf.getStr(), aBuf.getLength(), STREAM_READ);
...@@ -202,8 +298,7 @@ bool DataStreams::ImportData() ...@@ -202,8 +298,7 @@ bool DataStreams::ImportData()
// read more lines at once but not too much // read more lines at once but not too much
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i)
{ {
mpStream->ReadLine(sTmp); OUString sLine( OStringToOUString(ConsumeLine(), RTL_TEXTENCODING_UTF8) );
OUString sLine(OStringToOUString(sTmp, RTL_TEXTENCODING_UTF8));
if (sLine.indexOf(',') <= 0) if (sLine.indexOf(',') <= 0)
continue; continue;
......
...@@ -74,11 +74,14 @@ DataStreamsDlg::DataStreamsDlg(DataStreams *pDataStreams, Window* pParent) ...@@ -74,11 +74,14 @@ DataStreamsDlg::DataStreamsDlg(DataStreams *pDataStreams, Window* pParent)
void DataStreamsDlg::Start() void DataStreamsDlg::Start()
{ {
bool bIsScript = m_pRBScriptData->IsChecked();
sal_Int32 nLimit = 0; sal_Int32 nLimit = 0;
if (m_pRBMaxLimit->IsChecked()) if (m_pRBMaxLimit->IsChecked())
nLimit = m_pEdLimit->GetText().toInt32(); nLimit = m_pEdLimit->GetText().toInt32();
mpDataStreams->Set( m_pCbUrl->GetText(), bIsScript, m_pRBValuesInLine->IsChecked(), mpDataStreams->Set(
(m_pRBScriptData->IsChecked() ?
dynamic_cast<SvStream*>( new SvScriptStream(m_pCbUrl->GetText()) ) :
dynamic_cast<SvStream*>( new SvFileStream(m_pCbUrl->GetText(), STREAM_READ) )),
m_pRBValuesInLine->IsChecked(),
m_pEdRange->GetText(), nLimit, (m_pRBNoMove->IsChecked() ? DataStreams::NO_MOVE : m_pEdRange->GetText(), nLimit, (m_pRBNoMove->IsChecked() ? DataStreams::NO_MOVE :
m_pRBRangeDown->IsChecked() ? DataStreams::RANGE_DOWN : DataStreams::MOVE_DOWN) ); m_pRBRangeDown->IsChecked() ? DataStreams::RANGE_DOWN : DataStreams::MOVE_DOWN) );
mpDataStreams->Start(); mpDataStreams->Start();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment