Browse Source

sketch instantiation of iterator adapter #(67)

pull/80/head
Sebastian Thiel 1 month ago
parent
commit
a3083ad3aa
No known key found for this signature in database GPG Key ID: 9CB5EE7895E8268B
  1. 8
      Makefile
  2. 10
      git-features/Cargo.toml
  3. 6
      git-features/src/parallel/mod.rs
  4. 54
      git-features/src/parallel/serial.rs
  5. 18
      git-features/tests/parallel_shared.rs

8
Makefile

@ -57,7 +57,11 @@ lint: ## Run lints with clippy
##@ Testing
tests: clippy check unit-tests journey-tests-small journey-tests ## run all tests, including journey tests
tests: clippy check doc unit-tests journey-tests-small journey-tests ## run all tests, including journey tests, try building docs
doc: ## Run cargo doc on all crates
cargo doc
cargo doc --all-features
clippy: ## Run cargo clippy on all crates
cargo clippy --all
@ -97,7 +101,7 @@ check: ## Build all code in suitable configurations
unit-tests: ## run all unit tests
cargo test --all --no-fail-fast
cd git-features && cargo test && cargo test --features fast-sha1
cd git-features && cargo test && cargo test --all-features
cd git-transport && cargo test && cargo test --features http-client-curl
cd gitoxide-core && cargo test --lib

10
git-features/Cargo.toml

@ -22,11 +22,11 @@ io-pipe = ["bytes"]
name = "parallel"
path = "tests/parallel.rs"
required-features = ["parallel"]
[[test]]
name = "parallel_shared"
path = "tests/parallel_shared.rs"
required-features = ["parallel"]
#
#[[test]]
#name = "parallel_shared"
#path = "tests/parallel_shared.rs"
#required-features = ["parallel"]
[[test]]
name = "parallel_shared_in_serial"

6
git-features/src/parallel/mod.rs

@ -99,8 +99,14 @@ pub trait Reducer {
/// The type fed to the reducer in the [`feed()`][Reducer::feed()] method.
type Input;
/// The type produced in Ok(…) by [`feed()`][Reducer::feed()].
/// Most reducers by nature use `()` here as the value is in the aggregation.
/// However, some may use it to collect statistics only and return their Input
/// in some form as a result here for [`SteppedReduce`] to be useful.
type FeedProduce;
/// The type produced once by the [`finalize()`][Reducer::finalize()] method.
///
/// For traditional reducers, this is the value produced by the entire operation.
/// For those made for step-wise iteration this may be aggregated statistics.
type Output;
/// The error type to use for all methods of this trait.
type Error;

54
git-features/src/parallel/serial.rs

@ -35,4 +35,56 @@ where
reducer.finalize()
}
// pub struct SteppedReduce
/// An iterator adaptor to allow running computations using [`in_parallel()`] in a step-wise manner.
#[cfg(not(feature = "parallel"))]
pub struct SteppedReduce<Input, ConsumeFn, ThreadStateFn, Reducer> {
input: Input,
consume: ConsumeFn,
new_thread_state: ThreadStateFn,
reducer: Reducer,
}
#[cfg(not(feature = "parallel"))]
impl<Input, ConsumeFn, ThreadStateFn, Reducer, I, O, S> SteppedReduce<Input, ConsumeFn, ThreadStateFn, Reducer>
where
Input: Iterator<Item = I> + Send,
ThreadStateFn: Fn(usize) -> S + Send + Sync,
ConsumeFn: Fn(I, &mut S) -> O + Send + Sync,
Reducer: crate::parallel::Reducer<Input = O>,
I: Send,
O: Send,
{
/// Instantiate a new iterator.
pub fn new(
input: Input,
_thread_limit: Option<usize>,
new_thread_state: ThreadStateFn,
consume: ConsumeFn,
reducer: Reducer,
) -> Self {
SteppedReduce {
input,
consume,
new_thread_state,
reducer,
}
}
}
#[cfg(not(feature = "parallel"))]
impl<Input, ConsumeFn, ThreadStateFn, Reducer, I, O, S> Iterator
for SteppedReduce<Input, ConsumeFn, ThreadStateFn, Reducer>
where
Input: Iterator<Item = I> + Send,
ThreadStateFn: Fn(usize) -> S + Send + Sync,
ConsumeFn: Fn(I, &mut S) -> O + Send + Sync,
Reducer: crate::parallel::Reducer<Input = O>,
I: Send,
O: Send,
{
type Item = Result<Reducer::FeedProduce, Reducer::Error>;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
todo!()
}
}

18
git-features/tests/parallel_shared.rs

@ -1,3 +1,4 @@
//! Tests that are working similarly in parallel and serial mode
use git_features::parallel;
#[derive(Default)]
@ -33,3 +34,20 @@ fn parallel_add() {
.expect("successful computation");
assert_eq!(res, 100);
}
#[test]
fn stepped_reduce() {
let mut iter = parallel::SteppedReduce::new(
std::iter::from_fn(|| Some(1)).take(100),
None,
|_n| (),
|input, _state| input,
Adder::default(),
);
let mut aggregate = 0;
for value in iter.by_ref() {
aggregate += value.expect("success");
}
assert_eq!(aggregate, 100);
}
Loading…
Cancel
Save