commit
/**
* @throws SimpleKafkaClient\Exception
*/
public function commit($messageOrOffsets): void {}
Commit offsets synchronously, block until offsets are
committed or the commit fails and an exception is thrown.
- If
null
is passed, latest offsets for the current assignment will be committed - Ìf a
SimpleKafkaClient\Message
is passed, commit offset for a single topic+partition based on the message - If an array of
SimpleKafkaClient\TopicPartition
is passed, commit offsets for the provided list of partitions
$conf = SimpleKafkaClient\Configuration();
$conf->set('metadata.broker.list', 'kafka:9092');
$consumer = new SimpleKafkaClient\Consumer($conf);
$message = $consumer->consume(20000);
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) {
echo 'An error occured:' . rd_kafka_err2str($message->err) . PHP_EOL;
return;
}
$consumer->commit($message);