We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
2 parents 2c56da7 + e46d316 commit f7afb37Copy full SHA for f7afb37
pkg/rdkafka/RdKafkaConsumer.php
@@ -99,11 +99,15 @@ public function getQueue()
99
public function receive($timeout = 0)
100
{
101
if (false == $this->subscribed) {
102
- $this->consumer->assign([new TopicPartition(
103
- $this->getQueue()->getQueueName(),
104
- $this->getQueue()->getPartition(),
105
- $this->offset
106
- )]);
+ if (null === $this->offset) {
+ $this->consumer->subscribe([$this->getQueue()->getQueueName()]);
+ } else {
+ $this->consumer->assign([new TopicPartition(
+ $this->getQueue()->getQueueName(),
107
+ $this->getQueue()->getPartition(),
108
+ $this->offset
109
+ )]);
110
+ }
111
112
$this->subscribed = true;
113
}
0 commit comments