Kaydet (Commit) 7e2ea27e authored tarafından Armin Le Grand's avatar Armin Le Grand

tdf#93553 limit parallelism at zip save time to useful amount

At ODT export time writing and zipping comtained data packages is nicely
parallelized, but not limited to an upper bounds of threads to use.
Together with memory consumption this makes ressource usage and runtime
behaviour bad to crashing (mostly on 32bit).
I have now limited the processing dependent on the number of available
cores to get a good processing/ressource ratio. The result uses much less
memory, is faster and runs on 32bit systems.

Change-Id: I8bd516a9a0cefd644f5d7001214bc717f29770ab
Reviewed-on: https://gerrit.libreoffice.org/23305Tested-by: 's avatarJenkins <ci@libreoffice.org>
Reviewed-by: 's avatarNoel Grandin <noelgrandin@gmail.com>
üst 0f0cea28
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <package/Deflater.hxx> #include <package/Deflater.hxx>
#include <CRC32.hxx> #include <CRC32.hxx>
#include <atomic>
struct ZipEntry; struct ZipEntry;
class ZipPackageBuffer; class ZipPackageBuffer;
...@@ -35,6 +36,9 @@ class ZipPackageStream; ...@@ -35,6 +36,9 @@ class ZipPackageStream;
class ZipOutputEntry class ZipOutputEntry
{ {
// allow only DeflateThread to change m_bFinished using setFinished()
friend class DeflateThread;
css::uno::Sequence< sal_Int8 > m_aDeflateBuffer; css::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
ZipUtils::Deflater m_aDeflater; ZipUtils::Deflater m_aDeflater;
css::uno::Reference< css::uno::XComponentContext > m_xContext; css::uno::Reference< css::uno::XComponentContext > m_xContext;
...@@ -48,8 +52,9 @@ class ZipOutputEntry ...@@ -48,8 +52,9 @@ class ZipOutputEntry
CRC32 m_aCRC; CRC32 m_aCRC;
ZipEntry *m_pCurrentEntry; ZipEntry *m_pCurrentEntry;
sal_Int16 m_nDigested; sal_Int16 m_nDigested;
bool m_bEncryptCurrentEntry;
ZipPackageStream* m_pCurrentStream; ZipPackageStream* m_pCurrentStream;
bool m_bEncryptCurrentEntry;
std::atomic<bool> m_bFinished;
public: public:
ZipOutputEntry( ZipOutputEntry(
...@@ -78,7 +83,10 @@ public: ...@@ -78,7 +83,10 @@ public:
void closeEntry(); void closeEntry();
void write(const css::uno::Sequence< sal_Int8 >& rBuffer); void write(const css::uno::Sequence< sal_Int8 >& rBuffer);
bool isFinished() const { return m_bFinished; }
private: private:
void setFinished() { m_bFinished = true; }
void doDeflate(); void doDeflate();
}; };
......
...@@ -69,6 +69,16 @@ private: ...@@ -69,6 +69,16 @@ private:
throw(css::io::IOException, css::uno::RuntimeException); throw(css::io::IOException, css::uno::RuntimeException);
void writeEXT( const ZipEntry &rEntry ) void writeEXT( const ZipEntry &rEntry )
throw(css::io::IOException, css::uno::RuntimeException); throw(css::io::IOException, css::uno::RuntimeException);
// ScheduledThread handling helpers
void consumeScheduledThreadEntry(ZipOutputEntry* pCandidate);
void consumeFinishedScheduledThreadEntries();
void consumeAllScheduledThreadEntries();
public:
void reduceScheduledThreadsToGivenNumberOrLess(
sal_Int32 nThreads,
sal_Int32 nWaitTimeInTenthSeconds);
}; };
#endif #endif
......
...@@ -55,8 +55,9 @@ ZipOutputEntry::ZipOutputEntry( ...@@ -55,8 +55,9 @@ ZipOutputEntry::ZipOutputEntry(
, m_xOutStream(rxOutput) , m_xOutStream(rxOutput)
, m_pCurrentEntry(&rEntry) , m_pCurrentEntry(&rEntry)
, m_nDigested(0) , m_nDigested(0)
, m_bEncryptCurrentEntry(bEncrypt)
, m_pCurrentStream(pStream) , m_pCurrentStream(pStream)
, m_bEncryptCurrentEntry(bEncrypt)
, m_bFinished(false)
{ {
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries"); assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
assert(m_xOutStream.is()); assert(m_xOutStream.is());
...@@ -77,8 +78,9 @@ ZipOutputEntry::ZipOutputEntry( ...@@ -77,8 +78,9 @@ ZipOutputEntry::ZipOutputEntry(
, m_xContext(rxContext) , m_xContext(rxContext)
, m_pCurrentEntry(&rEntry) , m_pCurrentEntry(&rEntry)
, m_nDigested(0) , m_nDigested(0)
, m_bEncryptCurrentEntry(bEncrypt)
, m_pCurrentStream(pStream) , m_pCurrentStream(pStream)
, m_bEncryptCurrentEntry(bEncrypt)
, m_bFinished(false)
{ {
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries"); assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
if (m_bEncryptCurrentEntry) if (m_bEncryptCurrentEntry)
......
...@@ -93,42 +93,90 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt ) ...@@ -93,42 +93,90 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
m_pCurrentEntry = nullptr; m_pCurrentEntry = nullptr;
} }
void ZipOutputStream::finish() void ZipOutputStream::consumeScheduledThreadEntry(ZipOutputEntry* pCandidate)
throw(IOException, RuntimeException)
{ {
assert(!m_aZipList.empty() && "Zip file must have at least one entry!"); //Any exceptions thrown in the threads were caught and stored for now
::css::uno::Any aCaughtException(pCandidate->getParallelDeflateException());
if (aCaughtException.hasValue())
::cppu::throwException(aCaughtException);
// Wait for all threads to finish & write writeLOC(pCandidate->getZipEntry(), pCandidate->isEncrypt());
m_rSharedThreadPool.waitUntilEmpty();
for (size_t i = 0; i < m_aEntries.size(); i++) sal_Int32 nRead;
uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize);
uno::Reference< io::XInputStream > xInput = pCandidate->getData();
do
{ {
//Any exceptions thrown in the threads were caught and stored for now nRead = xInput->readBytes(aSequence, n_ConstBufferSize);
::css::uno::Any aCaughtException(m_aEntries[i]->getParallelDeflateException()); if (nRead < n_ConstBufferSize)
if (aCaughtException.hasValue()) aSequence.realloc(nRead);
::cppu::throwException(aCaughtException);
writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt()); rawWrite(aSequence);
}
while (nRead == n_ConstBufferSize);
xInput.clear();
sal_Int32 nRead; rawCloseEntry(pCandidate->isEncrypt());
uno::Sequence< sal_Int8 > aSequence(n_ConstBufferSize);
uno::Reference< io::XInputStream > xInput = m_aEntries[i]->getData(); pCandidate->getZipPackageStream()->successfullyWritten(pCandidate->getZipEntry());
do pCandidate->deleteBufferFile();
{ delete pCandidate;
nRead = xInput->readBytes(aSequence, n_ConstBufferSize); }
if (nRead < n_ConstBufferSize)
aSequence.realloc(nRead);
rawWrite(aSequence); void ZipOutputStream::consumeFinishedScheduledThreadEntries()
{
std::vector< ZipOutputEntry* > aNonFinishedEntries;
for(auto aIter = m_aEntries.begin(); aIter != m_aEntries.end(); ++aIter)
{
if((*aIter)->isFinished())
{
consumeScheduledThreadEntry(*aIter);
} }
while (nRead == n_ConstBufferSize); else
xInput.clear(); {
aNonFinishedEntries.push_back(*aIter);
}
}
// always reset to non-consumed entries
m_aEntries = aNonFinishedEntries;
}
rawCloseEntry(m_aEntries[i]->isEncrypt()); void ZipOutputStream::consumeAllScheduledThreadEntries()
{
while(!m_aEntries.empty())
{
ZipOutputEntry* pCandidate = m_aEntries.back();
m_aEntries.pop_back();
consumeScheduledThreadEntry(pCandidate);
}
}
void ZipOutputStream::reduceScheduledThreadsToGivenNumberOrLess(sal_Int32 nThreads, sal_Int32 nWaitTimeInTenthSeconds)
{
while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
{
consumeFinishedScheduledThreadEntries();
m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry()); if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
m_aEntries[i]->deleteBufferFile(); {
delete m_aEntries[i]; const TimeValue aTimeValue(0, 100000 * nWaitTimeInTenthSeconds);
osl_waitThread(&aTimeValue);
}
} }
}
void ZipOutputStream::finish()
throw(IOException, RuntimeException)
{
assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
// Wait for all threads to finish & write
m_rSharedThreadPool.waitUntilEmpty();
// consume all processed entries
consumeAllScheduledThreadEntries();
sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition()); sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
for (size_t i = 0; i < m_aZipList.size(); i++) for (size_t i = 0; i < m_aZipList.size(); i++)
......
...@@ -54,6 +54,7 @@ ...@@ -54,6 +54,7 @@
#include <rtl/random.h> #include <rtl/random.h>
#include <PackageConstants.hxx> #include <PackageConstants.hxx>
#include <thread>
using namespace com::sun::star::packages::zip::ZipConstants; using namespace com::sun::star::packages::zip::ZipConstants;
using namespace com::sun::star::packages::zip; using namespace com::sun::star::packages::zip;
...@@ -478,6 +479,7 @@ private: ...@@ -478,6 +479,7 @@ private:
deflateZipEntry(mpEntry, mxInStream); deflateZipEntry(mpEntry, mxInStream);
mxInStream.clear(); mxInStream.clear();
mpEntry->closeBufferFile(); mpEntry->closeBufferFile();
mpEntry->setFinished();
} }
catch (const uno::Exception&) catch (const uno::Exception&)
{ {
...@@ -824,6 +826,13 @@ bool ZipPackageStream::saveChild( ...@@ -824,6 +826,13 @@ bool ZipPackageStream::saveChild(
if (bParallelDeflate) if (bParallelDeflate)
{ {
// tdf#93553 limit to a useful amount of threads. Taking number of available
// cores and allow 4-times the amount for having the queue well filled. The
// 2nd pparameter is the time to wait beweeen cleanups in 10th of a second.
// Both values may be added to the configuration settings if needed.
static sal_Int32 nAllowedThreads(std::max(std::thread::hardware_concurrency(), 1U) * 4);
rZipOut.reduceScheduledThreadsToGivenNumberOrLess(nAllowedThreads, 1);
// Start a new thread deflating this zip entry // Start a new thread deflating this zip entry
ZipOutputEntry *pZipEntry = new ZipOutputEntry( ZipOutputEntry *pZipEntry = new ZipOutputEntry(
m_xContext, *pTempEntry, this, bToBeEncrypted); m_xContext, *pTempEntry, this, bToBeEncrypted);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment