Skip to content

Rivulet.Core

Safe, async-first parallel operators with bounded concurrency, retries, and backpressure for I/O-heavy workloads.

Transform collections in parallel while maintaining control over concurrency, errors, and resource usage.

Installation

dotnet add package Rivulet.Core

Quick Start

Parallel Transformation

using Rivulet.Core;

var urls = new[] { "https://api.example.com/1", "https://api.example.com/2", /* ... */ };

var results = await urls.SelectParallelAsync(
    async (url, ct) =>
    {
        using var response = await httpClient.GetAsync(url, ct);
        response.EnsureSuccessStatusCode();
        return await response.Content.ReadAsStringAsync(ct);
    },
    new ParallelOptionsRivulet
    {
        MaxDegreeOfParallelism = 32,
        MaxRetries = 3,
        IsTransient = ex => ex is HttpRequestException or TaskCanceledException,
        ErrorMode = ErrorMode.CollectAndContinue
    });

Streaming Results

Process results as they complete instead of waiting for all:

await foreach (var result in source.SelectParallelStreamAsync(
    async (item, ct) => await ProcessAsync(item, ct),
    new ParallelOptionsRivulet { MaxDegreeOfParallelism = 16 }))
{
    // Handle result immediately as it completes
    Console.WriteLine(result);
}

Parallel Side Effects

Execute actions in parallel without collecting results:

await items.ForEachParallelAsync(
    async (item, ct) => await SaveToDbAsync(item, ct),
    new ParallelOptionsRivulet
    {
        MaxDegreeOfParallelism = 10,
        ErrorMode = ErrorMode.FailFast
    });

Batch Processing

Process items in batches for efficient bulk operations:

// Materialize results - process 100 items at a time
var results = await items.BatchParallelAsync(
    batchSize: 100,
    async (batch, ct) =>
    {
        // Process entire batch together (e.g., bulk database insert)
        await BulkInsertAsync(batch, ct);
        return batch.Count();
    },
    new ParallelOptionsRivulet
    {
        MaxDegreeOfParallelism = 5
    });

// Streaming results - process batches as they're ready
await foreach (var batchResult in items.BatchParallelStreamAsync(
    batchSize: 50,
    async (batch, ct) =>
    {
        await ProcessBatchAsync(batch, ct);
        return batch.Count();
    },
    new ParallelOptionsRivulet { MaxDegreeOfParallelism = 10 },
    batchTimeout: TimeSpan.FromSeconds(5))) // Flush partial batch after timeout
{
    Console.WriteLine($"Processed batch: {batchResult} items");
}

Ordered Output

Maintain input order for sequence-sensitive operations:

// ETL pipeline where order matters for downstream processing
var orderedResults = await records.SelectParallelAsync(
    async (record, ct) => await TransformAsync(record, ct),
    new ParallelOptionsRivulet
    {
        MaxDegreeOfParallelism = 32,
        OrderedOutput = true  // Results match input order despite parallel processing
    });

// Streaming with ordered output
await foreach (var result in source.SelectParallelStreamAsync(
    async (x, ct) => await ProcessAsync(x, ct),
    new ParallelOptionsRivulet { OrderedOutput = true }))
{
    // Results arrive in input sequence order
}

Key Features

  • Bounded Concurrency - Control max parallel operations with backpressure
  • Adaptive Concurrency - Auto-scale workers based on latency and success rate (AIMD algorithm)
  • Retry Policies - Automatic retries with exponential backoff for transient errors
  • Circuit Breaker - Prevent cascading failures with automatic service protection
  • Rate Limiting - Token bucket algorithm for controlling operation rates
  • Error Handling Modes - FailFast, CollectAndContinue, or BestEffort
  • Streaming Support - Process results incrementally via IAsyncEnumerable<T>
  • Ordered Output - Maintain input sequence order when needed
  • Runtime Metrics - Built-in monitoring via EventCounters and custom callbacks
  • Progress Reporting - Periodic snapshots with throughput, ETA, and percent-complete
  • Cancellation - Full CancellationToken support throughout
  • Lifecycle Hooks - OnStart, OnComplete, OnRetry, OnError, OnThrottle, OnDrain callbacks
  • Fallback Values - Supply default results for failed items instead of throwing
  • Per-Item Timeouts - Enforce timeouts for individual operations
  • Works with both IEnumerable<T> and IAsyncEnumerable<T>

API

  • SelectParallelAsync - Process items and collect results
  • SelectParallelStreamAsync - Stream results as they complete
  • ForEachParallelAsync - Fire-and-forget parallel processing
  • BatchParallelAsync - Process items in configurable batches
  • BatchParallelStreamAsync - Stream processed items in configurable batches

Configuration Options

new ParallelOptionsRivulet
{
    // Concurrency control
    MaxDegreeOfParallelism = 32,        // Max concurrent operations (default: CPU cores)
    ChannelCapacity = 1024,              // Backpressure buffer size (streaming only)
    OrderedOutput = false,               // Return results in input order (default: false)

    // Adaptive concurrency (auto-scale workers based on performance)
    AdaptiveConcurrency = new AdaptiveConcurrencyOptions
    {
        MinConcurrency = 1,
        MaxConcurrency = 32,
        TargetLatency = TimeSpan.FromMilliseconds(100),
        MinSuccessRate = 0.95
    },

    // Error handling
    ErrorMode = ErrorMode.CollectAndContinue,  // How to handle failures
    OnErrorAsync = async (index, ex) => { /* ... */ return true; },

    // Retry policy
    MaxRetries = 3,                      // Number of retry attempts
    IsTransient = ex => ex is HttpRequestException,  // Which errors to retry
    BaseDelay = TimeSpan.FromMilliseconds(100),     // Exponential backoff base
    BackoffStrategy = BackoffStrategy.ExponentialJitter,

    // Circuit breaker (fail-fast when service is unhealthy)
    // Throws CircuitBreakerOpenException when circuit is open
    CircuitBreaker = new CircuitBreakerOptions
    {
        FailureThreshold = 5,
        SuccessThreshold = 2,
        OpenTimeout = TimeSpan.FromSeconds(30)
    },

    // Rate limiting (token bucket algorithm)
    RateLimit = new RateLimitOptions
    {
        TokensPerSecond = 100,
        BurstCapacity = 200
    },

    // Timeouts
    PerItemTimeout = TimeSpan.FromSeconds(30),  // Timeout per item

    // Lifecycle hooks
    OnStartItemAsync = async (index) => { /* ... */ },
    OnCompleteItemAsync = async (index) => { /* ... */ },
    OnRetryAsync = async (index, attempt, ex) => { /* called before backoff delay */ },
    OnThrottleAsync = async (count) => { /* ... */ },
    OnDrainAsync = async (count) => { /* ... */ },

    // Fallback value when all retries are exhausted (keeps result count == input count)
    OnFallback = (index, ex) => ex is TimeoutException ? (object?)"timeout" : null,

    // Progress reporting (ETL jobs, bulk imports)
    Progress = new ProgressOptions
    {
        ReportInterval = TimeSpan.FromSeconds(5),
        OnProgress = async snapshot =>
        {
            Console.WriteLine($"{snapshot.PercentComplete:F1}% — {snapshot.ItemsPerSecond:F0} items/s");
        }
    },

    // Runtime metrics (export to Prometheus, App Insights, DataDog, etc.)
    Metrics = new MetricsOptions
    {
        SampleInterval = TimeSpan.FromSeconds(10),
        OnMetricsSample = async snapshot => { /* export snapshot */ }
    }
}

Error Modes

  • FailFast - Stop immediately on first error and throw
  • CollectAndContinue - Continue processing, collect all errors, throw AggregateException at end
  • BestEffort - Continue processing, return successful results only, suppress errors

Framework Support

  • .NET 8.0
  • .NET 9.0

Documentation & Source

License

MIT License - see LICENSE for details


Made with ❤️ by Jeffeek | NuGet | GitHub