What is Kafka consumer poll?


Your thoughts?

MakeMeCTO |

The poll() method is how the Kafka client consumes messages from Kafka. This is the actual code based on Kafka 2.7:

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }
        do {
            client.maybeTriggerWakeup();
            if (includeMetadataInTimeout) {
                // try to update assignment metadata BUT do not need to block on the timer for join group
                updateAssignmentMetadataIfNeeded(timer, false);
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                    log.warn("Still waiting for metadata");
                }
            }
            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                // before returning the fetched records, we can send off the next round of fetches
                // and avoid block waiting for their responses to enable pipelining while the user
                // is handling the fetched records.
                //
                // NOTE: since the consumed position has already been updated, we must not allow
                // wakeups or any other errors to be triggered prior to returning the fetched records.
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.transmitSends();
                }
                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        } while (timer.notExpired());
        return ConsumerRecords.empty();
    } finally {
        release();
        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
    }
}

In a nutshell, this method will listen for messages based on the provided timer argument. If records are immediately available, the method will return the list of ConsumerRecord. If no messages are available, the method will wait for the provided amount of time before returning an empty set.

Yuzo_Koyama | 0

The Kafka consumer poll() method fetches records in sequential order from a specified topic/partitions.

This poll() method is how Kafka clients read data from Kafka.

When the poll() method is called, the consumer will fetch records from the last consumed offset.

stackchief | 0

The poll() method is the function a Kafka consumer calls to retrieve records from a given topic.

When calling the poll() method, consumers provide a timeout argument. This is the maximum amount of time to wait for records to process before returning.

At the end of the day, this method is really a fancy do...while loop. After ensuring the consumer is being accessed by a single thread, the loop is entered and messages are consumed.

Brenda Becker | 0

The poll() method basically checks to make sure a single thread is accessing the consumer and then enters a do...while loop where the while condition is the provided time argument not being exceeded.

EatFreshRupesh | 0

You use the poll() method to read messages from a given topic. This method takes a single argument of type Duration specifying how long to listen for messages before returning an empty list of records.

If records are immediately available, the poll() method will return immediately.