Class AggKafkaConsumerService
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService
-
- All Implemented Interfaces:
java.io.Closeable,java.lang.AutoCloseable
public class AggKafkaConsumerService extends AbstractVeniceService
AggKafkaConsumerServicesupports Kafka consumer pool for multiple Kafka clusters from different data centers; for each Kafka bootstrap server url,AggKafkaConsumerServicewill create oneKafkaConsumerService.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Constructor Description AggKafkaConsumerService(PubSubConsumerAdapterFactory consumerFactory, TopicManagerContext.PubSubPropertiesSupplier pubSubPropertiesSupplier, VeniceServerConfig serverConfig, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, TopicExistenceChecker topicExistenceChecker, PubSubMessageDeserializer pubSubDeserializer, java.util.function.Consumer<java.lang.String> killIngestionTaskRunnable, java.util.function.Function<java.lang.String,java.lang.Boolean> isAAOrWCEnabledFunc, ReadOnlyStoreRepository metadataRepository)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description AbstractKafkaConsumerServicecreateKafkaConsumerService(java.util.Properties consumerProperties)Create a newKafkaConsumerServicegiven consumerProperties which must contain a value for "bootstrap.servers".longgetLatestOffsetBasedOnMetrics(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)longgetOffsetLagBasedOnMetrics(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)protected static java.lang.RunnablegetStuckConsumerDetectionAndRepairRunnable(java.util.Map<java.lang.String,AbstractKafkaConsumerService> kafkaServerToConsumerServiceMap, java.util.Map<java.lang.String,StoreIngestionTask> versionTopicStoreIngestionTaskMapping, long stuckConsumerRepairThresholdMs, long nonExistingTopicIngestionTaskKillThresholdMs, long nonExistingTopicRetryIntervalMs, StuckConsumerRepairStats stuckConsumerRepairStats, java.util.function.Consumer<java.lang.String> killIngestionTaskRunnable)booleanhasConsumerAssignedFor(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)booleanstartInner()IMPORTANT: All newly created KafkaConsumerService are already started increateKafkaConsumerService(Properties), if this is no longer the case in future, make sure to update the startInner logic here.voidstopInner()ConsumedDataReceiver<java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>>subscribeConsumerFor(java.lang.String kafkaURL, StoreIngestionTask storeIngestionTask, PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastOffset)voidunsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
-
-
Constructor Detail
-
AggKafkaConsumerService
public AggKafkaConsumerService(PubSubConsumerAdapterFactory consumerFactory, TopicManagerContext.PubSubPropertiesSupplier pubSubPropertiesSupplier, VeniceServerConfig serverConfig, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, TopicExistenceChecker topicExistenceChecker, PubSubMessageDeserializer pubSubDeserializer, java.util.function.Consumer<java.lang.String> killIngestionTaskRunnable, java.util.function.Function<java.lang.String,java.lang.Boolean> isAAOrWCEnabledFunc, ReadOnlyStoreRepository metadataRepository)
-
-
Method Detail
-
startInner
public boolean startInner()
IMPORTANT: All newly created KafkaConsumerService are already started increateKafkaConsumerService(Properties), if this is no longer the case in future, make sure to update the startInner logic here.- Specified by:
startInnerin classAbstractVeniceService- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
-
stopInner
public void stopInner() throws java.lang.Exception- Specified by:
stopInnerin classAbstractVeniceService- Throws:
java.lang.Exception
-
getStuckConsumerDetectionAndRepairRunnable
protected static java.lang.Runnable getStuckConsumerDetectionAndRepairRunnable(java.util.Map<java.lang.String,AbstractKafkaConsumerService> kafkaServerToConsumerServiceMap, java.util.Map<java.lang.String,StoreIngestionTask> versionTopicStoreIngestionTaskMapping, long stuckConsumerRepairThresholdMs, long nonExistingTopicIngestionTaskKillThresholdMs, long nonExistingTopicRetryIntervalMs, StuckConsumerRepairStats stuckConsumerRepairStats, java.util.function.Consumer<java.lang.String> killIngestionTaskRunnable)
-
createKafkaConsumerService
public AbstractKafkaConsumerService createKafkaConsumerService(java.util.Properties consumerProperties)
Create a newKafkaConsumerServicegiven consumerProperties which must contain a value for "bootstrap.servers". If aKafkaConsumerServicefor the given "bootstrap.servers" (Kafka URL) has already been created, this method returns the createdKafkaConsumerService.- Parameters:
consumerProperties- consumer properties that are used to createKafkaConsumerService
-
hasConsumerAssignedFor
public boolean hasConsumerAssignedFor(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
unsubscribeConsumerFor
public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
subscribeConsumerFor
public ConsumedDataReceiver<java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>> subscribeConsumerFor(java.lang.String kafkaURL, StoreIngestionTask storeIngestionTask, PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastOffset)
-
getOffsetLagBasedOnMetrics
public long getOffsetLagBasedOnMetrics(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
-