Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ BC Breaks changelog

- Drop support for php 5.6 & 7.0
- Worker now catch Throwable, not only Exceptions
- WritableMessage::packAttributes now only accept timestamp or null (not false anymore)

**3.x -> 4.x**

Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"puzzle/configuration" : ">=3.0",
"puzzle/pieces" : "~2.2",
"symfony/console": "~3.2",
"swarrot/swarrot": "~2.3",
"swarrot/swarrot": "~3.0",
"knplabs/gaufrette": "~0.2",
"symfony/event-dispatcher": "~3.2",
"puzzle/uuid": "^2.0.1"
Expand All @@ -39,7 +39,7 @@
"empi89/php-amqp-stubs": "dev-master",
"puzzle/assert": "~1.1",
"pimple/pimple": "~3.0",
"odolbeau/rabbit-mq-admin-toolkit": "~3.0",
"odolbeau/rabbit-mq-admin-toolkit": "~4.0",
"behat/behat": "~3.3",
"alchemy/rabbitmq-management-client": "dev-retrieve-messages@dev",
"symfony/debug": "~3.2",
Expand Down
2 changes: 1 addition & 1 deletion features/bootstrap/ConsumeContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function() {
$consumer->consume($processor, $this->client, $workerContext);
}

public function process(ReadableMessage $message)
public function process(ReadableMessage $message): void
{
$this->consumedMessages[] = $message;
}
Expand Down
21 changes: 3 additions & 18 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,13 @@

interface Client extends LoggerAwareInterface
{
/**
* @return boolean
*/
public function publish($exchangeName, WritableMessage $message);
public function publish(string $exchangeName, WritableMessage $message): bool;

/**
* @return \AMQPQueue
*/
public function getQueue($queueName);
public function getQueue(string $queueName): \AMQPQueue;

/**
* @return \AMQPExchange
*/
public function getExchange($exchangeName);
public function getExchange(?string $exchangeName = null, string $type = AMQP_EX_TYPE_TOPIC): \AMQPExchange;

/**
* @return self
*/
public function setMessageProcessors(array $processors);

/**
* @return self
*/
public function appendMessageProcessor(Processor $processor);
}
8 changes: 5 additions & 3 deletions src/Clients/ChunkedMessageClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ChunkedMessageClient
$client,
$memory;

public function __construct(Client $client, MemoryManagementStrategy $memory = null)
public function __construct(Client $client, ?MemoryManagementStrategy $memory = null)
{
$this->changeRoutingKeyPrefix(self::DEFAULT_ROUTING_KEY_PREFIX);

Expand All @@ -30,7 +30,7 @@ public function __construct(Client $client, MemoryManagementStrategy $memory = n
$this->client = $client;
}

public function changeRoutingKeyPrefix($prefix)
public function changeRoutingKeyPrefix(string $prefix): void
{
if(is_string($prefix))
{
Expand All @@ -40,7 +40,7 @@ public function changeRoutingKeyPrefix($prefix)
}
}

public function publish($exchangeName, WritableMessage $chunkedMessage)
public function publish(string $exchangeName, WritableMessage $chunkedMessage): bool
{
$streamedContent = $chunkedMessage->getBodyInTransportFormat();

Expand Down Expand Up @@ -77,5 +77,7 @@ public function publish($exchangeName, WritableMessage $chunkedMessage)

$this->memory->manage($size);
}

return true;
}
}
14 changes: 7 additions & 7 deletions src/Clients/Decorators/PrefixedExchangesClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ class PrefixedExchangesClient implements Client
$client,
$exchangesPrefix;

public function __construct(Client $client, $exchangesPrefix)
public function __construct(Client $client, ?string $exchangesPrefix)
{
$this->client = $client;
$this->exchangesPrefix = $exchangesPrefix;
}

public function publish($exchangeName, WritableMessage $message)
public function publish(string $exchangeName, WritableMessage $message): bool
{
return $this->client->publish(
$this->computeExchangeName($exchangeName),
$message
);
}

private function computeExchangeName($exchangeName)
private function computeExchangeName(string $exchangeName): string
{
$exchangeParts = [];

Expand All @@ -44,18 +44,18 @@ private function computeExchangeName($exchangeName)
$exchangeParts[] = $exchangeName;

return trim(implode(self::DELIMITER, $exchangeParts));

}

public function getQueue($queueName)
public function getQueue(string $queueName): \AMQPQueue
{
return $this->client->getQueue($queueName);
}

public function getExchange($exchangeName)
public function getExchange(?string $exchangeName = null, string $type = AMQP_EX_TYPE_TOPIC): \AMQPExchange
{
return $this->client->getExchange(
$this->computeExchangeName($exchangeName)
$this->computeExchangeName($exchangeName),
$type
);
}

Expand Down
12 changes: 6 additions & 6 deletions src/Clients/Decorators/PrefixedQueuesClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,30 @@ class PrefixedQueuesClient implements Client
$client,
$queueNamePrefix;

public function __construct(Client $client, $queueNamePrefix)
public function __construct(Client $client, ?string $queueNamePrefix)
{
$this->client = $client;
$this->queueNamePrefix = $queueNamePrefix;
}

public function publish($exchangeName, WritableMessage $message)
public function publish(string $exchangeName, WritableMessage $message): bool
{
return $this->client->publish($exchangeName, $message);
}

public function getQueue($queueName)
public function getQueue(string $queueName): \AMQPQueue
{
$prefixedQueueName = $this->computePrefixedQueueName($queueName);

return $this->client->getQueue($prefixedQueueName);
}

public function getExchange($exchangeName)
public function getExchange(?string $exchangeName = null, string $type = AMQP_EX_TYPE_TOPIC): \AMQPExchange
{
return $this->client->getExchange($exchangeName);
return $this->client->getExchange($exchangeName, $type);
}

private function computePrefixedQueueName($queueName)
public function computePrefixedQueueName(string $queueName): string
{
$queueNameParts = [];

Expand Down
20 changes: 10 additions & 10 deletions src/Clients/InMemory.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,52 @@ class InMemory implements Client

public function __construct()
{
$this->sentMessages = array();
$this->sentMessages = [];
$this->logger = new NullLogger();
}

public function publish($exchangeName, WritableMessage $message)
public function publish(string $exchangeName, WritableMessage $message): bool
{
$this->updateMessageAttributes($message);
$this->saveMessage($exchangeName, $message);

return true;
}

private function updateMessageAttributes(WritableMessage $message)
private function updateMessageAttributes(WritableMessage $message): void
{
$message->setAttribute('app_id', 'memory');
$message->addHeader('routing_key', $message->getRoutingKey());

$this->onPublish($message);
}

public function getQueue($queueName)
public function getQueue(string $queueName): \AMQPQueue
{
throw new \RuntimeException('This AMQP Client must be used only for sending purpose');
}

public function getExchange($exchangeName)
public function getExchange(?string $exchangeName = null, string $type = AMQP_EX_TYPE_TOPIC): \AMQPExchange
{
throw new \RuntimeException('This AMQP Client must be used only for sending purpose');
}

private function saveMessage($exchangeName, WritableMessage $message)
{
$this->sentMessages[] = array(
$this->sentMessages[] = [
'exchange' => $exchangeName,
'message' => $message
);
];
}

public function getSentMessages()
public function getSentMessages(): array
{
return $this->sentMessages;
}

public function dropSentMessages()
public function dropSentMessages(): self
{
$this->sentMessages = array();
$this->sentMessages = [];

return $this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

class NullMemoryManagementStrategy implements MemoryManagementStrategy
{
public function init()
public function init(): void
{
}

public function manage($sentSize)
public function manage(int $sentSize): void
{
}
}
4 changes: 2 additions & 2 deletions src/Clients/MemoryManagementStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

interface MemoryManagementStrategy
{
public function init();
public function manage($sentSize);
public function init(): void;
public function manage(int $sentSize): void;
}
18 changes: 9 additions & 9 deletions src/Clients/Pecl.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ public function __construct(Configuration $configuration)
$this->logger = new NullLogger();
}

public function setMemoryManagementStrategy(MemoryManagementStrategy $strategy)
public function setMemoryManagementStrategy(MemoryManagementStrategy $strategy): self
{
$this->memoryManagementStrategy = $strategy;

return $this;
}

private function ensureIsConnected()
private function ensureIsConnected(): void
{
if(! $this->channel instanceof \AMQPChannel)
{
Expand All @@ -69,7 +69,7 @@ private function ensureIsConnected()
}
}

public function publish($exchangeName, WritableMessage $message)
public function publish(string $exchangeName, WritableMessage $message): bool
{
if($message->isChunked())
{
Expand All @@ -92,7 +92,7 @@ public function publish($exchangeName, WritableMessage $message)
return $this->sendMessage($ex, $message);
}

private function logMessage($exchangeName, WritableMessage $message)
private function logMessage(string $exchangeName, WritableMessage $message): void
{
$log = json_encode(array(
'exchange' => $exchangeName,
Expand All @@ -102,7 +102,7 @@ private function logMessage($exchangeName, WritableMessage $message)
$this->logger->error($log, ['This message was involved by an error (it was sent ... or not. Please check other logs)']);
}

private function sendMessage(\AMQPExchange $ex, WritableMessage $message)
private function sendMessage(\AMQPExchange $ex, WritableMessage $message): bool
{
try
{
Expand All @@ -125,7 +125,7 @@ private function sendMessage(\AMQPExchange $ex, WritableMessage $message)
return true;
}

private function computeMessageFlag(WritableMessage $message)
private function computeMessageFlag(WritableMessage $message): int
{
$flag = AMQP_NOPARAM;
$disallowSilentDropping = $this->configuration->read('amqp/global/disallowSilentDropping', false);
Expand All @@ -138,7 +138,7 @@ private function computeMessageFlag(WritableMessage $message)
return $flag;
}

public function getExchange($exchangeName = null, $type = AMQP_EX_TYPE_TOPIC)
public function getExchange(?string $exchangeName = null, string $type = AMQP_EX_TYPE_TOPIC): \AMQPExchange
{
$this->ensureIsConnected();

Expand All @@ -155,15 +155,15 @@ public function getExchange($exchangeName = null, $type = AMQP_EX_TYPE_TOPIC)
return $ex;
}

private function updateMessageAttributes(WritableMessage $message)
private function updateMessageAttributes(WritableMessage $message): void
{
$message->setAttribute('app_id', $this->applicationId);
$message->addHeader('routing_key', $message->getRoutingKey());

$this->onPublish($message);
}

public function getQueue($queueName)
public function getQueue(string $queueName): \AMQPQueue
{
$this->ensureIsConnected();

Expand Down
2 changes: 1 addition & 1 deletion src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

interface Consumer extends LoggerAwareInterface
{
const
public const
DEFAULT_MAX_EXECUTION_TIME = 3600; //in seconds

public function consume(ProcessorInterface $processor, Client $client, WorkerContext $workerContext);
Expand Down
19 changes: 5 additions & 14 deletions src/MessageMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,18 @@

interface MessageMetadata
{
const
public const
TRANSIENT = 1,
PERSISTENT = 2;

/**
* @return string
*/
public function getRoutingKey();
public function getRoutingKey(): string;

/**
* @return string
*/
public function getContentType();
public function getContentType(): string;

/**
* @return array
*/
public function getHeaders();
public function getHeaders(): array;

/**
* @return mixed
*/
public function getAttribute($attributeName);
public function getAttribute(string $attributeName);
}
Loading