1use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
2use super::encode::{
3 BrotliEncoderDestroyInstance, BrotliEncoderMaxCompressedSize, BrotliEncoderOperation,
4 HasherSetup, SanitizeParams,
5};
6use super::BrotliAlloc;
7use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
8use concat::{BroCatli, BroCatliResult};
9use core::any;
10use core::marker::PhantomData;
11use core::mem;
12use core::ops::Range;
13use enc::encode::BrotliEncoderStateStruct;
14
15pub type PoisonedThreadError = ();
16
17#[cfg(feature = "std")]
18pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
19#[cfg(not(feature = "std"))]
20pub type LowLevelThreadError = ();
21
22pub trait AnyBoxConstructor {
23 fn new(data: LowLevelThreadError) -> Self;
24}
25
26pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
27 fn join(self) -> Result<T, U>;
28}
29#[derive(Debug)]
30pub enum BrotliEncoderThreadError {
31 InsufficientOutputSpace,
32 ConcatenationDidNotProcessFullFile,
33 ConcatenationError(BroCatliResult),
34 ConcatenationFinalizationError(BroCatliResult),
35 OtherThreadPanic,
36 ThreadExecError(LowLevelThreadError),
37}
38
39impl AnyBoxConstructor for BrotliEncoderThreadError {
40 fn new(data: LowLevelThreadError) -> Self {
41 BrotliEncoderThreadError::ThreadExecError(data)
42 }
43}
44
45pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
46where
47 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
48{
49 data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
50 data_size: usize,
51}
52pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
53where
54 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
55{
56 compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
57 alloc: Alloc,
58}
59pub enum InternalSendAlloc<
60 ReturnVal: Send + 'static,
61 ExtraInput: Send + 'static,
62 Alloc: BrotliAlloc + Send + 'static,
63 Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
64> where
65 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
66{
67 A(Alloc, ExtraInput),
68 Join(Join),
69 SpawningOrJoining(PhantomData<ReturnVal>),
70}
71impl<
72 ReturnVal: Send + 'static,
73 ExtraInput: Send + 'static,
74 Alloc: BrotliAlloc + Send + 'static,
75 Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
76 > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
77where
78 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
79{
80 fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
81 match *self {
82 InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
83 _ => panic!("Bad state for allocator"),
84 }
85 }
86}
87
88pub struct SendAlloc<
89 ReturnValue: Send + 'static,
90 ExtraInput: Send + 'static,
91 Alloc: BrotliAlloc + Send + 'static,
92 Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
93>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
94where
96 <Alloc as Allocator<u8>>::AllocatedMemory: Send;
97
98impl<
99 ReturnValue: Send + 'static,
100 ExtraInput: Send + 'static,
101 Alloc: BrotliAlloc + Send + 'static,
102 Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
103 > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
104where
105 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
106{
107 pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
108 SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
109 }
110 pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
111 match self.0 {
112 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
113 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
114 (other, other_extra)
115 }
116 }
117 }
118 fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
119 match self.0 {
120 InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
121 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
122 panic!("Item permanently borrowed/leaked")
123 }
124 }
125 }
126 pub fn unwrap(self) -> (Alloc, ExtraInput) {
127 match self.0 {
128 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
129 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
130 panic!("Item permanently borrowed/leaked")
131 }
132 }
133 }
134 pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
135 match mem::replace(
136 &mut self.0,
137 InternalSendAlloc::SpawningOrJoining(PhantomData),
138 ) {
139 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
140 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
141 panic!("Item permanently borrowed/leaked")
142 }
143 }
144 }
145}
146
147pub enum InternalOwned<T> {
148 Item(T),
150 Borrowed,
151}
152
153pub struct Owned<T>(pub InternalOwned<T>); impl<T> Owned<T> {
155 pub fn new(data: T) -> Self {
156 Owned::<T>(InternalOwned::Item(data))
157 }
158 pub fn unwrap_or(self, other: T) -> T {
159 if let InternalOwned::Item(x) = self.0 {
160 x
161 } else {
162 other
163 }
164 }
165 pub fn unwrap(self) -> T {
166 if let InternalOwned::Item(x) = self.0 {
167 x
168 } else {
169 panic!("Item permanently borrowed")
170 }
171 }
172 pub fn view(&self) -> &T {
173 if let InternalOwned::Item(ref x) = self.0 {
174 x
175 } else {
176 panic!("Item permanently borrowed")
177 }
178 }
179}
180
181pub trait OwnedRetriever<U: Send + 'static> {
182 fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
183 fn unwrap(self) -> Result<U, PoisonedThreadError>;
184}
185
186#[cfg(feature = "std")]
187impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
188 fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
189 match self.read() {
190 Ok(ref u) => Ok(f(u)),
191 Err(_) => Err(PoisonedThreadError::default()),
192 }
193 }
194 fn unwrap(self) -> Result<U, PoisonedThreadError> {
195 match std::sync::Arc::try_unwrap(self) {
196 Ok(rwlock) => match rwlock.into_inner() {
197 Ok(u) => Ok(u),
198 Err(_) => Err(PoisonedThreadError::default()),
199 },
200 Err(_) => Err(PoisonedThreadError::default()),
201 }
202 }
203}
204
205pub trait BatchSpawnable<
206 ReturnValue: Send + 'static,
207 ExtraInput: Send + 'static,
208 Alloc: BrotliAlloc + Send + 'static,
209 U: Send + 'static + Sync,
210> where
211 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
212{
213 type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
214 type FinalJoinHandle: OwnedRetriever<U>;
215 fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
224 fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
225 &mut self,
226 handle: &mut Self::FinalJoinHandle,
227 alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
228 index: usize,
229 num_threads: usize,
230 f: F,
231 );
232}
233
234pub trait BatchSpawnableLite<
235 ReturnValue: Send + 'static,
236 ExtraInput: Send + 'static,
237 Alloc: BrotliAlloc + Send + 'static,
238 U: Send + 'static + Sync,
239> where
240 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
241{
242 type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
243 type FinalJoinHandle: OwnedRetriever<U>;
244 fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
245 fn spawn(
246 &mut self,
247 handle: &mut Self::FinalJoinHandle,
248 alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
249 index: usize,
250 num_threads: usize,
251 f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
252 );
253}
254pub fn CompressMultiSlice<
273 Alloc: BrotliAlloc + Send + 'static,
274 Spawner: BatchSpawnableLite<
275 CompressionThreadResult<Alloc>,
276 UnionHasher<Alloc>,
277 Alloc,
278 (
279 <Alloc as Allocator<u8>>::AllocatedMemory,
280 BrotliEncoderParams,
281 ),
282 >,
283>(
284 params: &BrotliEncoderParams,
285 input_slice: &[u8],
286 output: &mut [u8],
287 alloc_per_thread: &mut [SendAlloc<
288 CompressionThreadResult<Alloc>,
289 UnionHasher<Alloc>,
290 Alloc,
291 Spawner::JoinHandle,
292 >],
293 thread_spawner: &mut Spawner,
294) -> Result<usize, BrotliEncoderThreadError>
295where
296 <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
297 <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
298 <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
299{
300 let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
301 let mut input = <Alloc as Allocator<u8>>::alloc_cell(alloc, input_slice.len());
302 input.slice_mut().clone_from_slice(input_slice);
303 input
304 } else {
305 <Alloc as Allocator<u8>>::AllocatedMemory::default()
306 };
307 let mut owned_input = Owned::new(input);
308 let ret = CompressMulti(
309 params,
310 &mut owned_input,
311 output,
312 alloc_per_thread,
313 thread_spawner,
314 );
315 if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
316 <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
317 }
318 ret
319}
320
321fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
322 ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
323}
324
325fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
326 hasher: UnionHasher<Alloc>,
327 thread_index: usize,
328 num_threads: usize,
329 input_and_params: &(SliceW, BrotliEncoderParams),
330 mut alloc: Alloc,
331) -> CompressionThreadResult<Alloc>
332where
333 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
334{
335 let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
336 let mut mem = <Alloc as Allocator<u8>>::alloc_cell(
337 &mut alloc,
338 BrotliEncoderMaxCompressedSize(range.end - range.start),
339 );
340 let mut state = BrotliEncoderStateStruct::new(alloc);
341 state.params = input_and_params.1.clone();
342 if thread_index != 0 {
343 state.params.catable = true; state.params.magic_number = false; }
346 state.params.appendable = true; if thread_index != 0 {
348 state.set_custom_dictionary_with_optional_precomputed_hasher(
349 range.start,
350 &input_and_params.0.slice()[..range.start],
351 hasher,
352 );
353 }
354 let mut out_offset = 0usize;
355 let compression_result;
356 let mut available_out = mem.len();
357 loop {
358 let mut next_in_offset = 0usize;
359 let mut available_in = range.end - range.start;
360 let result = state.compress_stream(
361 BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
362 &mut available_in,
363 &input_and_params.0.slice()[range.clone()],
364 &mut next_in_offset,
365 &mut available_out,
366 mem.slice_mut(),
367 &mut out_offset,
368 &mut None,
369 &mut |_a, _b, _c, _d| (),
370 );
371 let new_range = range.start + next_in_offset..range.end;
372 range = new_range;
373 if result {
374 compression_result = Ok(out_offset);
375 break;
376 } else if available_out == 0 {
377 compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); break;
379 }
380 }
381 BrotliEncoderDestroyInstance(&mut state);
382 match compression_result {
383 Ok(size) => CompressionThreadResult::<Alloc> {
384 compressed: Ok(CompressedFileChunk {
385 data_backing: mem,
386 data_size: size,
387 }),
388 alloc: state.m8,
389 },
390 Err(e) => {
391 <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
392 CompressionThreadResult::<Alloc> {
393 compressed: Err(e),
394 alloc: state.m8,
395 }
396 }
397 }
398}
399
400pub fn CompressMulti<
401 Alloc: BrotliAlloc + Send + 'static,
402 SliceW: SliceWrapper<u8> + Send + 'static + Sync,
403 Spawner: BatchSpawnableLite<
404 CompressionThreadResult<Alloc>,
405 UnionHasher<Alloc>,
406 Alloc,
407 (SliceW, BrotliEncoderParams),
408 >,
409>(
410 params: &BrotliEncoderParams,
411 owned_input: &mut Owned<SliceW>,
412 output: &mut [u8],
413 alloc_per_thread: &mut [SendAlloc<
414 CompressionThreadResult<Alloc>,
415 UnionHasher<Alloc>,
416 Alloc,
417 Spawner::JoinHandle,
418 >],
419 thread_spawner: &mut Spawner,
420) -> Result<usize, BrotliEncoderThreadError>
421where
422 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
423 <Alloc as Allocator<u16>>::AllocatedMemory: Send,
424 <Alloc as Allocator<u32>>::AllocatedMemory: Send,
425{
426 let num_threads = alloc_per_thread.len();
427 let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
428 let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
429 let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
431 if num_threads > 1 {
432 thread_spawner.spawn(
434 &mut spawner_and_input,
435 &mut alloc_per_thread[0],
436 0,
437 num_threads,
438 compress_part,
439 );
440 }
441 let mut compression_last_thread_result;
443 if num_threads > 1 && params.favor_cpu_efficiency {
444 let mut local_params = params.clone();
445 SanitizeParams(&mut local_params);
446 let mut hasher = UnionHasher::Uninit;
447 HasherSetup(
448 alloc_per_thread[num_threads - 1].0.unwrap_input().0,
449 &mut hasher,
450 &mut local_params,
451 &[],
452 0,
453 0,
454 0,
455 );
456 for thread_index in 1..num_threads {
457 let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
458 let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
459 let overlap = hasher.StoreLookahead().wrapping_sub(1);
460 if range.end - range.start > overlap {
461 hasher.BulkStoreRange(
462 input_and_params.0.slice(),
463 !(0),
464 if range.start > overlap {
465 range.start - overlap
466 } else {
467 0
468 },
469 range.end - overlap,
470 );
471 }
472 });
473 if let Err(_e) = res {
474 return Err(BrotliEncoderThreadError::OtherThreadPanic);
475 }
476 if thread_index + 1 != num_threads {
477 {
478 let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
479 *out_hasher = hasher.clone_with_alloc(alloc);
480 }
481 thread_spawner.spawn(
482 &mut spawner_and_input,
483 &mut alloc_per_thread[thread_index],
484 thread_index,
485 num_threads,
486 compress_part,
487 );
488 }
489 }
490 let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
491 compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
492 compress_part(hasher,
493 num_threads - 1,
494 num_threads,
495 input_and_params,
496 alloc,
497 )
498 });
499 } else {
500 if num_threads > 1 {
501 for thread_index in 1..num_threads - 1 {
502 thread_spawner.spawn(
503 &mut spawner_and_input,
504 &mut alloc_per_thread[thread_index],
505 thread_index,
506 num_threads,
507 compress_part,
508 );
509 }
510 }
511 let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
512 compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
513 compress_part(UnionHasher::Uninit,
514 num_threads - 1,
515 num_threads,
516 input_and_params,
517 alloc,
518 )
519 });
520 }
521 let mut compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
522 let mut out_file_size = 0usize;
523 let mut bro_cat_li = BroCatli::new();
524 for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
525 let mut cur_result = if index + 1 == num_threads {
526 match mem::replace(&mut compression_last_thread_result, Err(())) {
527 Ok(result) => result,
528 Err(_err) => return Err(BrotliEncoderThreadError::OtherThreadPanic),
529 }
530 } else {
531 match mem::replace(
532 &mut thread.0,
533 InternalSendAlloc::SpawningOrJoining(PhantomData),
534 ) {
535 InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
536 panic!("Thread not properly spawned")
537 }
538 InternalSendAlloc::Join(join) => match join.join() {
539 Ok(result) => result,
540 Err(err) => {
541 return Err(err);
542 }
543 },
544 }
545 };
546 match cur_result.compressed {
547 Ok(compressed_out) => {
548 bro_cat_li.new_brotli_file();
549 let mut in_offset = 0usize;
550 let cat_result = bro_cat_li.stream(
551 &compressed_out.data_backing.slice()[..compressed_out.data_size],
552 &mut in_offset,
553 output,
554 &mut out_file_size,
555 );
556 match cat_result {
557 BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
558 compression_result = Ok(out_file_size);
559 }
560 BroCatliResult::NeedsMoreOutput => {
561 compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
562 }
564 err => {
565 compression_result = Err(BrotliEncoderThreadError::ConcatenationError(err));
566 }
568 }
569 <Alloc as Allocator<u8>>::free_cell(
570 &mut cur_result.alloc,
571 compressed_out.data_backing,
572 );
573 }
574 Err(e) => {
575 compression_result = Err(e);
576 }
577 }
578 thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
579 }
580 compression_result?;
581 match bro_cat_li.finish(output, &mut out_file_size) {
582 BroCatliResult::Success => compression_result = Ok(out_file_size),
583 err => {
584 compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
585 err,
586 ))
587 }
588 }
589 if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
590 *owned_input = Owned::new(retrieved_owned_input.0); } else if compression_result.is_ok() {
592 compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
593 }
594 compression_result
595}