1*f6dc9357SAndroid Build Coastguard Worker /* MtCoder.c -- Multi-thread Coder
2*f6dc9357SAndroid Build Coastguard Worker 2023-09-07 : Igor Pavlov : Public domain */
3*f6dc9357SAndroid Build Coastguard Worker
4*f6dc9357SAndroid Build Coastguard Worker #include "Precomp.h"
5*f6dc9357SAndroid Build Coastguard Worker
6*f6dc9357SAndroid Build Coastguard Worker #include "MtCoder.h"
7*f6dc9357SAndroid Build Coastguard Worker
8*f6dc9357SAndroid Build Coastguard Worker #ifndef Z7_ST
9*f6dc9357SAndroid Build Coastguard Worker
MtProgressThunk_Progress(ICompressProgressPtr pp,UInt64 inSize,UInt64 outSize)10*f6dc9357SAndroid Build Coastguard Worker static SRes MtProgressThunk_Progress(ICompressProgressPtr pp, UInt64 inSize, UInt64 outSize)
11*f6dc9357SAndroid Build Coastguard Worker {
12*f6dc9357SAndroid Build Coastguard Worker Z7_CONTAINER_FROM_VTBL_TO_DECL_VAR_pp_vt_p(CMtProgressThunk)
13*f6dc9357SAndroid Build Coastguard Worker UInt64 inSize2 = 0;
14*f6dc9357SAndroid Build Coastguard Worker UInt64 outSize2 = 0;
15*f6dc9357SAndroid Build Coastguard Worker if (inSize != (UInt64)(Int64)-1)
16*f6dc9357SAndroid Build Coastguard Worker {
17*f6dc9357SAndroid Build Coastguard Worker inSize2 = inSize - p->inSize;
18*f6dc9357SAndroid Build Coastguard Worker p->inSize = inSize;
19*f6dc9357SAndroid Build Coastguard Worker }
20*f6dc9357SAndroid Build Coastguard Worker if (outSize != (UInt64)(Int64)-1)
21*f6dc9357SAndroid Build Coastguard Worker {
22*f6dc9357SAndroid Build Coastguard Worker outSize2 = outSize - p->outSize;
23*f6dc9357SAndroid Build Coastguard Worker p->outSize = outSize;
24*f6dc9357SAndroid Build Coastguard Worker }
25*f6dc9357SAndroid Build Coastguard Worker return MtProgress_ProgressAdd(p->mtProgress, inSize2, outSize2);
26*f6dc9357SAndroid Build Coastguard Worker }
27*f6dc9357SAndroid Build Coastguard Worker
28*f6dc9357SAndroid Build Coastguard Worker
MtProgressThunk_CreateVTable(CMtProgressThunk * p)29*f6dc9357SAndroid Build Coastguard Worker void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
30*f6dc9357SAndroid Build Coastguard Worker {
31*f6dc9357SAndroid Build Coastguard Worker p->vt.Progress = MtProgressThunk_Progress;
32*f6dc9357SAndroid Build Coastguard Worker }
33*f6dc9357SAndroid Build Coastguard Worker
34*f6dc9357SAndroid Build Coastguard Worker
35*f6dc9357SAndroid Build Coastguard Worker
36*f6dc9357SAndroid Build Coastguard Worker #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37*f6dc9357SAndroid Build Coastguard Worker
38*f6dc9357SAndroid Build Coastguard Worker
39*f6dc9357SAndroid Build Coastguard Worker static THREAD_FUNC_DECL ThreadFunc(void *pp);
40*f6dc9357SAndroid Build Coastguard Worker
41*f6dc9357SAndroid Build Coastguard Worker
MtCoderThread_CreateAndStart(CMtCoderThread * t)42*f6dc9357SAndroid Build Coastguard Worker static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
43*f6dc9357SAndroid Build Coastguard Worker {
44*f6dc9357SAndroid Build Coastguard Worker WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent);
45*f6dc9357SAndroid Build Coastguard Worker if (wres == 0)
46*f6dc9357SAndroid Build Coastguard Worker {
47*f6dc9357SAndroid Build Coastguard Worker t->stop = False;
48*f6dc9357SAndroid Build Coastguard Worker if (!Thread_WasCreated(&t->thread))
49*f6dc9357SAndroid Build Coastguard Worker wres = Thread_Create(&t->thread, ThreadFunc, t);
50*f6dc9357SAndroid Build Coastguard Worker if (wres == 0)
51*f6dc9357SAndroid Build Coastguard Worker wres = Event_Set(&t->startEvent);
52*f6dc9357SAndroid Build Coastguard Worker }
53*f6dc9357SAndroid Build Coastguard Worker if (wres == 0)
54*f6dc9357SAndroid Build Coastguard Worker return SZ_OK;
55*f6dc9357SAndroid Build Coastguard Worker return MY_SRes_HRESULT_FROM_WRes(wres);
56*f6dc9357SAndroid Build Coastguard Worker }
57*f6dc9357SAndroid Build Coastguard Worker
58*f6dc9357SAndroid Build Coastguard Worker
MtCoderThread_Destruct(CMtCoderThread * t)59*f6dc9357SAndroid Build Coastguard Worker static void MtCoderThread_Destruct(CMtCoderThread *t)
60*f6dc9357SAndroid Build Coastguard Worker {
61*f6dc9357SAndroid Build Coastguard Worker if (Thread_WasCreated(&t->thread))
62*f6dc9357SAndroid Build Coastguard Worker {
63*f6dc9357SAndroid Build Coastguard Worker t->stop = 1;
64*f6dc9357SAndroid Build Coastguard Worker Event_Set(&t->startEvent);
65*f6dc9357SAndroid Build Coastguard Worker Thread_Wait_Close(&t->thread);
66*f6dc9357SAndroid Build Coastguard Worker }
67*f6dc9357SAndroid Build Coastguard Worker
68*f6dc9357SAndroid Build Coastguard Worker Event_Close(&t->startEvent);
69*f6dc9357SAndroid Build Coastguard Worker
70*f6dc9357SAndroid Build Coastguard Worker if (t->inBuf)
71*f6dc9357SAndroid Build Coastguard Worker {
72*f6dc9357SAndroid Build Coastguard Worker ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
73*f6dc9357SAndroid Build Coastguard Worker t->inBuf = NULL;
74*f6dc9357SAndroid Build Coastguard Worker }
75*f6dc9357SAndroid Build Coastguard Worker }
76*f6dc9357SAndroid Build Coastguard Worker
77*f6dc9357SAndroid Build Coastguard Worker
78*f6dc9357SAndroid Build Coastguard Worker
79*f6dc9357SAndroid Build Coastguard Worker
80*f6dc9357SAndroid Build Coastguard Worker /*
81*f6dc9357SAndroid Build Coastguard Worker ThreadFunc2() returns:
82*f6dc9357SAndroid Build Coastguard Worker SZ_OK - in all normal cases (even for stream error or memory allocation error)
83*f6dc9357SAndroid Build Coastguard Worker SZ_ERROR_THREAD - in case of failure in system synch function
84*f6dc9357SAndroid Build Coastguard Worker */
85*f6dc9357SAndroid Build Coastguard Worker
ThreadFunc2(CMtCoderThread * t)86*f6dc9357SAndroid Build Coastguard Worker static SRes ThreadFunc2(CMtCoderThread *t)
87*f6dc9357SAndroid Build Coastguard Worker {
88*f6dc9357SAndroid Build Coastguard Worker CMtCoder *mtc = t->mtCoder;
89*f6dc9357SAndroid Build Coastguard Worker
90*f6dc9357SAndroid Build Coastguard Worker for (;;)
91*f6dc9357SAndroid Build Coastguard Worker {
92*f6dc9357SAndroid Build Coastguard Worker unsigned bi;
93*f6dc9357SAndroid Build Coastguard Worker SRes res;
94*f6dc9357SAndroid Build Coastguard Worker SRes res2;
95*f6dc9357SAndroid Build Coastguard Worker BoolInt finished;
96*f6dc9357SAndroid Build Coastguard Worker unsigned bufIndex;
97*f6dc9357SAndroid Build Coastguard Worker size_t size;
98*f6dc9357SAndroid Build Coastguard Worker const Byte *inData;
99*f6dc9357SAndroid Build Coastguard Worker UInt64 readProcessed = 0;
100*f6dc9357SAndroid Build Coastguard Worker
101*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(Event_Wait(&mtc->readEvent))
102*f6dc9357SAndroid Build Coastguard Worker
103*f6dc9357SAndroid Build Coastguard Worker /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
104*f6dc9357SAndroid Build Coastguard Worker
105*f6dc9357SAndroid Build Coastguard Worker if (mtc->stopReading)
106*f6dc9357SAndroid Build Coastguard Worker {
107*f6dc9357SAndroid Build Coastguard Worker return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
108*f6dc9357SAndroid Build Coastguard Worker }
109*f6dc9357SAndroid Build Coastguard Worker
110*f6dc9357SAndroid Build Coastguard Worker res = MtProgress_GetError(&mtc->mtProgress);
111*f6dc9357SAndroid Build Coastguard Worker
112*f6dc9357SAndroid Build Coastguard Worker size = 0;
113*f6dc9357SAndroid Build Coastguard Worker inData = NULL;
114*f6dc9357SAndroid Build Coastguard Worker finished = True;
115*f6dc9357SAndroid Build Coastguard Worker
116*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
117*f6dc9357SAndroid Build Coastguard Worker {
118*f6dc9357SAndroid Build Coastguard Worker size = mtc->blockSize;
119*f6dc9357SAndroid Build Coastguard Worker if (mtc->inStream)
120*f6dc9357SAndroid Build Coastguard Worker {
121*f6dc9357SAndroid Build Coastguard Worker if (!t->inBuf)
122*f6dc9357SAndroid Build Coastguard Worker {
123*f6dc9357SAndroid Build Coastguard Worker t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
124*f6dc9357SAndroid Build Coastguard Worker if (!t->inBuf)
125*f6dc9357SAndroid Build Coastguard Worker res = SZ_ERROR_MEM;
126*f6dc9357SAndroid Build Coastguard Worker }
127*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
128*f6dc9357SAndroid Build Coastguard Worker {
129*f6dc9357SAndroid Build Coastguard Worker res = SeqInStream_ReadMax(mtc->inStream, t->inBuf, &size);
130*f6dc9357SAndroid Build Coastguard Worker readProcessed = mtc->readProcessed + size;
131*f6dc9357SAndroid Build Coastguard Worker mtc->readProcessed = readProcessed;
132*f6dc9357SAndroid Build Coastguard Worker }
133*f6dc9357SAndroid Build Coastguard Worker if (res != SZ_OK)
134*f6dc9357SAndroid Build Coastguard Worker {
135*f6dc9357SAndroid Build Coastguard Worker mtc->readRes = res;
136*f6dc9357SAndroid Build Coastguard Worker /* after reading error - we can stop encoding of previous blocks */
137*f6dc9357SAndroid Build Coastguard Worker MtProgress_SetError(&mtc->mtProgress, res);
138*f6dc9357SAndroid Build Coastguard Worker }
139*f6dc9357SAndroid Build Coastguard Worker else
140*f6dc9357SAndroid Build Coastguard Worker finished = (size != mtc->blockSize);
141*f6dc9357SAndroid Build Coastguard Worker }
142*f6dc9357SAndroid Build Coastguard Worker else
143*f6dc9357SAndroid Build Coastguard Worker {
144*f6dc9357SAndroid Build Coastguard Worker size_t rem;
145*f6dc9357SAndroid Build Coastguard Worker readProcessed = mtc->readProcessed;
146*f6dc9357SAndroid Build Coastguard Worker rem = mtc->inDataSize - (size_t)readProcessed;
147*f6dc9357SAndroid Build Coastguard Worker if (size > rem)
148*f6dc9357SAndroid Build Coastguard Worker size = rem;
149*f6dc9357SAndroid Build Coastguard Worker inData = mtc->inData + (size_t)readProcessed;
150*f6dc9357SAndroid Build Coastguard Worker readProcessed += size;
151*f6dc9357SAndroid Build Coastguard Worker mtc->readProcessed = readProcessed;
152*f6dc9357SAndroid Build Coastguard Worker finished = (mtc->inDataSize == (size_t)readProcessed);
153*f6dc9357SAndroid Build Coastguard Worker }
154*f6dc9357SAndroid Build Coastguard Worker }
155*f6dc9357SAndroid Build Coastguard Worker
156*f6dc9357SAndroid Build Coastguard Worker /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
157*f6dc9357SAndroid Build Coastguard Worker
158*f6dc9357SAndroid Build Coastguard Worker res2 = SZ_OK;
159*f6dc9357SAndroid Build Coastguard Worker
160*f6dc9357SAndroid Build Coastguard Worker if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
161*f6dc9357SAndroid Build Coastguard Worker {
162*f6dc9357SAndroid Build Coastguard Worker res2 = SZ_ERROR_THREAD;
163*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
164*f6dc9357SAndroid Build Coastguard Worker {
165*f6dc9357SAndroid Build Coastguard Worker res = res2;
166*f6dc9357SAndroid Build Coastguard Worker // MtProgress_SetError(&mtc->mtProgress, res);
167*f6dc9357SAndroid Build Coastguard Worker }
168*f6dc9357SAndroid Build Coastguard Worker }
169*f6dc9357SAndroid Build Coastguard Worker
170*f6dc9357SAndroid Build Coastguard Worker bi = mtc->blockIndex;
171*f6dc9357SAndroid Build Coastguard Worker
172*f6dc9357SAndroid Build Coastguard Worker if (++mtc->blockIndex >= mtc->numBlocksMax)
173*f6dc9357SAndroid Build Coastguard Worker mtc->blockIndex = 0;
174*f6dc9357SAndroid Build Coastguard Worker
175*f6dc9357SAndroid Build Coastguard Worker bufIndex = (unsigned)(int)-1;
176*f6dc9357SAndroid Build Coastguard Worker
177*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
178*f6dc9357SAndroid Build Coastguard Worker res = MtProgress_GetError(&mtc->mtProgress);
179*f6dc9357SAndroid Build Coastguard Worker
180*f6dc9357SAndroid Build Coastguard Worker if (res != SZ_OK)
181*f6dc9357SAndroid Build Coastguard Worker finished = True;
182*f6dc9357SAndroid Build Coastguard Worker
183*f6dc9357SAndroid Build Coastguard Worker if (!finished)
184*f6dc9357SAndroid Build Coastguard Worker {
185*f6dc9357SAndroid Build Coastguard Worker if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
186*f6dc9357SAndroid Build Coastguard Worker && mtc->expectedDataSize != readProcessed)
187*f6dc9357SAndroid Build Coastguard Worker {
188*f6dc9357SAndroid Build Coastguard Worker res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
189*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
190*f6dc9357SAndroid Build Coastguard Worker mtc->numStartedThreads++;
191*f6dc9357SAndroid Build Coastguard Worker else
192*f6dc9357SAndroid Build Coastguard Worker {
193*f6dc9357SAndroid Build Coastguard Worker MtProgress_SetError(&mtc->mtProgress, res);
194*f6dc9357SAndroid Build Coastguard Worker finished = True;
195*f6dc9357SAndroid Build Coastguard Worker }
196*f6dc9357SAndroid Build Coastguard Worker }
197*f6dc9357SAndroid Build Coastguard Worker }
198*f6dc9357SAndroid Build Coastguard Worker
199*f6dc9357SAndroid Build Coastguard Worker if (finished)
200*f6dc9357SAndroid Build Coastguard Worker mtc->stopReading = True;
201*f6dc9357SAndroid Build Coastguard Worker
202*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(Event_Set(&mtc->readEvent))
203*f6dc9357SAndroid Build Coastguard Worker
204*f6dc9357SAndroid Build Coastguard Worker if (res2 != SZ_OK)
205*f6dc9357SAndroid Build Coastguard Worker return res2;
206*f6dc9357SAndroid Build Coastguard Worker
207*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
208*f6dc9357SAndroid Build Coastguard Worker {
209*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Enter(&mtc->cs);
210*f6dc9357SAndroid Build Coastguard Worker bufIndex = mtc->freeBlockHead;
211*f6dc9357SAndroid Build Coastguard Worker mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
212*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Leave(&mtc->cs);
213*f6dc9357SAndroid Build Coastguard Worker
214*f6dc9357SAndroid Build Coastguard Worker res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
215*f6dc9357SAndroid Build Coastguard Worker mtc->inStream ? t->inBuf : inData, size, finished);
216*f6dc9357SAndroid Build Coastguard Worker
217*f6dc9357SAndroid Build Coastguard Worker // MtProgress_Reinit(&mtc->mtProgress, t->index);
218*f6dc9357SAndroid Build Coastguard Worker
219*f6dc9357SAndroid Build Coastguard Worker if (res != SZ_OK)
220*f6dc9357SAndroid Build Coastguard Worker MtProgress_SetError(&mtc->mtProgress, res);
221*f6dc9357SAndroid Build Coastguard Worker }
222*f6dc9357SAndroid Build Coastguard Worker
223*f6dc9357SAndroid Build Coastguard Worker {
224*f6dc9357SAndroid Build Coastguard Worker CMtCoderBlock *block = &mtc->blocks[bi];
225*f6dc9357SAndroid Build Coastguard Worker block->res = res;
226*f6dc9357SAndroid Build Coastguard Worker block->bufIndex = bufIndex;
227*f6dc9357SAndroid Build Coastguard Worker block->finished = finished;
228*f6dc9357SAndroid Build Coastguard Worker }
229*f6dc9357SAndroid Build Coastguard Worker
230*f6dc9357SAndroid Build Coastguard Worker #ifdef MTCODER_USE_WRITE_THREAD
231*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
232*f6dc9357SAndroid Build Coastguard Worker #else
233*f6dc9357SAndroid Build Coastguard Worker {
234*f6dc9357SAndroid Build Coastguard Worker unsigned wi;
235*f6dc9357SAndroid Build Coastguard Worker {
236*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Enter(&mtc->cs);
237*f6dc9357SAndroid Build Coastguard Worker wi = mtc->writeIndex;
238*f6dc9357SAndroid Build Coastguard Worker if (wi == bi)
239*f6dc9357SAndroid Build Coastguard Worker mtc->writeIndex = (unsigned)(int)-1;
240*f6dc9357SAndroid Build Coastguard Worker else
241*f6dc9357SAndroid Build Coastguard Worker mtc->ReadyBlocks[bi] = True;
242*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Leave(&mtc->cs);
243*f6dc9357SAndroid Build Coastguard Worker }
244*f6dc9357SAndroid Build Coastguard Worker
245*f6dc9357SAndroid Build Coastguard Worker if (wi != bi)
246*f6dc9357SAndroid Build Coastguard Worker {
247*f6dc9357SAndroid Build Coastguard Worker if (res != SZ_OK || finished)
248*f6dc9357SAndroid Build Coastguard Worker return 0;
249*f6dc9357SAndroid Build Coastguard Worker continue;
250*f6dc9357SAndroid Build Coastguard Worker }
251*f6dc9357SAndroid Build Coastguard Worker
252*f6dc9357SAndroid Build Coastguard Worker if (mtc->writeRes != SZ_OK)
253*f6dc9357SAndroid Build Coastguard Worker res = mtc->writeRes;
254*f6dc9357SAndroid Build Coastguard Worker
255*f6dc9357SAndroid Build Coastguard Worker for (;;)
256*f6dc9357SAndroid Build Coastguard Worker {
257*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
258*f6dc9357SAndroid Build Coastguard Worker {
259*f6dc9357SAndroid Build Coastguard Worker res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
260*f6dc9357SAndroid Build Coastguard Worker if (res != SZ_OK)
261*f6dc9357SAndroid Build Coastguard Worker {
262*f6dc9357SAndroid Build Coastguard Worker mtc->writeRes = res;
263*f6dc9357SAndroid Build Coastguard Worker MtProgress_SetError(&mtc->mtProgress, res);
264*f6dc9357SAndroid Build Coastguard Worker }
265*f6dc9357SAndroid Build Coastguard Worker }
266*f6dc9357SAndroid Build Coastguard Worker
267*f6dc9357SAndroid Build Coastguard Worker if (++wi >= mtc->numBlocksMax)
268*f6dc9357SAndroid Build Coastguard Worker wi = 0;
269*f6dc9357SAndroid Build Coastguard Worker {
270*f6dc9357SAndroid Build Coastguard Worker BoolInt isReady;
271*f6dc9357SAndroid Build Coastguard Worker
272*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Enter(&mtc->cs);
273*f6dc9357SAndroid Build Coastguard Worker
274*f6dc9357SAndroid Build Coastguard Worker if (bufIndex != (unsigned)(int)-1)
275*f6dc9357SAndroid Build Coastguard Worker {
276*f6dc9357SAndroid Build Coastguard Worker mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
277*f6dc9357SAndroid Build Coastguard Worker mtc->freeBlockHead = bufIndex;
278*f6dc9357SAndroid Build Coastguard Worker }
279*f6dc9357SAndroid Build Coastguard Worker
280*f6dc9357SAndroid Build Coastguard Worker isReady = mtc->ReadyBlocks[wi];
281*f6dc9357SAndroid Build Coastguard Worker
282*f6dc9357SAndroid Build Coastguard Worker if (isReady)
283*f6dc9357SAndroid Build Coastguard Worker mtc->ReadyBlocks[wi] = False;
284*f6dc9357SAndroid Build Coastguard Worker else
285*f6dc9357SAndroid Build Coastguard Worker mtc->writeIndex = wi;
286*f6dc9357SAndroid Build Coastguard Worker
287*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Leave(&mtc->cs);
288*f6dc9357SAndroid Build Coastguard Worker
289*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
290*f6dc9357SAndroid Build Coastguard Worker
291*f6dc9357SAndroid Build Coastguard Worker if (!isReady)
292*f6dc9357SAndroid Build Coastguard Worker break;
293*f6dc9357SAndroid Build Coastguard Worker }
294*f6dc9357SAndroid Build Coastguard Worker
295*f6dc9357SAndroid Build Coastguard Worker {
296*f6dc9357SAndroid Build Coastguard Worker CMtCoderBlock *block = &mtc->blocks[wi];
297*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK && block->res != SZ_OK)
298*f6dc9357SAndroid Build Coastguard Worker res = block->res;
299*f6dc9357SAndroid Build Coastguard Worker bufIndex = block->bufIndex;
300*f6dc9357SAndroid Build Coastguard Worker finished = block->finished;
301*f6dc9357SAndroid Build Coastguard Worker }
302*f6dc9357SAndroid Build Coastguard Worker }
303*f6dc9357SAndroid Build Coastguard Worker }
304*f6dc9357SAndroid Build Coastguard Worker #endif
305*f6dc9357SAndroid Build Coastguard Worker
306*f6dc9357SAndroid Build Coastguard Worker if (finished || res != SZ_OK)
307*f6dc9357SAndroid Build Coastguard Worker return 0;
308*f6dc9357SAndroid Build Coastguard Worker }
309*f6dc9357SAndroid Build Coastguard Worker }
310*f6dc9357SAndroid Build Coastguard Worker
311*f6dc9357SAndroid Build Coastguard Worker
ThreadFunc(void * pp)312*f6dc9357SAndroid Build Coastguard Worker static THREAD_FUNC_DECL ThreadFunc(void *pp)
313*f6dc9357SAndroid Build Coastguard Worker {
314*f6dc9357SAndroid Build Coastguard Worker CMtCoderThread *t = (CMtCoderThread *)pp;
315*f6dc9357SAndroid Build Coastguard Worker for (;;)
316*f6dc9357SAndroid Build Coastguard Worker {
317*f6dc9357SAndroid Build Coastguard Worker if (Event_Wait(&t->startEvent) != 0)
318*f6dc9357SAndroid Build Coastguard Worker return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
319*f6dc9357SAndroid Build Coastguard Worker if (t->stop)
320*f6dc9357SAndroid Build Coastguard Worker return 0;
321*f6dc9357SAndroid Build Coastguard Worker {
322*f6dc9357SAndroid Build Coastguard Worker SRes res = ThreadFunc2(t);
323*f6dc9357SAndroid Build Coastguard Worker CMtCoder *mtc = t->mtCoder;
324*f6dc9357SAndroid Build Coastguard Worker if (res != SZ_OK)
325*f6dc9357SAndroid Build Coastguard Worker {
326*f6dc9357SAndroid Build Coastguard Worker MtProgress_SetError(&mtc->mtProgress, res);
327*f6dc9357SAndroid Build Coastguard Worker }
328*f6dc9357SAndroid Build Coastguard Worker
329*f6dc9357SAndroid Build Coastguard Worker #ifndef MTCODER_USE_WRITE_THREAD
330*f6dc9357SAndroid Build Coastguard Worker {
331*f6dc9357SAndroid Build Coastguard Worker unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
332*f6dc9357SAndroid Build Coastguard Worker if (numFinished == mtc->numStartedThreads)
333*f6dc9357SAndroid Build Coastguard Worker if (Event_Set(&mtc->finishedEvent) != 0)
334*f6dc9357SAndroid Build Coastguard Worker return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
335*f6dc9357SAndroid Build Coastguard Worker }
336*f6dc9357SAndroid Build Coastguard Worker #endif
337*f6dc9357SAndroid Build Coastguard Worker }
338*f6dc9357SAndroid Build Coastguard Worker }
339*f6dc9357SAndroid Build Coastguard Worker }
340*f6dc9357SAndroid Build Coastguard Worker
341*f6dc9357SAndroid Build Coastguard Worker
342*f6dc9357SAndroid Build Coastguard Worker
MtCoder_Construct(CMtCoder * p)343*f6dc9357SAndroid Build Coastguard Worker void MtCoder_Construct(CMtCoder *p)
344*f6dc9357SAndroid Build Coastguard Worker {
345*f6dc9357SAndroid Build Coastguard Worker unsigned i;
346*f6dc9357SAndroid Build Coastguard Worker
347*f6dc9357SAndroid Build Coastguard Worker p->blockSize = 0;
348*f6dc9357SAndroid Build Coastguard Worker p->numThreadsMax = 0;
349*f6dc9357SAndroid Build Coastguard Worker p->expectedDataSize = (UInt64)(Int64)-1;
350*f6dc9357SAndroid Build Coastguard Worker
351*f6dc9357SAndroid Build Coastguard Worker p->inStream = NULL;
352*f6dc9357SAndroid Build Coastguard Worker p->inData = NULL;
353*f6dc9357SAndroid Build Coastguard Worker p->inDataSize = 0;
354*f6dc9357SAndroid Build Coastguard Worker
355*f6dc9357SAndroid Build Coastguard Worker p->progress = NULL;
356*f6dc9357SAndroid Build Coastguard Worker p->allocBig = NULL;
357*f6dc9357SAndroid Build Coastguard Worker
358*f6dc9357SAndroid Build Coastguard Worker p->mtCallback = NULL;
359*f6dc9357SAndroid Build Coastguard Worker p->mtCallbackObject = NULL;
360*f6dc9357SAndroid Build Coastguard Worker
361*f6dc9357SAndroid Build Coastguard Worker p->allocatedBufsSize = 0;
362*f6dc9357SAndroid Build Coastguard Worker
363*f6dc9357SAndroid Build Coastguard Worker Event_Construct(&p->readEvent);
364*f6dc9357SAndroid Build Coastguard Worker Semaphore_Construct(&p->blocksSemaphore);
365*f6dc9357SAndroid Build Coastguard Worker
366*f6dc9357SAndroid Build Coastguard Worker for (i = 0; i < MTCODER_THREADS_MAX; i++)
367*f6dc9357SAndroid Build Coastguard Worker {
368*f6dc9357SAndroid Build Coastguard Worker CMtCoderThread *t = &p->threads[i];
369*f6dc9357SAndroid Build Coastguard Worker t->mtCoder = p;
370*f6dc9357SAndroid Build Coastguard Worker t->index = i;
371*f6dc9357SAndroid Build Coastguard Worker t->inBuf = NULL;
372*f6dc9357SAndroid Build Coastguard Worker t->stop = False;
373*f6dc9357SAndroid Build Coastguard Worker Event_Construct(&t->startEvent);
374*f6dc9357SAndroid Build Coastguard Worker Thread_CONSTRUCT(&t->thread)
375*f6dc9357SAndroid Build Coastguard Worker }
376*f6dc9357SAndroid Build Coastguard Worker
377*f6dc9357SAndroid Build Coastguard Worker #ifdef MTCODER_USE_WRITE_THREAD
378*f6dc9357SAndroid Build Coastguard Worker for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
379*f6dc9357SAndroid Build Coastguard Worker Event_Construct(&p->writeEvents[i]);
380*f6dc9357SAndroid Build Coastguard Worker #else
381*f6dc9357SAndroid Build Coastguard Worker Event_Construct(&p->finishedEvent);
382*f6dc9357SAndroid Build Coastguard Worker #endif
383*f6dc9357SAndroid Build Coastguard Worker
384*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Init(&p->cs);
385*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Init(&p->mtProgress.cs);
386*f6dc9357SAndroid Build Coastguard Worker }
387*f6dc9357SAndroid Build Coastguard Worker
388*f6dc9357SAndroid Build Coastguard Worker
389*f6dc9357SAndroid Build Coastguard Worker
390*f6dc9357SAndroid Build Coastguard Worker
MtCoder_Free(CMtCoder * p)391*f6dc9357SAndroid Build Coastguard Worker static void MtCoder_Free(CMtCoder *p)
392*f6dc9357SAndroid Build Coastguard Worker {
393*f6dc9357SAndroid Build Coastguard Worker unsigned i;
394*f6dc9357SAndroid Build Coastguard Worker
395*f6dc9357SAndroid Build Coastguard Worker /*
396*f6dc9357SAndroid Build Coastguard Worker p->stopReading = True;
397*f6dc9357SAndroid Build Coastguard Worker if (Event_IsCreated(&p->readEvent))
398*f6dc9357SAndroid Build Coastguard Worker Event_Set(&p->readEvent);
399*f6dc9357SAndroid Build Coastguard Worker */
400*f6dc9357SAndroid Build Coastguard Worker
401*f6dc9357SAndroid Build Coastguard Worker for (i = 0; i < MTCODER_THREADS_MAX; i++)
402*f6dc9357SAndroid Build Coastguard Worker MtCoderThread_Destruct(&p->threads[i]);
403*f6dc9357SAndroid Build Coastguard Worker
404*f6dc9357SAndroid Build Coastguard Worker Event_Close(&p->readEvent);
405*f6dc9357SAndroid Build Coastguard Worker Semaphore_Close(&p->blocksSemaphore);
406*f6dc9357SAndroid Build Coastguard Worker
407*f6dc9357SAndroid Build Coastguard Worker #ifdef MTCODER_USE_WRITE_THREAD
408*f6dc9357SAndroid Build Coastguard Worker for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
409*f6dc9357SAndroid Build Coastguard Worker Event_Close(&p->writeEvents[i]);
410*f6dc9357SAndroid Build Coastguard Worker #else
411*f6dc9357SAndroid Build Coastguard Worker Event_Close(&p->finishedEvent);
412*f6dc9357SAndroid Build Coastguard Worker #endif
413*f6dc9357SAndroid Build Coastguard Worker }
414*f6dc9357SAndroid Build Coastguard Worker
415*f6dc9357SAndroid Build Coastguard Worker
MtCoder_Destruct(CMtCoder * p)416*f6dc9357SAndroid Build Coastguard Worker void MtCoder_Destruct(CMtCoder *p)
417*f6dc9357SAndroid Build Coastguard Worker {
418*f6dc9357SAndroid Build Coastguard Worker MtCoder_Free(p);
419*f6dc9357SAndroid Build Coastguard Worker
420*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Delete(&p->cs);
421*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Delete(&p->mtProgress.cs);
422*f6dc9357SAndroid Build Coastguard Worker }
423*f6dc9357SAndroid Build Coastguard Worker
424*f6dc9357SAndroid Build Coastguard Worker
MtCoder_Code(CMtCoder * p)425*f6dc9357SAndroid Build Coastguard Worker SRes MtCoder_Code(CMtCoder *p)
426*f6dc9357SAndroid Build Coastguard Worker {
427*f6dc9357SAndroid Build Coastguard Worker unsigned numThreads = p->numThreadsMax;
428*f6dc9357SAndroid Build Coastguard Worker unsigned numBlocksMax;
429*f6dc9357SAndroid Build Coastguard Worker unsigned i;
430*f6dc9357SAndroid Build Coastguard Worker SRes res = SZ_OK;
431*f6dc9357SAndroid Build Coastguard Worker
432*f6dc9357SAndroid Build Coastguard Worker if (numThreads > MTCODER_THREADS_MAX)
433*f6dc9357SAndroid Build Coastguard Worker numThreads = MTCODER_THREADS_MAX;
434*f6dc9357SAndroid Build Coastguard Worker numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads);
435*f6dc9357SAndroid Build Coastguard Worker
436*f6dc9357SAndroid Build Coastguard Worker if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
437*f6dc9357SAndroid Build Coastguard Worker if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
438*f6dc9357SAndroid Build Coastguard Worker if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
439*f6dc9357SAndroid Build Coastguard Worker
440*f6dc9357SAndroid Build Coastguard Worker if (numBlocksMax > MTCODER_BLOCKS_MAX)
441*f6dc9357SAndroid Build Coastguard Worker numBlocksMax = MTCODER_BLOCKS_MAX;
442*f6dc9357SAndroid Build Coastguard Worker
443*f6dc9357SAndroid Build Coastguard Worker if (p->blockSize != p->allocatedBufsSize)
444*f6dc9357SAndroid Build Coastguard Worker {
445*f6dc9357SAndroid Build Coastguard Worker for (i = 0; i < MTCODER_THREADS_MAX; i++)
446*f6dc9357SAndroid Build Coastguard Worker {
447*f6dc9357SAndroid Build Coastguard Worker CMtCoderThread *t = &p->threads[i];
448*f6dc9357SAndroid Build Coastguard Worker if (t->inBuf)
449*f6dc9357SAndroid Build Coastguard Worker {
450*f6dc9357SAndroid Build Coastguard Worker ISzAlloc_Free(p->allocBig, t->inBuf);
451*f6dc9357SAndroid Build Coastguard Worker t->inBuf = NULL;
452*f6dc9357SAndroid Build Coastguard Worker }
453*f6dc9357SAndroid Build Coastguard Worker }
454*f6dc9357SAndroid Build Coastguard Worker p->allocatedBufsSize = p->blockSize;
455*f6dc9357SAndroid Build Coastguard Worker }
456*f6dc9357SAndroid Build Coastguard Worker
457*f6dc9357SAndroid Build Coastguard Worker p->readRes = SZ_OK;
458*f6dc9357SAndroid Build Coastguard Worker
459*f6dc9357SAndroid Build Coastguard Worker MtProgress_Init(&p->mtProgress, p->progress);
460*f6dc9357SAndroid Build Coastguard Worker
461*f6dc9357SAndroid Build Coastguard Worker #ifdef MTCODER_USE_WRITE_THREAD
462*f6dc9357SAndroid Build Coastguard Worker for (i = 0; i < numBlocksMax; i++)
463*f6dc9357SAndroid Build Coastguard Worker {
464*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->writeEvents[i]))
465*f6dc9357SAndroid Build Coastguard Worker }
466*f6dc9357SAndroid Build Coastguard Worker #else
467*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
468*f6dc9357SAndroid Build Coastguard Worker #endif
469*f6dc9357SAndroid Build Coastguard Worker
470*f6dc9357SAndroid Build Coastguard Worker {
471*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->readEvent))
472*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, (UInt32)numBlocksMax, (UInt32)numBlocksMax))
473*f6dc9357SAndroid Build Coastguard Worker }
474*f6dc9357SAndroid Build Coastguard Worker
475*f6dc9357SAndroid Build Coastguard Worker for (i = 0; i < MTCODER_BLOCKS_MAX - 1; i++)
476*f6dc9357SAndroid Build Coastguard Worker p->freeBlockList[i] = i + 1;
477*f6dc9357SAndroid Build Coastguard Worker p->freeBlockList[MTCODER_BLOCKS_MAX - 1] = (unsigned)(int)-1;
478*f6dc9357SAndroid Build Coastguard Worker p->freeBlockHead = 0;
479*f6dc9357SAndroid Build Coastguard Worker
480*f6dc9357SAndroid Build Coastguard Worker p->readProcessed = 0;
481*f6dc9357SAndroid Build Coastguard Worker p->blockIndex = 0;
482*f6dc9357SAndroid Build Coastguard Worker p->numBlocksMax = numBlocksMax;
483*f6dc9357SAndroid Build Coastguard Worker p->stopReading = False;
484*f6dc9357SAndroid Build Coastguard Worker
485*f6dc9357SAndroid Build Coastguard Worker #ifndef MTCODER_USE_WRITE_THREAD
486*f6dc9357SAndroid Build Coastguard Worker p->writeIndex = 0;
487*f6dc9357SAndroid Build Coastguard Worker p->writeRes = SZ_OK;
488*f6dc9357SAndroid Build Coastguard Worker for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
489*f6dc9357SAndroid Build Coastguard Worker p->ReadyBlocks[i] = False;
490*f6dc9357SAndroid Build Coastguard Worker p->numFinishedThreads = 0;
491*f6dc9357SAndroid Build Coastguard Worker #endif
492*f6dc9357SAndroid Build Coastguard Worker
493*f6dc9357SAndroid Build Coastguard Worker p->numStartedThreadsLimit = numThreads;
494*f6dc9357SAndroid Build Coastguard Worker p->numStartedThreads = 0;
495*f6dc9357SAndroid Build Coastguard Worker
496*f6dc9357SAndroid Build Coastguard Worker // for (i = 0; i < numThreads; i++)
497*f6dc9357SAndroid Build Coastguard Worker {
498*f6dc9357SAndroid Build Coastguard Worker CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
499*f6dc9357SAndroid Build Coastguard Worker RINOK(MtCoderThread_CreateAndStart(nextThread))
500*f6dc9357SAndroid Build Coastguard Worker }
501*f6dc9357SAndroid Build Coastguard Worker
502*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(Event_Set(&p->readEvent))
503*f6dc9357SAndroid Build Coastguard Worker
504*f6dc9357SAndroid Build Coastguard Worker #ifdef MTCODER_USE_WRITE_THREAD
505*f6dc9357SAndroid Build Coastguard Worker {
506*f6dc9357SAndroid Build Coastguard Worker unsigned bi = 0;
507*f6dc9357SAndroid Build Coastguard Worker
508*f6dc9357SAndroid Build Coastguard Worker for (;; bi++)
509*f6dc9357SAndroid Build Coastguard Worker {
510*f6dc9357SAndroid Build Coastguard Worker if (bi >= numBlocksMax)
511*f6dc9357SAndroid Build Coastguard Worker bi = 0;
512*f6dc9357SAndroid Build Coastguard Worker
513*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
514*f6dc9357SAndroid Build Coastguard Worker
515*f6dc9357SAndroid Build Coastguard Worker {
516*f6dc9357SAndroid Build Coastguard Worker const CMtCoderBlock *block = &p->blocks[bi];
517*f6dc9357SAndroid Build Coastguard Worker unsigned bufIndex = block->bufIndex;
518*f6dc9357SAndroid Build Coastguard Worker BoolInt finished = block->finished;
519*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK && block->res != SZ_OK)
520*f6dc9357SAndroid Build Coastguard Worker res = block->res;
521*f6dc9357SAndroid Build Coastguard Worker
522*f6dc9357SAndroid Build Coastguard Worker if (bufIndex != (unsigned)(int)-1)
523*f6dc9357SAndroid Build Coastguard Worker {
524*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
525*f6dc9357SAndroid Build Coastguard Worker {
526*f6dc9357SAndroid Build Coastguard Worker res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
527*f6dc9357SAndroid Build Coastguard Worker if (res != SZ_OK)
528*f6dc9357SAndroid Build Coastguard Worker MtProgress_SetError(&p->mtProgress, res);
529*f6dc9357SAndroid Build Coastguard Worker }
530*f6dc9357SAndroid Build Coastguard Worker
531*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Enter(&p->cs);
532*f6dc9357SAndroid Build Coastguard Worker {
533*f6dc9357SAndroid Build Coastguard Worker p->freeBlockList[bufIndex] = p->freeBlockHead;
534*f6dc9357SAndroid Build Coastguard Worker p->freeBlockHead = bufIndex;
535*f6dc9357SAndroid Build Coastguard Worker }
536*f6dc9357SAndroid Build Coastguard Worker CriticalSection_Leave(&p->cs);
537*f6dc9357SAndroid Build Coastguard Worker }
538*f6dc9357SAndroid Build Coastguard Worker
539*f6dc9357SAndroid Build Coastguard Worker RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
540*f6dc9357SAndroid Build Coastguard Worker
541*f6dc9357SAndroid Build Coastguard Worker if (finished)
542*f6dc9357SAndroid Build Coastguard Worker break;
543*f6dc9357SAndroid Build Coastguard Worker }
544*f6dc9357SAndroid Build Coastguard Worker }
545*f6dc9357SAndroid Build Coastguard Worker }
546*f6dc9357SAndroid Build Coastguard Worker #else
547*f6dc9357SAndroid Build Coastguard Worker {
548*f6dc9357SAndroid Build Coastguard Worker WRes wres = Event_Wait(&p->finishedEvent);
549*f6dc9357SAndroid Build Coastguard Worker res = MY_SRes_HRESULT_FROM_WRes(wres);
550*f6dc9357SAndroid Build Coastguard Worker }
551*f6dc9357SAndroid Build Coastguard Worker #endif
552*f6dc9357SAndroid Build Coastguard Worker
553*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
554*f6dc9357SAndroid Build Coastguard Worker res = p->readRes;
555*f6dc9357SAndroid Build Coastguard Worker
556*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
557*f6dc9357SAndroid Build Coastguard Worker res = p->mtProgress.res;
558*f6dc9357SAndroid Build Coastguard Worker
559*f6dc9357SAndroid Build Coastguard Worker #ifndef MTCODER_USE_WRITE_THREAD
560*f6dc9357SAndroid Build Coastguard Worker if (res == SZ_OK)
561*f6dc9357SAndroid Build Coastguard Worker res = p->writeRes;
562*f6dc9357SAndroid Build Coastguard Worker #endif
563*f6dc9357SAndroid Build Coastguard Worker
564*f6dc9357SAndroid Build Coastguard Worker if (res != SZ_OK)
565*f6dc9357SAndroid Build Coastguard Worker MtCoder_Free(p);
566*f6dc9357SAndroid Build Coastguard Worker return res;
567*f6dc9357SAndroid Build Coastguard Worker }
568*f6dc9357SAndroid Build Coastguard Worker
569*f6dc9357SAndroid Build Coastguard Worker #endif
570*f6dc9357SAndroid Build Coastguard Worker
571*f6dc9357SAndroid Build Coastguard Worker #undef RINOK_THREAD
572