diff -Nru pbzip2-1.0.5/AUTHORS pbzip2-1.1.1/AUTHORS --- pbzip2-1.0.5/AUTHORS 2009-01-08 21:08:17.000000000 +0000 +++ pbzip2-1.1.1/AUTHORS 2010-04-17 18:37:39.000000000 +0000 @@ -5,6 +5,7 @@ - Using modified producer/consumer threading code from Andrae Muys - Uses libbzip2 by Julian Seward (http://www.bzip.org/) +- Major contributions by Yavor Nikolov Contributions @@ -59,7 +60,129 @@ Reinhard Schiedermeier - support for tar --use-compress-prog=pbzip2 +Elbert Pol - creates and maintains OS/2 packages + +Nico Vrouwe - support for CPU detection on Win32 + +Eduardo Terol - creates and maintains Windows 32bit package + +Nikita Zhuk - creates and maintains Mac OS X Automator action and workflow/service + +Jari Aalto - added long options to man page and -h output, added --loadavg, --read long options + +Scott Emery - ignore fwrite return and pass chown errors in writeFileMetaData if effective uid root + +Steven Chamberlain - code to support throttling compression to prevent memory exhaustion with slow output pipe + +Yavor Nikolov - code to support throttling compression to + prevent memory exhaustion with slow output, cleanup of debug output + - fixed infinite loop on when fileWriter fails to create output file + at start + - allDone renamed to producerDone and added mutex synchronized-access + - Changed fileWriter loop exit condition: now protected from + simultaneous access + - Mutex initialization/disposal refactored + - Throttling loops using thread condition wait + - Fatal error handling refactored + - Removed allDone checks used to signal error (now handled by + handle_error function) + - Prevented dangling threads on switch from Multi to Single threaded + - Inline hint added on a few functions + - Some additional error_handlers placed instead of returns (kill any + dangling threads) + - Cleanup and termination changed in attempt to prevent + signal-handling issues in mulit-threaded environment (still some + problems are observed on signalling e.g. with Ctrl+C) + - Signal-handling in child threads disabled. The goal is to have + single thread only which accepts signals + - Using abort instead of exit on error termination + - Fixed command-line parsing problem (e.g. -m100 -p12 -> 120 CPUs) + (Problem was unterminated strings afer strncpy). + - Signal handlers setup refactored to separate function and + switched from signal to sigaction as per POSIX recommendations + - Added mutexes unlocking before error-termination. + - Termination flag introduced (terminateFlag) to indicate abrupt + termination and facilitate thread finishing in error conditon. + - fileWriter: error_handler instead of exit on write error. + - percentComplete progress printed only if changed. + - signal handling redesigned: using sigwait in separate thread. + - Makefile: -D_POSIX_PTHREAD_SEMANTICS (used in Solaris). + - CHAR_BIT instead of 8 used in a warning message. + - SIGUSR1 signal handling added and used to terminate signal handling + thread. (Resolved issue with pthread_cancel on Windows-Cygwin) + - Fixed wrongly issued exit code 1 instead of 0. + - Corrected some error messages and added a few new ones at signal and + terminator threads join. + - Added support for thread stack size customization (-S# option) + Needs USE_STACKSIZE_CUSTOMIZATION to be defined to enable that option + - Added define of PTHREAD_STACK_MIN if such is not available in + standard headers. + - OutputBuffer usage redesigned as fixed-size circular buffer. Adding + new elements to it refactored as separate function. + - OutputBuffer resizing removed from producer_decompress since now + buffer should be with fixed size. + - Fixed debug print of OutputBuffer now referencing OutputBuffer in + old-style absolute index (in fileWriter and others). + - memstr function implementation simplified (delegated to standard + library function which is doing the same more efficiently). + - Changed some variables from int to size_t to get rid of compiler + warnings (signed + unsigned expressions). + - Sequential processing of input file/pipe/redirect implemented (capsulated + as separate class: BZ2StreamScanner) + - Parallel decompression enabled (now possible with the sequential in) + - Refactored declarations moved to separate header file (pbzip2.h) to + make global definitions available to other source modules + - Progress reporting modified since we don't have number of + blocks up-front with sequential input read (now based on bytes). fileSize + moved as InFileSize global variable for that purpose + - Progress computation in fileWriter moved to QuietMode != 0 + (not needed to do it if we won't print it) + - disposeMemory helper function implemented to ease memory disposal + - Processing functions of threads declared as extern "C" since pthread_t + requires plain "C" calling convention instead of the default "C++" + - pthread_mutex_{lock|unlock} replaced with safe_mutex_{lock|unlock} + where appropriate (to prevent from issues like out of sys mutexes) + - Makefile modified to include the new source files for BZ2StreamScanner + - Makefile refined (library flags specified in LDFLAGS variable) + - Makefile.solaris.sunstudio included as example makefile for Solaris + and SunStudio 12 C++ compiler + - bz2HeaderZero in main initialized to value 0x90 > 127 which is in general + out of char type range. Changed to unsigned along with tmpBuff to avoid + some compiler(e.g. c++0x)/runtime warnings/errors. + - Some thread conditions signalling added on termination requested to ease + termination of blocked on conditions threads + - Other pthread_* calls (signal, wait) migrated to safe_* wrappers to + handle error return codes (and simplify code where already handled) + - Timed pthread cond waits refactored to separate function and moved to + debug sections only; non-timed wait used in non-debug mode. Signalling + consitions to wake threads waiting on these conditions guaranteed. + - memstr function templetized to allow working with other data types but + not only char (e.g. unsigned char) + - safe_cond_broadcast implemented and additional signalling added at + fileWriter end to prevent consumers blocking at end. + - Signal error when the input file doesn't contain any bzip2 headers. + - Fixed problems with not-handling zero-file length special header on compression + and decompression. + - Signalling error on stdin decompression when file doesn't start with + correct bzip2 magic header. + - Implemented outputBufferInit(size_t size) utility function for output + buffer initialization/resetting. + - Plain C headers moved to extern "C" section. + - Modified file-names handling to avoid issues with file-sizes > 2040 + - Fixed out of array pointer for OutFilename in strncasecmp calls + - A few other minor modifications + - consumer_decompress using low-level API now to improve performance of + long bzip2 streams + - Fixed issue in safe_cond_timed_wait which caused segmentation fault + when compiled in DEBUG mode + - Handle decompression of very long bz2 streams incrementally instead of + loading whole streams in memory at once + - Progress calculation changed: fixed issue when large file support is + disabled and enabled monitoring of segmented long bzip2 streams + - Fixed issue with Sun Studio compiler - required explicit declaration + of static const members in .cpp. + Specials thanks for suggestions and testing ------------------------------------------- -Phillippe Welsh, James Terhune, Dru Lemley, Bryan Stillwell, George Chalissery, Kir Kolyshkin, Madhu Kangara, Mike Furr, Joergen Ramskov, Kurt Fitzner, Peter Cordes, Oliver Falk, Jindrich Novy, Benjamin Reed, Chris Dearman, Richard Russon, Aníbal Monsalve Salazar, Jim Leonard, Paul Pluzhniko, Robert Archard, Coran Fisher, Ken Takusagawa, David Pyke, Matt Turner, Damien Ancelin, Álvaro Reguly, Ivan Voras, John Dalton, Sami Liedes, Rene Georgi, René Rhéaume, Jeroen Roovers, Reinhard Schiedermeier, Kari Pahula, Elbert Pol. +Phillippe Welsh, James Terhune, Dru Lemley, Bryan Stillwell, George Chalissery, Kir Kolyshkin, Madhu Kangara, Mike Furr, Joergen Ramskov, Kurt Fitzner, Peter Cordes, Oliver Falk, Jindrich Novy, Benjamin Reed, Chris Dearman, Richard Russon, Aníbal Monsalve Salazar, Jim Leonard, Paul Pluzhniko, Robert Archard, Coran Fisher, Ken Takusagawa, David Pyke, Matt Turner, Damien Ancelin, Álvaro Reguly, Ivan Voras, John Dalton, Sami Liedes, Rene Georgi, René Rhéaume, Jeroen Roovers, Reinhard Schiedermeier, Kari Pahula, Elbert Pol, Nico Vrouwe, Eduardo Terol, Samuel Thibault, Michael Fuereder, Jari Aalto, Scott Emery, Steven Chamberlain, Yavor Nikolov, Nikita Zhuk, Joao Seabra, Conn Clark, Mark A. Haun, Tim Bielawa, Michal Gorny, Mikolaj Habdank, Christian Kujau, Marc-Christian Petersen, Piero Ottuzzi, Ephraim Ofir. diff -Nru pbzip2-1.0.5/BZ2StreamScanner.cpp pbzip2-1.1.1/BZ2StreamScanner.cpp --- pbzip2-1.0.5/BZ2StreamScanner.cpp 1970-01-01 00:00:00.000000000 +0000 +++ pbzip2-1.1.1/BZ2StreamScanner.cpp 2010-04-17 18:37:39.000000000 +0000 @@ -0,0 +1,703 @@ +/* + * File: BZ2StreamScanner.cpp + * Author: Yavor Nikolov + * + * Created on March 6, 2010, 10:07 PM + */ + +#include "pbzip2.h" +#include "BZ2StreamScanner.h" + +#include +#include +#include +#include + +#include +#include + +using namespace std; + +namespace pbzip2 +{ + +const size_t BZ2StreamScanner::DEFAULT_OUT_BUFF_LIMIT; + +BZ2StreamScanner::BZ2StreamScanner( int hInFile, size_t inBuffCapacity ): + _inBuff( NULL ), + _inBuffCapacity( 0 ) +{ + _outBuff.buf = NULL; + _outBuff.bufSize = 0; + + init( hInFile, inBuffCapacity ); +} + +/** + * Initialize - position to beginning of input file and prepare for searching. + * + * @return 0 - on success; -1 on error. + */ +int BZ2StreamScanner::init( int hInFile, size_t inBuffCapacity ) +{ + dispose(); + + CharType bz2header[] = "BZh91AY&SY"; + // zero-terminated string + CharType bz2ZeroHeader[] = + { 'B', 'Z', 'h', '9', 0x17, 0x72, 0x45, 0x38, 0x50, 0x90, 0 }; + + _hInFile = hInFile; + _eof = false; + _bz2Header = bz2header; + _bz2HeaderZero = bz2ZeroHeader; + _bz2HeaderFound = false; + _inBuffCapacity = 0; + _errState = 0; + _searchStatus = false; + _outBuffCapacityHint = 0; + _outBuffCapacityLimit = DEFAULT_OUT_BUFF_LIMIT; + _outSequenceNumber = 0; + _streamNumber = 0; + + // Prevent too small buffer + if ( inBuffCapacity < 2 * _bz2Header.size() ) + { + inBuffCapacity = 2 * _bz2Header.size(); + } + + // allocate memory to read in file + _inBuff = new(std::nothrow) CharType[inBuffCapacity]; + + if ( _inBuff == NULL ) + { + _errState |= ERR_MEM_ALLOC_INBUFF; + _inBuffEnd = NULL; + handle_error( EF_EXIT, -1, + "pbzip2: *ERROR: Could not allocate memory (FileData)! Aborting...\n" ); + + return -1; + } + + _inBuffCapacity = inBuffCapacity; + + _inBuffCurrent = _inBuffSearchPtr = _inBuffEnd = _inBuff; + + return 0; +} + +/** + * dispose memory resources + */ +void BZ2StreamScanner::dispose() +{ + disposeMemory( _outBuff.buf ); + _outBuff.bufSize = 0; + + disposeMemory( _inBuff ); + _inBuffCapacity = 0; + + // close( _hInFile ); +} + +BZ2StreamScanner::~BZ2StreamScanner() +{ + dispose(); +} + +/** + * Verify if there is enough space in output buffer. If not - then allocate. + */ +int BZ2StreamScanner::ensureOutBuffCapacity( size_t newSize ) +{ + #ifdef PBZIP_DEBUG + fprintf( stderr, " start ensureOutBuffCapacity/newSize=%u: [", newSize ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + if ( newSize <= _outBuffCapacity ) + { + return 0; // enough capacity already + } + + if ( newSize > _outBuffCapacityHint ) + { + _outBuffCapacityHint = ( 11 * newSize ) / 10; + + if ( ( newSize <= getOutBuffCapacityLimit() ) && + ( _outBuffCapacityHint > getOutBuffCapacityLimit() ) ) + { + _outBuffCapacityHint = getOutBuffCapacityLimit(); + } + } + + char * newBuff = new(std::nothrow) char[_outBuffCapacityHint]; + if ( newBuff == NULL ) + { + handle_error( EF_EXIT, -1, + "pbzip2: *ERROR: Could not allocate memory (ensureOutBuffCapacity/%u)!" + "Aborting...\n", _outBuffCapacityHint ); + + _errState |= ERR_MEM_ALLOC_OUTBUFF; + return -1; + } + + if ( _outBuff.buf != NULL ) + { + memcpy( newBuff, _outBuff.buf, _outBuff.bufSize ); + delete [] _outBuff.buf; + } + + initOutBuff( newBuff, _outBuff.bufSize, _outBuffCapacityHint ); + + #ifdef PBZIP_DEBUG + fprintf( stderr, " end ensureOutBuffCapacity/newSize=%u: [", newSize ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + return 0; +} + +/** + * Depending on wether we have already found bz2 header or not - either append + * data to output buffer or discard it. + * On append [current, end) is appended to output buffer. Output buffer is + * extended if there is not enough existing space available in it. + * + * @return the number of bytes appended to output buff or skipped. -1 on error. + */ +int BZ2StreamScanner::appendOutBuffData(CharType * end) +{ + int additionSize = end - getInBuffCurrent(); + + #ifdef PBZIP_DEBUG + fprintf( stderr, " start appendOutBuffData/%d: [", additionSize ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + if ( additionSize <= 0 ) + { + return 0; + } + + if ( isBz2HeaderFound() ) + { + size_t newSize = _outBuff.bufSize + additionSize; + + if ( ensureOutBuffCapacity( newSize ) != 0 ) + { + return - 1; // failure encountered + } + + memcpy( getOutBuffEnd(), getInBuffCurrent(), additionSize ); + _outBuff.bufSize += additionSize; + } + + // slide current position + _inBuffCurrent = end; + + #ifdef PBZIP_DEBUG + fprintf( stderr, " end appendOutBuffData/%d: [", additionSize ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + return additionSize; +} + +/** + * Append available data from [current, search pos) to output buffer but + * just up to fill current out buffer capacity + */ +int BZ2StreamScanner::appendOutBuffDataUpToLimit() +{ + size_t maxCapacity = std::max( getOutBuffCapacityLimit(), _outBuffCapacity ); + int maxAddition = maxCapacity - _outBuff.bufSize; + if (maxAddition <= 0 ) + { + return 0; + } + + CharType * end1; + if ( eof() ) + { + end1 = getInBuffEnd(); + } + else + { + // subtract header size to keep the tail (since start of next header may be in it) + end1 = std::min( getInBuffSearchPtr(), getInBuffEnd() - ( getHeaderSize() - 1 ) ); + } + CharType * end2 = getInBuffCurrent() + maxAddition; + CharType * end = std::min( end1, end2 ); + + return appendOutBuffData( end ); +} + +/** + * Copy end section of input buffer to beginning just in case the BZIP2 header + * is located between two buffer boundaries. Copy the other remaining + * data into output buffer. + */ +int BZ2StreamScanner::rewindInBuff() +{ + // temporarily mark tail beginning (not real header position) + _inBuffSearchPtr = getInBuffEnd() - ( _bz2Header.size() - 1 ); + int ret = appendOutBuffData(); + if ( failed() || ( ret < 0 ) ) + { + return -1; + } + else if ( ret == 0 ) + { + // search pos <= current + _inBuffSearchPtr = getInBuffCurrent(); + } + + int tailSize = getInBuffEnd() - getInBuffSearchPtr(); + + #ifdef PBZIP_DEBUG + fprintf( stderr, " rewindInBuff: tail len: %d; app ret=%d [", tailSize, ret ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + // copy tail of input buffer to start and cut the rest + std::copy( getInBuffSearchPtr(), getInBuffEnd(), getInBuffBegin() ); + _inBuffEnd = getInBuffBegin() + tailSize; + _inBuffCurrent = getInBuffBegin(); + _inBuffSearchPtr = getInBuffBegin(); + + #ifdef PBZIP_DEBUG + fprintf( stderr, " end rewindInBuff: tail len: %d; app ret=%d [", tailSize, ret ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + return 0; +} + +/** + * Load data from file to input buffer. Read untill buffer is full or end of + * file is reached or error is encountered. + * + * Enough additional capacity should be ensured otherwise may return 0 before + * eof. + * + * @return Returns number of read bytes on success; 0 - on end of file; < 0 on error + */ +int BZ2StreamScanner::readData() +{ + rewindInBuff(); + if ( failed() ) + { + return -1; + } + + if ( getInBuffSize() >= getInBuffCapacity() ) + { + handle_error( EF_EXIT, -1, + "pbzip2: *ERROR: BZ2StreamScanner::readData not enough buffer free space!" + " inBuffSize=%u, _inBuffCapacity=%u! Aborting...\n", + getInBuffSize(), getInBuffCapacity() ); + + _errState |= ERR_IO_INSUFFICIENT_BUFF_CAPACITY; + return -1; + } + + int bytesRead = do_read( _hInFile, getInBuffEnd(), + getInBuffCapacity() - getInBuffSize() ); + + #ifdef PBZIP_DEBUG + fprintf( stderr, " readData: %d bytes read\n", bytesRead ); + #endif + + if ( bytesRead > 0 ) + { + _inBuffEnd += bytesRead; + } + else if ( bytesRead < 0 ) + { + handle_error( EF_EXIT, -1, + "pbzip2: *ERROR: Could not read from input file [err=%d]! Aborting...\n", bytesRead ); + + _errState |= ERR_IO_READ; + return bytesRead; + } + else // ( bytesRead == 0 ) + { + _eof = true; + } + + return bytesRead; +} + +/** + * Locate BZh header prefix in buffer. In case of first search - just check + * the beginning of buffer and signal error if it doesn't match to headers. + * + * @return pointer to BZh header prefix if located. getInBuffEnd() if not. + * failure() and getErrState() will indicate error if such occurred. + */ +BZ2StreamScanner::CharType * BZ2StreamScanner::locateHeaderPrefixInBuff() +{ + size_t prefixLen = 3; + + #ifdef PBZIP_DEBUG + fprintf( stderr, " start locateHeaderPrefixInBuff; " ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + // first search + if ( !isBz2HeaderFound() ) + { + if ( ( getInBuffSearchPtr() != getInBuffBegin() ) || + ( getInBuffSize() < _bz2Header.size() ) ) + { + _errState |= ERR_INVALID_FILE_FORMAT; + _inBuffSearchPtr = getInBuffEnd(); + } + else if ( _bz2Header.compare( 0, prefixLen, getInBuffSearchPtr(), prefixLen ) == 0 ) + { + // header prefix found + } + else + { + _errState |= ERR_INVALID_FILE_FORMAT; + _inBuffSearchPtr = getInBuffEnd(); + } + } + else + { + _inBuffSearchPtr = std::search( getInBuffSearchPtr(), getInBuffEnd(), + _bz2Header.begin(), _bz2Header.begin() + prefixLen ); + } + + #ifdef PBZIP_DEBUG + if ( getInBuffSearchPtr() != getInBuffEnd() ) + { + fprintf( stderr, " end locateHeaderPrefixInBuff - header prefix found; " ); + } + else + { + fprintf( stderr, " end locateHeaderPrefixInBuff - header prefix not found; " ); + } + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + return getInBuffSearchPtr(); +} + + +/** + * Search next bz2 header just in currently available input buffer. + * (Doesn't read more data from file). + * + * @return pointer to header or getInBuffEnd() if such is not found. + */ +BZ2StreamScanner::CharType * BZ2StreamScanner::searchNextHeaderInBuff() +{ + #ifdef PBZIP_DEBUG + fprintf( stderr, " start searchNextHeaderInBuff; " ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + _searchStatus = false; + size_t prefixLen = 3; + size_t hsp = prefixLen + 1; // header selection position + + locateHeaderPrefixInBuff(); + while ( !failed() && ( getUnsearchedCount() >= getHeaderSize() ) ) + { + // _inBuffSearchPtr += prefixLen; + basic_string * pHdr = NULL; + + if ( getInBuffSearchPtr()[hsp] == _bz2Header[hsp] ) + { + pHdr = &_bz2Header; + #ifdef PBZIP_DEBUG + fprintf( stderr, " searchNextHeaderInBuff - kind of NON-ZERO header\n" ); + #endif + } + else if ( getInBuffSearchPtr()[hsp] == _bz2HeaderZero[hsp] ) + { + pHdr = &_bz2HeaderZero; + #ifdef PBZIP_DEBUG + fprintf( stderr, " searchNextHeaderInBuff - kind of ZERO header\n" ); + #endif + } + + if ( pHdr != NULL ) + { + CharType bwtSizeChar = getInBuffSearchPtr()[prefixLen]; + if ( ( bwtSizeChar >= '1' ) && ( bwtSizeChar <= '9' ) ) + { + (*pHdr)[prefixLen] = bwtSizeChar; + + // compare the remaining part of magic header + int cmpres = pHdr->compare( hsp, pHdr->size() - hsp, + getInBuffSearchPtr() + hsp, pHdr->size() - hsp ); + + #ifdef PBZIP_DEBUG + fprintf( stderr, " searchNextHeaderInBuff:cmpres=%d\n", cmpres ); + #endif + if ( cmpres == 0 ) + { + _searchStatus = true; + #ifdef PBZIP_DEBUG + fprintf( stderr, " end searchNextHeaderInBuff - found; " ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + return _inBuffSearchPtr; + } + } + } + + if ( !isBz2HeaderFound() ) + { + // not finding header on first search means failure + _errState |= ERR_INVALID_FILE_FORMAT; + break; + } + else + { + _inBuffSearchPtr += prefixLen; + locateHeaderPrefixInBuff(); + } + } + + // no header has been found if we're here + _inBuffSearchPtr = getInBuffEnd(); + + #ifdef PBZIP_DEBUG + fprintf( stderr, " end searchNextHeaderInBuff; " ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + return _inBuffSearchPtr; +} + + +void BZ2StreamScanner::printCurrentState() +{ + fprintf( stderr, "current=%d, search pos=%d, end pos=%d; s-c=%d" + "; out buf size=%d; out buf capacity=%d; header found=%d; search status=%d", + getInBuffCurrent() - getInBuffBegin(), + getInBuffSearchPtr() - getInBuffBegin(), + getInBuffEnd() - getInBuffBegin(), + getInBuffSearchPtr() - getInBuffCurrent(), + (int)_outBuff.bufSize, + (int)_outBuffCapacity, + (int)isBz2HeaderFound(), + (int)getSearchStatus() ); +} + +/** + * Search next bz2 header. Read more data from file if needed. + * + * @return pointer to header is returned if found; + * getInBuffEnd() - if not found (or error). + * One should check failure() or _errorState for error details. + */ +BZ2StreamScanner::CharType * BZ2StreamScanner::searchNextHeader() +{ + #ifdef PBZIP_DEBUG + fprintf( stderr, " start searchNextHeader %u/%u... : ", + getInBuffSearchPtr() - getInBuffBegin(), getInBuffSize() ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + if ( getUnsearchedCount() > 0 ) + { + searchNextHeaderInBuff(); + } + + while ( !getSearchStatus() && !eof() && !failed() && !isOutBuffFullEnough() ) + { + readData(); + + if ( failed() ) + { + return getInBuffEnd(); + } + + searchNextHeaderInBuff(); + } + + if ( getSearchStatus() ) + { + _bz2HeaderFound = true; + + #ifdef PBZIP_DEBUG + fprintf( stderr, " header found; " ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + } + + if ( failed() ) + { + return _inBuffSearchPtr = getInBuffEnd(); + } + + #ifdef PBZIP_DEBUG + fprintf( stderr, " end searchNextHeader %u/%u... NOT FOUND: ", + getInBuffSearchPtr() - getInBuffBegin(), getInBuffSize() ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + return _inBuffSearchPtr; +} + +/** + * Get next BZ2 stream from the input. + * + * @return output buffer initialized with bz2 stream. failure() should be checked + * after calling this method - true would mean failure(). If failure() is false: + * - outBuff.bufSize == 0 indicates end of file; + */ +outBuff * BZ2StreamScanner::getNextStream() +{ + initOutBuff(); + + #ifdef PBZIP_DEBUG + static OFF_T blockNum = 0; + #endif + + outBuff * res = new(std::nothrow) outBuff; + if ( res == NULL ) + { + handle_error( EF_EXIT, -1, + "pbzip2: *ERROR: Could not allocate memory (getNextStream/%u)!" + "Aborting...\n", (unsigned) sizeof( outBuff ) ); + + _errState |= ERR_MEM_ALLOC_OUTBUFF; + return res; + } + + res->buf = NULL; + res->bufSize = std::numeric_limits::max(); + + // first search + if ( !failed() && !isBz2HeaderFound() ) + { + #ifdef PBZIP_DEBUG + blockNum = 0; + fprintf( stderr, " First search start\n" ); + #endif + + _searchStatus = false; + searchNextHeader(); + } + + if ( failed() ) + { + return res; + } + + if ( ( getInBuffCurrent() == getInBuffEnd() ) && eof() ) + { + // end of file + #ifdef PBZIP_DEBUG + fprintf( stderr, " End of file\n" ); + #endif + + res->bufSize = 0; + return res; + } + + if ( ( getInBuffCurrent() == getInBuffSearchPtr() ) || + ( !getSearchStatus() && !eof() ) ) + { + // search for next header + // Slide a bit to skip current header in order to search for next one. + _inBuffSearchPtr = std::min( getInBuffSearchPtr() + _bz2Header.size(), + getInBuffEnd() ); + _searchStatus = false; + + #ifdef PBZIP_DEBUG + fprintf( stderr, " getNextStream - Searching subsequent header... " ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + searchNextHeader(); + } + + if ( failed() ) + { + return res; + } + + appendOutBuffDataUpToLimit(); + + if ( failed() ) + { + return res; + } + + if ( _outSequenceNumber > 0 ) + { + // continuing an unterminated sequence + ++_outSequenceNumber; + } + else if ( getInBuffCurrent() != getInBuffSearchPtr() ) + { + // start of long multi-part stream + _outSequenceNumber = 1; + } + + _outBuff.sequenceNumber = _outSequenceNumber; + _outBuff.inSize = _outBuff.bufSize; + _outBuff.blockNumber = _streamNumber; + + if ( getInBuffCurrent() == getInBuffSearchPtr() ) + { + // we're at end of stream (either single or multi-segment one) + _outBuff.isLastInSequence = true; + _outSequenceNumber = 0; + ++_streamNumber; + } + else + { + _outBuff.isLastInSequence = false; + } + + + #ifdef PBZIP_DEBUG + OFF_T startBlock = blockNum; + blockNum += _outBuff.bufSize; + + fprintf( stderr, " end getNextStream/blockRange=[%"PRIu64", %"PRIu64"), stream no=%d; seq=%d: [", + startBlock, blockNum, _outBuff.blockNumber, _outBuff.sequenceNumber ); + printCurrentState(); + fprintf( stderr, "\n" ); + #endif + + *res = _outBuff; + // clean-up pointers to returned data. + initOutBuff(); + + return res; +} + +void BZ2StreamScanner::initOutBuff( char * buf, size_t bufSize, size_t bufCapacity ) +{ + _outBuff.buf = buf; + _outBuff.bufSize = bufSize; + _outBuffCapacity = bufCapacity; + _outBuff.inSize = 0; +} + +} // namespace pbzip2 + diff -Nru pbzip2-1.0.5/BZ2StreamScanner.h pbzip2-1.1.1/BZ2StreamScanner.h --- pbzip2-1.0.5/BZ2StreamScanner.h 1970-01-01 00:00:00.000000000 +0000 +++ pbzip2-1.1.1/BZ2StreamScanner.h 2010-04-17 18:37:39.000000000 +0000 @@ -0,0 +1,151 @@ +/* + * File: BZ2StreamScanner.h + * Author: Yavor Nikolov + * + * Created on March 6, 2010, 10:07 PM + */ + +#ifndef _BZ2STREAMSCANNER_H +#define _BZ2STREAMSCANNER_H + +#include "pbzip2.h" +#include +#include + +using namespace std; + +namespace pbzip2 +{ + +class BZ2StreamScanner +{ +public: + typedef unsigned char CharType; + + static const size_t DEFAULT_IN_BUFF_CAPACITY = 1024 * 1024; // 1M + static const size_t DEFAULT_OUT_BUFF_LIMIT = 1024 * 1024; + + enum BZ2SScannerErrorFlag + { + ERR_MEM_ALLOC_INBUFF = 1, + ERR_MEM_ALLOC_OUTBUFF = 1 << 1, + ERR_IO_READ = 1 << 2, + ERR_IO_INSUFFICIENT_BUFF_CAPACITY = 1 << 3, + ERR_INVALID_STATE = 1 << 4, + ERR_INVALID_FILE_FORMAT = 1 << 5 + }; + + BZ2StreamScanner( int hInFile, size_t inBuffCapacity = DEFAULT_IN_BUFF_CAPACITY ); + int init( int hInFile, size_t inBuffCapacity = DEFAULT_IN_BUFF_CAPACITY ); + + virtual ~BZ2StreamScanner(); + + outBuff * getNextStream(); + + size_t getInBuffSize() const { return ( _inBuffEnd - _inBuff ); } + size_t getInBuffCapacity() const { return _inBuffCapacity; } + const basic_string & getHeader() const { return _bz2Header; } + size_t getHeaderSize() const { return _bz2Header.size(); } + int getErrState() const { return _errState; } + bool failed() { return ( _errState != 0 ); } + + /** true if header has been found since last initialization */ + bool isBz2HeaderFound() const { return _bz2HeaderFound; } + + /** status of last/current search only */ + bool getSearchStatus() const { return _searchStatus; } + + // end of file + bool eof() const { return _eof; } + + /** true if out buffer is full enough to produce output block */ + bool isOutBuffFullEnough() const { return _outBuff.bufSize >= getOutBuffCapacityLimit(); } + + /** + * dispose memory resources + */ + virtual void dispose(); + + void printCurrentState(); + +private: + /* disable copy c-tor */ + BZ2StreamScanner( const BZ2StreamScanner& orig ) {} + + void initOutBuff( char * buf = NULL, size_t bufSize = 0, size_t bufCapacity = 0 ); + int appendOutBuffData( CharType * end ); + int appendOutBuffData() { return appendOutBuffData( getInBuffSearchPtr() ); } + int appendOutBuffDataUpToLimit(); + int ensureOutBuffCapacity( size_t newSize ); + int readData(); + + CharType * getInBuffEnd() { return _inBuffEnd; } + CharType * getInBuffBegin() { return _inBuff; } + CharType * getInBuffCurrent() { return _inBuffCurrent; } + CharType * getInBuffSearchPtr() { return _inBuffSearchPtr; } + char * getOutBuffEnd() { return _outBuff.buf + _outBuff.bufSize; } + size_t getUnsearchedCount() const { return _inBuffEnd - _inBuffSearchPtr; } + + /** + * Search next bz2 header. Read more data from file if needed. + * + * @return pointer to header is returned if found; + * getInBuffEnd() - if not found; NULL - on error. + */ + CharType * searchNextHeader(); + + /** + * Search next bz2 header just in currently available input buffer. + * (Doesn't read more data from file). + * + * @return pointer to header or getInBuffEnd() if such is not found. + */ + CharType * searchNextHeaderInBuff(); + + /** + * Prepare for next read from file into input buffer. + * Consumes remaining input data buffer and moves header tail to beginning. + * + */ + int rewindInBuff(); + + /** + * Locate BZh header prefix in buffer. In case of first search - just check + * the beginning of buffer and signal error if it doesn't match to headers. + * + * @return pointer to BZh header prefix if located. getInBuffEnd() if not. + * failure() and getErrState() will indicate error if such occurred. + */ + CharType * locateHeaderPrefixInBuff(); + + size_t getOutBuffCapacityLimit() const { return _outBuffCapacityLimit; } + + int _hInFile; // input file descriptor + bool _eof; + + basic_string _bz2Header; + basic_string _bz2HeaderZero; + bool _bz2HeaderFound; + bool _searchStatus; + + CharType * _inBuff; + CharType * _inBuffEnd; // end of data read from file + CharType * _inBuffCurrent; + CharType * _inBuffSearchPtr; + + size_t _inBuffCapacity; // allocated memory capacity for in buffer + + outBuff _outBuff; + size_t _outBuffCapacity; + size_t _outBuffCapacityHint; // keep max used capacity + size_t _outBuffCapacityLimit; + + unsigned int _errState; // 0 - ok; otherwise error + int _outSequenceNumber; // output block sequence number in bz2 stream (>0 if segmented) + int _streamNumber; +}; + +} + +#endif /* _BZ2STREAMSCANNER_H */ + diff -Nru pbzip2-1.0.5/ChangeLog pbzip2-1.1.1/ChangeLog --- pbzip2-1.0.5/ChangeLog 2009-01-08 21:08:17.000000000 +0000 +++ pbzip2-1.1.1/ChangeLog 2010-04-17 18:37:39.000000000 +0000 @@ -1,3 +1,46 @@ +Changes in 1.1.1 (Apr 17, 2010) +- Modified decompression to use low-level libbz2 API to improve + performance of long bzip2 streams of large single-stream bzip2 + blocks +- This release should now decompress files created with bzip2 + at least as fast as bzip2 or slightly faster +- Handle decompression of long bzip2 streams incrementally + instead of loading whole stream in memory at once +- Fixed issue in safe_cond_timed_wait which caused segmentation + fault when compiled in DEBUG mode +- Fixed issue with Sun Studio compiler - required explicit + declaration of static const members in .cpp + +Changes in 1.1.0 (Mar 13, 2010) +- Added support for multi-threaded decompression using STDIN/pipes +- Added code to support throttling compression to prevent memory + exhaustion with slow output pipe +- Added -m switch to specify max memory usage amount before + throttling starts (default 100MB) +- Fixed bug that did not allow command line parameters to be used + when compressing data from STDIN +- Added long options to man page and -h output +- Added --loadavg, --read long options +- Added support for CPU detection on Win32 +- Major improvements to protection of shared variables, error and + signal handling, program termination +- Added -S switch for thread stack size customization + (needs USE_STACKSIZE_CUSTOMIZATION defined when compiling) +- Fixed command line parsing bug for -b, -p, -m switches +- Fixed infinite loop when fileWriter fails to create output + file at start +- Fixed bug that deleted input filename (with .bz2 extension for + compression and without .bz2 extension for decompression) when + a user interrupts process with CTRL-C while outputting to STDOUT +- Fixed bug where 0 byte files were not processed properly when + data input from STDIN +- Ignores fwrite return and passes chown errors in writeFileMetaData + if effective uid root +- OutputBuffer usage redesigned as fixed-size circular buffer +- Lots of minor bugs fixed (see AUTHORS or pbzip2.cpp for full details) +- Special thanks to Yavor Nikolov for providing the majority of + contributions to this release and a significant amount of testing + Changes in 1.0.5 (Jan 08, 2009) - Now properly complains about trying to read or write compressed data to terminal, and exits diff -Nru pbzip2-1.0.5/COPYING pbzip2-1.1.1/COPYING --- pbzip2-1.0.5/COPYING 2009-01-08 21:08:17.000000000 +0000 +++ pbzip2-1.1.1/COPYING 2010-04-17 18:37:39.000000000 +0000 @@ -1,4 +1,4 @@ -This program, "pbzip2" is copyright (C) 2003-2009 Jeff Gilchrist. +This program, "pbzip2" is copyright (C) 2003-2010 Jeff Gilchrist. All rights reserved. The library "libbzip2" which pbzip2 uses, is copyright @@ -37,4 +37,4 @@ Jeff Gilchrist, Ottawa, Canada. pbzip2@compression.ca -pbzip2 version 1.0.5 of January 8, 2009 +pbzip2 version 1.1.1 of April 17, 2010 diff -Nru pbzip2-1.0.5/debian/changelog pbzip2-1.1.1/debian/changelog --- pbzip2-1.0.5/debian/changelog 2010-10-29 19:36:27.000000000 +0000 +++ pbzip2-1.1.1/debian/changelog 2010-10-29 19:36:27.000000000 +0000 @@ -1,3 +1,9 @@ +pbzip2 (1.1.1-1~ppa1~hardyubuntu1) hardy; urgency=low + + * Upgrade to v1.1.1 + + -- Mark Foster Fri, 29 Oct 2010 12:10:39 -0700 + pbzip2 (1.0.5-1~ppa1~hardy) hardy; urgency=low * Backport to hardy for new features (compress from stdin mostly) diff -Nru pbzip2-1.0.5/Makefile pbzip2-1.1.1/Makefile --- pbzip2-1.0.5/Makefile 2009-01-08 21:08:17.000000000 +0000 +++ pbzip2-1.1.1/Makefile 2010-04-17 18:37:39.000000000 +0000 @@ -4,28 +4,48 @@ # Compiler to use CC = g++ CFLAGS = -O2 +#CFLAGS += -g -Wall +#CFLAGS += -ansi +#CFLAGS += -pedantic +#CFLAGS += -std=c++0x # Comment out CFLAGS line below for compatability mode for 32bit file sizes # (less than 2GB) and systems that have compilers that treat int as 64bit # natively (ie: modern AIX) -CFLAGS += -D_LARGEFILE64_SOURCE -D_FILE_OFFSET_BITS=64 +CFLAGS += -D_LARGEFILE64_SOURCE -D_FILE_OFFSET_BITS=64 # Uncomment CFLAGS line below if you want to compile pbzip2 without load # average support for systems that do not support it #CFLAGS += -DPBZIP_NO_LOADAVG +# Uncomment CFLAGS line below to get debug output +#CFLAGS += -DPBZIP_DEBUG + +# Comment out CFLAGS line below to disable pthread semantics in code +CFLAGS += -D_POSIX_PTHREAD_SEMANTICS + +# Comment out CFLAGS line below to disable Thread stack size customization +CFLAGS += -DUSE_STACKSIZE_CUSTOMIZATION + +# On some compilers -pthreads +CFLAGS += -pthread + +# External libraries +LDFLAGS = -lbz2 +LDFLAGS += -lpthread + # Where you want pbzip2 installed when you do 'make install' PREFIX = /usr all: pbzip2 # Standard pbzip2 compile -pbzip2: pbzip2.cpp - $(CC) $(CFLAGS) $^ -o pbzip2 -pthread -lpthread -lbz2 +pbzip2: pbzip2.cpp BZ2StreamScanner.cpp + $(CC) $(CFLAGS) $^ -o pbzip2 $(LDFLAGS) # Choose this if you want to compile in a static version of the libbz2 library -pbzip2-static: pbzip2.cpp libbz2.a - $(CC) $(CFLAGS) $^ -o pbzip2 -pthread -lpthread -I. -L. -lbz2 +pbzip2-static: pbzip2.cpp BZ2StreamScanner.cpp libbz2.a + $(CC) $(CFLAGS) $^ -o pbzip2 -I. -L. $(LDFLAGS) # Install the binary pbzip2 program and man page install: pbzip2 diff -Nru pbzip2-1.0.5/Makefile.solaris.sunstudio pbzip2-1.1.1/Makefile.solaris.sunstudio --- pbzip2-1.0.5/Makefile.solaris.sunstudio 1970-01-01 00:00:00.000000000 +0000 +++ pbzip2-1.1.1/Makefile.solaris.sunstudio 2010-04-17 18:37:39.000000000 +0000 @@ -0,0 +1,61 @@ +# Make file for parallel BZIP2 (customized for Solaris 10 64-bit/SPARC IV+) +SHELL = /bin/sh + +# Compiler to use +CC=CC +CFLAGS = -m64 +CFLAGS += -fast -xO5 + +#CFLAGS += -Wall +#CFLAGS += -g +CFLAGS += -mt -lpthread + +# Comment out CFLAGS line below for compatability mode for 32bit file sizes +# (less than 2GB) and systems that have compilers that treat int as 64bit +# natively (ie: modern AIX) +#CFLAGS += -D_LARGEFILE64_SOURCE -D_FILE_OFFSET_BITS=64 + +# Uncomment CFLAGS line below if you want to compile pbzip2 without load +# average support for systems that do not support it +#CFLAGS += -DPBZIP_NO_LOADAVG + +# Uncomment CFLAGS line below to get debug output +#CFLAGS += -DPBZIP_DEBUG + +# Comment out CFLAGS line below to disable pthread semantics in code +CFLAGS += -D_POSIX_PTHREAD_SEMANTICS + +# Comment out CFLAGS line below to disable Thread stack size customization +CFLAGS += -DUSE_STACKSIZE_CUSTOMIZATION + +# Libraries +LDFLAGS = -lbz2 +LDFLAGS += -lpthread + +# Where you want pbzip2 installed when you do 'make install' +PREFIX = /usr + +all: pbzip2 + +# Standard pbzip2 compile +pbzip2: pbzip2.cpp BZ2StreamScanner.cpp + $(CC) $(CFLAGS) $^ -o pbzip2 ${LDFLAGS} + +# Choose this if you want to compile in a static version of the libbz2 library +pbzip2-static: pbzip2.cpp BZ2StreamScanner.cpp libbz2.a + $(CC) $(CFLAGS) $^ -o pbzip2 -I. -L. $(LDFLAGS) + +# Install the binary pbzip2 program and man page +install: pbzip2 + if ( test ! -d $(PREFIX)/bin ) ; then mkdir -p $(PREFIX)/bin ; fi + if ( test ! -d $(PREFIX)/man ) ; then mkdir -p $(PREFIX)/man ; fi + if ( test ! -d $(PREFIX)/man/man1 ) ; then mkdir -p $(PREFIX)/man/man1 ; fi + cp -f pbzip2 $(PREFIX)/bin/pbzip2 + chmod a+x $(PREFIX)/bin/pbzip2 + ln -s -f $(PREFIX)/bin/pbzip2 $(PREFIX)/bin/pbunzip2 + ln -s -f $(PREFIX)/bin/pbzip2 $(PREFIX)/bin/pbzcat + cp -f pbzip2.1 $(PREFIX)/man/man1 + chmod a+r $(PREFIX)/man/man1/pbzip2.1 + +clean: + rm -f *.o pbzip2 diff -Nru pbzip2-1.0.5/pbzip2.1 pbzip2-1.1.1/pbzip2.1 --- pbzip2-1.0.5/pbzip2.1 2009-01-08 21:08:17.000000000 +0000 +++ pbzip2-1.1.1/pbzip2.1 2010-04-17 18:37:39.000000000 +0000 @@ -1,10 +1,10 @@ .TH pbzip2 1 .SH NAME -pbzip2 \- parallel bzip2 file compressor, v1.0.5 +pbzip2 \- parallel bzip2 file compressor, v1.1.1 .SH SYNOPSIS -.B pbzip2 +.B pbzip2 .RB [ " \-123456789 " ] -.RB [ " \-b#cdfhklp#qrtvVz " ] +.RB [ " \-b#cdfhklm#p#qrS#tvVz " ] [ .I "filenames \&..." ] @@ -12,8 +12,8 @@ .I pbzip2 is a parallel implementation of the bzip2 block-sorting file compressor that uses pthreads and achieves near-linear speedup on SMP -machines. The output of this version is fully compatible with bzip2 -v1.0.2 or newer (ie: anything compressed with +machines. The output of this version is fully compatible with bzip2 +v1.0.2 or newer (ie: anything compressed with .I pbzip2 can be decompressed with bzip2). .PP @@ -25,125 +25,116 @@ The default settings for .I pbzip2 will work well in most cases. The only switch you will likely need to -use is -d to decompress files and -p to set the # of processors for +use is -d to decompress files and -p to set the # of processors for .I pbzip2 to use if autodetect is not supported on your system, or you want to use a specific # of CPUs. .SH OPTIONS .TP .B \-b# -Where # is the file block size in 100k (default 9 = 900k) +Where # is block size in 100k steps (default 9 = 900k) .TP -.B \-c +.B \-c, \-\-stdout Output to standard out (stdout) .TP -.B \-d +.B \-d,\-\-decompress Decompress file .TP -.B \-f +.B \-f,\-\-force Force, overwrite existing output file .TP -.B \-h +.B \-h,\-\-help Print this help message .TP -.B \-k +.B \-k,\-\-keep Keep input file, do not delete .TP -.B \-l +.B \-l,\-\-loadavg Load average determines max number processors to use .TP +.B \-m# +Where # is max memory usage in 1MB steps (default 100 = 100MB) +.TP .B \-p# Where # is the number of processors (default: autodetect) .TP -.B \-q +.B \-q,\-\-quiet Quiet mode (default) .TP -.B \-r +.B \-r,\-\-read Read entire input file into RAM and split between processors .TP -.B \-t +.B \-S# +Child thread stack size in 1KB steps (default stack size if unspecified) +.TP +.B \-t,\-\-test Test compressed file integrity .TP -.B \-v +.B \-v,\-\-verbose Verbose mode .TP .B \-V -Display version info for -.I pbzip2 +Display version info for +.I pbzip2 then exit .TP -.B \-z +.B \-z,\-\-compress Compress file (default) .TP -.B \-1..9 -Set BWT block size to 100k .. 900k (default 900k) +.B \-1,\-\-fast ... \-9,\-\-best +Set BWT block size to 100k .. 900k (default 900k). .SH FILE SIZES -You should be able to compress files larger than 4GB with +You should be able to compress files larger than 4GB with .I pbzip2. .PP -Files that are compressed with -.I pbzip2 +Files that are compressed with +.I pbzip2 are broken up into pieces and -each individual piece is compressed. This is how -.I pbzip2 +each individual piece is compressed. This is how +.I pbzip2 runs faster on multiple CPUs since the pieces can be compressed simultaneously. The final .bz2 file may be slightly larger than if it was compressed with the regular bzip2 program due to this file splitting (usually -less than 0.2% larger). Files that are compressed with -.I pbzip2 -will also gain considerable speedup when decompressed using +less than 0.2% larger). Files that are compressed with +.I pbzip2 +will also gain considerable speedup when decompressed using .I pbzip2. .PP Files that were compressed using bzip2 will not see speedup since bzip2 packages the data into a single chunk that cannot be split -between processors. If you have a large file that was created with -bzip2 (say 1.5GB for example) you will likely not be able to -decompress the file with -.I pbzip2 -since -.I pbzip2 -will try to allocate -1.5GB of memory to decompress it, and that call might fail depending -on your system resources. If the same 1.5GB file had of been -compressed with -.I pbzip2, -it would decompress fine with -.I pbzip2. -If you are unable to decompress a file with -.I pbzip2 -due to its size, use the regular bzip2 instead. +between processors. .SH EXAMPLES Example 1: pbzip2 myfile.tar .PP This example will compress the file "myfile.tar" into the compressed file "myfile.tar.bz2". It will use the autodetected # of processors (or 2 -processors if autodetect not supported) with the default file block size +processors if autodetect not supported) with the default file block size of 900k and default BWT block size of 900k. .PP Example 2: pbzip2 -b15k myfile.tar .PP -This example will compress the file "myfile.tar" into the compressed file -"myfile.tar.bz2". It will use the autodetected # of processors (or 2 -processors if autodetect not supported) with a file block size of 1500k -and a BWT block size of 900k. The file "myfile.tar" will not be deleted +This example will compress the file "myfile.tar" into the compressed file +"myfile.tar.bz2". It will use the autodetected # of processors (or 2 +processors if autodetect not supported) with a file block size of 1500k +and a BWT block size of 900k. The file "myfile.tar" will not be deleted after compression is finished. .PP Example 3: pbzip2 -p4 -r -5 myfile.tar second*.txt .PP -This example will compress the file "myfile.tar" into the compressed file -"myfile.tar.bz2". It will use 4 processors with a BWT block size of 500k. -The file block size will be the size of "myfile.tar" divided by 4 (# of -processors) so that the data will be split evenly among each processor. -This requires you have enough RAM for pbzip2 to read the entire file into -memory for compression. Pbzip2 will then use the same options to compress +This example will compress the file "myfile.tar" into the compressed file +"myfile.tar.bz2". It will use 4 processors with a BWT block size of 500k. +The file block size will be the size of "myfile.tar" divided by 4 (# of +processors) so that the data will be split evenly among each processor. +This requires you have enough RAM for pbzip2 to read the entire file into +memory for compression. Pbzip2 will then use the same options to compress all other files that match the wildcard "second*.txt" in that directory. .PP Example 4: tar cf myfile.tar.bz2 --use-compress-prog=pbzip2 dir_to_compress/ .br Example 4: tar -c directory_to_compress/ | pbzip2 -c > myfile.tar.bz2 .PP -These examples will compress the data being given to pbzip2 via pipe +These examples will compress the data being given to pbzip2 via pipe from TAR into the compressed file "myfile.tar.bz2". It will use the autodetected # of processors (or 2 processors if autodetect not supported) with the default file block size of 900k and default BWT @@ -151,14 +142,19 @@ "directory_to_compress/" directory and passing the data to pbzip2 as it works. .PP -Example 5: pbzip2 -d myfile.tar.bz2 +Example 5: pbzip2 -d -m500 myfile.tar.bz2 .PP -This example will decompress the file "myfile.tar.bz2" into the decompressed -file "myfile.tar". It will use the autodetected # of processors (or 2 -processors if autodetect not supported). The switches -b, -r, and -1..-9 are -not valid for decompression. +This example will decompress the file "myfile.tar.bz2" into the decompressed +file "myfile.tar". It will use the autodetected # of processors (or 2 +processors if autodetect not supported). It will use a maximum of 500MB of +memory for decompression. The switches -b, -r, and -1..-9 are not valid for +decompression. .SH "SEE ALSO" bzip2(1) +gzip(1) +lzip(1) +rzip(1) +zip(1) .SH AUTHOR Jeff Gilchrist diff -Nru pbzip2-1.0.5/pbzip2.cpp pbzip2-1.1.1/pbzip2.cpp --- pbzip2-1.0.5/pbzip2.cpp 2009-01-08 21:08:17.000000000 +0000 +++ pbzip2-1.1.1/pbzip2.cpp 2010-04-17 18:37:39.000000000 +0000 @@ -7,56 +7,185 @@ * - Modified producer/consumer threading code from * Andrae Muys * - uses libbzip2 by Julian Seward (http://sources.redhat.com/bzip2/) + * - Major contributions by Yavor Nikolov * - * Date : January 8, 2009 + * Date : April 17, 2010 * + * TODO + * Known Issues + * - direct decompress: (bzerr == BZ_DATA_ERROR_MAGIC) - on rewrite mode + * is handled as cat which is counter-intuitive (though similar to bzip2 handling). + * - some functions are too-long -> harder to maintain (e.g. main) * * Contributions * ------------- * Bryan Stillwell - code cleanup, RPM spec, prep work - * for inclusion in Fedora Extras + * for inclusion in Fedora Extras * Dru Lemley [http://lemley.net/smp.html] - help with large file support * Kir Kolyshkin - autodetection for # of CPUs * Joergen Ramskov - initial version of man page * Peter Cordes - code cleanup * Kurt Fitzner - port to Windows compilers and - * decompression throttling + * decompression throttling * Oliver Falk - RPM spec update * Jindrich Novy - code cleanup and bug fixes * Benjamin Reed - autodetection for # of CPUs in OSX * Chris Dearman - fixed pthreads race condition * Richard Russon - help fix decompression bug * Paul Pluzhnikov - fixed minor memory leak - * Aníbal Monsalve Salazar - creates and maintains Debian packages + * Aníbal Monsalve Salazar - creates and maintains Debian packages * Steve Christensen - creates and maintains Solaris packages (sunfreeware.com) * Alessio Cervellin - creates and maintains Solaris packages (blastwave.org) * Ying-Chieh Liao - created the FreeBSD port * Andrew Pantyukhin - maintains the FreeBSD ports and willing to - * resolve any FreeBSD-related problems + * resolve any FreeBSD-related problems * Roland Illig - creates and maintains NetBSD packages * Matt Turner - code cleanup - * Álvaro Reguly - RPM spec update to support SUSE Linux + * Ãlvaro Reguly - RPM spec update to support SUSE Linux * Ivan Voras - support for stdin and pipes during compression and - * CPU detect changes + * CPU detect changes * John Dalton - code cleanup and bug fixes for stdin support * Rene Georgi - code and Makefile cleanup, support for direct - * decompress and bzcat - * René Rhéaume & Jeroen Roovers - patch to support uclibc's lack of - * a getloadavg function + * decompress and bzcat + * René Rhéaume & Jeroen Roovers - patch to support uclibc's lack of + * a getloadavg function * Reinhard Schiedermeier - support for tar --use-compress-prog=pbzip2 + * Elbert Pol - creates and maintains OS/2 packages + * Nico Vrouwe - support for CPU detection on Win32 + * Eduardo Terol - creates and maintains Windows 32bit package + * Nikita Zhuk - creates and maintains Mac OS X Automator action and + * workflow/service + * Jari Aalto - Add long options to -h output. + * Add --loadavg, --read long options. + * Scott Emery - ignore fwrite return and pass chown errors in + * writeFileMetaData if effective uid root + * Steven Chamberlain - code to support throttling compression to + * prevent memory exhaustion with slow output + * pipe + * Yavor Nikolov - code to support throttling compression to + * prevent memory exhaustion with slow output, cleanup of debug output + * - fixed infinite loop on when fileWriter fails to create output file + * at start + * - allDone renamed to producerDone and added mutex synchronized-access + * - Changed fileWriter loop exit condition: now protected from + * simultaneous access + * - Mutex initialization/disposal refactored + * - Throttling loops using thread condition wait + * - Fatal error handling refactored + * - Removed allDone checks used to signal error (now handled by + * handle_error function) + * - Prevented dangling threads on switch from Multi to Single threaded + * - Inline hint added on a few functions + * - Some additional error_handlers placed instead of returns (kill any + * dangling threads) + * - Cleanup and termination changed in attempt to prevent + * signal-handling issues in mulit-threaded environment (still some + * problems are observed on signalling e.g. with Ctrl+C) + * - Signal-handling in child threads disabled. The goal is to have + * single thread only which accepts signals + * - Using abort instead of exit on error termination + * - Fixed command-line parsing problem (e.g. -m100 -p12 -> 120 CPUs) + * (Problem was unterminated strings afer strncpy). + * - Signal handlers setup refactored to separate function and + * switched from signal to sigaction as per POSIX recommendations + * - Added mutexes unlocking before error-termination. + * - Termination flag introduced (terminateFlag) to indicate abrupt + * termination and facilitate thread finishing in error conditon. + * - fileWriter: error_handler instead of exit on write error. + * - percentComplete progress printed only if changed. + * - signal handling redesigned: using sigwait in separate thread. + * - Makefile: -D_POSIX_PTHREAD_SEMANTICS (used in Solaris). + * - CHAR_BIT instead of 8 used in a warning message. + * - SIGUSR1 signal handling added and used to terminate signal handling + * thread. (Resolved issue with pthread_cancel on Windows-Cygwin) + * - Fixed wrongly issued exit code 1 instead of 0. + * - Corrected some error messages and added a few new ones at signal and + * terminator threads join. + * - Added support for thread stack size customization (-S# option) + * Needs USE_STACKSIZE_CUSTOMIZATION to be defined to enable that option + * - Added define of PTHREAD_STACK_MIN if such is not available in + * standard headers. + * - OutputBuffer usage redesigned as fixed-size circular buffer. Adding + * new elements to it refactored as separate function. + * - OutputBuffer resizing removed from producer_decompress since now + * buffer should be with fixed size. + * - Fixed debug print of OutputBuffer now referencing OutputBuffer in + * old-style absolute index (in fileWriter and others). + * - memstr function implementation simplified (delegated to standard + * library function which is doing the same more efficiently). + * - Changed some variables from int to size_t to get rid of compiler + * warnings (signed + unsigned expressions). + * - Sequential processing of input file/pipe/redirect implemented (capsulated + * as separate class: BZ2StreamScanner) + * - Parallel decompression enabled (now possible with the sequential in) + * - Refactored declarations moved to separate header file (pbzip2.h) to + * make global definitions available to other source modules + * - Progress reporting modified since we don't have number of + * blocks up-front with sequential input read (now based on bytes). fileSize + * moved as InFileSize global variable for that purpose + * - Progress computation in fileWriter moved to QuietMode != 0 + * (not needed to do it if we won't print it) + * - disposeMemory helper function implemented to ease memory disposal + * - Processing functions of threads declared as extern "C" since pthread_t + * requires plain "C" calling convention instead of the default "C++" + * - pthread_mutex_{lock|unlock} replaced with safe_mutex_{lock|unlock} + * where appropriate (to prevent from issues like out of sys mutexes) + * - Makefile modified to include the new source files for BZ2StreamScanner + * - Makefile refined (library flags specified in LDFLAGS variable) + * - Makefile.solaris.sunstudio included as example makefile for Solaris + * and SunStudio 12 C++ compiler + * - bz2HeaderZero in main initialized to value 0x90 > 127 which is in general + * out of char type range. Changed to unsigned along with tmpBuff to avoid + * some compiler(e.g. c++0x)/runtime warnings/errors. + * - Some thread conditions signalling added on termination requested to ease + * termination of blocked on conditions threads + * - Other pthread_* calls (signal, wait) migrated to safe_* wrappers to + * handle error return codes (and simplify code where already handled) + * - Timed pthread cond waits refactored to separate function and moved to + * debug sections only; non-timed wait used in non-debug mode. Signalling + * consitions to wake threads waiting on these conditions guaranteed. + * - memstr function templetized to allow working with other data types but + * not only char * (e.g. unsigned char *) + * - safe_cond_broadcast implemented and additional signalling added at + * fileWriter end to prevent consumers blocking at end. + * - Signal error when the input file doesn't contain any bzip2 headers. + * - Fixed problems with not-handling zero-file length special header on compression + * and decompression. + * - Signalling error on stdin decompression when file doesn't start with + * correct bzip2 magic header. + * - Implemented outputBufferInit(size_t size) utility function for output + * buffer initialization/resetting. + * - Plain C headers moved to extern "C" section. + * - Modified file-names handling to avoid issues with file-sizes > 2040 + * - Fixed out of array pointer for OutFilename in strncasecmp calls + * - A few other minor modifications + * - consumer_decompress using low-level API now to improve performance of + * long bzip2 streams + * - Fixed issue in safe_cond_timed_wait which caused segmentation fault + * when compiled in DEBUG mode + * - Handle decompression of very long bz2 streams incrementally instead of + * loading whole streams in memory at once + * - Progress calculation changed: fixed issue when large file support is + * disabled and enabled monitoring of segmented long bzip2 streams + * - Fixed issue with Sun Studio compiler - required explicit declaration + * of static const members in .cpp. * * Specials thanks for suggestions and testing: Phillippe Welsh, * James Terhune, Dru Lemley, Bryan Stillwell, George Chalissery, * Kir Kolyshkin, Madhu Kangara, Mike Furr, Joergen Ramskov, Kurt Fitzner, * Peter Cordes, Oliver Falk, Jindrich Novy, Benjamin Reed, Chris Dearman, - * Richard Russon, Aníbal Monsalve Salazar, Jim Leonard, Paul Pluzhnikov, + * Richard Russon, Aníbal Monsalve Salazar, Jim Leonard, Paul Pluzhnikov, * Coran Fisher, Ken Takusagawa, David Pyke, Matt Turner, Damien Ancelin, - * Álvaro Reguly, Ivan Voras, John Dalton, Sami Liedes, Rene Georgi, - * René Rhéaume, Jeroen Roovers, Reinhard Schiedermeier, Kari Pahula, - * Elbert Pol. + * Ãlvaro Reguly, Ivan Voras, John Dalton, Sami Liedes, Rene Georgi, + * René Rhéaume, Jeroen Roovers, Reinhard Schiedermeier, Kari Pahula, + * Elbert Pol, Nico Vrouwe, Eduardo Terol, Samuel Thibault, Michael Fuereder, + * Jari Aalto, Scott Emery, Steven Chamberlain, Yavor Nikolov, Nikita Zhuk, + * Joao Seabra, Conn Clark, Mark A. Haun, Tim Bielawa, Michal Gorny, + * Mikolaj Habdank, Christian Kujau, Marc-Christian Petersen, Piero Ottuzzi, + * Ephraim Ofir. * * - * This program, "pbzip2" is copyright (C) 2003-2009 Jeff Gilchrist. + * This program, "pbzip2" is copyright (C) 2003-2010 Jeff Gilchrist. * All rights reserved. * * The library "libbzip2" which pbzip2 uses, is copyright @@ -95,116 +224,61 @@ * * Jeff Gilchrist, Ottawa, Canada. * pbzip2@compression.ca - * pbzip2 version 1.0.5 of January 8, 2009 + * pbzip2 version 1.1.1 of April 17, 2010 * */ +#include "pbzip2.h" +#include "BZ2StreamScanner.h" + #include +#include +#include +#include + +extern "C" +{ #include #include #include #include #include #include +#include #include #include #include -#include #include -#ifndef WIN32 -#include -#include -#else -#include -#include -#endif -#ifdef __APPLE__ -#include -#endif -#ifdef __sun -#include -#endif -#ifndef __BORLANDC__ -#define __STDC_FORMAT_MACROS -#include -#else -#define PRIu64 "Lu" -#define strncasecmp(x,y,z) strncmpi(x,y,z) -#endif -#ifdef __osf__ -#define PRIu64 "llu" -#endif - -// uncomment for debug output -//#define PBZIP_DEBUG - -// uncomment to disable load average code (may be required for some platforms) -//#define PBZIP_NO_LOADAVG - -// detect systems that are known not to support load average code -#if defined (WIN32) || defined (__CYGWIN32__) || defined (__MINGW32__) || defined (__BORLANDC__) || defined (__hpux) || defined (__osf__) || defined(__UCLIBC__) - #define PBZIP_NO_LOADAVG -#endif - -#ifdef WIN32 -#define PATH_SEP '\\' -#define usleep(x) Sleep(x/1000) -#define LOW_DWORD(x) ((*(LARGE_INTEGER *)&x).LowPart) -#define HIGH_DWORD(x) ((*(LARGE_INTEGER *)&x).HighPart) -#ifndef _TIMEVAL_DEFINED /* also in winsock[2].h */ -#define _TIMEVAL_DEFINED -struct timeval { - long tv_sec; - long tv_usec; -}; -#endif -#else -#define PATH_SEP '/' -#endif - -#ifndef WIN32 -#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) -#define OFF_T off_t -#else -#define FILE_MODE (S_IRUSR | S_IWUSR ) -#define OFF_T __int64 -#endif - -#ifndef O_BINARY -#define O_BINARY 0 -#endif +#include +} -typedef struct -{ - char *buf; - unsigned int bufSize; -} outBuff; - -typedef struct -{ - char **buf; - unsigned int *bufSize; - int *blockNum; - long head, tail; - int full, empty; - pthread_mutex_t *mut; - pthread_cond_t *notFull, *notEmpty; - pthread_t *consumers; -} queue; - -typedef struct -{ - OFF_T dataStart; - OFF_T dataSize; -} bz2BlockListing; // // GLOBALS // -static int allDone = 0; +static int producerDone = 0; +static int terminateFlag = 0; // Abnormal premature termination +static int finishedFlag = 0; // Main thread work finished (about to exit) +static int unfinishedWorkCleaned = 0; static int numCPU = 2; static int QUEUESIZE = 2; +static int SIG_HANDLER_QUIT_SIGNAL = SIGUSR1; // signal used to stop SignalHandlerThread +#ifdef USE_STACKSIZE_CUSTOMIZATION +static int ChildThreadStackSize = 0; // -1 - don't modify stacksize; 0 - use minimum; > 0 - use specified +#ifndef PTHREAD_STACK_MIN + #define PTHREAD_STACK_MIN 4096 +#endif +#endif // USE_STACKSIZE_CUSTOMIZATION +static unsigned char Bz2HeaderZero[] = { + 0x42, 0x5A, 0x68, 0x39, 0x17, 0x72, 0x45, 0x38, 0x50, 0x90, 0x00, 0x00, 0x00, 0x00 }; +static OFF_T InFileSize; +static OFF_T InBytesProduced = 0; static int NumBlocks = 0; +static int NumBlocksEstimated = 0; static int NumBufferedBlocks = 0; +static size_t NumBufferedTailBlocks = 0; +static size_t NumBufferedBlocksMax = 0; +static int NextBlockToWrite; +static size_t OutBufferPosToWrite; // = 0; // position in output buffer static int Verbosity = 0; static int QuietMode = 1; static int OutputStdOut = 0; @@ -212,562 +286,924 @@ static int BWTblockSize = 9; static int FileListCount = 0; static std::vector OutputBuffer; +static queue *FifoQueue; // fifo queue (global var used on termination cleanup) static pthread_mutex_t *OutMutex = NULL; -static pthread_mutex_t *MemMutex = NULL; +static pthread_mutex_t *ProducerDoneMutex = NULL; +static pthread_mutex_t ErrorHandlerMutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t TerminateFlagMutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t ProgressIndicatorsMutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t *notTooMuchNumBuffered; +static pthread_cond_t TerminateCond = PTHREAD_COND_INITIALIZER; +static pthread_attr_t ChildThreadAttributes; static struct stat fileMetaData; -static char *sigInFilename = NULL; -static char *sigOutFilename = NULL; +static const char *sigInFilename = NULL; +static const char *sigOutFilename = NULL; static char BWTblockSizeChar = '9'; +static sigset_t SignalMask; +static pthread_t SignalHandlerThread; +static pthread_t TerminatorThread; + +inline int syncGetProducerDone(); +inline void syncSetProducerDone(int newValue); +inline int syncGetTerminateFlag(); +inline void syncSetTerminateFlag(int newValue); +inline void syncSetFinishedFlag(int newValue); +void cleanupUnfinishedWork(); +void cleanupAndQuit(int exitCode); +int initSignalMask(); +int setupSignalHandling(); +int setupTerminator(); + +inline void safe_mutex_lock(pthread_mutex_t *mutex); +inline void safe_mutex_unlock(pthread_mutex_t *mutex); +inline void safe_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); +inline void safe_cond_signal(pthread_cond_t *cond); +int safe_cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex, int seconds, const char *caller = "safe_cond_timed_wait"); - -void mySignalCatcher(int); -char *memstr(char *, int, char *, int); +template +FI1 memstr(FI1 searchBuf, int searchBufSize, FI2 searchString, int searchStringSize); int producer_decompress(int, OFF_T, queue *); -void *consumer_decompress(void *); -void *fileWriter(void *); -int directcompress(int, OFF_T, int, char *); -int directdecompress(char *, char *); -int producer(int, int, int, queue *); -void *consumer(void *); +int directcompress(int, OFF_T, int, const char *); +int directdecompress(const char *, const char *); +int producer(int hInfile, int blockSize, queue *fifo); +int mutexesInit(); +void mutexesDelete(); queue *queueInit(int); void queueDelete(queue *); -void queueAdd(queue *, char *, unsigned int, int); -char *queueDel(queue *, unsigned int *, int *); -int getFileMetaData(char *); -int writeFileMetaData(char *); +void outputBufferInit(size_t size); +outBuff * outputBufferAdd(const outBuff & element, const char *caller); +outBuff * outputBufferSeqAddNext(outBuff * preveElement, outBuff * newElement); +int getFileMetaData(const char *); +int writeFileMetaData(const char *); int testBZ2ErrorHandling(int, BZFILE *, int); int testCompressedData(char *); ssize_t bufread(int hf, char *buf, size_t bsize); int detectCPUs(void); +/* + * Pointers to functions used by plain C pthreads API require C calling + * conventions. + */ +extern "C" +{ +void* signalHandlerProc(void* arg); +void* terminatorThreadProc(void* arg); +void *consumer_decompress(void *); +void *fileWriter(void *); +void *consumer(void *); +} + +/* + * Lock mutex or exit application immediately on error. + */ +inline void safe_mutex_lock(pthread_mutex_t *mutex) +{ + int ret = pthread_mutex_lock(mutex); + if (ret != 0) + { + fprintf(stderr, "pthread_mutex_lock error [%d]! Aborting immediately!\n", ret); + cleanupAndQuit(-5); + } +} /* - ********************************************************* + * Unlock mutex or exit application immediately on error. */ -void mySignalCatcher(int n) +inline void safe_mutex_unlock(pthread_mutex_t *mutex) { - struct stat statBuf; - int ret = 0; + int ret = pthread_mutex_unlock(mutex); + if (ret != 0) + { + fprintf(stderr, "pthread_mutex_unlock error [%d]! Aborting immediately!\n", ret); + cleanupAndQuit(-6); + } +} + +/* + * Call pthread_cond_signal - check return code and exit application immediately + * on error. + */ +inline void safe_cond_signal(pthread_cond_t *cond) +{ + int ret = pthread_cond_signal(cond); + if (ret != 0) + { + fprintf(stderr, "pthread_cond_signal error [%d]! Aborting immediately!\n", ret); + cleanupAndQuit(-7); + } +} - fprintf(stderr, "\n *Control-C or similar caught, quitting...\n"); +/* + * Call pthread_cond_signal - check return code and exit application immediately + * on error. + */ +inline void safe_cond_broadcast(pthread_cond_t *cond) +{ + int ret = pthread_cond_broadcast(cond); + if (ret != 0) + { + fprintf(stderr, "pthread_cond_broadcast error [%d]! Aborting immediately!\n", ret); + cleanupAndQuit(-7); + } +} + +/* + * Unlock mutex or exit application immediately on error. + */ +inline void safe_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) +{ + int ret = pthread_cond_wait(cond, mutex); + if (ret != 0) + { + fprintf(stderr, "pthread_cond_wait error [%d]! Aborting immediately!\n", ret); + pthread_mutex_unlock(mutex); + cleanupAndQuit(-8); + } +} + +/* + * Delegate to pthread_cond_timedwait. Check for errors and abort if + * any encountered. Return 0 on success and non-zero code on error + */ +int safe_cond_timed_wait(pthread_cond_t *cond, pthread_mutex_t *mutex, int seconds, const char *caller) +{ + struct timespec waitTimer; + #ifndef WIN32 + struct timeval tv; + struct timezone tz; + #else + SYSTEMTIME systemtime; + LARGE_INTEGER filetime; + #endif + + #ifndef WIN32 + gettimeofday(&tv, &tz); + waitTimer.tv_sec = tv.tv_sec + seconds; + waitTimer.tv_nsec = tv.tv_usec * 1000; + #else + GetSystemTime(&systemtime); + SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime); + waitTimer.tv_sec = filetime.QuadPart / 10000000; + waitTimer.tv_nsec = filetime.QuadPart - ((LONGLONG)waitTimer.tv_sec * 10000000) * 10; + waitTimer.tv_sec += seconds; + #endif #ifdef PBZIP_DEBUG - fprintf(stderr, " Infile: %s Outfile: %s\n", sigInFilename, sigOutFilename); + fprintf(stderr, "%s: waitTimer.tv_sec: %d waitTimer.tv_nsec: %lld\n", caller, waitTimer.tv_sec, + (long long)waitTimer.tv_nsec); #endif + int pret = pthread_cond_timedwait(cond, mutex, &waitTimer); + // we are not using a compatible pthreads library so abort + if ((pret != 0) && (pret != EINTR) && (pret != EBUSY) && (pret != ETIMEDOUT)) + { + pthread_mutex_unlock(mutex); + handle_error(EF_EXIT, 1, + "pbzip2: *ERROR: %s: pthread_cond_timedwait() call invalid [pret=%d]. This machine\n" + " does not have compatible pthreads library. Aborting.\n", caller, pret); - // only cleanup files if we did something with them - if ((sigInFilename == NULL) || (sigOutFilename == NULL)) - exit(1); + cleanupAndQuit(-9); + } + #ifdef PBZIP_DEBUG + else if (pret != 0) + { + fprintf(stderr, "%s: pthread_cond_timedwait returned with non-fatal error [%d]\n", caller, pret); + } + #endif // PBZIP_DEBUG - // check to see if input file still exists - ret = stat(sigInFilename, &statBuf); - if (ret == 0) + return 0; +} + +/* + * Delegate to write but keep writing until count bytes are written or + * error is encountered (on success all count bytes would be written) + */ +ssize_t do_write(int fd, const void *buf, size_t count) +{ + ssize_t bytesRemaining = count; + ssize_t nbytes = 0; + const char *pbuf = (const char *)buf; + while ((bytesRemaining > 0) && ((nbytes = write(fd, pbuf, bytesRemaining)) > 0)) { - // only want to remove output file if input still exists - if (QuietMode != 1) - fprintf(stderr, "Deleting output file: %s, if it exists...\n", sigOutFilename); - ret = remove(sigOutFilename); - if (ret != 0) - fprintf(stderr, "pbzip2: *WARNING: Deletion of output file (apparently) failed.\n"); + bytesRemaining -= nbytes; + pbuf += nbytes; } - else + + if (nbytes < 0) { - fprintf(stderr, "pbzip2: *WARNING: Output file was not deleted since input file no longer exists.\n"); - fprintf(stderr, "pbzip2: *WARNING: Output file: %s, may be incomplete!\n", sigOutFilename); + return nbytes; } - exit(1); + return (count - bytesRemaining); } /* + * Delegate to read but keep writing until count bytes are read or + * error is encountered (on success all count bytes would be read) + */ +ssize_t do_read(int fd, void *buf, size_t count) +{ + ssize_t bytesRemaining = count; + ssize_t nbytes = 0; + char *pbuf = (char *)buf; + while ((bytesRemaining > 0) && (nbytes = read(fd, pbuf, bytesRemaining)) > 0) + { + bytesRemaining -= nbytes; + pbuf += nbytes; + } + + if (nbytes < 0) + { + return nbytes; + } + + return (count - bytesRemaining); +} + + +/* ********************************************************* - This function will search the array pointed to by - searchBuf[] for the string searchString[] and return - a pointer to the start of the searchString[] if found - otherwise return NULL if not found. + Atomically get producerDone value. */ -char *memstr(char *searchBuf, int searchBufSize, char *searchString, int searchStringSize) +inline int syncGetProducerDone() { - int i; + int ret; + safe_mutex_lock(ProducerDoneMutex); + ret = producerDone; + safe_mutex_unlock(ProducerDoneMutex); + + return ret; +} + +/* + ********************************************************* + Atomically set producerDone value. +*/ +inline void syncSetProducerDone(int newValue) +{ + safe_mutex_lock(ProducerDoneMutex); + producerDone = newValue; + safe_mutex_unlock(ProducerDoneMutex); +} + +/* + * Atomic get terminateFlag + */ +inline int syncGetTerminateFlag() +{ + int ret; + safe_mutex_lock(&TerminateFlagMutex); + ret = terminateFlag; + safe_mutex_unlock(&TerminateFlagMutex); + + return ret; +} + +/* + * Atomically set termination flag and signal the related + * condition. + */ +inline void syncSetTerminateFlag(int newValue) +{ + safe_mutex_lock(&TerminateFlagMutex); - for (i=0; i < searchBufSize; i++) + terminateFlag = newValue; + if (terminateFlag != 0) { - if ((searchBufSize - i) < searchStringSize) - break; + // wake up terminator thread + pthread_cond_signal(&TerminateCond); - if ( searchBuf[i] == searchString[0] && - memcmp(searchBuf+i, searchString, searchStringSize) == 0 ) + // wake up all other possibly blocked on cond threads + pthread_cond_broadcast(notTooMuchNumBuffered); + if (FifoQueue != NULL) { - return &searchBuf[i]; - } + pthread_cond_broadcast(FifoQueue->notFull); + pthread_cond_broadcast(FifoQueue->notEmpty); + } } - return NULL; + safe_mutex_unlock(&TerminateFlagMutex); } /* - ********************************************************* - This function works in two passes of the input file. - The first pass will look for BZIP2 headers in the file - and note their location and size of the sections. - The second pass will read in those BZIP2 sections and - pass them off the the selected CPU(s) for decompression. + * Set finishedSucessFlag and signal the related condition. */ -int producer_decompress(int hInfile, OFF_T fileSize, queue *fifo) +inline void syncSetFinishedFlag(int newValue) { - std::vector bz2BlockList; - bz2BlockListing TempBlockListing; - char *FileData = NULL; - char bz2Header[] = {"BZh91AY&SY"}; // for 900k BWT block size - OFF_T bytesLeft = 0; - OFF_T inSize = 100000; - int blockNum = 0; - OFF_T ret = 0; - int i; - int bz2NumBlocks = 0; - char *startPointer = NULL; - OFF_T currentByte = 0; - OFF_T startByte = 0; + safe_mutex_lock(&TerminateFlagMutex); - // set search header to value in file - bz2Header[3] = BWTblockSizeChar; + finishedFlag = newValue; + if (finishedFlag != 0) + { + pthread_cond_signal(&TerminateCond); + } - // go to start of file - ret = lseek(hInfile, 0, SEEK_SET); - if (ret != 0) + safe_mutex_unlock(&TerminateFlagMutex); +} + +/* + ********************************************************* + Print error message and optionally exit or abort + depending on exitFlag: + 0 - don't quit; + 1 - exit; + 2 - abort. + On exit - exitCode status is used. +*/ +int handle_error(ExitFlag exitFlag, int exitCode, const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + vfprintf(stderr, fmt, args); + fflush(stderr); + va_end(args); + + if (exitFlag == EF_ABORT) { - fprintf(stderr, "pbzip2: *ERROR: Could not seek to beginning of file [%"PRIu64"]! Skipping...\n", (unsigned long long)ret); - close(hInfile); - allDone = 1; - return -1; + syncSetTerminateFlag(1); + abort(); + } + if (exitFlag == EF_EXIT) + { + syncSetTerminateFlag(1); } - // scan input file for BZIP2 block markers (BZh91AY&SY) - pthread_mutex_lock(MemMutex); - // allocate memory to read in file - FileData = NULL; - FileData = new char[inSize]; - pthread_mutex_unlock(MemMutex); - // make sure memory was allocated properly - if (FileData == NULL) + return exitCode; +} + +/* + * Initialize and set thread signal mask + */ +int initSignalMask() +{ + int ret = 0; + ret = sigemptyset(&SignalMask); + + ret = sigaddset(&SignalMask, SIGINT) | ret; + ret = sigaddset(&SignalMask, SIGTERM) | ret; + ret = sigaddset(&SignalMask, SIGABRT) | ret; + ret = sigaddset(&SignalMask, SIG_HANDLER_QUIT_SIGNAL) | ret; + #ifndef WIN32 + ret = sigaddset(&SignalMask, SIGHUP) | ret; + #endif + + if (ret == 0) { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (FileData)! Skipping...\n"); - close(hInfile); - allDone = 1; - return -1; + ret = pthread_sigmask(SIG_BLOCK, &SignalMask, NULL); } - // keep going until all the file is scanned for BZIP2 blocks - bytesLeft = fileSize; - while (bytesLeft > 0) + + return ret; +} + +/* + * Initialize attributes for child threads. + * + */ +int initChildThreadAttributes() +{ + int ret = pthread_attr_init(&ChildThreadAttributes); + + if (ret < 0) { - if (currentByte == 0) - { - #ifdef PBZIP_DEBUG - fprintf(stderr, " -> Bytes To Read: %"PRIu64" bytes...\n", inSize); - #endif + fprintf(stderr, "Can't initialize thread atrributes [err=%d]! Aborting...\n", ret); + exit(-1); + } - // read file data - ret = read(hInfile, (char *) FileData, inSize); - } - else - { - // copy end section of previous buffer to new just in case the BZIP2 header is - // located between two buffer boundaries - memcpy(FileData, FileData+inSize-(strlen(bz2Header)-1), strlen(bz2Header)-1); - #ifdef PBZIP_DEBUG - fprintf(stderr, " -> Bytes To Read: %"PRIu64" bytes...\n", inSize-(strlen(bz2Header)-1)); - #endif + #ifdef USE_STACKSIZE_CUSTOMIZATION + if (ChildThreadStackSize > 0) + { + ret = pthread_attr_setstacksize(&ChildThreadAttributes, ChildThreadStackSize); - // read file data minus overflow from previous buffer - ret = read(hInfile, (char *) FileData+strlen(bz2Header)-1, inSize-(strlen(bz2Header)-1)); - } - #ifdef PBZIP_DEBUG - fprintf(stderr, " -> Total Bytes Read: %"PRIu64" bytes...\n\n", ret); - #endif - if (ret < 0) + if (ret != 0) { - fprintf(stderr, "pbzip2: *ERROR: Could not read from fibz2NumBlocksle! Skipping...\n"); - close(hInfile); - pthread_mutex_lock(MemMutex); - if (FileData != NULL) - delete [] FileData; - pthread_mutex_unlock(MemMutex); - allDone = 1; - return -1; + fprintf(stderr, "Can't set thread stacksize [err=%d]! Countinue with default one.\n", ret); } + } + #endif // USE_STACKSIZE_CUSTOMIZATION - // scan buffer for bzip2 start header - if (currentByte == 0) - startPointer = memstr(FileData, ret, bz2Header, strlen(bz2Header)); - else - startPointer = memstr(FileData, ret+(strlen(bz2Header)-1), bz2Header, strlen(bz2Header)); - while (startPointer != NULL) - { - if (currentByte == 0) - startByte = startPointer - FileData + currentByte; - else - startByte = startPointer - FileData + currentByte - (strlen(bz2Header) - 1); - #ifdef PBZIP_DEBUG - fprintf(stderr, " Found substring at: %x\n", startPointer); - fprintf(stderr, " startByte = %"PRIu64"\n", startByte); - fprintf(stderr, " bz2NumBlocks = %d\n", bz2NumBlocks); - #endif + return ret; +} - // add data to end of block list - TempBlockListing.dataStart = startByte; - TempBlockListing.dataSize = 0; - bz2BlockList.push_back(TempBlockListing); - bz2NumBlocks++; +/* + * Setup and start signal handling. + */ +int setupSignalHandling() +{ + int ret = initSignalMask(); - if (currentByte == 0) - { - startPointer = memstr(startPointer+1, ret-(startPointer-FileData)-1, bz2Header, strlen(bz2Header)); - } - else - { - startPointer = memstr(startPointer+1, ret-(startPointer-FileData)-1+(strlen(bz2Header)-1), bz2Header, strlen(bz2Header)); - } - } + if (ret == 0) + { + ret = pthread_create(&SignalHandlerThread, &ChildThreadAttributes, signalHandlerProc, NULL); + } - currentByte += ret; - bytesLeft -= ret; - } // while + return ret; +} + +/* + * Setup and start signal handling. + */ +int setupTerminator() +{ + return pthread_create(&TerminatorThread, &ChildThreadAttributes, terminatorThreadProc, NULL ); +} - // use direcdecompress() instead to process 1 bzip2 stream - if (bz2NumBlocks <= 1) +/* + ********************************************************* + * Clean unfinished work (after error). + * Deletes output file if such exists and if not using pipes. + */ +void cleanupUnfinishedWork() +{ + if (unfinishedWorkCleaned != 0) { - if (QuietMode != 1) - fprintf(stderr, "Switching to no threads mode: only 1 BZIP2 block found.\n"); - allDone = 1; - return -99; + return; } - pthread_mutex_lock(MemMutex); - if (FileData != NULL) - delete [] FileData; - NumBlocks = bz2NumBlocks; - pthread_mutex_lock(OutMutex); - // create output buffer - OutputBuffer.resize(bz2NumBlocks); - // make sure memory was allocated properly - if (OutputBuffer.size() != bz2NumBlocks) + struct stat statBuf; + int ret = 0; + + #ifdef PBZIP_DEBUG + fprintf(stderr, " Infile: %s Outfile: %s\n", sigInFilename, sigOutFilename); + #endif + + // only cleanup files if we did something with them + if ((sigInFilename == NULL) || (sigOutFilename == NULL) || (OutputStdOut == 1)) { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (OutputBuffer)! Aborting...\n"); - allDone = 1; - return 1; + unfinishedWorkCleaned = 1; + return; } - // set empty buffer - for (i=0; i < bz2NumBlocks; i++) + + if (QuietMode != 1) { - OutputBuffer[i].buf = NULL; - OutputBuffer[i].bufSize = 0; + fprintf(stderr, "Cleanup unfinished work [Outfile: %s]...\n", sigOutFilename); } - pthread_mutex_unlock(OutMutex); - pthread_mutex_unlock(MemMutex); - // calculate data sizes for each block - for (i=0; i < bz2NumBlocks; i++) + // check to see if input file still exists + ret = stat(sigInFilename, &statBuf); + if (ret == 0) { - if (i == bz2NumBlocks-1) - { - // special case for last block - bz2BlockList[i].dataSize = fileSize - bz2BlockList[i].dataStart; - } - else if (i == 0) + // only want to remove output file if input still exists + if (QuietMode != 1) + fprintf(stderr, "Deleting output file: %s, if it exists...\n", sigOutFilename); + ret = remove(sigOutFilename); + if (ret != 0) { - // special case for first block - bz2BlockList[i].dataSize = bz2BlockList[i+1].dataStart; + fprintf(stderr, "pbzip2: *WARNING: Deletion of output file (apparently) failed.\n"); } else { - // normal case - bz2BlockList[i].dataSize = bz2BlockList[i+1].dataStart - bz2BlockList[i].dataStart; + fprintf(stderr, "pbzip2: *INFO: Deletion of output file succeeded.\n"); + sigOutFilename = NULL; } - #ifdef PBZIP_DEBUG - fprintf(stderr, " bz2BlockList[%d].dataStart = %"PRIu64"\n", i, bz2BlockList[i].dataStart); - fprintf(stderr, " bz2BlockList[%d].dataSize = %"PRIu64"\n", i, bz2BlockList[i].dataSize); - #endif + } + else + { + fprintf(stderr, "pbzip2: *WARNING: Output file was not deleted since input file no longer exists.\n"); + fprintf(stderr, "pbzip2: *WARNING: Output file: %s, may be incomplete!\n", sigOutFilename); } - // keep going until all the blocks are processed - for (i=0; i < bz2NumBlocks; i++) + unfinishedWorkCleaned = 1; +} + +/* + ********************************************************* + */ + +/* + ********************************************************* + * Terminator thread procedure: looking at terminateFlag + * and exit application when it's set. + */ +void* terminatorThreadProc(void* arg) +{ + int ret = pthread_mutex_lock(&TerminateFlagMutex); + + if (ret != 0) { - // go to start of block position in file - #ifndef WIN32 - ret = lseek(hInfile, bz2BlockList[i].dataStart, SEEK_SET); - #else - ret = bz2BlockList[i].dataStart; - LOW_DWORD(ret) = SetFilePointer((HANDLE)_get_osfhandle(hInfile), LOW_DWORD(ret), &HIGH_DWORD(ret), FILE_BEGIN); - #endif - if (ret != bz2BlockList[i].dataStart) - { - fprintf(stderr, "pbzip2: *ERROR: Could not seek to beginning of file [%"PRIu64"]! Skipping...\n", (unsigned long long)ret); - close(hInfile); - allDone = 1; - return -1; - } + fprintf(stderr, "Terminator thread: pthread_mutex_lock error [%d]! Aborting...\n", ret); + syncSetTerminateFlag(1); + cleanupAndQuit(1); + } + + while ((finishedFlag == 0) && (terminateFlag == 0)) + { + ret = pthread_cond_wait(&TerminateCond, &TerminateFlagMutex); + } - // set buffer size - inSize = bz2BlockList[i].dataSize; + // Successfull end + if (finishedFlag != 0) + { + ret = pthread_mutex_unlock(&TerminateFlagMutex); + return NULL; + } + + // Being here implies (terminateFlag != 0) + ret = pthread_mutex_unlock(&TerminateFlagMutex); + + fprintf(stderr, "Terminator thread: premature exit requested - quitting...\n"); + cleanupAndQuit(1); + + return NULL; // never reachable +} + +/* + ********************************************************* + * Signal handler thread function to hook cleanup on + * certain signals. + */ +void* signalHandlerProc(void* arg) +{ + int signalCaught; + + // wait for specified in mask signal + int ret = sigwait(&SignalMask, &signalCaught); + + if (ret != 0) + { + fprintf(stderr, "\n *signalHandlerProc - sigwait error: %d\n", ret); + } + else if (signalCaught == SIG_HANDLER_QUIT_SIGNAL) + { + return NULL; + } + else // ret == 0 + { + fprintf(stderr, "\n *Control-C or similar caught [sig=%d], quitting...\n", signalCaught); + // Delegating cleanup and termination to Terminator Thread + syncSetTerminateFlag(1); + } + + return NULL; +} + +/* + * Cleanup unfinished work (output file) and exit with the given exit code. + * To be used to quite on error with non-zero exitCode. + */ +void cleanupAndQuit(int exitCode) +{ + // syncSetTerminateFlag(1); + + int ret = pthread_mutex_lock(&ErrorHandlerMutex); + if (ret != 0) + { + fprintf(stderr, "Cleanup Handler: Failed to lock ErrorHandlerMutex! May double cleanup...\n"); + } + cleanupUnfinishedWork(); + pthread_mutex_unlock(&ErrorHandlerMutex); + + exit(exitCode); +} +/* + ********************************************************* + This function will search the array pointed to by + searchBuf[] for the string searchString[] and return + a pointer to the start of the searchString[] if found + otherwise return NULL if not found. +*/ +template +FI1 memstr(FI1 searchBuf, int searchBufSize, FI2 searchString, int searchStringSize) +{ + FI1 searchBufEnd = searchBuf + searchBufSize; + FI1 s = std::search(searchBuf, searchBufEnd, + searchString, searchString + searchStringSize); + + return (s != searchBufEnd) ? s : NULL; +} + +/* + ********************************************************* + Function works in single pass. It's Splitting long + streams into sequences of multiple segments. + */ +int producer_decompress(int hInfile, OFF_T fileSize, queue *fifo) +{ + safe_mutex_lock(&ProgressIndicatorsMutex); + InBytesProduced = 0; + NumBlocks = 0; + safe_mutex_unlock(&ProgressIndicatorsMutex); + + pbzip2::BZ2StreamScanner bz2StreamScanner(hInfile); + + // keep going until all the blocks are processed + outBuff * fileData = bz2StreamScanner.getNextStream(); + while (!bz2StreamScanner.failed() && (fileData->bufSize > 0)) + { #ifdef PBZIP_DEBUG - fprintf(stderr, " -> Bytes To Read: %"PRIu64" bytes...\n", inSize); + fprintf(stderr, " -> Bytes Read: %u bytes...\n", fileData->bufSize); #endif if (QuietMode != 1) { // give warning to user if block is larger than 250 million bytes - if (inSize > 250000000) + if (fileData->bufSize > 250000000) { - fprintf(stderr, "pbzip2: *WARNING: Compressed block size is large [%"PRIu64" bytes].\n", (unsigned long long)inSize); + fprintf(stderr, "pbzip2: *WARNING: Compressed block size is large [%"PRIu64" bytes].\n", + (unsigned long long) fileData->bufSize); fprintf(stderr, " If program aborts, use regular BZIP2 to decompress.\n"); } } - pthread_mutex_lock(MemMutex); - // allocate memory to read in file - FileData = NULL; - FileData = new char[inSize]; - pthread_mutex_unlock(MemMutex); - // make sure memory was allocated properly - if (FileData == NULL) - { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (FileData)! Skipping...\n"); - close(hInfile); - allDone = 1; - return -1; - } - - // read file data - ret = read(hInfile, (char *) FileData, inSize); - #ifdef PBZIP_DEBUG - fprintf(stderr, " -> Total Bytes Read: %"PRIu64" bytes...\n\n", ret); - #endif - // check to make sure all the data we expected was read in - if (ret == 0) - { - pthread_mutex_lock(MemMutex); - if (FileData != NULL) - delete [] FileData; - pthread_mutex_unlock(MemMutex); - break; - } - else if (ret < 0) - { - fprintf(stderr, "pbzip2: *ERROR: Could not read from file! Skipping...\n"); - close(hInfile); - pthread_mutex_lock(MemMutex); - if (FileData != NULL) - delete [] FileData; - pthread_mutex_unlock(MemMutex); - allDone = 1; - return -1; - } - else if (ret != inSize) - { - fprintf(stderr, "pbzip2: *ERROR: Could not read enough data from file! Skipping...\n"); - close(hInfile); - pthread_mutex_lock(MemMutex); - if (FileData != NULL) - delete [] FileData; - pthread_mutex_unlock(MemMutex); - allDone = 1; - return -1; - } - // add data to the decompression queue - pthread_mutex_lock (fifo->mut); + safe_mutex_lock(fifo->mut); while (fifo->full) { #ifdef PBZIP_DEBUG - printf ("producer: queue FULL.\n"); + fprintf (stderr, "producer: queue FULL.\n"); #endif - pthread_cond_wait (fifo->notFull, fifo->mut); + safe_cond_wait (fifo->notFull, fifo->mut); } #ifdef PBZIP_DEBUG - fprintf(stderr, "producer: Buffer: %x Size: %"PRIu64" Block: %d\n", FileData, inSize, blockNum); + fprintf(stderr, "producer: Buffer: %x Size: %"PRIu64" Block: %d\n", fileData->buf, + (unsigned long long)fileData->bufSize, NumBlocks); #endif - queueAdd(fifo, FileData, inSize, blockNum); - pthread_mutex_unlock (fifo->mut); - pthread_cond_signal (fifo->notEmpty); + fifo->add(fileData); + safe_cond_signal (fifo->notEmpty); - blockNum++; + safe_mutex_lock(&ProgressIndicatorsMutex); + InBytesProduced += fileData->bufSize; + NumBlocks = fileData->blockNumber + 1; + safe_mutex_unlock(&ProgressIndicatorsMutex); - // make sure output buffer is large enough to handle input data - if (blockNum > OutputBuffer.size()) - { - int newsize = OutputBuffer.size()*2; - pthread_mutex_lock(OutMutex); - OutputBuffer.resize(newsize); - pthread_mutex_unlock(OutMutex); - if (OutputBuffer.size() != newsize) - { - allDone = 1; - return -1; - } - } + safe_mutex_unlock(fifo->mut); + + fileData = bz2StreamScanner.getNextStream(); } // for close(hInfile); - allDone = 1; + // last stream is always dummy one (either error or eof) + delete fileData; + + if (bz2StreamScanner.failed() || !bz2StreamScanner.eof()) + { + handle_error(EF_EXIT, 1, "pbzip2: producer_decompress: *ERROR: when reading bzip2 input stream\n"); + return -1; + } + else if (!bz2StreamScanner.isBz2HeaderFound() || !bz2StreamScanner.eof()) + { + handle_error(EF_EXIT, 1, "pbzip2: producer_decompress: *ERROR: input file is not a valid bzip2 stream\n"); + return -1; + } + + syncSetProducerDone(1); + safe_cond_broadcast(fifo->notEmpty); // just in case + + #ifdef PBZIP_DEBUG + fprintf(stderr, "producer: Done - exiting. Last Block: %d\n", NumBlocks); + #endif + return 0; } + /* ********************************************************* */ void *consumer_decompress(void *q) { - struct timespec waitTimer; - #ifndef WIN32 - struct timeval tv; - struct timezone tz; - #else - SYSTEMTIME systemtime; - LARGE_INTEGER filetime; - #endif - queue *fifo; - char *FileData = NULL; + queue *fifo = (queue *)q; + + outBuff *fileData = NULL; + outBuff *lastFileData = NULL; char *DecompressedData = NULL; - unsigned int inSize = 0; unsigned int outSize = 0; - int blockNum = -1; - int ret = -1; - int pret = -1; + outBuff * prevOutBlockInSequence = NULL; + int outSequenceNumber = 0; // sequence number in multi-part output blocks + unsigned int processedIn = 0; + + bz_stream strm; + strm.bzalloc = NULL; + strm.bzfree = NULL; + strm.opaque = NULL; + + for (;;) + { + if (syncGetTerminateFlag() != 0) + { + #ifdef PBZIP_DEBUG + fprintf (stderr, "consumer: terminating1 - terminateFlag set\n"); + #endif + return (NULL); + } + + safe_mutex_lock(fifo->mut); + for (;;) + { + if (!fifo->empty && (fifo->remove(fileData) == 1)) + { + // block retreived - break the loop and continue further + break; + } + + #ifdef PBZIP_DEBUG + fprintf (stderr, "consumer: queue EMPTY.\n"); + #endif + + if (fifo->empty && ((syncGetProducerDone() == 1) || (syncGetTerminateFlag() != 0))) + { + // finished - either OK or terminated forcibly + pthread_mutex_unlock(fifo->mut); + // BZ2_bzDecompressEnd( &strm ); + if (lastFileData != NULL) + { + delete lastFileData; + } + #ifdef PBZIP_DEBUG + fprintf (stderr, "consumer: exiting2\n"); + #endif + return (NULL); + } + + #ifdef PBZIP_DEBUG + safe_cond_timed_wait(fifo->notEmpty, fifo->mut, 1, "consumer"); + #else + safe_cond_wait(fifo->notEmpty, fifo->mut); + #endif + } + + #ifdef PBZIP_DEBUG + fprintf(stderr, "consumer: Buffer: %x Size: %u Block: %d\n", + fileData->buf, (unsigned)fileData->bufSize, fileData->blockNumber); + #endif + + safe_cond_signal(fifo->notFull); + safe_mutex_unlock(fifo->mut); + + if (lastFileData != NULL) + { + delete lastFileData; + } + lastFileData = fileData; + + #ifdef PBZIP_DEBUG + fprintf (stderr, "consumer: recieved %d.\n", fileData->blockNumber); + #endif + + outSize = 900000; + + int bzret = BZ_OK; + + if (fileData->sequenceNumber < 2) + { + // start of new stream from in queue (0 -> single block; 1 - mutli) + bzret = BZ2_bzDecompressInit(&strm, Verbosity, 0); + if (bzret != BZ_OK) + { + handle_error(EF_EXIT, -1, "pbzip2: *ERROR during BZ2_bzDecompressInit: %d\n", bzret); + return (NULL); + } + } + + strm.avail_in = fileData->bufSize; + strm.next_in = fileData->buf; + while ((bzret == BZ_OK) && (strm.avail_in != 0)) + { + #ifdef PBZIP_DEBUG + fprintf(stderr, "decompress: block=%d; seq=%d; prev=%llx; avail_in=%u; avail_out=%u\n", + fileData->blockNumber, outSequenceNumber, + (unsigned long long) prevOutBlockInSequence, + strm.avail_in, strm.avail_out); + #endif + + if (DecompressedData == NULL) + { + // allocate memory for decompressed data (start with default 900k block size) + DecompressedData = new(std::nothrow) char[outSize]; + // make sure memory was allocated properly + + if (DecompressedData == NULL) + { + handle_error(EF_EXIT, -1, + " *ERROR: Could not allocate memory (DecompressedData)! Aborting...\n"); + return (NULL); + } + + processedIn = 0; + + strm.avail_out = outSize; + strm.next_out = DecompressedData; + } + + unsigned int availIn = strm.avail_in; + bzret = BZ2_bzDecompress(&strm); + processedIn += (availIn - strm.avail_in); + + // issue out block if out buffer is full or stream end is detected + if ( ((bzret == BZ_OK) && strm.avail_out == 0) || (bzret == BZ_STREAM_END) ) + { + outBuff * addret = NULL; + unsigned int len = outSize - strm.avail_out; + bool isLast = (bzret == BZ_STREAM_END); + if (outSequenceNumber>0) + { + ++outSequenceNumber; + + outBuff * nextOutBlock = new(std::nothrow) outBuff( + DecompressedData, len, fileData->blockNumber, + outSequenceNumber, processedIn, isLast, NULL); + + if (nextOutBlock == NULL) + { + BZ2_bzDecompressEnd( &strm ); + handle_error(EF_EXIT, -1, + " *ERROR: Could not allocate memory (nextOutBlock)! Aborting...\n"); + return (NULL); + } + + addret = outputBufferSeqAddNext(prevOutBlockInSequence, nextOutBlock); + #ifdef PBZIP_DEBUG + fprintf(stderr, "decompress: outputBufferSeqAddNext->%llx; block=%d; seq=%d; prev=%llx\n", + (unsigned long long)addret, + fileData->blockNumber, outSequenceNumber, + (unsigned long long) prevOutBlockInSequence); + #endif + } + else // sequenceNumber = 0 + { + if (bzret == BZ_OK) + { + ++outSequenceNumber; + } + addret = outputBufferAdd(outBuff( + DecompressedData, len, + fileData->blockNumber, + outSequenceNumber, processedIn, isLast, NULL), "consumer_decompress"); + + #ifdef PBZIP_DEBUG + fprintf(stderr, "decompress: outputBufferAdd->%llx; block=%d; seq=%d; prev=%llx\n", + (unsigned long long)addret, + fileData->blockNumber, outSequenceNumber, + (unsigned long long) prevOutBlockInSequence); + #endif + } - fifo = (queue *)q; + if (addret == NULL) + { + // error encountered + BZ2_bzDecompressEnd( &strm ); + return (NULL); + } - for (;;) - { - pthread_mutex_lock(fifo->mut); - while (fifo->empty) - { - #ifdef PBZIP_DEBUG - printf ("consumer: queue EMPTY.\n"); - #endif - if (allDone == 1) - { - pthread_mutex_unlock(fifo->mut); - #ifdef PBZIP_DEBUG - printf ("consumer: exiting2\n"); - #endif - return (NULL); - } - // only wait for the queue notEmpty signal for 1 second before checking if - // the producer is finished putting work into the queue - #ifndef WIN32 - gettimeofday(&tv, &tz); - waitTimer.tv_sec = tv.tv_sec + 1; - waitTimer.tv_nsec = tv.tv_usec * 1000; - #else - GetSystemTime(&systemtime); - SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime); - waitTimer.tv_sec = filetime.QuadPart / 10000000; - waitTimer.tv_nsec = filetime.QuadPart - ((LONGLONG)waitTimer.tv_sec * 10000000) * 10; - waitTimer.tv_sec++; - #endif - #ifdef PBZIP_DEBUG - fprintf(stderr, "consumer: waitTimer.tv_sec: %d waitTimer.tv_nsec: %d\n", waitTimer.tv_sec, waitTimer.tv_nsec); - #endif - pret = pthread_cond_timedwait(fifo->notEmpty, fifo->mut, &waitTimer); - // we are not using a compatible pthreads library so abort - if (pret == EINVAL) - { - fprintf(stderr, "pbzip2: *ERROR: pthread_cond_timedwait() call invalid. This machine\n"); - fprintf(stderr, " does not have compatible pthreads library. Skipping.\n"); - exit(1); + prevOutBlockInSequence = addret; + DecompressedData = NULL; } } - FileData = queueDel(fifo, &inSize, &blockNum); - #ifdef PBZIP_DEBUG - fprintf(stderr, "consumer: Buffer: %x Size: %u Block: %d\n", FileData, inSize, blockNum); - #endif - - pthread_mutex_unlock(fifo->mut); - pthread_cond_signal(fifo->notFull); - #ifdef PBZIP_DEBUG - printf ("consumer: recieved %d.\n", blockNum); - #endif - - outSize = 900000; - pthread_mutex_lock(MemMutex); - // allocate memory for decompressed data (start with default 900k block size) - DecompressedData = new char[outSize]; - pthread_mutex_unlock(MemMutex); - // make sure memory was allocated properly - if (DecompressedData == NULL) + + if ((bzret != BZ_STREAM_END) && (bzret != BZ_OK)) { - fprintf(stderr, " *ERROR: Could not allocate memory (DecompressedData)! Skipping...\n"); + handle_error(EF_EXIT, -1, "pbzip2: *ERROR during BZ2_bzDecompress: ret=%d; block=%d; seq=%d; avail_in=%d\n", + bzret, fileData->blockNumber, outSequenceNumber, strm.avail_in); return (NULL); } - // decompress the memory buffer (verbose=0) - ret = BZ2_bzBuffToBuffDecompress(DecompressedData, &outSize, FileData, inSize, 0, Verbosity); - while (ret == BZ_OUTBUFF_FULL) + if (strm.avail_in != 0) { - #ifdef PBZIP_DEBUG - fprintf(stderr, "Increasing DecompressedData buffer size: %d -> %d\n", outSize, outSize*4); - #endif + handle_error(EF_EXIT, -1, "pbzip2: *ERROR unconsumed in after BZ2_bzDecompress loop:" + "ret=%d; block=%d; seq=%d; avail_in=%d\n", + bzret, fileData->blockNumber, outSequenceNumber, strm.avail_in); + return (NULL); + } - pthread_mutex_lock(MemMutex); - if (DecompressedData != NULL) - delete [] DecompressedData; - DecompressedData = NULL; - // increase buffer space - outSize = outSize * 4; - // allocate memory for decompressed data (start with default 900k block size) - DecompressedData = new char[outSize]; - pthread_mutex_unlock(MemMutex); - // make sure memory was allocated properly - if (DecompressedData == NULL) + if (bzret == BZ_STREAM_END) + { + if (!(fileData->isLastInSequence)) { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (DecompressedData)! Skipping...\n"); + handle_error(EF_EXIT, -1, "pbzip2: *ERROR on decompress - """ + "in segments for stream ended but BZ_STREAM_END not reached: ret=%d; block=%d; seq=%d\n", + bzret, fileData->blockNumber, outSequenceNumber); return (NULL); } - // decompress the memory buffer (verbose=0) - ret = BZ2_bzBuffToBuffDecompress(DecompressedData, &outSize, FileData, inSize, 0, Verbosity); - } // while + bzret = BZ2_bzDecompressEnd(&strm); + if (bzret != BZ_OK) + { + handle_error(EF_EXIT, -1, "pbzip2: *ERROR during BZ2_bzDecompressEnd: %d\n", bzret); + return (NULL); + } - if ((ret != BZ_OK) && (ret != BZ_OUTBUFF_FULL)) - fprintf(stderr, "pbzip2: *ERROR during decompression: %d\n", ret); + outSequenceNumber = 0; + prevOutBlockInSequence = NULL; + } #ifdef PBZIP_DEBUG - fprintf(stderr, "\n Compressed Block Size: %u\n", inSize); + fprintf(stderr, "\n Compressed Block Size: %u\n", (unsigned)fileData->bufSize); fprintf(stderr, " Original Block Size: %u\n", outSize); #endif - // store data to be written in output bin - pthread_mutex_lock(OutMutex); - OutputBuffer[blockNum].buf = DecompressedData; - OutputBuffer[blockNum].bufSize = outSize; - NumBufferedBlocks++; - pthread_mutex_unlock(OutMutex); - // throttle back if there is too much backlog in the file writing thread so we don't exceed system memory - while (NumBufferedBlocks > (numCPU * 20)) - { - #ifdef PBZIP_DEBUG - fprintf(stderr, "consumer: Throttling from FileWriter backlog: %d\n", NumBufferedBlocks); - #endif - usleep(50000); - } + disposeMemory(fileData->buf); #ifdef PBZIP_DEBUG - fprintf(stderr, " OutputBuffer[%d].buf = %x\n", blockNum, OutputBuffer[blockNum].buf); - fprintf(stderr, " OutputBuffer[%d].bufSize = %u\n", blockNum, OutputBuffer[blockNum].bufSize); + fprintf(stderr, " OutputBuffer[%d].buf = %x\n", fileData->blockNumber, DecompressedData); + fprintf(stderr, " OutputBuffer[%d].bufSize = %u\n", fileData->blockNumber, outSize); + fflush(stderr); #endif - if (FileData != NULL) - { - pthread_mutex_lock(MemMutex); - delete [] FileData; - FileData = NULL; - pthread_mutex_unlock(MemMutex); - } } // for + #ifdef PBZIP_DEBUG - printf ("consumer: exiting\n"); + fprintf (stderr, "consumer: exiting\n"); #endif return (NULL); } @@ -782,9 +1218,16 @@ int percentComplete = 0; int hOutfile = 1; // default to stdout int currBlock = 0; + size_t outBufferPos = 0; int ret = -1; + OFF_T bytesProcessed = 0; OutFilename = (char *) outname; + outBuff * prevBlockInSequence = NULL; + + #ifdef PBZIP_DEBUG + fprintf(stderr, "fileWriter function started\n"); + #endif // write to file instead of stdout if (OutputStdOut == 0) @@ -793,88 +1236,226 @@ // check to see if file creation was successful if (hOutfile == -1) { - fprintf(stderr, "pbzip2: *ERROR: Could not create output file [%s]!\n", OutFilename); + handle_error(EF_EXIT, -1, + "pbzip2: *ERROR: Could not create output file [%s]!\n", OutFilename); return (NULL); } } - while ((currBlock < NumBlocks) || (allDone == 0)) + while (true) { #ifdef PBZIP_DEBUG - fprintf(stderr, "fileWriter: Block: %d\n", currBlock); + int lastseq = 0; + if (prevBlockInSequence != NULL) + { + lastseq = prevBlockInSequence->sequenceNumber; + } #endif - pthread_mutex_lock(OutMutex); - if ((OutputBuffer.size() == 0) || (OutputBuffer[currBlock].bufSize < 1) || (OutputBuffer[currBlock].buf == NULL)) + // Order is important. We don't need sync on NumBlocks when producer + // is done. + if ((syncGetProducerDone() == 1) && (currBlock >= NumBlocks) && (prevBlockInSequence == NULL)) { - pthread_mutex_unlock(OutMutex); + #ifdef PBZIP_DEBUG + fprintf(stderr, "fileWriter [b:%d:%d]: done - quit loop.\n", currBlock, lastseq); + #endif + // We're done + break; + } + + if (syncGetTerminateFlag() != 0) + { + #ifdef PBZIP_DEBUG + fprintf (stderr, "fileWriter [b:%d]: terminating1 - terminateFlag set\n", currBlock); + #endif + break; + } + + safe_mutex_lock(OutMutex); + #ifdef PBZIP_DEBUG + outBuff * lastnext = (prevBlockInSequence != NULL) ? prevBlockInSequence->next : NULL; + fprintf(stderr, "fileWriter: Block: %d Size: %u Next File Block: %d" + ", outBufferPos: %u, NumBlocks: %d, producerDone: %d, lastseq=%d" + ", prev=%llx, next=%llx\n", + currBlock, NumBufferedBlocksMax, NextBlockToWrite, + outBufferPos, NumBlocks, syncGetProducerDone(), lastseq, + (unsigned long long)prevBlockInSequence, + (unsigned long long)lastnext); + #endif + + if ((OutputBuffer[outBufferPos].buf == NULL) && + ((prevBlockInSequence == NULL) || (prevBlockInSequence->next == NULL))) + { + safe_mutex_unlock(OutMutex); // sleep a little so we don't go into a tight loop using up all the CPU usleep(50000); continue; } else - pthread_mutex_unlock(OutMutex); + { + safe_mutex_unlock(OutMutex); + } + outBuff * outBlock; + if (prevBlockInSequence != NULL) + { + outBlock = prevBlockInSequence->next; + } + else + { + outBlock = &OutputBuffer[outBufferPos]; + } #ifdef PBZIP_DEBUG - fprintf(stderr, "fileWriter: Buffer: %x Size: %u Block: %d\n", OutputBuffer[currBlock].buf, OutputBuffer[currBlock].bufSize, currBlock); + fprintf(stderr, "fileWriter: Buffer: %x Size: %u Block: %d, Seq: %d, isLast: %d\n", + OutputBuffer[outBufferPos].buf, OutputBuffer[outBufferPos].bufSize, currBlock, + outBlock->sequenceNumber, (int)outBlock->isLastInSequence); #endif // write data to the output file - ret = write(hOutfile, OutputBuffer[currBlock].buf, OutputBuffer[currBlock].bufSize); + ret = do_write(hOutfile, outBlock->buf, outBlock->bufSize); #ifdef PBZIP_DEBUG - fprintf(stderr, "\n -> Total Bytes Written[%d]: %d bytes...\n", currBlock, ret); + fprintf(stderr, "\n -> Total Bytes Written[%d:%d]: %d bytes...\n", currBlock, outBlock->sequenceNumber, ret); #endif - CompressedSize += ret; - if (ret <= 0) + + if (ret < 0) { - fprintf(stderr, "pbzip2: *ERROR: Could not write to file! Skipping...\n"); if (OutputStdOut == 0) close(hOutfile); - pthread_mutex_lock(OutMutex); - pthread_mutex_lock(MemMutex); - if (OutputBuffer[currBlock].buf != NULL) - delete [] OutputBuffer[currBlock].buf; - pthread_mutex_unlock(MemMutex); - pthread_mutex_unlock(OutMutex); + + handle_error(EF_EXIT, -1, + "pbzip2: *ERROR: Could not write %d bytes to file [ret=%d]! Aborting...\n", + outBlock->bufSize, ret); return (NULL); } + CompressedSize += ret; + + bytesProcessed += outBlock->inSize; + delete [] outBlock->buf; + outBlock->buf = NULL; + outBlock->bufSize = 0; - pthread_mutex_lock(OutMutex); - pthread_mutex_lock(MemMutex); - if (OutputBuffer[currBlock].buf != NULL) + if (outBlock->isLastInSequence) { - delete [] OutputBuffer[currBlock].buf; - NumBufferedBlocks--; + if (++outBufferPos == NumBufferedBlocksMax) + { + outBufferPos = 0; + } + ++currBlock; + } + + safe_mutex_lock(OutMutex); + + if (outBlock->isLastInSequence) + { + ++NextBlockToWrite; + OutBufferPosToWrite = outBufferPos; + } + if (outBlock->sequenceNumber > 1) + { + --NumBufferedTailBlocks; + } + // --NumBufferedBlocks; // to be removed + safe_cond_broadcast(notTooMuchNumBuffered); + safe_mutex_unlock(OutMutex); + + if (outBlock->sequenceNumber > 2) + { + delete prevBlockInSequence; + } + + if (outBlock->isLastInSequence) + { + prevBlockInSequence = NULL; + if (outBlock->sequenceNumber > 1) + { + delete outBlock; + } + } + else + { + prevBlockInSequence = outBlock; } - pthread_mutex_unlock(MemMutex); - pthread_mutex_unlock(OutMutex); - currBlock++; - // print current completion status - percentComplete = 100 * currBlock / NumBlocks; if (QuietMode != 1) { - fprintf(stderr, "Completed: %d%% \r", percentComplete); + // print current completion status + int percentCompleteOld = percentComplete; + if (InFileSize > 0) + { + percentComplete = (100.0 * (double)bytesProcessed / (double)InFileSize); + } + + #ifdef PBZIP_DEBUG + fprintf(stderr, "Completed: %d%% NextBlockToWrite: %d/%u \r", percentComplete, NextBlockToWrite, NumBufferedBlocksMax); fflush(stderr); + #else + if (percentComplete != percentCompleteOld) + { + fprintf(stderr, "Completed: %d%% \r", percentComplete); + fflush(stderr); + } + #endif } } // while + if (currBlock == 0) + { + // zero-size file needs special handling + ret = do_write(hOutfile, Bz2HeaderZero, sizeof(Bz2HeaderZero) ); + + if (ret < 0) + { + handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not write to file! Aborting...\n"); + return (NULL); + } + } + + if (OutputStdOut == 0) close(hOutfile); - if ((QuietMode != 1) && (allDone == 0)) + + if (QuietMode != 1) { fprintf(stderr, " Output Size: %"PRIu64" bytes\n", (unsigned long long)CompressedSize); } + #ifdef PBZIP_DEBUG + fprintf(stderr, "fileWriter exit\n"); + fflush(stderr); + #endif + + // wake up all other possibly blocked on cond threads + if (FifoQueue != NULL) + { + safe_cond_broadcast(FifoQueue->notEmpty); // important + safe_cond_broadcast(FifoQueue->notFull); // not really needed + } + safe_cond_broadcast(notTooMuchNumBuffered); // not really needed + + if (QuietMode != 1) + { + // print current completion status + percentComplete = 100; + + #ifdef PBZIP_DEBUG + fprintf(stderr, "Completed: %d%% NextBlockToWrite: %d/%u \r", percentComplete, NextBlockToWrite, NumBufferedBlocksMax); + fflush(stderr); + #else + + fprintf(stderr, "Completed: %d%% \r", percentComplete); + fflush(stderr); + #endif + } + return (NULL); } /* ********************************************************* */ -int directcompress(int hInfile, OFF_T fileSize, int blockSize, char *OutFilename) +int directcompress(int hInfile, OFF_T fileSize, int blockSize, const char *OutFilename) { char *FileData = NULL; char *CompressedData = NULL; @@ -901,10 +1482,27 @@ return -1; } } + #ifdef WIN32 + else + { + setmode(fileno(stdout), O_BINARY); + } + #endif // keep going until all the file is processed while (bytesLeft > 0) { + if (syncGetTerminateFlag() != 0) + { + close(hInfile); + if (OutputStdOut == 0) + close(hOutfile); + + fprintf (stderr, "directcompress: terminating - terminateFlag set\n"); + + return -1; + } + // // READ DATA // @@ -921,20 +1519,21 @@ // allocate memory to read in file FileData = NULL; - FileData = new char[inSize]; + FileData = new(std::nothrow) char[inSize]; // make sure memory was allocated properly if (FileData == NULL) { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (FileData)! Skipping...\n"); close(hInfile); if (OutputStdOut == 0) close(hOutfile); - allDone = 1; + + handle_error(EF_EXIT, -1, + "pbzip2: *ERROR: Could not allocate memory (FileData)! Aborting...\n"); return -1; } // read file data - rret = read(hInfile, (char *) FileData, inSize); + rret = do_read(hInfile, (char *) FileData, inSize); #ifdef PBZIP_DEBUG fprintf(stderr, " -> Total Bytes Read: %d bytes...\n\n", rret); #endif @@ -946,13 +1545,14 @@ } else if (rret < 0) { - fprintf(stderr, "pbzip2: *ERROR: Could not read from file! Skipping...\n"); close(hInfile); if (FileData != NULL) delete [] FileData; if (OutputStdOut == 0) close(hOutfile); - allDone = 1; + + handle_error(EF_EXIT, -1, + "pbzip2: *ERROR: Could not read from file! Aborting...\n"); return -1; } @@ -965,22 +1565,30 @@ outSize = (int) ((inSize*1.01)+600); // allocate memory for compressed data - CompressedData = new char[outSize]; + CompressedData = new(std::nothrow) char[outSize]; // make sure memory was allocated properly if (CompressedData == NULL) { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (CompressedData)! Skipping...\n"); close(hInfile); if (FileData != NULL) delete [] FileData; - allDone = 1; + + handle_error(EF_EXIT, -1, + "pbzip2: *ERROR: Could not allocate memory (CompressedData)! Aborting...\n"); return -1; } // compress the memory buffer (blocksize=9*100k, verbose=0, worklevel=30) ret = BZ2_bzBuffToBuffCompress(CompressedData, &outSize, FileData, inSize, BWTblockSize, Verbosity, 30); if (ret != BZ_OK) - fprintf(stderr, "pbzip2: *ERROR during compression: %d\n", ret); + { + close(hInfile); + if (FileData != NULL) + delete [] FileData; + + handle_error(EF_EXIT, -1, "pbzip2: *ERROR during compression: %d! Aborting...\n", ret); + return -1; + } #ifdef PBZIP_DEBUG fprintf(stderr, "\n Original Block Size: %u\n", inSize); @@ -992,15 +1600,13 @@ // // write data to the output file - ret = write(hOutfile, CompressedData, outSize); + ret = do_write(hOutfile, CompressedData, outSize); #ifdef PBZIP_DEBUG fprintf(stderr, "\n -> Total Bytes Written[%d]: %d bytes...\n", currBlock, ret); #endif - CompressedSize += ret; if (ret <= 0) { - fprintf(stderr, "pbzip2: *ERROR: Could not write to file! Skipping...\n"); close(hInfile); if (FileData != NULL) delete [] FileData; @@ -1008,17 +1614,24 @@ delete [] CompressedData; if (OutputStdOut == 0) close(hOutfile); - allDone = 1; + + handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not write to file! Aborting...\n"); return -1; } + CompressedSize += ret; + currBlock++; // print current completion status - percentComplete = 100 * currBlock / NumBlocks; + int percentCompleteOld = percentComplete; + percentComplete = 100 * currBlock / NumBlocksEstimated; if (QuietMode != 1) { - fprintf(stderr, "Completed: %d%% \r", percentComplete); - fflush(stderr); + if (percentComplete != percentCompleteOld) + { + fprintf(stderr, "Completed: %d%% \r", percentComplete); + fflush(stderr); + } } // clean up memory @@ -1047,14 +1660,14 @@ fprintf(stderr, " Output Size: %"PRIu64" bytes\n", (unsigned long long)CompressedSize); } - allDone = 1; + syncSetProducerDone(1); // Not really needed for direct version return 0; } /* ********************************************************* */ -int directdecompress(char *InFilename, char *OutFilename) +int directdecompress(const char *InFilename, const char *OutFilename) { FILE *stream = NULL; FILE *zStream = NULL; @@ -1077,19 +1690,27 @@ zStream = fopen(InFilename, "rb"); if (zStream == NULL) { - fprintf(stderr, "pbzip2: *ERROR: Could not open input file [%s]! Skipping...\n", InFilename); + handle_error(EF_NOQUIT, -1, + "pbzip2: *ERROR: Could not open input file [%s]! Aborting...\n", InFilename); return -1; } } else + { + #ifdef WIN32 + setmode(fileno(stdin), O_BINARY); + #endif zStream = stdin; + } // check file stream for errors if (ferror(zStream)) { - fprintf(stderr, "pbzip2: *ERROR: Problem with input stream of file [%s]! Skipping...\n", InFilename); if (zStream != stdin) fclose(zStream); + + handle_error(EF_NOQUIT, -1, + "pbzip2: *ERROR: Problem with input stream of file [%s]! Aborting...\n", InFilename); return -1; } @@ -1099,20 +1720,37 @@ stream = fopen(OutFilename, "wb"); } else + { + #ifdef WIN32 + setmode(fileno(stdout), O_BINARY); + #endif stream = stdout; + } // check file stream for errors if (ferror(stream)) { - fprintf(stderr, "pbzip2: *ERROR: Problem with output stream of file [%s]! Skipping...\n", InFilename); if (stream != stdout) fclose(stream); + + handle_error(EF_NOQUIT, -1, + "pbzip2: *ERROR: Problem with output stream of file [%s]! Aborting...\n", InFilename); return -1; } // loop until end of file while(true) { + if (syncGetTerminateFlag() != 0) + { + fprintf (stderr, "directdecompress: terminating1 - terminateFlag set\n"); + if (zStream != stdin) + fclose(zStream); + if (stream != stdout) + fclose(stream); + return -1; + } + bzf = BZ2_bzReadOpen(&bzerr, zStream, Verbosity, 0, unused, nUnused); if (bzf == NULL || bzerr != BZ_OK) { @@ -1121,6 +1759,7 @@ fclose(zStream); if (stream != stdout) fclose(stream); + return ret; } @@ -1128,6 +1767,16 @@ while (bzerr == BZ_OK) { + if (syncGetTerminateFlag() != 0) + { + fprintf (stderr, "directdecompress: terminating2 - terminateFlag set\n"); + if (zStream != stdin) + fclose(zStream); + if (stream != stdout) + fclose(stream); + return -1; + } + nread = BZ2_bzRead(&bzerr, bzf, obuf, sizeof(obuf)); if (bzerr == BZ_DATA_ERROR_MAGIC) { @@ -1153,7 +1802,7 @@ return ret; } if (nread > 0) - fwrite (obuf, sizeof(unsigned char), nread, stream); + (void) fwrite (obuf, sizeof(unsigned char), nread, stream); if (ferror(stream)) { ret = testBZ2ErrorHandling(bzerr, bzf, streamNo); @@ -1168,7 +1817,7 @@ } } if ((bzerr == BZ_OK || bzerr == BZ_STREAM_END) && nread > 0) - fwrite(obuf, sizeof(unsigned char), nread, stream ); + (void) fwrite(obuf, sizeof(unsigned char), nread, stream ); if (ferror(stream)) { ret = testBZ2ErrorHandling(bzerr, bzf, streamNo); @@ -1261,6 +1910,7 @@ } } + syncSetProducerDone(1); // Not really needed for direct version. return 0; } @@ -1299,19 +1949,30 @@ int producer(int hInfile, int blockSize, queue *fifo) { char *FileData = NULL; - OFF_T inSize = 0; - int blockNum = 0; + size_t inSize = 0; + // int blockNum = 0; int ret = 0; - int pret = -1; + // int pret = -1; // We will now totally ignore the fileSize and read the data as it // comes in. Aside from allowing us to process arbitrary streams, it's // also the *right thing to do* in unix environments where data may // be appended to the file as it's processed (e.g. log files). + safe_mutex_lock(&ProgressIndicatorsMutex); + NumBlocks = 0; + InBytesProduced = 0; + safe_mutex_unlock(&ProgressIndicatorsMutex); + // keep going until all the file is processed while (1) { + if (syncGetTerminateFlag() != 0) + { + close(hInfile); + return -1; + } + // set buffer size inSize = blockSize; @@ -1319,17 +1980,14 @@ fprintf(stderr, " -> Bytes To Read: %"PRIu64" bytes...\n", inSize); #endif - pthread_mutex_lock(MemMutex); // allocate memory to read in file FileData = NULL; - FileData = new char[inSize]; - pthread_mutex_unlock(MemMutex); + FileData = new(std::nothrow) char[inSize]; // make sure memory was allocated properly if (FileData == NULL) { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (FileData)! Skipping...\n"); close(hInfile); - allDone = 1; + handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (FileData)! Aborting...\n"); return -1; } @@ -1341,75 +1999,77 @@ if (ret == 0) { // finished reading. - pthread_mutex_lock(MemMutex); if (FileData != NULL) delete [] FileData; - pthread_mutex_unlock(MemMutex); - NumBlocks = blockNum; break; } else if (ret < 0) { - fprintf(stderr, "pbzip2: *ERROR: Could not read from file! Skipping...\n"); close(hInfile); - pthread_mutex_lock(MemMutex); if (FileData != NULL) delete [] FileData; - pthread_mutex_unlock(MemMutex); - allDone = 1; + + handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not read from file! Aborting...\n"); return -1; } // check to make sure all the data we expected was read in - if (ret != inSize) + if ((size_t)ret != inSize) inSize = ret; #ifdef PBZIP_DEBUG - fprintf(stderr, "producer: Going into fifo-mut lock (blockNum: %d)\n", blockNum); + fprintf(stderr, "producer: Going into fifo-mut lock (NumBlocks: %d)\n", NumBlocks); #endif // add data to the compression queue - pthread_mutex_lock(fifo->mut); + safe_mutex_lock(fifo->mut); while (fifo->full) { #ifdef PBZIP_DEBUG - printf ("producer: queue FULL.\n"); + fprintf (stderr, "producer: queue FULL.\n"); #endif - pret = pthread_cond_wait(fifo->notFull, fifo->mut); - if (pret != 0) - fprintf(stderr, "pbzip2: producer: *ERROR: pthread_cond_wait error = %d\n", pret); + safe_cond_wait(fifo->notFull, fifo->mut); + + if (syncGetTerminateFlag() != 0) + { + pthread_mutex_unlock(fifo->mut); + close(hInfile); + return -1; + } } #ifdef PBZIP_DEBUG - fprintf(stderr, "producer: Buffer: %x Size: %"PRIu64" Block: %d\n", FileData, inSize, blockNum); + fprintf(stderr, "producer: Buffer: %x Size: %"PRIu64" Block: %d\n", FileData, inSize, NumBlocks); #endif - queueAdd(fifo, FileData, inSize, blockNum); - pthread_mutex_unlock(fifo->mut); - pthread_cond_signal(fifo->notEmpty); - - blockNum++; - - // make sure output buffer is large enough to handle input data - if (blockNum > OutputBuffer.size()) + outBuff * queueElement = new(std::nothrow) outBuff(FileData, inSize, NumBlocks, 0); + // make sure memory was allocated properly + if (queueElement == NULL) { - int newsize = OutputBuffer.size()*2; - #ifdef PBZIP_DEBUG - fprintf(stderr, "producer: Resizing OutputBuffer to %d\n", newsize); - #endif - pthread_mutex_lock(OutMutex); - OutputBuffer.resize(newsize); - pthread_mutex_unlock(OutMutex); - if (OutputBuffer.size() != newsize) - { - allDone = 1; - return -1; - } + close(hInfile); + handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (queueElement)! Aborting...\n"); + return -1; } + + fifo->add(queueElement); + safe_cond_signal(fifo->notEmpty); + + safe_mutex_lock(&ProgressIndicatorsMutex); + ++NumBlocks; + InBytesProduced += inSize; + safe_mutex_unlock(&ProgressIndicatorsMutex); + + safe_mutex_unlock(fifo->mut); } // while close(hInfile); - allDone = 1; + syncSetProducerDone(1); + safe_cond_broadcast(fifo->notEmpty); // just in case + + #ifdef PBZIP_DEBUG + fprintf(stderr, "producer: Done - exiting. Num Blocks: %d\n", NumBlocks); + #endif + return 0; } @@ -1418,117 +2078,106 @@ */ void *consumer (void *q) { - struct timespec waitTimer; - #ifndef WIN32 - struct timeval tv; - struct timezone tz; - #else - SYSTEMTIME systemtime; - LARGE_INTEGER filetime; - #endif queue *fifo; - char *FileData = NULL; + // char *FileData = NULL; + outBuff *fileData; char *CompressedData = NULL; - unsigned int inSize = 0; + // unsigned int inSize = 0; unsigned int outSize = 0; - int blockNum = -1; + // int blockNum = -1; int ret = -1; - int pret = -1; fifo = (queue *)q; for (;;) { - pthread_mutex_lock(fifo->mut); - while (fifo->empty) + if (syncGetTerminateFlag() != 0) + { + #ifdef PBZIP_DEBUG + fprintf (stderr, "consumer: terminating1 - terminateFlag set\n"); + #endif + return (NULL); + } + + safe_mutex_lock(fifo->mut); + for (;;) { + if (!fifo->empty && (fifo->remove(fileData) == 1)) + { + // block retreived - break the loop and continue further + break; + } + #ifdef PBZIP_DEBUG - printf ("consumer: queue EMPTY.\n"); + fprintf (stderr, "consumer: queue EMPTY.\n"); #endif - if (allDone == 1) + + if (fifo->empty && ((syncGetProducerDone() == 1) || (syncGetTerminateFlag() != 0))) { - pthread_mutex_unlock(fifo->mut); + safe_mutex_unlock(fifo->mut); #ifdef PBZIP_DEBUG - printf ("consumer: exiting2\n"); + fprintf (stderr, "consumer: exiting2\n"); #endif - return (NULL); - } - // only wait for the queue notEmpty signal for 1 second before checking if - // the producer is finished putting work into the queue - #ifndef WIN32 - gettimeofday(&tv, &tz); - waitTimer.tv_sec = tv.tv_sec + 1; - waitTimer.tv_nsec = tv.tv_usec * 1000; - #else - GetSystemTime(&systemtime); - SystemTimeToFileTime(&systemtime, (FILETIME *)&filetime); - waitTimer.tv_sec = filetime.QuadPart / 10000000; - waitTimer.tv_nsec = filetime.QuadPart - ((LONGLONG)waitTimer.tv_sec * 10000000) * 10; - waitTimer.tv_sec++; - #endif + return (NULL); + } + #ifdef PBZIP_DEBUG - fprintf(stderr, "consumer: waitTimer.tv_sec: %d waitTimer.tv_nsec: %d\n", waitTimer.tv_sec, waitTimer.tv_nsec); + safe_cond_timed_wait(fifo->notEmpty, fifo->mut, 1, "consumer"); + #else + safe_cond_wait(fifo->notEmpty, fifo->mut); #endif - pret = pthread_cond_timedwait(fifo->notEmpty, fifo->mut, &waitTimer); - // we are not using a compatible pthreads library so abort - if (pret == EINVAL) - { - fprintf(stderr, "pbzip2: *ERROR: pthread_cond_timedwait() call invalid. This machine\n"); - fprintf(stderr, " does not have compatible pthreads library. Skipping.\n"); - exit(-1); - } } - FileData = queueDel(fifo, &inSize, &blockNum); + #ifdef PBZIP_DEBUG - fprintf(stderr, "consumer: Buffer: %x Size: %u Block: %d\n", FileData, inSize, blockNum); + fprintf(stderr, "consumer: Buffer: %x Size: %u Block: %d\n", + fileData->buf, (unsigned)fileData->bufSize, fileData->blockNumber); #endif - pthread_mutex_unlock(fifo->mut); - pret = pthread_cond_signal(fifo->notFull); - if (pret != 0) - fprintf(stderr, "pbzip2: consumer: *ERROR: pthread_cond_signal error = %d\n", pret); + safe_cond_signal(fifo->notFull); + safe_mutex_unlock(fifo->mut); #ifdef PBZIP_DEBUG - fprintf(stderr, "consumer: received %d.\n", blockNum); + fprintf(stderr, "consumer: received %d.\n", fileData->blockNumber); #endif - outSize = (int) ((inSize*1.01)+600); - pthread_mutex_lock(MemMutex); + outSize = (unsigned int) (((fileData->bufSize)*1.01)+600); // allocate memory for compressed data - CompressedData = new char[outSize]; - pthread_mutex_unlock(MemMutex); + CompressedData = new(std::nothrow) char[outSize]; // make sure memory was allocated properly if (CompressedData == NULL) { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (CompressedData)! Skipping...\n"); + handle_error(EF_EXIT, -1, "pbzip2: *ERROR: Could not allocate memory (CompressedData)! Aborting...\n"); return (NULL); } // compress the memory buffer (blocksize=9*100k, verbose=0, worklevel=30) - ret = BZ2_bzBuffToBuffCompress(CompressedData, &outSize, FileData, inSize, BWTblockSize, Verbosity, 30); + ret = BZ2_bzBuffToBuffCompress(CompressedData, &outSize, + fileData->buf, fileData->bufSize, BWTblockSize, Verbosity, 30); if (ret != BZ_OK) - fprintf(stderr, "pbzip2: *ERROR during compression: %d\n", ret); + { + handle_error(EF_EXIT, -1, "pbzip2: *ERROR during compression: %d! Aborting...\n", ret); + return (NULL); + } #ifdef PBZIP_DEBUG - fprintf(stderr, "\n Original Block Size: %u\n", inSize); + fprintf(stderr, "\n Original Block Size: %u\n", (unsigned)fileData->bufSize); fprintf(stderr, " Compressed Block Size: %u\n", outSize); #endif - // store data to be written in output bin - pthread_mutex_lock(OutMutex); - OutputBuffer[blockNum].buf = CompressedData; - OutputBuffer[blockNum].bufSize = outSize; - pthread_mutex_unlock(OutMutex); + disposeMemory(fileData->buf); - if (FileData != NULL) + // store data to be written in output bin + outBuff outBlock = outBuff(CompressedData, outSize, fileData->blockNumber, 0, fileData->bufSize); + if (outputBufferAdd(outBlock, "consumer") == NULL) { - pthread_mutex_lock(MemMutex); - delete [] FileData; - FileData = NULL; - pthread_mutex_unlock(MemMutex); + return (NULL); } + + delete fileData; + fileData = NULL; } // for + #ifdef PBZIP_DEBUG - printf ("consumer: exiting\n"); + fprintf (stderr, "consumer: exiting\n"); #endif return (NULL); } @@ -1536,29 +2185,68 @@ /* ********************************************************* */ +int mutexesInit() +{ + // initialize mutexes + OutMutex = new(std::nothrow) pthread_mutex_t; + // make sure memory was allocated properly + if (OutMutex == NULL) + { + fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (OutMutex)! Aborting...\n"); + return 1; + } + pthread_mutex_init(OutMutex, NULL); + + ProducerDoneMutex = new(std::nothrow) pthread_mutex_t; + // make sure memory was allocated properly + if (ProducerDoneMutex == NULL) + { + fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (ProducerDoneMutex)! Aborting...\n"); + return 1; + } + pthread_mutex_init(ProducerDoneMutex, NULL); + + return 0; +} + +/* + ********************************************************* + */ +void mutexesDelete() +{ + if (OutMutex != NULL) + { + pthread_mutex_destroy(OutMutex); + delete OutMutex; + OutMutex = NULL; + } + + if (ProducerDoneMutex != NULL) + { + pthread_mutex_destroy(ProducerDoneMutex); + delete ProducerDoneMutex; + ProducerDoneMutex = NULL; + } +} + +/* + ********************************************************* + */ queue *queueInit(int queueSize) { queue *q; QUEUESIZE = queueSize; - q = new queue; + q = new(std::nothrow) queue; if (q == NULL) return NULL; - q->buf = NULL; - q->buf = new char*[queueSize]; - if (q->buf == NULL) - return NULL; + q->qData = new(std::nothrow) queue::ElementTypePtr[queueSize]; - q->bufSize = NULL; - q->bufSize = new unsigned int[queueSize]; - if (q->bufSize == NULL) + if (q->qData == NULL) return NULL; - q->blockNum = NULL; - q->blockNum = new int[queueSize]; - if (q->blockNum == NULL) - return NULL; + q->size = queueSize; q->empty = 1; q->full = 0; @@ -1566,31 +2254,38 @@ q->tail = 0; q->mut = NULL; - q->mut = new pthread_mutex_t; + q->mut = new(std::nothrow) pthread_mutex_t; if (q->mut == NULL) return NULL; pthread_mutex_init(q->mut, NULL); q->notFull = NULL; - q->notFull = new pthread_cond_t; + q->notFull = new(std::nothrow) pthread_cond_t; if (q->notFull == NULL) return NULL; pthread_cond_init(q->notFull, NULL); q->notEmpty = NULL; - q->notEmpty = new pthread_cond_t; + q->notEmpty = new(std::nothrow) pthread_cond_t; if (q->notEmpty == NULL) return NULL; pthread_cond_init(q->notEmpty, NULL); q->consumers = NULL; - q->consumers = new pthread_t[queueSize]; + q->consumers = new(std::nothrow) pthread_t[queueSize]; if (q->consumers == NULL) return NULL; + notTooMuchNumBuffered = NULL; + notTooMuchNumBuffered = new(std::nothrow) pthread_cond_t; + if (notTooMuchNumBuffered == NULL) + return NULL; + pthread_cond_init(notTooMuchNumBuffered, NULL); + return (q); } + /* ********************************************************* */ @@ -1621,52 +2316,133 @@ } delete [] q->consumers; - delete [] q->blockNum; - delete [] q->bufSize; - delete [] q->buf; + delete [] q->qData; + delete q; q = NULL; + if (notTooMuchNumBuffered != NULL) + { + pthread_cond_destroy(notTooMuchNumBuffered); + delete notTooMuchNumBuffered; + notTooMuchNumBuffered = NULL; + } + return; } -/* - ********************************************************* + +/** + * Initialize output buffer contents with empty (NULL, 0) blocks + * + * @param size new size of buffer + * */ -void queueAdd (queue *q, char *in, unsigned int bufSize, int blockNum) +void outputBufferInit(size_t size) { - q->buf[q->tail] = in; - q->bufSize[q->tail] = bufSize; - q->blockNum[q->tail] = blockNum; - q->tail++; - if (q->tail == QUEUESIZE) - q->tail = 0; - if (q->tail == q->head) - q->full = 1; - q->empty = 0; + safe_mutex_lock(OutMutex); - return; + NextBlockToWrite = 0; + OutBufferPosToWrite = 0; + NumBufferedBlocks = 0; + NumBufferedTailBlocks = 0; + + outBuff emptyElement; + emptyElement.buf = NULL; + emptyElement.bufSize = 0; + + // Resize and fill-in with empty elements + OutputBuffer.assign(size, emptyElement); + + // unlikely to get here since more likely exception will be thrown + if (OutputBuffer.size() != size) + { + fprintf(stderr, "pbzip2: *ERROR: Could not initialize (OutputBuffer); size=%u! Aborting...\n", size); + safe_mutex_unlock(OutMutex); + exit(1); + } + + safe_mutex_unlock(OutMutex); } -/* - ********************************************************* +outBuff * outputBufferSeqAddNext(outBuff * preveElement, outBuff * newElement) +{ + safe_mutex_lock(OutMutex); + + while (NumBufferedTailBlocks >= NumBufferedBlocksMax) + { + if (syncGetTerminateFlag() != 0) + { + #ifdef PBZIP_DEBUG + fprintf (stderr, "%s: terminating2 - terminateFlag set\n", "consumer"); + #endif + pthread_mutex_unlock(OutMutex); + return NULL; + } + + #ifdef PBZIP_DEBUG + fprintf (stderr, "%s/outputBufferSeqAddNext: Throttling from FileWriter backlog: %d\n", "consumer", NumBufferedBlocks); + #endif + safe_cond_wait(notTooMuchNumBuffered, OutMutex); + } + + preveElement->next = newElement; + + ++NumBufferedTailBlocks; + + safe_mutex_unlock(OutMutex); + + return newElement; +} + +/** + * Store an item in OutputBuffer out bin. Synchronization is embedded to protect + * from simultaneous access. + * + * @param in - item buffer + * @param bufSize - item buffer size + * @param blockNum - block number in the whole stream (not the position in buffer) + * @param caller - calling function (used for logging and debug purposes) + * + * @return pointer to added element on success; NULL - on error */ -char *queueDel (queue *q, unsigned int *bufSize, int *blockNum) +outBuff * outputBufferAdd(const outBuff & element, const char *caller) { - char *out; + safe_mutex_lock(OutMutex); - out = q->buf[q->head]; - *bufSize = q->bufSize[q->head]; - *blockNum = q->blockNum[q->head]; - - q->head++; - if (q->head == QUEUESIZE) - q->head = 0; - if (q->head == q->tail) - q->empty = 1; - q->full = 0; + // wait while blockNum is out of range + // [NextBlockToWrite, NextBlockToWrite + NumBufferedBlocksMax) + int dist = element.blockNumber - NumBufferedBlocksMax; + while (dist >= NextBlockToWrite) + { + if (syncGetTerminateFlag() != 0) + { + #ifdef PBZIP_DEBUG + fprintf (stderr, "%s/outputBufferAdd: terminating2 - terminateFlag set\n", caller); + #endif + pthread_mutex_unlock(OutMutex); + return NULL; + } + + #ifdef PBZIP_DEBUG + fprintf (stderr, "%s: Throttling from FileWriter backlog: %d\n", caller, NumBufferedBlocks); + #endif + safe_cond_wait(notTooMuchNumBuffered, OutMutex); + } - return out; + // calculate output buffer position (used in circular mode) + size_t outBuffPos = OutBufferPosToWrite + element.blockNumber - NextBlockToWrite; + if (outBuffPos >= NumBufferedBlocksMax) + { + outBuffPos -= NumBufferedBlocksMax; + } + + OutputBuffer[outBuffPos] = element; + ++NumBufferedBlocks; + + safe_mutex_unlock(OutMutex); + + return &(OutputBuffer[outBuffPos]); } /* @@ -1847,7 +2623,7 @@ /* ********************************************************* */ -int getFileMetaData(char *fileName) +int getFileMetaData(const char *fileName) { // get the file meta data and store it in the global structure return stat(fileName, &fileMetaData); @@ -1856,10 +2632,14 @@ /* ********************************************************* */ -int writeFileMetaData(char *fileName) +int writeFileMetaData(const char *fileName) { int ret = 0; + #ifndef WIN32 struct utimbuf uTimBuf; + #else + _utimbuf uTimBuf; + #endif // store file times in structure uTimBuf.actime = fileMetaData.st_atime; @@ -1877,7 +2657,11 @@ // update file with stored file ownership (if access allows) #ifndef WIN32 - chown(fileName, fileMetaData.st_uid, fileMetaData.st_gid); + ret = chown(fileName, fileMetaData.st_uid, fileMetaData.st_gid); + // following may happen on some Linux filesystems (i.e. NTFS) + // extra error messages do no harm + if ((geteuid() == 0) && (ret != 0)) + return ret; #endif return 0; @@ -1903,6 +2687,10 @@ ncpu = 1; #elif defined(_SC_NPROCESSORS_ONLN) ncpu = sysconf(_SC_NPROCESSORS_ONLN); + #elif defined(WIN32) + SYSTEM_INFO si; + GetSystemInfo(&si); + ncpu = si.dwNumberOfProcessors; #endif // Ensure we have at least one processor to use @@ -1918,8 +2706,9 @@ */ void banner() { - fprintf(stderr, "Parallel BZIP2 v1.0.5 - by: Jeff Gilchrist [http://compression.ca]\n"); - fprintf(stderr, "[Jan. 08, 2009] (uses libbzip2 by Julian Seward)\n"); + fprintf(stderr, "Parallel BZIP2 v1.1.1 - by: Jeff Gilchrist [http://compression.ca]\n"); + fprintf(stderr, "[Apr. 17, 2010] (uses libbzip2 by Julian Seward)\n"); + fprintf(stderr, "Major contributions: Yavor Nikolov \n"); return; } @@ -1937,36 +2726,40 @@ fprintf(stderr, "\nInvalid command line: %s. Aborting...\n\n", reason); #ifndef PBZIP_NO_LOADAVG - fprintf(stderr, "Usage: %s [-1 .. -9] [-b#cdfhklp#qrtVz] \n", progname); + fprintf(stderr, "Usage: %s [-1 .. -9] [-b#cdfhklm#p#qrS#tVz] \n", progname); #else - fprintf(stderr, "Usage: %s [-1 .. -9] [-b#cdfhkp#qrtVz] \n", progname); -#endif - fprintf(stderr, " -b# : where # is the file block size in 100k (default 9 = 900k)\n"); - fprintf(stderr, " -c : output to standard out (stdout)\n"); - fprintf(stderr, " -d : decompress file\n"); - fprintf(stderr, " -f : force, overwrite existing output file\n"); - fprintf(stderr, " -h : print this help message\n"); - fprintf(stderr, " -k : keep input file, don't delete\n"); + fprintf(stderr, "Usage: %s [-1 .. -9] [-b#cdfhkm#p#qrS#tVz] \n", progname); +#endif // PBZIP_NO_LOADAVG + fprintf(stderr, " -1 .. -9 set BWT block size to 100k .. 900k (default 900k)\n"); + fprintf(stderr, " -b# Block size in 100k steps (default 9 = 900k)\n"); + fprintf(stderr, " -c,--stdout Output to standard out (stdout)\n"); + fprintf(stderr, " -d,--decompress Decompress file\n"); + fprintf(stderr, " -f,--force Overwrite existing output file\n"); + fprintf(stderr, " -h,--help Print this help message\n"); + fprintf(stderr, " -k,--keep Keep input file, don't delete\n"); #ifndef PBZIP_NO_LOADAVG - fprintf(stderr, " -l : load average determines max number processors to use\n"); -#endif - fprintf(stderr, " -p# : where # is the number of processors (default"); + fprintf(stderr, " -l,--loadavg Load average determines max number processors to use\n"); +#endif // PBZIP_NO_LOADAVG + fprintf(stderr, " -m# Maximum memory usage in 1MB steps (default 100 = 100MB)\n"); + fprintf(stderr, " -p# Number of processors to use (default"); #if defined(_SC_NPROCESSORS_ONLN) || defined(__APPLE__) fprintf(stderr, ": autodetect [%d])\n", detectCPUs()); #else fprintf(stderr, " 2)\n"); -#endif - fprintf(stderr, " -q : quiet mode (default)\n"); - fprintf(stderr, " -r : read entire input file into RAM and split between processors\n"); - fprintf(stderr, " -t : test compressed file integrity\n"); - fprintf(stderr, " -v : verbose mode\n"); - fprintf(stderr, " -V : display version info for pbzip2 then exit\n"); - fprintf(stderr, " -z : compress file (default)\n"); - fprintf(stderr, " -1 .. -9 : set BWT block size to 100k .. 900k (default 900k)\n\n"); +#endif // _SC_NPROCESSORS_ONLN || __APPLE__ + fprintf(stderr, " -q,--quiet Quiet mode (default)\n"); + fprintf(stderr, " -r,--read Read entire input file into RAM and split between processors\n"); +#ifdef USE_STACKSIZE_CUSTOMIZATION + fprintf(stderr, " -S# Child thread stack size in 1KB steps (default stack size if unspecified)\n"); +#endif // USE_STACKSIZE_CUSTOMIZATION + fprintf(stderr, " -t,--test Test compressed file integrity\n"); + fprintf(stderr, " -v,--verbose Verbose mode\n"); + fprintf(stderr, " -V,--version Display version info for pbzip2 then exit\n"); + fprintf(stderr, " -z,--compress Compress file (default)\n\n"); fprintf(stderr, "Example: pbzip2 -b15vk myfile.tar\n"); fprintf(stderr, "Example: pbzip2 -p4 -r -5 myfile.tar second*.txt\n"); fprintf(stderr, "Example: tar cf myfile.tar.bz2 --use-compress-prog=pbzip2 dir_to_compress/\n"); - fprintf(stderr, "Example: pbzip2 -d myfile.tar.bz2\n\n"); + fprintf(stderr, "Example: pbzip2 -d -m500 myfile.tar.bz2\n\n"); exit(-1); } @@ -1982,10 +2775,9 @@ char *progName = NULL; char *progNamePos = NULL; char bz2Header[] = {"BZh91AY&SY"}; // using 900k block size - char bz2HeaderZero[] = { 0x42, 0x5A, 0x68, 0x39, 0x17, 0x72, 0x45, 0x38, 0x50, 0x90, 0x00, 0x00, 0x00, 0x00 }; - char OutFilename[2048]; + std::string outFilename; // [2048]; char cmdLineTemp[2048]; - char tmpBuff[50]; + unsigned char tmpBuff[50]; char stdinFile[2] = {"-"}; struct timeval tvStartTime; struct timeval tvStopTime; @@ -2005,8 +2797,6 @@ double timeCalc = 0.0; double timeStart = 0.0; double timeStop = 0.0; - OFF_T fileSize = 0; - size_t size; int cmdLineTempCount = 0; int readEntireFile = 0; int zeroByteFile = 0; @@ -2014,7 +2804,9 @@ int hOutfile = -1; int numBlocks = 0; int blockSize = 9*100000; + int maxMemory = 100000000; int decompress = 0; + int compress = 0; int testFile = 0; int errLevel = 0; int noThreads = 0; @@ -2022,7 +2814,8 @@ int force = 0; int ret = 0; int fileLoop; - int i, j, k; + size_t i, j, k; + bool switchedMtToSt = false; // switched from multi- to single-thread // get current time for benchmark reference #ifndef WIN32 @@ -2058,12 +2851,15 @@ } FileListCount = 0; - FileList = new char *[argc]; + FileList = new(std::nothrow) char *[argc]; if (FileList == NULL) { fprintf(stderr, "pbzip2: *ERROR: Not enough memory! Aborting...\n"); return 1; } + // set default max memory usage to 100MB + maxMemory = 100000000; + NumBufferedBlocksMax = 0; numCPU = detectCPUs(); @@ -2072,7 +2868,7 @@ #endif // parse command line switches - for (i=1; i < argc; i++) + for (i=1; (int)i < argc; i++) { if (argv[i][0] == '-') { @@ -2096,7 +2892,7 @@ } else if (strcmp(argv[i], "--compress") == 0) { - decompress = 0; + compress = 1; } else if (strcmp(argv[i], "--fast") == 0) { @@ -2118,10 +2914,20 @@ { usage(argv[0], "HELP"); } + #ifndef PBZIP_NO_LOADAVG + else if (strcmp(argv[i], "--loadavg") == 0) + { + useLoadAverage = 1; + } + #endif else if (strcmp(argv[i], "--quiet") == 0) { QuietMode = 1; } + else if (strcmp(argv[i], "--read") == 0) + { + readEntireFile = 1; + } else if (strcmp(argv[i], "--stdout") == 0) { OutputStdOut = 1; keep = 1; @@ -2142,7 +2948,7 @@ continue; } #ifdef PBZIP_DEBUG - fprintf(stderr, "argv[%d]: %s Len: %d\n", i, argv[i], strlen(argv[i])); + fprintf(stderr, "argv[%u]: %s Len: %d\n", i, argv[i], strlen(argv[i])); #endif // get command line options with single "-" // check for multiple switches grouped together @@ -2162,6 +2968,7 @@ if (cmdLineTempCount == 0) usage(argv[0], "Cannot parse -p argument"); strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount); + cmdLineTemp[cmdLineTempCount] = '\0'; numCPU = atoi(cmdLineTemp); if (numCPU > 4096) { @@ -2190,6 +2997,7 @@ if (cmdLineTempCount == 0) usage(argv[0], "Cannot parse file block size"); strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount); + cmdLineTemp[cmdLineTempCount] = '\0'; blockSize = atoi(cmdLineTemp)*100000; if ((blockSize < 100000) || (blockSize > 1000000000)) { @@ -2201,6 +3009,62 @@ fprintf(stderr, "-b%d\n", blockSize); #endif break; + case 'm': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "1"); maxMemory = 1000000; + while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp)) + { + // no more numbers, finish + if ((argv[i][k] < '0') || (argv[i][k] > '9')) + break; + k++; + cmdLineTempCount++; + } + if (cmdLineTempCount == 0) + usage(argv[0], "Cannot parse -m argument"); + strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount); + cmdLineTemp[cmdLineTempCount] = '\0'; + maxMemory = atoi(cmdLineTemp)*1000000; + if ((maxMemory < 1000000) || (maxMemory > 1000000000)) + { + fprintf(stderr,"pbzip2: *ERROR: Memory usage size Min: 1MB and Max: 1000MB! Aborting...\n"); + return 1; + } + j += cmdLineTempCount; + #ifdef PBZIP_DEBUG + fprintf(stderr, "-m%d\n", maxMemory); + #endif + break; + #ifdef USE_STACKSIZE_CUSTOMIZATION + case 'S': k = j+1; cmdLineTempCount = 0; strcpy(cmdLineTemp, "0"); ChildThreadStackSize = -1; + while (argv[i][k] != '\0' && k < sizeof(cmdLineTemp)) + { + // no more numbers, finish + if ((argv[i][k] < '0') || (argv[i][k] > '9')) + break; + k++; + cmdLineTempCount++; + } + if (cmdLineTempCount == 0) + usage(argv[0], "Cannot parse -S argument"); + strncpy(cmdLineTemp, argv[i]+j+1, cmdLineTempCount); + cmdLineTemp[cmdLineTempCount] = '\0'; + ChildThreadStackSize = atoi(cmdLineTemp)*1024; + if (ChildThreadStackSize < 0) + { + fprintf(stderr,"pbzip2: *ERROR: Parsing -S: invalid stack size specified [%d]! Ignoring...\n", + ChildThreadStackSize); + } + else if (ChildThreadStackSize < PTHREAD_STACK_MIN) + { + fprintf(stderr,"pbzip2: *WARNING: Stack size %d bytes less than minumum - adjusting to %d bytes.\n", + ChildThreadStackSize, PTHREAD_STACK_MIN); + ChildThreadStackSize = PTHREAD_STACK_MIN; + } + j += cmdLineTempCount; + #ifdef PBZIP_DEBUG + fprintf(stderr, "-S%d\n", ChildThreadStackSize); + #endif + break; + #endif // USE_STACKSIZE_CUSTOMIZATION case 'd': decompress = 1; break; case 'c': OutputStdOut = 1; keep = 1; break; case 'f': force = 1; ForceOverwrite = 1; break; @@ -2215,7 +3079,7 @@ case 't': testFile = 1; break; case 'v': QuietMode = 0; break; case 'V': banner(); exit(0); break; - case 'z': decompress = 0; break; + case 'z': compress = 1; break; case '1': BWTblockSize = 1; break; case '2': BWTblockSize = 2; break; case '3': BWTblockSize = 3; break; @@ -2236,6 +3100,17 @@ } } /* for */ + Bz2HeaderZero[3] = '0' + BWTblockSize; + bz2Header[3] = Bz2HeaderZero[3]; + + // check to make sure we are not trying to compress and decompress at same time + if ((compress == 1) && (decompress == 1)) + { + fprintf(stderr,"pbzip2: *ERROR: Can't compress and uncompress data at same time. Aborting!\n"); + fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]); + return 1; + } + if (FileListCount == 0) { if (testFile == 1) @@ -2289,7 +3164,27 @@ FileListCount++; } else - usage(argv[0], "Not enough files given"); + { + // probably trying to input data from stdin + if (QuietMode != 1) + fprintf(stderr,"pbzip2: Assuming input data coming from stdin...\n\n"); + + OutputStdOut = 1; + keep = 1; + #ifndef WIN32 + if (isatty(fileno(stdout))) + #else + if (_isatty(_fileno(stdout))) + #endif + { + fprintf(stderr,"pbzip2: *ERROR: Won't write compressed data to terminal. Aborting!\n"); + fprintf(stderr,"pbzip2: For help type: %s -h\n", argv[0]); + return 1; + } + // expecting data from stdin + FileList[FileListCount] = stdinFile; + FileListCount++; + } } if (QuietMode != 1) @@ -2299,11 +3194,11 @@ // do sanity check to make sure integers are the size we expect #ifdef PBZIP_DEBUG - fprintf(stderr, "off_t size: %d uint size: %d\n", sizeof(OFF_T), sizeof(unsigned int)); + fprintf(stderr, "off_t size: %u uint size: %u\n", sizeof(OFF_T), sizeof(unsigned int)); #endif if (sizeof(OFF_T) <= 4) { - fprintf(stderr, "\npbzip2: *WARNING: off_t variable size only %d bits!\n", sizeof(OFF_T)*8); + fprintf(stderr, "\npbzip2: *WARNING: off_t variable size only %u bits!\n", sizeof(OFF_T)*CHAR_BIT); if (decompress == 1) fprintf(stderr, " You will only able to uncompress files smaller than 2GB in size.\n\n"); else @@ -2343,14 +3238,26 @@ } #endif - // setup signal handling + // Initialize child threads attributes + initChildThreadAttributes(); + + // setup signal handling (should be before creating any child thread) sigInFilename = NULL; sigOutFilename = NULL; - signal(SIGINT, mySignalCatcher); - signal(SIGTERM, mySignalCatcher); - #ifndef WIN32 - signal(SIGHUP, mySignalCatcher); - #endif + ret = setupSignalHandling(); + if (ret != 0) + { + fprintf(stderr, "pbzip2: *ERROR: Can't setup signal handling [%d]. Aborting!\n", ret); + return 1; + } + + // Create and start terminator thread. + ret = setupTerminator(); + if (ret != 0) + { + fprintf(stderr, "pbzip2: *ERROR: Can't setup terminator thread [%d]. Aborting!\n", ret); + return 1; + } if (numCPU < 1) numCPU = 1; @@ -2367,37 +3274,29 @@ #endif if (decompress != 1) { - fprintf(stderr, " BWT Block Size: %d00k\n", BWTblockSize); + fprintf(stderr, " BWT Block Size: %d00 KB\n", BWTblockSize); if (blockSize < 100000) fprintf(stderr, "File Block Size: %d bytes\n", blockSize); else - fprintf(stderr, "File Block Size: %dk\n", blockSize/1000); + fprintf(stderr, "File Block Size: %d KB\n", blockSize/1000); } + fprintf(stderr, " Maximum Memory: %d MB\n", maxMemory/1000000); + #ifdef USE_STACKSIZE_CUSTOMIZATION + if (ChildThreadStackSize > 0) + fprintf(stderr, " Stack Size: %d KB\n", ChildThreadStackSize/1024); + #endif } fprintf(stderr, "-------------------------------------------\n"); } - // initialize mutexes - OutMutex = new pthread_mutex_t; - // make sure memory was allocated properly - if (OutMutex == NULL) - { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (OutMutex)! Aborting...\n"); - return 1; - } - pthread_mutex_init(OutMutex, NULL); - - MemMutex = new pthread_mutex_t; - // make sure memory was allocated properly - if (MemMutex == NULL) + int mutexesInitRet = mutexesInit(); + if ( mutexesInitRet != 0 ) { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (MemMutex)! Aborting...\n"); - return 1; + return mutexesInitRet; } - pthread_mutex_init(MemMutex, NULL); // create queue - fifo = queueInit(numCPU); + fifo = FifoQueue = queueInit(numCPU); if (fifo == NULL) { fprintf (stderr, "pbzip2: *ERROR: Queue Init failed. Aborting...\n"); @@ -2407,8 +3306,9 @@ // process all files for (fileLoop=0; fileLoop < FileListCount; fileLoop++) { - allDone = 0; - fileSize = 0; + producerDone = 0; + InFileSize = 0; + NumBlocks = 0; // set input filename InFilename = FileList[fileLoop]; @@ -2441,7 +3341,7 @@ } // set ouput filename - strncpy(OutFilename, FileList[fileLoop], 2040); + outFilename = std::string(FileList[fileLoop]); if ((decompress == 1) && (strcmp(InFilename, "-") != 0)) { // check if input file is a valid .bz2 compressed file @@ -2455,7 +3355,7 @@ continue; } memset(tmpBuff, 0, sizeof(tmpBuff)); - size = read(hInfile, tmpBuff, strlen(bz2Header)+1); + size_t size = do_read(hInfile, tmpBuff, strlen(bz2Header)+1); close(hInfile); if ((size == (size_t)(-1)) || (size < strlen(bz2Header)+1)) { @@ -2478,7 +3378,7 @@ if (memstr(tmpBuff+4, size-4, bz2Header+4, strlen(bz2Header)-4) == NULL) { // check to see if this is a special 0 byte file - if (memstr(tmpBuff+4, size-4, bz2HeaderZero+4, strlen(bz2Header)-4) == NULL) + if (memstr(tmpBuff+4, size-4, Bz2HeaderZero+4, strlen(bz2Header)-4) == NULL) { fprintf(stderr, "pbzip2: *ERROR: File [%s] is NOT a valid bzip2! Skipping...\n", InFilename); fprintf(stderr, "-------------------------------------------\n"); @@ -2502,33 +3402,35 @@ } // check if filename ends with .bz2 - if (strncasecmp(&OutFilename[strlen(OutFilename)-4], ".bz2", 4) == 0) + std::string bz2Tail(".bz2"); + if ( ends_with_icase(outFilename, bz2Tail) ) { // remove .bz2 extension - OutFilename[strlen(OutFilename)-4] = '\0'; + outFilename.resize( outFilename.size() - bz2Tail.size() ); } else { // add .out extension so we don't overwrite original file - strcat(OutFilename, ".out"); + outFilename += ".out"; } } // decompress == 1 else { // check input file to make sure its not already a .bz2 file - if (strncasecmp(&InFilename[strlen(InFilename)-4], ".bz2", 4) == 0) + std::string bz2Tail(".bz2"); + if ( ends_with_icase(std::string(InFilename), bz2Tail) ) { fprintf(stderr, "pbzip2: *ERROR: Input file [%s] already has a .bz2 extension! Skipping...\n", InFilename); fprintf(stderr, "-------------------------------------------\n"); errLevel = 1; continue; } - strcat(OutFilename, ".bz2"); + outFilename += bz2Tail; } // setup signal handling filenames sigInFilename = InFilename; - sigOutFilename = OutFilename; + sigOutFilename = outFilename.c_str(); if (strcmp(InFilename, "-") != 0) { @@ -2564,13 +3466,13 @@ } // get size of file #ifndef WIN32 - fileSize = statbuf.st_size; + InFileSize = statbuf.st_size; #else fileSize_temp.LowPart = GetFileSize((HANDLE)_get_osfhandle(hInfile), (unsigned long *)&fileSize_temp.HighPart); - fileSize = fileSize_temp.QuadPart; + InFileSize = fileSize_temp.QuadPart; #endif // don't process a 0 byte file - if (fileSize == 0) + if (InFileSize == 0) { if (decompress == 1) { @@ -2598,17 +3500,17 @@ else { hInfile = 0; // stdin - fileSize = -1; // fake it + InFileSize = -1; // fake it } // check to see if output file exists if ((force != 1) && (OutputStdOut == 0)) { - hOutfile = open(OutFilename, O_RDONLY | O_BINARY); + hOutfile = open(outFilename.c_str(), O_RDONLY | O_BINARY); // check to see if file exists before processing if (hOutfile != -1) { - fprintf(stderr, "pbzip2: *ERROR: Output file [%s] already exists! Use -f to overwrite...\n", OutFilename); + fprintf(stderr, "pbzip2: *ERROR: Output file [%s] already exists! Use -f to overwrite...\n", outFilename.c_str()); fprintf(stderr, "-------------------------------------------\n"); errLevel = 1; close(hOutfile); @@ -2627,7 +3529,7 @@ else { // determine block size to try and spread data equally over # CPUs - blockSize = fileSize / numCPU; + blockSize = InFileSize / numCPU; } } @@ -2638,34 +3540,35 @@ fprintf(stderr, " Input Name: %s\n", hInfile != 0 ? InFilename : ""); if (OutputStdOut == 0) - fprintf(stderr, " Output Name: %s\n\n", OutFilename); + fprintf(stderr, " Output Name: %s\n\n", outFilename.c_str()); else fprintf(stderr, " Output Name: \n\n"); if (decompress == 1) fprintf(stderr, " BWT Block Size: %c00k\n", BWTblockSizeChar); if (strcmp(InFilename, "-") != 0) - fprintf(stderr, " Input Size: %"PRIu64" bytes\n", (unsigned long long)fileSize); + fprintf(stderr, " Input Size: %"PRIu64" bytes\n", (unsigned long long)InFileSize); } if (decompress == 1) { numBlocks = 0; // Do not use threads if we only have 1 CPU or small files - if ((numCPU == 1) || (fileSize < 1000000)) + if ((numCPU == 1) || (InFileSize < 1000000)) noThreads = 1; else noThreads = 0; - // for now use no threads method for uncompressing from stdin - if (strcmp(InFilename, "-") == 0) - noThreads = 1; + + // Enable threads method for uncompressing from stdin + if ((numCPU > 1) && (strcmp(InFilename, "-") == 0)) + noThreads = 0; } else { - if (fileSize > 0) + if (InFileSize > 0) { // calculate the # of blocks of data - numBlocks = (fileSize + blockSize - 1) / blockSize; + numBlocks = (InFileSize + blockSize - 1) / blockSize; // Do not use threads for small files where we only have 1 block to process // or if we only have 1 CPU if ((numBlocks == 1) || (numCPU == 1)) @@ -2686,21 +3589,21 @@ // write to file instead of stdout if (OutputStdOut == 0) { - hOutfile = open(OutFilename, O_RDWR | O_CREAT | O_TRUNC | O_BINARY, FILE_MODE); + hOutfile = open(outFilename.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, FILE_MODE); // check to see if file creation was successful if (hOutfile == -1) { - fprintf(stderr, "pbzip2: *ERROR: Could not create output file [%s]!\n", OutFilename); + fprintf(stderr, "pbzip2: *ERROR: Could not create output file [%s]!\n", outFilename.c_str()); close(hOutfile); errLevel = 1; continue; } } // write data to the output file - ret = write(hOutfile, bz2HeaderZero, sizeof(bz2HeaderZero)); + ret = do_write(hOutfile, Bz2HeaderZero, sizeof(Bz2HeaderZero)); if (OutputStdOut == 0) close(hOutfile); - if (ret != sizeof(bz2HeaderZero)) + if (ret != sizeof(Bz2HeaderZero)) { fprintf(stderr, "pbzip2: *ERROR: Could not write to file! Skipping...\n"); fprintf(stderr, "-------------------------------------------\n"); @@ -2709,9 +3612,22 @@ } if (QuietMode != 1) { - fprintf(stderr, " Output Size: %"PRIu64" bytes\n", (unsigned long long)sizeof(bz2HeaderZero)); + fprintf(stderr, " Output Size: %"PRIu64" bytes\n", (unsigned long long)sizeof(Bz2HeaderZero)); fprintf(stderr, "-------------------------------------------\n"); } + // remove input file unless requested not to by user + if (keep != 1) + { + struct stat statbuf; + if (OutputStdOut == 0) + { + // only remove input file if output file exists + if (stat(outFilename.c_str(), &statbuf) == 0) + remove(InFilename); + } + else + remove(InFilename); + } continue; } } @@ -2719,22 +3635,21 @@ fprintf(stderr, "# Blocks: %d\n", numBlocks); #endif // set global variable - NumBlocks = numBlocks; + NumBlocksEstimated = numBlocks; + // Calculate maximum number of buffered blocks to use + NumBufferedBlocksMax = maxMemory / blockSize; + // Subtract blocks for number of extra buffers in producer and fileWriter (~ numCPU for each) + if ((int)NumBufferedBlocksMax - (numCPU * 2) < 1) + NumBufferedBlocksMax = 1; + else + NumBufferedBlocksMax = NumBufferedBlocksMax - (numCPU * 2); + #ifdef PBZIP_DEBUG + fprintf(stderr, "pbzip2: maxMemory: %d blockSize: %d\n", maxMemory, blockSize); + fprintf(stderr, "pbzip2: NumBufferedBlocksMax: %u\n", NumBufferedBlocksMax); + #endif // create output buffer - OutputBuffer.resize(numBlocks); - // make sure memory was allocated properly - if (OutputBuffer.size() != numBlocks) - { - fprintf(stderr, "pbzip2: *ERROR: Could not allocate memory (OutputBuffer)! Aborting...\n"); - return 1; - } - // set empty buffer - for (i=0; i < numBlocks; i++) - { - OutputBuffer[i].buf = NULL; - OutputBuffer[i].bufSize = 0; - } + outputBufferInit(NumBufferedBlocksMax); if (decompress == 1) { @@ -2742,46 +3657,56 @@ if (noThreads == 0) { // do decompression - NumBufferedBlocks = 0; if (QuietMode != 1) fprintf(stderr, "Decompressing data...\n"); - for (i=0; i < numCPU; i++) + for (i=0; (int)i < numCPU; i++) { - ret = pthread_create(&fifo->consumers[i], NULL, consumer_decompress, fifo); + ret = pthread_create(&fifo->consumers[i], &ChildThreadAttributes, consumer_decompress, fifo); if (ret != 0) { - fprintf(stderr, "pbzip2: *ERROR: Not enough resources to create consumer thread #%d (code = %d) Aborting...\n", i, ret); + fprintf(stderr, "pbzip2: *ERROR: Not enough resources to create consumer thread #%u (code = %d) Aborting...\n", i, ret); return 1; } } - ret = pthread_create(&output, NULL, fileWriter, OutFilename); + ret = pthread_create(&output, &ChildThreadAttributes, fileWriter, (void*)outFilename.c_str()); if (ret != 0) { - fprintf(stderr, "pbzip2: *ERROR: Not enough resources to create fileWriter thread (code = %d) Aborting...\n", ret); + handle_error(EF_EXIT, 1, + "pbzip2: *ERROR: Not enough resources to create fileWriter thread (code = %d) Aborting...\n", ret); + + ret = pthread_join(TerminatorThread, NULL); return 1; } // start reading in data for decompression - ret = producer_decompress(hInfile, fileSize, fifo); + ret = producer_decompress(hInfile, InFileSize, fifo); if (ret == -99) { // only 1 block detected, use single threaded code to decompress noThreads = 1; + + switchedMtToSt = true; + + // wait for fileWriter thread to exit + if (pthread_join(output, NULL) != 0) + { + errLevel = 1; + } } else if (ret != 0) errLevel = 1; } // use single threaded code - if (noThreads == 1) + if ((noThreads == 1) && (errLevel == 0)) { if (QuietMode != 1) fprintf(stderr, "Decompressing data (no threads)...\n"); if (hInfile > 0) close(hInfile); - ret = directdecompress(InFilename, OutFilename); + ret = directdecompress(InFilename, outFilename.c_str()); if (ret != 0) errLevel = 1; } @@ -2796,21 +3721,24 @@ if (QuietMode != 1) fprintf(stderr, "Compressing data...\n"); - for (i=0; i < numCPU; i++) + for (i=0; (int)i < numCPU; i++) { - ret = pthread_create(&fifo->consumers[i], NULL, consumer, fifo); + ret = pthread_create(&fifo->consumers[i], &ChildThreadAttributes, consumer, fifo); if (ret != 0) { - fprintf(stderr, "pbzip2: *ERROR: Not enough resources to create consumer thread #%d (code = %d) Aborting...\n", i, ret); + fprintf(stderr, "pbzip2: *ERROR: Not enough resources to create consumer thread #%u (code = %d) Aborting...\n", i, ret); return 1; } } - ret = pthread_create(&output, NULL, fileWriter, OutFilename); + ret = pthread_create(&output, &ChildThreadAttributes, fileWriter, (void*)outFilename.c_str()); if (ret != 0) { - fprintf(stderr, "pbzip2: *ERROR: Not enough resources to create fileWriter thread (code = %d) Aborting...\n", ret); + handle_error(EF_EXIT, 1, + "pbzip2: *ERROR: Not enough resources to create fileWriter thread (code = %d) Aborting...\n", ret); + pthread_join(TerminatorThread, NULL); return 1; + } // start reading in data for compression @@ -2824,7 +3752,7 @@ if (QuietMode != 1) fprintf(stderr, "Compressing data (no threads)...\n"); - ret = directcompress(hInfile, fileSize, blockSize, OutFilename); + ret = directcompress(hInfile, InFileSize, blockSize, outFilename.c_str()); if (ret != 0) errLevel = 1; } @@ -2833,23 +3761,34 @@ if (noThreads == 0) { // wait for fileWriter thread to exit - pthread_join(output, NULL); - + ret = pthread_join(output, NULL); + if (ret != 0) + errLevel = 1; + } + + if ((noThreads == 0) || switchedMtToSt ) + { // wait for consumer threads to exit - for (i = 0; i < numCPU; i++) - pthread_join(fifo->consumers[i], NULL); + for (i = 0; (int)i < numCPU; i++) + { + ret = pthread_join(fifo->consumers[i], NULL); + if (ret != 0) + errLevel = 1; + } } if (OutputStdOut == 0) { // write store file meta data to output file - if (writeFileMetaData(OutFilename) != 0) + if (writeFileMetaData(outFilename.c_str()) != 0) fprintf(stderr, "pbzip2: *ERROR: Could not write file meta data to [%s]!\n", InFilename); } - // finished processing file + // finished processing file (mutex since accessed by cleanup procedure) + safe_mutex_lock(&ErrorHandlerMutex); sigInFilename = NULL; sigOutFilename = NULL; + safe_mutex_unlock(&ErrorHandlerMutex); // remove input file unless requested not to by user if (keep != 1) @@ -2858,7 +3797,7 @@ if (OutputStdOut == 0) { // only remove input file if output file exists - if (stat(OutFilename, &statbuf) == 0) + if (stat(outFilename.c_str(), &statbuf) == 0) remove(InFilename); } else @@ -2876,22 +3815,34 @@ fprintf(stderr, "-------------------------------------------\n"); } /* for */ - // reclaim memory - queueDelete(fifo); - fifo = NULL; - if (OutMutex != NULL) + // Terminate signal handler thread sending SIGQUIT signal + ret = pthread_kill(SignalHandlerThread, SIG_HANDLER_QUIT_SIGNAL); + if (ret != 0) { - pthread_mutex_destroy(OutMutex); - delete OutMutex; - OutMutex = NULL; + fprintf(stderr, "Couldn't signal signal QUIT to SignalHandlerThread [%d]. Quitting prematurely!\n", ret); + exit(errLevel); + } + else + { + ret = pthread_join(SignalHandlerThread, NULL); + if (ret != 0) + { + fprintf(stderr, "Error on join of SignalHandlerThread [%d]\n", ret); + } } - if (MemMutex != NULL) + + syncSetFinishedFlag(1); + ret = pthread_join(TerminatorThread, NULL); + if (ret != 0) { - pthread_mutex_destroy(MemMutex); - delete MemMutex; - MemMutex = NULL; + fprintf(stderr, "Error on join of TerminatorThread [%d]\n", ret); } + // reclaim memory + queueDelete(fifo); + mutexesDelete(); + disposeMemory(FileList); + // get current time for end of benchmark #ifndef WIN32 gettimeofday(&tvStopTime, &tz); @@ -2914,5 +3865,5 @@ if (QuietMode != 1) fprintf(stderr, "\n Wall Clock: %f seconds\n", timeCalc); - return errLevel; + exit(errLevel); } diff -Nru pbzip2-1.0.5/pbzip2.h pbzip2-1.1.1/pbzip2.h --- pbzip2-1.0.5/pbzip2.h 1970-01-01 00:00:00.000000000 +0000 +++ pbzip2-1.1.1/pbzip2.h 2010-04-17 18:37:39.000000000 +0000 @@ -0,0 +1,259 @@ +/* + * File: pbzip2.h + * Author: Yavor Nikolov + * + * Created on March 6, 2010, 10:18 PM + */ + +#ifndef _PBZIP2_H +#define _PBZIP2_H + +#include +#include + +#ifndef WIN32 +#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) +#define OFF_T off_t +#else +#define FILE_MODE (S_IRUSR | S_IWUSR ) +#define OFF_T __int64 +#endif + +extern "C" +{ +#ifndef WIN32 +#include +#include +#include +#else +#include +#include +#endif +#ifdef __APPLE__ +#include +#endif +#ifdef __sun +#include +#endif +#ifndef __BORLANDC__ +#define __STDC_FORMAT_MACROS +#include +#else +#define PRIu64 "Lu" +#define strncasecmp(x,y,z) strncmpi(x,y,z) +#endif +#ifdef __osf__ +#define PRIu64 "llu" +#endif + +#include +} + +// uncomment for debug output +//#define PBZIP_DEBUG + +// uncomment to disable load average code (may be required for some platforms) +//#define PBZIP_NO_LOADAVG + +// detect systems that are known not to support load average code +#if defined (WIN32) || defined (__CYGWIN32__) || defined (__MINGW32__) || defined (__BORLANDC__) || defined (__hpux) || defined (__osf__) || defined(__UCLIBC__) + #define PBZIP_NO_LOADAVG +#endif + +#ifdef WIN32 +#define PATH_SEP '\\' +#define usleep(x) Sleep(x/1000) +#define LOW_DWORD(x) ((*(LARGE_INTEGER *)&x).LowPart) +#define HIGH_DWORD(x) ((*(LARGE_INTEGER *)&x).HighPart) +#ifndef _TIMEVAL_DEFINED /* also in winsock[2].h */ +#define _TIMEVAL_DEFINED +struct timeval { + long tv_sec; + long tv_usec; +}; +#endif +#else +#define PATH_SEP '/' +#endif + +#ifndef O_BINARY +#define O_BINARY 0 +#endif + +typedef struct outBuff +{ + char *buf; + unsigned int bufSize; + int blockNumber; + int sequenceNumber; + unsigned int inSize; // original size before compression/decompressoin + bool isLastInSequence; + outBuff * next; // next in longer sequence of buffers for this block + + outBuff( + char * aBuf = NULL, + unsigned int aBufSize = 0, + int aBlockNumber = 0, + int aSequenceNumber = 0, + unsigned int aInSize = 0, + bool isLast = true, + outBuff * aNext = NULL): + buf(aBuf), + bufSize(aBufSize), + blockNumber(aBlockNumber), + sequenceNumber(aSequenceNumber), + inSize(aInSize), + isLastInSequence(isLast), + next(aNext) + {} +} outBuff; + +typedef enum ExitFlag +{ + EF_NOQUIT = 0, + EF_EXIT = 1, + EF_ABORT = 2 +} ExitFlag; + +typedef struct queue +{ + typedef outBuff ElementType; + typedef ElementType* ElementTypePtr; + + ElementTypePtr *qData; + long size; + long head, tail; + int full, empty; + pthread_mutex_t *mut; + pthread_cond_t *notFull, *notEmpty; + pthread_t *consumers; + + void add(ElementTypePtr element) + { + qData[tail] = element; + ++tail; + + if (tail == size) + tail = 0; + if (tail == head) + full = 1; + + empty = 0; + } + + /** + * Remove the head returning it into element. If the given element is + * + * @param element - removed element is copied here + * @return 1 on success; 0 - on denied request; -1 - on error + */ + int remove(ElementTypePtr & element) + { + ElementTypePtr & headElem = qData[head]; + + #ifdef PBZIP_DEBUG + fprintf (stderr, "queue::remove: head=%llx; elem=%llx\n", + (unsigned long long)headElem, + (unsigned long long)element); + + if (headElem != NULL) + { + fprintf (stderr, " queue::remove: head: seq=%d; blk=%d; islast=%d\n", + headElem->sequenceNumber, + headElem->blockNumber, + (int)headElem->isLastInSequence); + } + #endif + + if ((headElem->sequenceNumber > 1) && + ((element == NULL) || (headElem->blockNumber != element->blockNumber))) + { + // 2+ part of long-sequence BZ2 stream. Deny if requestor is not + // aware of that + return 0; + } + + element = headElem; + ++head; + + if (head == size) + head = 0; + if (head == tail) + empty = 1; + + full = 0; + + return 1; + } + +} queue; + +/* + ********************************************************* + Print error message and optionally exit or abort + depending on exitFlag: + 0 - don't quit; + 1 - exit; + 2 - abort. + On exit - exitCode status is used. +*/ +int handle_error(ExitFlag exitFlag, int exitCode, const char *fmt, ...); + +/* + * Delegate to read but keep writing until count bytes are read or + * error is encountered (on success all count bytes would be read) + */ +ssize_t do_read(int fd, void *buf, size_t count); + +/* + * Delegate to write but keep writing until count bytes are written or + * error is encountered (on success all count bytes would be written) + */ +ssize_t do_write(int fd, const void *buf, size_t count); + + +/** + * Dispose the given buffer memory if not NULL and make it NULL. Provided + * buffer should be allocated with new[]. + */ +template +inline void disposeMemory(C *& pBuff) +{ + if (pBuff != NULL) + { + delete [] pBuff; + pBuff = NULL; + } +} + + +/** + * Check if a given string ends with a given suffix ignoring case difference. + * + * @return true if str ends with suffix; false - otherwise + */ +template +inline bool ends_with_icase( const std::basic_string & str, const std::basic_string & suffix ) +{ + int ti = str.size() - suffix.size(); + + if ( ti < 0 ) + { + return false; + } + + size_t si = 0; + while ( si < suffix.size() ) + { + if ( ::tolower( str[ti] ) != ::tolower( suffix[si] ) ) + { + return false; + } + ++si; + ++ti; + } + + return true; +} + +#endif /* _PBZIP2_H */ diff -Nru pbzip2-1.0.5/pbzip2.spec pbzip2-1.1.1/pbzip2.spec --- pbzip2-1.0.5/pbzip2.spec 2009-01-08 21:08:17.000000000 +0000 +++ pbzip2-1.1.1/pbzip2.spec 2010-04-17 18:37:39.000000000 +0000 @@ -1,5 +1,5 @@ Name: pbzip2 -Version: 1.0.5 +Version: 1.1.1 Release: 1%{?dist} Summary: Parallel implementation of bzip2 URL: http://www.compression.ca/pbzip2/ @@ -52,6 +52,12 @@ %changelog +* Sat Apr 17 2010 Jeff Gilchrist - 1.1.1-1 +- Release 1.1.1 + +* Sat Mar 13 2010 Jeff Gilchrist - 1.1.0-1 +- Release 1.1.0 + * Fri Jan 8 2009 Jeff Gilchrist - 1.0.5-1 - Release 1.0.5 diff -Nru pbzip2-1.0.5/README pbzip2-1.1.1/README --- pbzip2-1.0.5/README 2009-01-08 21:08:17.000000000 +0000 +++ pbzip2-1.1.1/README 2010-04-17 18:37:39.000000000 +0000 @@ -1,6 +1,6 @@ -January 8, 2009 +April 17, 2010 -Parallel BZIP2 v1.0.5 - by: Jeff Gilchrist +Parallel BZIP2 v1.1.1 - by: Jeff Gilchrist Available at: http://compression.ca/ This is the README for pbzip2, a parallel implementation of the @@ -37,7 +37,7 @@ 2. HOW TO BUILD -- Windows -On Windows, pbzip2 can be compiled using Cygwin or MinGW. +On Windows, pbzip2 can be compiled using Cygwin. If you do not have libbzip2 installed on your system, you should first go to http://www.bzip.org/ and install it. @@ -47,18 +47,6 @@ files are located and type `make'. This builds the pbzip2 program and dynamically links to the libbzip2 library. -MinGW can be found at: http://www.mingw.org/ -You will also need http://sources.redhat.com/pthreads-win32/ -to compile in MinGW. You can take a precompiled binary -libpthreadGC2.a and associated pthreadGC2.dll file from the -dll-latest/lib repository. Copy libpthreadGC2.a to -/lib/libpthread.a of your MinGW install and then copy the -pthreadGC2.dll to the pbzip2 directory or to a directory -in your Windows path (ie: WINDOWS\SYSTEM32). -From a MinGW shell, go to the directory where the pbzip2 source -files are located and type `make'. This builds the pbzip2 -program and links the libbzip2 and libpthread library. - If you would like to build pbzip2 with a statically linked libbzip2 library, download the bzip2 source from the above site, compile it, and copy the libbz2.a file into the pbzip2 source @@ -123,26 +111,31 @@ only switch you will likely need to use is -d to decompress files and -p to set the # of processors for pbzip2 to use if autodetect is not supported on your system, or you want to use a specific # of CPUs. +Note, that if you are using a large number of CPUs you may wish to +lower your default stack size setting (with the -S switch or ulimit) +to reduce the amount of memory each thread uses. -Usage: pbzip2 [-1 .. -9] [-b#cdfhklp#qrtvVz] +Usage: pbzip2 [-1 .. -9] [-b#cdfhklm#p#qrS#tvVz] Switches: - -b# : where # is the file block size in 100k (default 9 = 900k) - -c : output to standard out (stdout) - -d : decompress file - -f : force, overwrite existing output file - -h : print this help message - -k : keep input file, don't delete - -l : load average determines max number processors to use - -p# : where # is the number of processors (default: autodetect or 2) - -q : quiet mode (default) - -r : read entire input file into RAM and split between processors - -t : test compressed file integrity - -v : verbose mode - -V : display version info for pbzip2 then exit - -z : compress file (default) - -1 .. -9 : set BWT block size to 100k .. 900k (default 900k) - + -b# Where # is block size in 100k steps (default 9 = 900k) + -c, --stdout Output to standard out (stdout) + -d,--decompress Decompress file + -f,--force Force, overwrite existing output file + -h,--help Print this help message + -k,--keep Keep input file, do not delete + -l,--loadavg Load average determines max number processors to use + -m# Where # is max memory usage in 1MB steps (default 100 = 100MB) + -p# Where # is the number of processors (default: autodetect) + -q,--quiet Quiet mode (default) + -r,--read Read entire input file into RAM and split between processors + -S# Child thread stack size in 1KB steps (default stack size if unspecified) + -t,--test Test compressed file integrity + -v,--verbose Verbose mode + -V Display version info for pbzip2 then exit + -z,--compress Compress file (default) + -1,--fast ... -9,--best Set BWT block size to 100k .. 900k (default 900k). + Example: pbzip2 myfile.tar @@ -198,9 +191,10 @@ it works. -Example: pbzip2 -d myfile.tar.bz2 +Example: pbzip2 -d -m500 myfile.tar.bz2 This example will decompress the file "myfile.tar.bz2" into the decompressed file "myfile.tar". It will use the autodetected # of -processors (or 2 processors if autodetect not supported). +processors (or 2 processors if autodetect not supported). It will use +a maximum of 500MB of memory for decompression. The switches -b, -r, -t, and -1..-9 are not valid for decompression.