Concurrent Map with Limit

Concurrent map operation with configurable parallelism limit to prevent resource exhaustion.

← Back
Language: swift | Tags: concurrency async collection parallel
extension Sequence where Element: Sendable {
    /// Concurrent map with limited parallelism
    func concurrentMap<T: Sendable>(
        maxConcurrency: Int = 4,
        _ transform: @Sendable @escaping (Element) async throws -> T
    ) async throws -> [T] {
        try await withThrowingTaskGroup(of: (Int, T).self) { group in
            var results: [T?] = Array(repeating: nil, count: Array(self).count)
            var index = 0
            var iterator = makeIterator()

            // Start initial batch
            for _ in 0..<maxConcurrency {
                guard let element = iterator.next() else { break }
                let currentIndex = index
                group.addTask {
                    let result = try await transform(element)
                    return (currentIndex, result)
                }
                index += 1
            }

            // Process remaining with backpressure
            while let (completedIndex, result) = try await group.next() {
                results[completedIndex] = result
                if let element = iterator.next() {
                    let currentIndex = index
                    group.addTask {
                        let result = try await transform(element)
                        return (currentIndex, result)
                    }
                    index += 1
                }
            }

            return results.compactMap { $0 }
        }
    }
}

// Usage:
// let urls = [url1, url2, url3, url4, url5]
// let results = try await urls.concurrentMap(maxConcurrency: 3) { url in
//     try await fetchData(from: url)
// }