brotli/enc/
threading.rs

1use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
2use super::encode::{
3    BrotliEncoderDestroyInstance, BrotliEncoderMaxCompressedSize, BrotliEncoderOperation,
4    HasherSetup, SanitizeParams,
5};
6use super::BrotliAlloc;
7use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
8use concat::{BroCatli, BroCatliResult};
9use core::any;
10use core::marker::PhantomData;
11use core::mem;
12use core::ops::Range;
13use enc::encode::BrotliEncoderStateStruct;
14
15pub type PoisonedThreadError = ();
16
17#[cfg(feature = "std")]
18pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
19#[cfg(not(feature = "std"))]
20pub type LowLevelThreadError = ();
21
22pub trait AnyBoxConstructor {
23    fn new(data: LowLevelThreadError) -> Self;
24}
25
26pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
27    fn join(self) -> Result<T, U>;
28}
29#[derive(Debug)]
30pub enum BrotliEncoderThreadError {
31    InsufficientOutputSpace,
32    ConcatenationDidNotProcessFullFile,
33    ConcatenationError(BroCatliResult),
34    ConcatenationFinalizationError(BroCatliResult),
35    OtherThreadPanic,
36    ThreadExecError(LowLevelThreadError),
37}
38
39impl AnyBoxConstructor for BrotliEncoderThreadError {
40    fn new(data: LowLevelThreadError) -> Self {
41        BrotliEncoderThreadError::ThreadExecError(data)
42    }
43}
44
45pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
46where
47    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
48{
49    data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
50    data_size: usize,
51}
52pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
53where
54    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
55{
56    compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
57    alloc: Alloc,
58}
59pub enum InternalSendAlloc<
60    ReturnVal: Send + 'static,
61    ExtraInput: Send + 'static,
62    Alloc: BrotliAlloc + Send + 'static,
63    Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
64> where
65    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
66{
67    A(Alloc, ExtraInput),
68    Join(Join),
69    SpawningOrJoining(PhantomData<ReturnVal>),
70}
71impl<
72        ReturnVal: Send + 'static,
73        ExtraInput: Send + 'static,
74        Alloc: BrotliAlloc + Send + 'static,
75        Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
76    > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
77where
78    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
79{
80    fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
81        match *self {
82            InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
83            _ => panic!("Bad state for allocator"),
84        }
85    }
86}
87
88pub struct SendAlloc<
89    ReturnValue: Send + 'static,
90    ExtraInput: Send + 'static,
91    Alloc: BrotliAlloc + Send + 'static,
92    Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
93>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
94//FIXME pub
95where
96    <Alloc as Allocator<u8>>::AllocatedMemory: Send;
97
98impl<
99        ReturnValue: Send + 'static,
100        ExtraInput: Send + 'static,
101        Alloc: BrotliAlloc + Send + 'static,
102        Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
103    > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
104where
105    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
106{
107    pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
108        SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
109    }
110    pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
111        match self.0 {
112            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
113            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
114                (other, other_extra)
115            }
116        }
117    }
118    fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
119        match self.0 {
120            InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
121            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
122                panic!("Item permanently borrowed/leaked")
123            }
124        }
125    }
126    pub fn unwrap(self) -> (Alloc, ExtraInput) {
127        match self.0 {
128            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
129            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
130                panic!("Item permanently borrowed/leaked")
131            }
132        }
133    }
134    pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
135        match mem::replace(
136            &mut self.0,
137            InternalSendAlloc::SpawningOrJoining(PhantomData),
138        ) {
139            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
140            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
141                panic!("Item permanently borrowed/leaked")
142            }
143        }
144    }
145}
146
147pub enum InternalOwned<T> {
148    // FIXME pub
149    Item(T),
150    Borrowed,
151}
152
153pub struct Owned<T>(pub InternalOwned<T>); // FIXME pub
154impl<T> Owned<T> {
155    pub fn new(data: T) -> Self {
156        Owned::<T>(InternalOwned::Item(data))
157    }
158    pub fn unwrap_or(self, other: T) -> T {
159        if let InternalOwned::Item(x) = self.0 {
160            x
161        } else {
162            other
163        }
164    }
165    pub fn unwrap(self) -> T {
166        if let InternalOwned::Item(x) = self.0 {
167            x
168        } else {
169            panic!("Item permanently borrowed")
170        }
171    }
172    pub fn view(&self) -> &T {
173        if let InternalOwned::Item(ref x) = self.0 {
174            x
175        } else {
176            panic!("Item permanently borrowed")
177        }
178    }
179}
180
181pub trait OwnedRetriever<U: Send + 'static> {
182    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
183    fn unwrap(self) -> Result<U, PoisonedThreadError>;
184}
185
186#[cfg(feature = "std")]
187impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
188    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
189        match self.read() {
190            Ok(ref u) => Ok(f(u)),
191            Err(_) => Err(PoisonedThreadError::default()),
192        }
193    }
194    fn unwrap(self) -> Result<U, PoisonedThreadError> {
195        match std::sync::Arc::try_unwrap(self) {
196            Ok(rwlock) => match rwlock.into_inner() {
197                Ok(u) => Ok(u),
198                Err(_) => Err(PoisonedThreadError::default()),
199            },
200            Err(_) => Err(PoisonedThreadError::default()),
201        }
202    }
203}
204
205pub trait BatchSpawnable<
206    ReturnValue: Send + 'static,
207    ExtraInput: Send + 'static,
208    Alloc: BrotliAlloc + Send + 'static,
209    U: Send + 'static + Sync,
210> where
211    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
212{
213    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
214    type FinalJoinHandle: OwnedRetriever<U>;
215    // this function takes in an input slice
216    // a SendAlloc per thread and converts them all into JoinHandle
217    // the input is borrowed until the joins complete
218    // owned is set to borrowed
219    // the final join handle is a r/w lock which will return the SliceW to the owner
220    // the FinalJoinHandle is only to be called when each individual JoinHandle has been examined
221    // the function is called with the thread_index, the num_threads, a reference to the slice under a read lock,
222    // and an allocator from the alloc_per_thread
223    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
224    fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
225        &mut self,
226        handle: &mut Self::FinalJoinHandle,
227        alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
228        index: usize,
229        num_threads: usize,
230        f: F,
231    );
232}
233
234pub trait BatchSpawnableLite<
235    ReturnValue: Send + 'static,
236    ExtraInput: Send + 'static,
237    Alloc: BrotliAlloc + Send + 'static,
238    U: Send + 'static + Sync,
239> where
240    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
241{
242    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
243    type FinalJoinHandle: OwnedRetriever<U>;
244    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
245    fn spawn(
246        &mut self,
247        handle: &mut Self::FinalJoinHandle,
248        alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
249        index: usize,
250        num_threads: usize,
251        f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
252    );
253}
254/*
255impl<ReturnValue:Send+'static,
256     ExtraInput:Send+'static,
257     Alloc:BrotliAlloc+Send+'static,
258     U:Send+'static+Sync>
259     BatchSpawnableLite<T, Alloc, U> for BatchSpawnable<T, Alloc, U> {
260  type JoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::JoinHandle;
261  type FinalJoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::FinalJoinHandle;
262  fn batch_spawn(
263    &mut self,
264    input: &mut Owned<U>,
265    alloc_per_thread:&mut [SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>],
266    f: fn(usize, usize, &U, Alloc) -> T,
267  ) -> Self::FinalJoinHandle {
268   <Self as BatchSpawnable<ReturnValue, ExtraInput,  Alloc, U>>::batch_spawn(self, input, alloc_per_thread, f)
269  }
270}*/
271
272pub fn CompressMultiSlice<
273    Alloc: BrotliAlloc + Send + 'static,
274    Spawner: BatchSpawnableLite<
275        CompressionThreadResult<Alloc>,
276        UnionHasher<Alloc>,
277        Alloc,
278        (
279            <Alloc as Allocator<u8>>::AllocatedMemory,
280            BrotliEncoderParams,
281        ),
282    >,
283>(
284    params: &BrotliEncoderParams,
285    input_slice: &[u8],
286    output: &mut [u8],
287    alloc_per_thread: &mut [SendAlloc<
288        CompressionThreadResult<Alloc>,
289        UnionHasher<Alloc>,
290        Alloc,
291        Spawner::JoinHandle,
292    >],
293    thread_spawner: &mut Spawner,
294) -> Result<usize, BrotliEncoderThreadError>
295where
296    <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
297    <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
298    <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
299{
300    let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
301        let mut input = <Alloc as Allocator<u8>>::alloc_cell(alloc, input_slice.len());
302        input.slice_mut().clone_from_slice(input_slice);
303        input
304    } else {
305        <Alloc as Allocator<u8>>::AllocatedMemory::default()
306    };
307    let mut owned_input = Owned::new(input);
308    let ret = CompressMulti(
309        params,
310        &mut owned_input,
311        output,
312        alloc_per_thread,
313        thread_spawner,
314    );
315    if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
316        <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
317    }
318    ret
319}
320
321fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
322    ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
323}
324
325fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
326    hasher: UnionHasher<Alloc>,
327    thread_index: usize,
328    num_threads: usize,
329    input_and_params: &(SliceW, BrotliEncoderParams),
330    mut alloc: Alloc,
331) -> CompressionThreadResult<Alloc>
332where
333    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
334{
335    let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
336    let mut mem = <Alloc as Allocator<u8>>::alloc_cell(
337        &mut alloc,
338        BrotliEncoderMaxCompressedSize(range.end - range.start),
339    );
340    let mut state = BrotliEncoderStateStruct::new(alloc);
341    state.params = input_and_params.1.clone();
342    if thread_index != 0 {
343        state.params.catable = true; // make sure we can concatenate this to the other work results
344        state.params.magic_number = false; // no reason to pepper this around
345    }
346    state.params.appendable = true; // make sure we are at least appendable, so that future items can be catted in
347    if thread_index != 0 {
348        state.set_custom_dictionary_with_optional_precomputed_hasher(
349            range.start,
350            &input_and_params.0.slice()[..range.start],
351            hasher,
352        );
353    }
354    let mut out_offset = 0usize;
355    let compression_result;
356    let mut available_out = mem.len();
357    loop {
358        let mut next_in_offset = 0usize;
359        let mut available_in = range.end - range.start;
360        let result = state.compress_stream(
361            BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
362            &mut available_in,
363            &input_and_params.0.slice()[range.clone()],
364            &mut next_in_offset,
365            &mut available_out,
366            mem.slice_mut(),
367            &mut out_offset,
368            &mut None,
369            &mut |_a, _b, _c, _d| (),
370        );
371        let new_range = range.start + next_in_offset..range.end;
372        range = new_range;
373        if result {
374            compression_result = Ok(out_offset);
375            break;
376        } else if available_out == 0 {
377            compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); // mark no space??
378            break;
379        }
380    }
381    BrotliEncoderDestroyInstance(&mut state);
382    match compression_result {
383        Ok(size) => CompressionThreadResult::<Alloc> {
384            compressed: Ok(CompressedFileChunk {
385                data_backing: mem,
386                data_size: size,
387            }),
388            alloc: state.m8,
389        },
390        Err(e) => {
391            <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
392            CompressionThreadResult::<Alloc> {
393                compressed: Err(e),
394                alloc: state.m8,
395            }
396        }
397    }
398}
399
400pub fn CompressMulti<
401    Alloc: BrotliAlloc + Send + 'static,
402    SliceW: SliceWrapper<u8> + Send + 'static + Sync,
403    Spawner: BatchSpawnableLite<
404        CompressionThreadResult<Alloc>,
405        UnionHasher<Alloc>,
406        Alloc,
407        (SliceW, BrotliEncoderParams),
408    >,
409>(
410    params: &BrotliEncoderParams,
411    owned_input: &mut Owned<SliceW>,
412    output: &mut [u8],
413    alloc_per_thread: &mut [SendAlloc<
414        CompressionThreadResult<Alloc>,
415        UnionHasher<Alloc>,
416        Alloc,
417        Spawner::JoinHandle,
418    >],
419    thread_spawner: &mut Spawner,
420) -> Result<usize, BrotliEncoderThreadError>
421where
422    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
423    <Alloc as Allocator<u16>>::AllocatedMemory: Send,
424    <Alloc as Allocator<u32>>::AllocatedMemory: Send,
425{
426    let num_threads = alloc_per_thread.len();
427    let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
428    let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
429    // start thread spawner
430    let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
431    if num_threads > 1 {
432        // spawn first thread without "custom dictionary" while we compute the custom dictionary for other work items
433        thread_spawner.spawn(
434            &mut spawner_and_input,
435            &mut alloc_per_thread[0],
436            0,
437            num_threads,
438            compress_part,
439        );
440    }
441    // populate all hashers at once, cloning them one by one
442    let mut compression_last_thread_result;
443    if num_threads > 1 && params.favor_cpu_efficiency {
444        let mut local_params = params.clone();
445        SanitizeParams(&mut local_params);
446        let mut hasher = UnionHasher::Uninit;
447        HasherSetup(
448            alloc_per_thread[num_threads - 1].0.unwrap_input().0,
449            &mut hasher,
450            &mut local_params,
451            &[],
452            0,
453            0,
454            0,
455        );
456        for thread_index in 1..num_threads {
457            let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
458                let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
459                let overlap = hasher.StoreLookahead().wrapping_sub(1);
460                if range.end - range.start > overlap {
461                    hasher.BulkStoreRange(
462                        input_and_params.0.slice(),
463                        !(0),
464                        if range.start > overlap {
465                            range.start - overlap
466                        } else {
467                            0
468                        },
469                        range.end - overlap,
470                    );
471                }
472            });
473            if let Err(_e) = res {
474                return Err(BrotliEncoderThreadError::OtherThreadPanic);
475            }
476            if thread_index + 1 != num_threads {
477                {
478                    let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
479                    *out_hasher = hasher.clone_with_alloc(alloc);
480                }
481                thread_spawner.spawn(
482                    &mut spawner_and_input,
483                    &mut alloc_per_thread[thread_index],
484                    thread_index,
485                    num_threads,
486                    compress_part,
487                );
488            }
489        }
490        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
491        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
492        compress_part(hasher,
493                      num_threads - 1,
494                      num_threads,
495                      input_and_params,
496                      alloc,
497        )
498      });
499    } else {
500        if num_threads > 1 {
501            for thread_index in 1..num_threads - 1 {
502                thread_spawner.spawn(
503                    &mut spawner_and_input,
504                    &mut alloc_per_thread[thread_index],
505                    thread_index,
506                    num_threads,
507                    compress_part,
508                );
509            }
510        }
511        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
512        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
513        compress_part(UnionHasher::Uninit,
514                      num_threads - 1,
515                      num_threads,
516                      input_and_params,
517                      alloc,
518        )
519      });
520    }
521    let mut compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
522    let mut out_file_size = 0usize;
523    let mut bro_cat_li = BroCatli::new();
524    for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
525        let mut cur_result = if index + 1 == num_threads {
526            match mem::replace(&mut compression_last_thread_result, Err(())) {
527                Ok(result) => result,
528                Err(_err) => return Err(BrotliEncoderThreadError::OtherThreadPanic),
529            }
530        } else {
531            match mem::replace(
532                &mut thread.0,
533                InternalSendAlloc::SpawningOrJoining(PhantomData),
534            ) {
535                InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
536                    panic!("Thread not properly spawned")
537                }
538                InternalSendAlloc::Join(join) => match join.join() {
539                    Ok(result) => result,
540                    Err(err) => {
541                        return Err(err);
542                    }
543                },
544            }
545        };
546        match cur_result.compressed {
547            Ok(compressed_out) => {
548                bro_cat_li.new_brotli_file();
549                let mut in_offset = 0usize;
550                let cat_result = bro_cat_li.stream(
551                    &compressed_out.data_backing.slice()[..compressed_out.data_size],
552                    &mut in_offset,
553                    output,
554                    &mut out_file_size,
555                );
556                match cat_result {
557                    BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
558                        compression_result = Ok(out_file_size);
559                    }
560                    BroCatliResult::NeedsMoreOutput => {
561                        compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
562                        // not enough space
563                    }
564                    err => {
565                        compression_result = Err(BrotliEncoderThreadError::ConcatenationError(err));
566                        // misc error
567                    }
568                }
569                <Alloc as Allocator<u8>>::free_cell(
570                    &mut cur_result.alloc,
571                    compressed_out.data_backing,
572                );
573            }
574            Err(e) => {
575                compression_result = Err(e);
576            }
577        }
578        thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
579    }
580    compression_result?;
581    match bro_cat_li.finish(output, &mut out_file_size) {
582        BroCatliResult::Success => compression_result = Ok(out_file_size),
583        err => {
584            compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
585                err,
586            ))
587        }
588    }
589    if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
590        *owned_input = Owned::new(retrieved_owned_input.0); // return the input to its rightful owner before returning
591    } else if compression_result.is_ok() {
592        compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
593    }
594    compression_result
595}