Skip to content

Commit

Permalink
feat(amqp): make amqp v2 compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaudgirard committed Feb 28, 2024
1 parent ffe058e commit 83e9a62
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
with:
php-version: ${{ matrix.php-version }}
coverage: xdebug2
extensions: amqp
extensions: amqp-2.1.2
- name: Install symfony version from matrix
env:
SYMFONY_VERSION: ${{ matrix.symfony-version }}
Expand Down
24 changes: 21 additions & 3 deletions src/AmqpBundle/Amqp/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ public function ackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): bool
$this->eventDispatcher->dispatch($ackEvent,AckEvent::NAME);
}

return $this->call($this->queue, 'ack', [$deliveryTag, $flags]);
$isAcked = $this->call($this->queue, 'ack', [$deliveryTag, $flags]);

if ($isAcked === null) {
return true;
}

return $isAcked;
}

/**
Expand All @@ -83,7 +89,13 @@ public function nackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): boo
$this->eventDispatcher->dispatch($nackEvent, NackEvent::NAME);
}

return $this->call($this->queue, 'nack', [$deliveryTag, $flags]);
$isNacked = $this->call($this->queue, 'nack', [$deliveryTag, $flags]);

if ($isNacked === null) {
return true;
}

return $isNacked;
}

/**
Expand All @@ -100,7 +112,13 @@ public function purge(): bool
$this->eventDispatcher->dispatch($purgeEvent, PurgeEvent::NAME);
}

return $this->call($this->queue, 'purge');
$isPurged = $this->call($this->queue, 'purge');

if ($isPurged === null) {
return true;
}

return $isPurged;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/AmqpBundle/Amqp/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ public function publishMessage(string $message, int $flags = AMQP_NOPARAM, array
}

if (!$routingKeys) {
return $this->call($this->exchange, 'publish', [$message, null, $flags, $attributes]);
return $this->call($this->exchange, 'publish', [$message, null, $flags, $attributes]) === null;
}

// Publish the message for each routing keys
$success = true;
foreach ($routingKeys as $routingKey) {
$success &= $this->call($this->exchange, 'publish', [$message, $routingKey, $flags, $attributes]);
$success &= $this->call($this->exchange, 'publish', [$message, $routingKey, $flags, $attributes]) === null;
}

return (bool) $success;
Expand Down
4 changes: 2 additions & 2 deletions src/AmqpBundle/Tests/Units/Amqp/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ public function testPurge()
->and($queue = $this->getQueue($msgList))
->and($consumer = new Base($queue, []))
// Purge the queue
->boolean($consumer->purge())
->isTrue()
->integer($consumer->purge())
->isEqualTo(1)
->mock($queue)
->call('purge')
->once()
Expand Down
4 changes: 2 additions & 2 deletions src/AmqpBundle/Tests/Units/Factory/Mock/MockAMQPChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public function qos($prefetchSize, $prefetchCount, $global = NULL)
{
}

public function setPrefetchSize($count){
public function setPrefetchSize(int $size): void{

}

public function setPrefetchCount($count){
public function setPrefetchCount(int $count): void {

}
}

0 comments on commit 83e9a62

Please sign in to comment.