1use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
2use core::marker::PhantomData;
3use core::ops::Range;
4use core::{any, mem};
5#[cfg(feature = "std")]
6use std;
7
8use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
9use super::encode::{
10 hasher_setup, BrotliEncoderDestroyInstance, BrotliEncoderMaxCompressedSize,
11 BrotliEncoderOperation, SanitizeParams,
12};
13use super::BrotliAlloc;
14use crate::concat::{BroCatli, BroCatliResult};
15use crate::enc::combined_alloc::{alloc_default, allocate};
16use crate::enc::encode::BrotliEncoderStateStruct;
17
18pub type PoisonedThreadError = ();
19
20#[cfg(feature = "std")]
21pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
22#[cfg(not(feature = "std"))]
23pub type LowLevelThreadError = ();
24
25pub trait AnyBoxConstructor {
26 fn new(data: LowLevelThreadError) -> Self;
27}
28
29pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
30 fn join(self) -> Result<T, U>;
31}
32#[derive(Debug)]
33pub enum BrotliEncoderThreadError {
34 InsufficientOutputSpace,
35 ConcatenationDidNotProcessFullFile,
36 ConcatenationError(BroCatliResult),
37 ConcatenationFinalizationError(BroCatliResult),
38 OtherThreadPanic,
39 ThreadExecError(LowLevelThreadError),
40}
41
42impl AnyBoxConstructor for BrotliEncoderThreadError {
43 fn new(data: LowLevelThreadError) -> Self {
44 BrotliEncoderThreadError::ThreadExecError(data)
45 }
46}
47
48pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
49where
50 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
51{
52 data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
53 data_size: usize,
54}
55pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
56where
57 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
58{
59 compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
60 alloc: Alloc,
61}
62pub enum InternalSendAlloc<
63 ReturnVal: Send + 'static,
64 ExtraInput: Send + 'static,
65 Alloc: BrotliAlloc + Send + 'static,
66 Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
67> where
68 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
69{
70 A(Alloc, ExtraInput),
71 Join(Join),
72 SpawningOrJoining(PhantomData<ReturnVal>),
73}
74impl<
75 ReturnVal: Send + 'static,
76 ExtraInput: Send + 'static,
77 Alloc: BrotliAlloc + Send + 'static,
78 Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
79 > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
80where
81 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
82{
83 fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
84 match *self {
85 InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
86 _ => panic!("Bad state for allocator"),
87 }
88 }
89}
90
91pub struct SendAlloc<
92 ReturnValue: Send + 'static,
93 ExtraInput: Send + 'static,
94 Alloc: BrotliAlloc + Send + 'static,
95 Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
96>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
97where
99 <Alloc as Allocator<u8>>::AllocatedMemory: Send;
100
101impl<
102 ReturnValue: Send + 'static,
103 ExtraInput: Send + 'static,
104 Alloc: BrotliAlloc + Send + 'static,
105 Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
106 > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
107where
108 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
109{
110 pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
111 SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
112 }
113 pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
114 match self.0 {
115 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
116 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
117 (other, other_extra)
118 }
119 }
120 }
121 fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
122 match self.0 {
123 InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
124 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
125 panic!("Item permanently borrowed/leaked")
126 }
127 }
128 }
129 pub fn unwrap(self) -> (Alloc, ExtraInput) {
130 match self.0 {
131 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
132 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
133 panic!("Item permanently borrowed/leaked")
134 }
135 }
136 }
137 pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
138 match mem::replace(
139 &mut self.0,
140 InternalSendAlloc::SpawningOrJoining(PhantomData),
141 ) {
142 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
143 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
144 panic!("Item permanently borrowed/leaked")
145 }
146 }
147 }
148}
149
150pub enum InternalOwned<T> {
151 Item(T),
153 Borrowed,
154}
155
156pub struct Owned<T>(pub InternalOwned<T>); impl<T> Owned<T> {
158 pub fn new(data: T) -> Self {
159 Owned::<T>(InternalOwned::Item(data))
160 }
161 pub fn unwrap_or(self, other: T) -> T {
162 if let InternalOwned::Item(x) = self.0 {
163 x
164 } else {
165 other
166 }
167 }
168 pub fn unwrap(self) -> T {
169 if let InternalOwned::Item(x) = self.0 {
170 x
171 } else {
172 panic!("Item permanently borrowed")
173 }
174 }
175 pub fn view(&self) -> &T {
176 if let InternalOwned::Item(ref x) = self.0 {
177 x
178 } else {
179 panic!("Item permanently borrowed")
180 }
181 }
182}
183
184pub trait OwnedRetriever<U: Send + 'static> {
185 fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
186 fn unwrap(self) -> Result<U, PoisonedThreadError>;
187}
188
189#[cfg(feature = "std")]
190impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
191 fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
192 match self.read() {
193 Ok(ref u) => Ok(f(u)),
194 Err(_) => Err(PoisonedThreadError::default()),
195 }
196 }
197 fn unwrap(self) -> Result<U, PoisonedThreadError> {
198 match std::sync::Arc::try_unwrap(self) {
199 Ok(rwlock) => match rwlock.into_inner() {
200 Ok(u) => Ok(u),
201 Err(_) => Err(PoisonedThreadError::default()),
202 },
203 Err(_) => Err(PoisonedThreadError::default()),
204 }
205 }
206}
207
208pub trait BatchSpawnable<
209 ReturnValue: Send + 'static,
210 ExtraInput: Send + 'static,
211 Alloc: BrotliAlloc + Send + 'static,
212 U: Send + 'static + Sync,
213> where
214 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
215{
216 type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
217 type FinalJoinHandle: OwnedRetriever<U>;
218 fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
227 fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
228 &mut self,
229 handle: &mut Self::FinalJoinHandle,
230 alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
231 index: usize,
232 num_threads: usize,
233 f: F,
234 );
235}
236
237pub trait BatchSpawnableLite<
238 ReturnValue: Send + 'static,
239 ExtraInput: Send + 'static,
240 Alloc: BrotliAlloc + Send + 'static,
241 U: Send + 'static + Sync,
242> where
243 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
244{
245 type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
246 type FinalJoinHandle: OwnedRetriever<U>;
247 fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
248 fn spawn(
249 &mut self,
250 handle: &mut Self::FinalJoinHandle,
251 alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
252 index: usize,
253 num_threads: usize,
254 f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
255 );
256}
257pub fn CompressMultiSlice<
276 Alloc: BrotliAlloc + Send + 'static,
277 Spawner: BatchSpawnableLite<
278 CompressionThreadResult<Alloc>,
279 UnionHasher<Alloc>,
280 Alloc,
281 (
282 <Alloc as Allocator<u8>>::AllocatedMemory,
283 BrotliEncoderParams,
284 ),
285 >,
286>(
287 params: &BrotliEncoderParams,
288 input_slice: &[u8],
289 output: &mut [u8],
290 alloc_per_thread: &mut [SendAlloc<
291 CompressionThreadResult<Alloc>,
292 UnionHasher<Alloc>,
293 Alloc,
294 Spawner::JoinHandle,
295 >],
296 thread_spawner: &mut Spawner,
297) -> Result<usize, BrotliEncoderThreadError>
298where
299 <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
300 <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
301 <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
302{
303 let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
304 let mut input = allocate::<u8, _>(alloc, input_slice.len());
305 input.slice_mut().clone_from_slice(input_slice);
306 input
307 } else {
308 alloc_default::<u8, Alloc>()
309 };
310 let mut owned_input = Owned::new(input);
311 let ret = CompressMulti(
312 params,
313 &mut owned_input,
314 output,
315 alloc_per_thread,
316 thread_spawner,
317 );
318 if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
319 <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
320 }
321 ret
322}
323
324fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
325 ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
326}
327
328fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
329 hasher: UnionHasher<Alloc>,
330 thread_index: usize,
331 num_threads: usize,
332 input_and_params: &(SliceW, BrotliEncoderParams),
333 mut alloc: Alloc,
334) -> CompressionThreadResult<Alloc>
335where
336 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
337{
338 let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
339 let mut mem = allocate::<u8, _>(
340 &mut alloc,
341 BrotliEncoderMaxCompressedSize(range.end - range.start),
342 );
343 let mut state = BrotliEncoderStateStruct::new(alloc);
344 state.params = input_and_params.1.clone();
345 if thread_index != 0 {
346 state.params.catable = true; state.params.magic_number = false; }
349 state.params.appendable = true; if thread_index != 0 {
351 state.set_custom_dictionary_with_optional_precomputed_hasher(
352 range.start,
353 &input_and_params.0.slice()[..range.start],
354 hasher,
355 true,
356 );
357 }
358 let mut out_offset = 0usize;
359 let compression_result;
360 let mut available_out = mem.len();
361 loop {
362 let mut next_in_offset = 0usize;
363 let mut available_in = range.end - range.start;
364 let result = state.compress_stream(
365 BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
366 &mut available_in,
367 &input_and_params.0.slice()[range.clone()],
368 &mut next_in_offset,
369 &mut available_out,
370 mem.slice_mut(),
371 &mut out_offset,
372 &mut None,
373 &mut |_a, _b, _c, _d| (),
374 );
375 let new_range = range.start + next_in_offset..range.end;
376 range = new_range;
377 if result {
378 compression_result = Ok(out_offset);
379 break;
380 } else if available_out == 0 {
381 compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); break;
383 }
384 }
385 BrotliEncoderDestroyInstance(&mut state);
386 match compression_result {
387 Ok(size) => CompressionThreadResult::<Alloc> {
388 compressed: Ok(CompressedFileChunk {
389 data_backing: mem,
390 data_size: size,
391 }),
392 alloc: state.m8,
393 },
394 Err(e) => {
395 <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
396 CompressionThreadResult::<Alloc> {
397 compressed: Err(e),
398 alloc: state.m8,
399 }
400 }
401 }
402}
403
404pub fn CompressMulti<
405 Alloc: BrotliAlloc + Send + 'static,
406 SliceW: SliceWrapper<u8> + Send + 'static + Sync,
407 Spawner: BatchSpawnableLite<
408 CompressionThreadResult<Alloc>,
409 UnionHasher<Alloc>,
410 Alloc,
411 (SliceW, BrotliEncoderParams),
412 >,
413>(
414 params: &BrotliEncoderParams,
415 owned_input: &mut Owned<SliceW>,
416 output: &mut [u8],
417 alloc_per_thread: &mut [SendAlloc<
418 CompressionThreadResult<Alloc>,
419 UnionHasher<Alloc>,
420 Alloc,
421 Spawner::JoinHandle,
422 >],
423 thread_spawner: &mut Spawner,
424) -> Result<usize, BrotliEncoderThreadError>
425where
426 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
427 <Alloc as Allocator<u16>>::AllocatedMemory: Send,
428 <Alloc as Allocator<u32>>::AllocatedMemory: Send,
429{
430 let num_threads = alloc_per_thread.len();
431 let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
432 let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
433 let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
435 if num_threads > 1 {
436 thread_spawner.spawn(
438 &mut spawner_and_input,
439 &mut alloc_per_thread[0],
440 0,
441 num_threads,
442 compress_part,
443 );
444 }
445 let mut compression_last_thread_result;
447 if num_threads > 1 && params.favor_cpu_efficiency {
448 let mut local_params = params.clone();
449 SanitizeParams(&mut local_params);
450 let mut hasher = UnionHasher::Uninit;
451 hasher_setup(
452 alloc_per_thread[num_threads - 1].0.unwrap_input().0,
453 &mut hasher,
454 &mut local_params,
455 None, &[],
457 0,
458 0,
459 false,
460 );
461 for thread_index in 1..num_threads {
462 let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
463 let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
464 let overlap = hasher.StoreLookahead().wrapping_sub(1);
465 if range.end - range.start > overlap {
466 hasher.BulkStoreRange(
467 input_and_params.0.slice(),
468 usize::MAX,
469 if range.start > overlap {
470 range.start - overlap
471 } else {
472 0
473 },
474 range.end - overlap,
475 );
476 }
477 });
478 if let Err(_e) = res {
479 return Err(BrotliEncoderThreadError::OtherThreadPanic);
480 }
481 if thread_index + 1 != num_threads {
482 {
483 let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
484 *out_hasher = hasher.clone_with_alloc(alloc);
485 }
486 thread_spawner.spawn(
487 &mut spawner_and_input,
488 &mut alloc_per_thread[thread_index],
489 thread_index,
490 num_threads,
491 compress_part,
492 );
493 }
494 }
495 let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
496 compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
497 compress_part(hasher,
498 num_threads - 1,
499 num_threads,
500 input_and_params,
501 alloc,
502 )
503 });
504 } else {
505 if num_threads > 1 {
506 for thread_index in 1..num_threads - 1 {
507 thread_spawner.spawn(
508 &mut spawner_and_input,
509 &mut alloc_per_thread[thread_index],
510 thread_index,
511 num_threads,
512 compress_part,
513 );
514 }
515 }
516 let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
517 compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
518 compress_part(UnionHasher::Uninit,
519 num_threads - 1,
520 num_threads,
521 input_and_params,
522 alloc,
523 )
524 });
525 }
526 let mut compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
527 let mut out_file_size = 0usize;
528 let mut bro_cat_li = BroCatli::new();
529 for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
530 let mut cur_result = if index + 1 == num_threads {
531 match mem::replace(&mut compression_last_thread_result, Err(())) {
532 Ok(result) => result,
533 Err(_err) => return Err(BrotliEncoderThreadError::OtherThreadPanic),
534 }
535 } else {
536 match mem::replace(
537 &mut thread.0,
538 InternalSendAlloc::SpawningOrJoining(PhantomData),
539 ) {
540 InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
541 panic!("Thread not properly spawned")
542 }
543 InternalSendAlloc::Join(join) => match join.join() {
544 Ok(result) => result,
545 Err(err) => {
546 return Err(err);
547 }
548 },
549 }
550 };
551 match cur_result.compressed {
552 Ok(compressed_out) => {
553 bro_cat_li.new_brotli_file();
554 let mut in_offset = 0usize;
555 let cat_result = bro_cat_li.stream(
556 &compressed_out.data_backing.slice()[..compressed_out.data_size],
557 &mut in_offset,
558 output,
559 &mut out_file_size,
560 );
561 match cat_result {
562 BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
563 compression_result = Ok(out_file_size);
564 }
565 BroCatliResult::NeedsMoreOutput => {
566 compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
567 }
569 err => {
570 compression_result = Err(BrotliEncoderThreadError::ConcatenationError(err));
571 }
573 }
574 <Alloc as Allocator<u8>>::free_cell(
575 &mut cur_result.alloc,
576 compressed_out.data_backing,
577 );
578 }
579 Err(e) => {
580 compression_result = Err(e);
581 }
582 }
583 thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
584 }
585 compression_result?;
586 match bro_cat_li.finish(output, &mut out_file_size) {
587 BroCatliResult::Success => compression_result = Ok(out_file_size),
588 err => {
589 compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
590 err,
591 ))
592 }
593 }
594 if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
595 *owned_input = Owned::new(retrieved_owned_input.0); } else if compression_result.is_ok() {
597 compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
598 }
599 compression_result
600}