Skip to content

Commit

Permalink
feat(amqp): make amqp v2 compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaudgirard committed Feb 28, 2024
1 parent ffe058e commit 626b630
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
24 changes: 21 additions & 3 deletions src/AmqpBundle/Amqp/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ public function ackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): bool
$this->eventDispatcher->dispatch($ackEvent,AckEvent::NAME);
}

return $this->call($this->queue, 'ack', [$deliveryTag, $flags]);
$isAcked = $this->call($this->queue, 'ack', [$deliveryTag, $flags]);

if ($isAcked === null) {
return true;
}

return $isAcked;
}

/**
Expand All @@ -83,7 +89,13 @@ public function nackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): boo
$this->eventDispatcher->dispatch($nackEvent, NackEvent::NAME);
}

return $this->call($this->queue, 'nack', [$deliveryTag, $flags]);
$isNacked = $this->call($this->queue, 'nack', [$deliveryTag, $flags]);

if ($isNacked === null) {
return true;
}

return $isNacked;
}

/**
Expand All @@ -100,7 +112,13 @@ public function purge(): bool
$this->eventDispatcher->dispatch($purgeEvent, PurgeEvent::NAME);
}

return $this->call($this->queue, 'purge');
$isPurged = $this->call($this->queue, 'purge');

if ($isPurged === null) {
return true;
}

return $isPurged;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/AmqpBundle/Amqp/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ public function publishMessage(string $message, int $flags = AMQP_NOPARAM, array
}

if (!$routingKeys) {
return $this->call($this->exchange, 'publish', [$message, null, $flags, $attributes]);
return $this->call($this->exchange, 'publish', [$message, null, $flags, $attributes]) === null;
}

// Publish the message for each routing keys
$success = true;
foreach ($routingKeys as $routingKey) {
$success &= $this->call($this->exchange, 'publish', [$message, $routingKey, $flags, $attributes]);
$success &= $this->call($this->exchange, 'publish', [$message, $routingKey, $flags, $attributes]) === null;
}

return (bool) $success;
Expand Down
4 changes: 2 additions & 2 deletions src/AmqpBundle/Tests/Units/Amqp/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ public function testPurge()
->and($queue = $this->getQueue($msgList))
->and($consumer = new Base($queue, []))
// Purge the queue
->boolean($consumer->purge())
->isTrue()
->integer($consumer->purge())
->isEqualTo(1)
->mock($queue)
->call('purge')
->once()
Expand Down

0 comments on commit 626b630

Please sign in to comment.