brotli/enc/
singlethreading.rs

1use super::backward_references::UnionHasher;
2use alloc::{Allocator, SliceWrapper};
3use core::marker::PhantomData;
4use core::mem;
5use enc::threading::{
6    BatchSpawnable, BatchSpawnableLite, BrotliEncoderThreadError, CompressMulti,
7    CompressionThreadResult, InternalOwned, InternalSendAlloc, Joinable, Owned, OwnedRetriever,
8    PoisonedThreadError, SendAlloc,
9};
10use enc::BrotliAlloc;
11use enc::BrotliEncoderParams;
12
13pub struct SingleThreadedJoinable<T: Send + 'static, U: Send + 'static> {
14    result: Result<T, U>,
15}
16impl<T: Send + 'static, U: Send + 'static> Joinable<T, U> for SingleThreadedJoinable<T, U> {
17    fn join(self) -> Result<T, U> {
18        self.result
19    }
20}
21#[cfg(feature = "std")]
22pub struct SingleThreadedOwnedRetriever<U: Send + 'static>(std::sync::RwLock<U>);
23#[cfg(feature = "std")]
24impl<U: Send + 'static> OwnedRetriever<U> for SingleThreadedOwnedRetriever<U> {
25    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
26        Ok(f(&*self.0.read().unwrap()))
27    }
28    fn unwrap(self) -> Result<U, PoisonedThreadError> {
29        Ok(self.0.into_inner().unwrap())
30    }
31}
32#[cfg(feature = "std")]
33impl<U: Send + 'static> SingleThreadedOwnedRetriever<U> {
34    fn new(u: U) -> Self {
35        SingleThreadedOwnedRetriever(std::sync::RwLock::new(u))
36    }
37}
38
39#[cfg(not(feature = "std"))]
40pub struct SingleThreadedOwnedRetriever<U: Send + 'static>(U);
41#[cfg(not(feature = "std"))]
42impl<U: Send + 'static> SingleThreadedOwnedRetriever<U> {
43    fn new(u: U) -> Self {
44        SingleThreadedOwnedRetriever(u)
45    }
46}
47#[cfg(not(feature = "std"))]
48impl<U: Send + 'static> OwnedRetriever<U> for SingleThreadedOwnedRetriever<U> {
49    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
50        Ok(f(&self.0))
51    }
52    fn unwrap(self) -> Result<U, PoisonedThreadError> {
53        Ok(self.0)
54    }
55}
56
57#[derive(Default)]
58pub struct SingleThreadedSpawner {}
59
60impl<
61        ReturnValue: Send + 'static,
62        ExtraInput: Send + 'static,
63        Alloc: BrotliAlloc + Send + 'static,
64        U: Send + 'static + Sync,
65    > BatchSpawnable<ReturnValue, ExtraInput, Alloc, U> for SingleThreadedSpawner
66where
67    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
68{
69    type JoinHandle = SingleThreadedJoinable<ReturnValue, BrotliEncoderThreadError>;
70    type FinalJoinHandle = SingleThreadedOwnedRetriever<U>;
71    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle {
72        SingleThreadedOwnedRetriever::<U>::new(
73            mem::replace(input, Owned(InternalOwned::Borrowed)).unwrap(),
74        )
75    }
76    fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
77        &mut self,
78        handle: &mut Self::FinalJoinHandle,
79        work: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
80        index: usize,
81        num_threads: usize,
82        f: F,
83    ) {
84        let (alloc, extra_input) = work.replace_with_default();
85        let ret = handle.view(|sub_view| f(extra_input, index, num_threads, sub_view, alloc));
86        *work = SendAlloc(InternalSendAlloc::Join(SingleThreadedJoinable {
87            result: Ok(ret.unwrap()),
88        }));
89    }
90}
91
92impl<
93        ReturnValue: Send + 'static,
94        ExtraInput: Send + 'static,
95        Alloc: BrotliAlloc + Send + 'static,
96        U: Send + 'static + Sync,
97    > BatchSpawnableLite<ReturnValue, ExtraInput, Alloc, U> for SingleThreadedSpawner
98where
99    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
100{
101    type JoinHandle =
102        <SingleThreadedSpawner as BatchSpawnable<ReturnValue, ExtraInput, Alloc, U>>::JoinHandle;
103    type FinalJoinHandle = <SingleThreadedSpawner as BatchSpawnable<
104        ReturnValue,
105        ExtraInput,
106        Alloc,
107        U,
108    >>::FinalJoinHandle;
109
110    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle {
111        <Self as BatchSpawnable<ReturnValue, ExtraInput, Alloc, U>>::make_spawner(self, input)
112    }
113    fn spawn(
114        &mut self,
115        handle: &mut Self::FinalJoinHandle,
116        alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
117        index: usize,
118        num_threads: usize,
119        f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
120    ) {
121        <Self as BatchSpawnable<ReturnValue, ExtraInput, Alloc, U>>::spawn(
122            self,
123            handle,
124            alloc_per_thread,
125            index,
126            num_threads,
127            f,
128        )
129    }
130}
131
132pub fn compress_multi<
133    Alloc: BrotliAlloc + Send + 'static,
134    SliceW: SliceWrapper<u8> + Send + 'static + Sync,
135>(
136    params: &BrotliEncoderParams,
137    owned_input: &mut Owned<SliceW>,
138    output: &mut [u8],
139    alloc_per_thread: &mut [SendAlloc<
140        CompressionThreadResult<Alloc>,
141        UnionHasher<Alloc>,
142        Alloc,
143        <SingleThreadedSpawner as BatchSpawnable<
144            CompressionThreadResult<Alloc>,
145            UnionHasher<Alloc>,
146            Alloc,
147            SliceW,
148        >>::JoinHandle,
149    >],
150) -> Result<usize, BrotliEncoderThreadError>
151where
152    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
153    <Alloc as Allocator<u16>>::AllocatedMemory: Send,
154    <Alloc as Allocator<u32>>::AllocatedMemory: Send,
155{
156    CompressMulti(
157        params,
158        owned_input,
159        output,
160        alloc_per_thread,
161        &mut SingleThreadedSpawner::default(),
162    )
163}
164
165pub struct WorkerPool<A, B, C, D> {
166    a: PhantomData<A>,
167    b: PhantomData<B>,
168    c: PhantomData<C>,
169    d: PhantomData<D>,
170}
171pub fn new_work_pool<A, B, C, D>(_num_threads: usize) -> WorkerPool<A, B, C, D> {
172    WorkerPool::<A, B, C, D> {
173        a: PhantomData,
174        b: PhantomData,
175        c: PhantomData,
176        d: PhantomData,
177    }
178}
179
180pub fn compress_worker_pool<
181    Alloc: BrotliAlloc + Send + 'static,
182    SliceW: SliceWrapper<u8> + Send + 'static + Sync,
183>(
184    params: &BrotliEncoderParams,
185    owned_input: &mut Owned<SliceW>,
186    output: &mut [u8],
187    alloc_per_thread: &mut [SendAlloc<
188        CompressionThreadResult<Alloc>,
189        UnionHasher<Alloc>,
190        Alloc,
191        <SingleThreadedSpawner as BatchSpawnable<
192            CompressionThreadResult<Alloc>,
193            UnionHasher<Alloc>,
194            Alloc,
195            SliceW,
196        >>::JoinHandle,
197    >],
198    _worker_pool: &mut WorkerPool<
199        CompressionThreadResult<Alloc>,
200        UnionHasher<Alloc>,
201        Alloc,
202        (SliceW, BrotliEncoderParams),
203    >,
204) -> Result<usize, BrotliEncoderThreadError>
205where
206    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
207    <Alloc as Allocator<u16>>::AllocatedMemory: Send,
208    <Alloc as Allocator<u32>>::AllocatedMemory: Send,
209{
210    compress_multi(params, owned_input, output, alloc_per_thread)
211}