1 // StreamBinder.cpp 2 3 #include "StdAfx.h" 4 5 #include "../../Common/MyCom.h" 6 7 #include "StreamBinder.h" 8 9 Z7_CLASS_IMP_COM_1( 10 CBinderInStream 11 , ISequentialInStream 12 ) 13 CStreamBinder *_binder; 14 public: 15 ~CBinderInStream() { _binder->CloseRead_CallOnce(); } 16 CBinderInStream(CStreamBinder *binder): _binder(binder) {} 17 }; 18 19 Z7_COM7F_IMF(CBinderInStream::Read(void *data, UInt32 size, UInt32 *processedSize)) 20 { return _binder->Read(data, size, processedSize); } 21 22 23 Z7_CLASS_IMP_COM_1( 24 CBinderOutStream 25 , ISequentialOutStream 26 ) 27 CStreamBinder *_binder; 28 public: 29 ~CBinderOutStream() { _binder->CloseWrite(); } 30 CBinderOutStream(CStreamBinder *binder): _binder(binder) {} 31 }; 32 33 Z7_COM7F_IMF(CBinderOutStream::Write(const void *data, UInt32 size, UInt32 *processedSize)) 34 { return _binder->Write(data, size, processedSize); } 35 36 37 static HRESULT Event_Create_or_Reset(NWindows::NSynchronization::CAutoResetEvent &event) 38 { 39 const WRes wres = event.CreateIfNotCreated_Reset(); 40 return HRESULT_FROM_WIN32(wres); 41 } 42 43 HRESULT CStreamBinder::Create_ReInit() 44 { 45 RINOK(Event_Create_or_Reset(_canRead_Event)) 46 // RINOK(Event_Create_or_Reset(_canWrite_Event)) 47 48 // _canWrite_Semaphore.Close(); 49 // we need at least 3 items of maxCount: 1 for normal unlock in Read(), 2 items for unlock in CloseRead_CallOnce() 50 _canWrite_Semaphore.OptCreateInit(0, 3); 51 52 // _readingWasClosed = false; 53 _readingWasClosed2 = false; 54 55 _waitWrite = true; 56 _bufSize = 0; 57 _buf = NULL; 58 ProcessedSize = 0; 59 // WritingWasCut = false; 60 return S_OK; 61 } 62 63 64 void CStreamBinder::CreateStreams2(CMyComPtr<ISequentialInStream> &inStream, CMyComPtr<ISequentialOutStream> &outStream) 65 { 66 inStream = new CBinderInStream(this); 67 outStream = new CBinderOutStream(this); 68 } 69 70 // (_canRead_Event && _bufSize == 0) means that stream is finished. 71 72 HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize) 73 { 74 if (processedSize) 75 *processedSize = 0; 76 if (size != 0) 77 { 78 if (_waitWrite) 79 { 80 WRes wres = _canRead_Event.Lock(); 81 if (wres != 0) 82 return HRESULT_FROM_WIN32(wres); 83 _waitWrite = false; 84 } 85 if (size > _bufSize) 86 size = _bufSize; 87 if (size != 0) 88 { 89 memcpy(data, _buf, size); 90 _buf = ((const Byte *)_buf) + size; 91 ProcessedSize += size; 92 if (processedSize) 93 *processedSize = size; 94 _bufSize -= size; 95 96 /* 97 if (_bufSize == 0), then we have read whole buffer 98 we have two ways here: 99 - if we check (_bufSize == 0) here, we unlock Write only after full data Reading - it reduces the number of syncs 100 - if we don't check (_bufSize == 0) here, we unlock Write after partial data Reading 101 */ 102 if (_bufSize == 0) 103 { 104 _waitWrite = true; 105 // _canWrite_Event.Set(); 106 _canWrite_Semaphore.Release(); 107 } 108 } 109 } 110 return S_OK; 111 } 112 113 114 HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize) 115 { 116 if (processedSize) 117 *processedSize = 0; 118 if (size == 0) 119 return S_OK; 120 121 if (!_readingWasClosed2) 122 { 123 _buf = data; 124 _bufSize = size; 125 _canRead_Event.Set(); 126 127 /* 128 _canWrite_Event.Lock(); 129 if (_readingWasClosed) 130 _readingWasClosed2 = true; 131 */ 132 133 _canWrite_Semaphore.Lock(); 134 135 // _bufSize : is remain size that was not read 136 size -= _bufSize; 137 138 // size : is size of data that was read 139 if (size != 0) 140 { 141 // if some data was read, then we report that size and return 142 if (processedSize) 143 *processedSize = size; 144 return S_OK; 145 } 146 _readingWasClosed2 = true; 147 } 148 149 // WritingWasCut = true; 150 return k_My_HRESULT_WritingWasCut; 151 } 152