Kaydet (Commit) 7cd3f267 authored tarafından Luboš Luňák's avatar Luboš Luňák

split out thread functionality from ZipOutputEntry

It can be easily separated out, it looked like hacked in. And
I will need to do more refactoring of the class, so this shouldn't
be more complex than necessary.

Change-Id: I302da55409e9195274907ca4939c37fbb2427b18
Reviewed-on: https://gerrit.libreoffice.org/73031
Tested-by: Jenkins
Reviewed-by: 's avatarLuboš Luňák <l.lunak@collabora.com>
üst ee22409a
......@@ -45,12 +45,12 @@ public:
~Deflater();
Deflater(sal_Int32 nSetLevel, bool bNowrap);
void setInputSegment( const css::uno::Sequence< sal_Int8 >& rBuffer );
bool needsInput( );
bool needsInput() const;
void finish( );
bool finished( ) { return bFinished;}
bool finished() const { return bFinished;}
sal_Int32 doDeflateSegment( css::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewLength );
sal_Int64 getTotalIn( );
sal_Int64 getTotalOut( );
sal_Int64 getTotalIn() const;
sal_Int64 getTotalOut() const;
void reset( );
void end( );
};
......
......@@ -27,6 +27,7 @@
#include <com/sun/star/xml/crypto/XDigestContext.hpp>
#include <package/Deflater.hxx>
#include <comphelper/threadpool.hxx>
#include "CRC32.hxx"
#include <atomic>
......@@ -36,25 +37,20 @@ class ZipPackageStream;
class ZipOutputEntry
{
// allow only DeflateThreadTask to change m_bFinished using setFinished()
friend class DeflateThreadTask;
protected:
css::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
ZipUtils::Deflater m_aDeflater;
css::uno::Reference< css::uno::XComponentContext > m_xContext;
OUString m_aTempURL;
css::uno::Reference< css::io::XOutputStream > m_xOutStream;
css::uno::Reference< css::xml::crypto::XCipherContext > m_xCipherContext;
css::uno::Reference< css::xml::crypto::XDigestContext > m_xDigestContext;
std::exception_ptr m_aParallelDeflateException;
CRC32 m_aCRC;
ZipEntry *m_pCurrentEntry;
sal_Int16 m_nDigested;
ZipPackageStream* m_pCurrentStream;
bool const m_bEncryptCurrentEntry;
std::atomic<bool> m_bFinished;
public:
ZipOutputEntry(
......@@ -62,32 +58,49 @@ public:
const css::uno::Reference< css::uno::XComponentContext >& rxContext,
ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);
~ZipOutputEntry();
ZipEntry* getZipEntry() { return m_pCurrentEntry; }
ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
bool isEncrypt() { return m_bEncryptCurrentEntry; }
/* This block of methods is for threaded zipping, where we compress to a temp stream, whose
data is retrieved via getData */
void closeEntry();
void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream);
void write(const css::uno::Sequence< sal_Int8 >& rBuffer);
protected:
ZipOutputEntry(
const css::uno::Reference< css::io::XOutputStream >& rxOutStream,
const css::uno::Reference< css::uno::XComponentContext >& rxContext,
ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream);
void doDeflate();
};
// Class that runs the compression in a thread.
class ZipOutputEntryInThread : public ZipOutputEntry
{
class Task;
OUString m_aTempURL;
std::exception_ptr m_aParallelDeflateException;
std::atomic<bool> m_bFinished;
public:
ZipOutputEntryInThread(
const css::uno::Reference< css::uno::XComponentContext >& rxContext,
ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);
std::unique_ptr<comphelper::ThreadTask> createTask(
const std::shared_ptr<comphelper::ThreadTaskTag>& pTag,
const css::uno::Reference< css::io::XInputStream >& xInStream );
/* This block of methods is for threaded zipping, where we compress to a temp stream, whose
data is retrieved via getData */
void createBufferFile();
void setParallelDeflateException(const std::exception_ptr& exception) { m_aParallelDeflateException = exception; }
css::uno::Reference< css::io::XInputStream > getData() const;
const std::exception_ptr& getParallelDeflateException() const { return m_aParallelDeflateException; }
void closeBufferFile();
void deleteBufferFile();
ZipEntry* getZipEntry() { return m_pCurrentEntry; }
ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
bool isEncrypt() { return m_bEncryptCurrentEntry; }
void closeEntry();
void write(const css::uno::Sequence< sal_Int8 >& rBuffer);
bool isFinished() const { return m_bFinished; }
private:
void setFinished() { m_bFinished = true; }
void doDeflate();
};
#endif
......
......@@ -29,6 +29,7 @@
struct ZipEntry;
class ZipOutputEntry;
class ZipOutputEntryInThread;
class ZipPackageStream;
class ZipOutputStream
......@@ -39,7 +40,7 @@ class ZipOutputStream
ByteChucker m_aChucker;
ZipEntry *m_pCurrentEntry;
std::vector< ZipOutputEntry* > m_aEntries;
std::vector< ZipOutputEntryInThread* > m_aEntries;
std::exception_ptr m_aDeflateException;
public:
......@@ -47,7 +48,7 @@ public:
const css::uno::Reference< css::io::XOutputStream > &xOStream );
~ZipOutputStream();
void addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask );
void addDeflatingThreadTask( ZipOutputEntryInThread *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask );
/// @throws css::io::IOException
/// @throws css::uno::RuntimeException
......@@ -79,7 +80,7 @@ private:
void writeEXT( const ZipEntry &rEntry );
// ScheduledThread handling helpers
void consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate);
void consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread> pCandidate);
void consumeFinishedScheduledThreadTaskEntries();
public:
......
......@@ -100,7 +100,7 @@ void Deflater::setInputSegment( const uno::Sequence< sal_Int8 >& rBuffer )
nLength = rBuffer.getLength();
}
bool Deflater::needsInput( )
bool Deflater::needsInput() const
{
return nLength <=0;
}
......@@ -113,11 +113,11 @@ sal_Int32 Deflater::doDeflateSegment( uno::Sequence< sal_Int8 >& rBuffer, sal_In
OSL_ASSERT( !(nNewLength < 0 || nNewLength > rBuffer.getLength()));
return doDeflateBytes(rBuffer, /*nNewOffset*/0, nNewLength);
}
sal_Int64 Deflater::getTotalIn( )
sal_Int64 Deflater::getTotalIn() const
{
return pStream->total_in; // FIXME64: zlib doesn't look 64bit clean here
}
sal_Int64 Deflater::getTotalOut( )
sal_Int64 Deflater::getTotalOut() const
{
return pStream->total_out; // FIXME64: zlib doesn't look 64bit clean here
}
......
......@@ -46,7 +46,8 @@ ZipOutputEntry::ZipOutputEntry(
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
ZipPackageStream* pStream,
bool bEncrypt)
bool bEncrypt,
bool checkStream)
: m_aDeflateBuffer(n_ConstBufferSize)
, m_aDeflater(DEFAULT_COMPRESSION, true)
, m_xContext(rxContext)
......@@ -55,10 +56,10 @@ ZipOutputEntry::ZipOutputEntry(
, m_nDigested(0)
, m_pCurrentStream(pStream)
, m_bEncryptCurrentEntry(bEncrypt)
, m_bFinished(false)
{
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
assert(m_xOutStream.is());
(void)checkStream;
assert(!checkStream || m_xOutStream.is());
if (m_bEncryptCurrentEntry)
{
m_xCipherContext = ZipFile::StaticGetCipher( m_xContext, pStream->GetEncryptionData(), true );
......@@ -67,64 +68,13 @@ ZipOutputEntry::ZipOutputEntry(
}
ZipOutputEntry::ZipOutputEntry(
const css::uno::Reference< css::io::XOutputStream >& rxOutput,
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
ZipPackageStream* pStream,
bool bEncrypt)
: m_aDeflateBuffer(n_ConstBufferSize)
, m_aDeflater(DEFAULT_COMPRESSION, true)
, m_xContext(rxContext)
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
, m_pCurrentStream(pStream)
, m_bEncryptCurrentEntry(bEncrypt)
, m_bFinished(false)
: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
{
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
if (m_bEncryptCurrentEntry)
{
m_xCipherContext = ZipFile::StaticGetCipher( m_xContext, pStream->GetEncryptionData(), true );
m_xDigestContext = ZipFile::StaticGetDigestContextForChecksum( m_xContext, pStream->GetEncryptionData() );
}
}
ZipOutputEntry::~ZipOutputEntry()
{
}
void ZipOutputEntry::createBufferFile()
{
assert(!m_xOutStream.is() && m_aTempURL.isEmpty() &&
"should only be called in the threaded mode where there is no existing stream yet");
uno::Reference < beans::XPropertySet > xTempFileProps(
io::TempFile::create(m_xContext),
uno::UNO_QUERY_THROW );
xTempFileProps->setPropertyValue("RemoveFile", uno::makeAny(false));
uno::Any aUrl = xTempFileProps->getPropertyValue( "Uri" );
aUrl >>= m_aTempURL;
assert(!m_aTempURL.isEmpty());
uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
m_xOutStream = xTempAccess->openFileWrite(m_aTempURL);
}
void ZipOutputEntry::closeBufferFile()
{
m_xOutStream->closeOutput();
m_xOutStream.clear();
}
void ZipOutputEntry::deleteBufferFile()
{
assert(!m_xOutStream.is() && !m_aTempURL.isEmpty());
uno::Reference < ucb::XSimpleFileAccess3 > xAccess(ucb::SimpleFileAccess::create(m_xContext));
xAccess->kill(m_aTempURL);
}
uno::Reference< io::XInputStream > ZipOutputEntry::getData() const
{
uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
return xTempAccess->openFileRead(m_aTempURL);
}
void ZipOutputEntry::closeEntry()
......@@ -241,4 +191,114 @@ void ZipOutputEntry::doDeflate()
}
}
ZipOutputEntryInThread::ZipOutputEntryInThread(
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
ZipPackageStream* pStream,
bool bEncrypt)
: ZipOutputEntry( uno::Reference< css::io::XOutputStream >(), rxContext, rEntry, pStream, bEncrypt, false )
, m_bFinished(false)
{
}
void ZipOutputEntryInThread::createBufferFile()
{
assert(!m_xOutStream.is() && m_aTempURL.isEmpty() &&
"should only be called in the threaded mode where there is no existing stream yet");
uno::Reference < beans::XPropertySet > xTempFileProps(
io::TempFile::create(m_xContext),
uno::UNO_QUERY_THROW );
xTempFileProps->setPropertyValue("RemoveFile", uno::makeAny(false));
uno::Any aUrl = xTempFileProps->getPropertyValue( "Uri" );
aUrl >>= m_aTempURL;
assert(!m_aTempURL.isEmpty());
uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
m_xOutStream = xTempAccess->openFileWrite(m_aTempURL);
}
void ZipOutputEntryInThread::closeBufferFile()
{
m_xOutStream->closeOutput();
m_xOutStream.clear();
}
void ZipOutputEntryInThread::deleteBufferFile()
{
assert(!m_xOutStream.is() && !m_aTempURL.isEmpty());
uno::Reference < ucb::XSimpleFileAccess3 > xAccess(ucb::SimpleFileAccess::create(m_xContext));
xAccess->kill(m_aTempURL);
}
uno::Reference< io::XInputStream > ZipOutputEntryInThread::getData() const
{
uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
return xTempAccess->openFileRead(m_aTempURL);
}
class ZipOutputEntryInThread::Task : public comphelper::ThreadTask
{
ZipOutputEntryInThread *mpEntry;
uno::Reference< io::XInputStream > mxInStream;
public:
Task( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntryInThread *pEntry,
const uno::Reference< io::XInputStream >& xInStream )
: comphelper::ThreadTask(pTag)
, mpEntry(pEntry)
, mxInStream(xInStream)
{}
private:
virtual void doWork() override
{
try
{
mpEntry->createBufferFile();
mpEntry->writeStream(mxInStream);
mxInStream.clear();
mpEntry->closeBufferFile();
mpEntry->setFinished();
}
catch (...)
{
mpEntry->setParallelDeflateException(std::current_exception());
try
{
if (mpEntry->m_xOutStream.is())
mpEntry->closeBufferFile();
if (!mpEntry->m_aTempURL.isEmpty())
mpEntry->deleteBufferFile();
}
catch (uno::Exception const&)
{
}
mpEntry->setFinished();
}
}
};
std::unique_ptr<comphelper::ThreadTask> ZipOutputEntryInThread::createTask(
const std::shared_ptr<comphelper::ThreadTaskTag>& pTag,
const uno::Reference< io::XInputStream >& xInStream )
{
return std::make_unique<Task>(pTag, this, xInStream);
}
void ZipOutputEntry::writeStream(const uno::Reference< io::XInputStream >& xInStream)
{
sal_Int32 nLength = 0;
uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
do
{
nLength = xInStream->readBytes(aSeq, n_ConstBufferSize);
if (nLength != n_ConstBufferSize)
aSeq.realloc(nLength);
write(aSeq);
}
while (nLength == n_ConstBufferSize);
closeEntry();
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
......@@ -68,7 +68,7 @@ void ZipOutputStream::setEntry( ZipEntry *pEntry )
}
}
void ZipOutputStream::addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pTask )
void ZipOutputStream::addDeflatingThreadTask( ZipOutputEntryInThread *pEntry, std::unique_ptr<comphelper::ThreadTask> pTask )
{
comphelper::ThreadPool::getSharedOptimalPool().pushTask(std::move(pTask));
m_aEntries.push_back(pEntry);
......@@ -91,7 +91,7 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
m_pCurrentEntry = nullptr;
}
void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate)
void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread> pCandidate)
{
//Any exceptions thrown in the threads were caught and stored for now
const std::exception_ptr& rCaughtException(pCandidate->getParallelDeflateException());
......@@ -126,13 +126,13 @@ void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputE
void ZipOutputStream::consumeFinishedScheduledThreadTaskEntries()
{
std::vector< ZipOutputEntry* > aNonFinishedEntries;
std::vector< ZipOutputEntryInThread* > aNonFinishedEntries;
for(ZipOutputEntry* pEntry : m_aEntries)
for(ZipOutputEntryInThread* pEntry : m_aEntries)
{
if(pEntry->isFinished())
{
consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pEntry));
consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread>(pEntry));
}
else
{
......@@ -167,9 +167,9 @@ void ZipOutputStream::finish()
// consume all processed entries
while(!m_aEntries.empty())
{
ZipOutputEntry* pCandidate = m_aEntries.back();
ZipOutputEntryInThread* pCandidate = m_aEntries.back();
m_aEntries.pop_back();
consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pCandidate));
consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread>(pCandidate));
}
sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
......
......@@ -17,7 +17,6 @@
* the License at http://www.apache.org/licenses/LICENSE-2.0 .
*/
#include <memory>
#include <ZipPackageStream.hxx>
#include <com/sun/star/beans/PropertyValue.hpp>
......@@ -431,65 +430,6 @@ bool ZipPackageStream::ParsePackageRawStream()
return true;
}
static void deflateZipEntry(ZipOutputEntry *pZipEntry,
const uno::Reference< io::XInputStream >& xInStream)
{
sal_Int32 nLength = 0;
uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
do
{
nLength = xInStream->readBytes(aSeq, n_ConstBufferSize);
if (nLength != n_ConstBufferSize)
aSeq.realloc(nLength);
pZipEntry->write(aSeq);
}
while (nLength == n_ConstBufferSize);
pZipEntry->closeEntry();
}
class DeflateThreadTask: public comphelper::ThreadTask
{
ZipOutputEntry *mpEntry;
uno::Reference< io::XInputStream > mxInStream;
public:
DeflateThreadTask( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntry *pEntry,
const uno::Reference< io::XInputStream >& xInStream )
: comphelper::ThreadTask(pTag)
, mpEntry(pEntry)
, mxInStream(xInStream)
{}
private:
virtual void doWork() override
{
try
{
mpEntry->createBufferFile();
deflateZipEntry(mpEntry, mxInStream);
mxInStream.clear();
mpEntry->closeBufferFile();
mpEntry->setFinished();
}
catch (...)
{
mpEntry->setParallelDeflateException(std::current_exception());
try
{
if (mpEntry->m_xOutStream.is())
mpEntry->closeBufferFile();
if (!mpEntry->m_aTempURL.isEmpty())
mpEntry->deleteBufferFile();
}
catch (uno::Exception const&)
{
}
mpEntry->setFinished();
}
}
};
static void ImplSetStoredData( ZipEntry & rEntry, uno::Reference< io::XInputStream> const & rStream )
{
// It's very annoying that we have to do this, but lots of zip packages
......@@ -839,16 +779,16 @@ bool ZipPackageStream::saveChild(
rZipOut.reduceScheduledThreadTasksToGivenNumberOrLess(nAllowedTasks);
// Start a new thread task deflating this zip entry
ZipOutputEntry *pZipEntry = new ZipOutputEntry(
ZipOutputEntryInThread *pZipEntry = new ZipOutputEntryInThread(
m_xContext, *pTempEntry, this, bToBeEncrypted);
rZipOut.addDeflatingThreadTask( pZipEntry,
std::make_unique<DeflateThreadTask>(rZipOut.getThreadTaskTag(), pZipEntry, xStream) );
pZipEntry->createTask( rZipOut.getThreadTaskTag(), xStream) );
}
else
{
rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
ZipOutputEntry aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted);
deflateZipEntry(&aZipEntry, xStream);
aZipEntry.writeStream(xStream);
rZipOut.rawCloseEntry(bToBeEncrypted);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment