Class KafkaConsumerService
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
-
- com.linkedin.davinci.kafka.consumer.KafkaConsumerService
-
- All Implemented Interfaces:
java.io.Closeable,java.lang.AutoCloseable
- Direct Known Subclasses:
PartitionWiseKafkaConsumerService,TopicWiseKafkaConsumerService
public abstract class KafkaConsumerService extends AbstractKafkaConsumerService
KafkaConsumerServiceis used to manage a pool of consumption-related resources connected to a specific Kafka cluster. The reasons to have this pool are: 1. To reduce the unnecessary overhead of having one consumer per store-version, each of which includes the internal IO threads/connections to brokers and internal buffers; 2. To reduce the GC overhead when there are a lot of store versions bootstrapping/ingesting at the same time; 3. To have a predictable and configurable upper bound on the total amount of resources occupied by consumers become, no matter how many store-versions are being hosted in the same instance; The responsibilities of this class include: 1. Setting up a fixed size pool of consumption unit, where each unit contains exactly one: a)SharedKafkaConsumerb)ConsumptionTaskc)ConsumerSubscriptionCleaner2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably, thestartConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver)function allows the caller to start funneling consumed data into a receiver (i.e. into another task). 3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption load balancing strategy:pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classKafkaConsumerService.ConsumerAssignmentStrategyThis consumer assignment strategy specify how consumers from consumer pool are allocated.-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
Fields Modifier and Type Field Description protected AggKafkaConsumerServiceStatsaggStatsprotected IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask>consumerToConsumptionTaskprotected java.lang.StringkafkaUrlprotected java.lang.StringkafkaUrlForLoggerprotected ConsumerPoolTypepoolTypeprotected java.util.Map<PubSubTopic,java.util.Map<PubSubTopicPartition,com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>>versionTopicToTopicPartitionToConsumer-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Modifier Constructor Description protectedKafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, java.util.Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, java.lang.String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, TopicExistenceChecker topicExistenceChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description com.linkedin.davinci.kafka.consumer.SharedKafkaConsumerassignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)This function assigns a consumer for the givenStoreIngestionTaskand returns the assigned consumer.voidbatchUnsubscribe(PubSubTopic versionTopic, java.util.Set<PubSubTopicPartition> topicPartitionsToUnSub)com.linkedin.davinci.kafka.consumer.SharedKafkaConsumergetConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)java.util.Map<PubSubTopicPartition,TopicPartitionIngestionInfo>getIngestionInfoFromConsumer(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)longgetLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)longgetMaxElapsedTimeMSSinceLastPollInConsumerPool()longgetOffsetLagBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)booleanhasAnySubscriptionFor(PubSubTopic versionTopic)protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumerpickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)protected voidremoveTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition)voidsetThreadFactory(RandomAccessDaemonThreadFactory threadFactory)voidstartConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastReadOffset, ConsumedDataReceiver<java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>> consumedDataReceiver)booleanstartInner()voidstopInner()voidunSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)Stop specific subscription associated with the given version topic.voidunsubscribeAll(PubSubTopic versionTopic)Stop all subscription associated with the given version topic.
-
-
-
Field Detail
-
kafkaUrl
protected final java.lang.String kafkaUrl
-
kafkaUrlForLogger
protected final java.lang.String kafkaUrlForLogger
-
poolType
protected final ConsumerPoolType poolType
-
aggStats
protected final AggKafkaConsumerServiceStats aggStats
-
consumerToConsumptionTask
protected final IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask> consumerToConsumptionTask
-
versionTopicToTopicPartitionToConsumer
protected final java.util.Map<PubSubTopic,java.util.Map<PubSubTopicPartition,com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>> versionTopicToTopicPartitionToConsumer
-
-
Constructor Detail
-
KafkaConsumerService
protected KafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, java.util.Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, java.lang.String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, TopicExistenceChecker topicExistenceChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled)
- Parameters:
statsOverride- injection of stats, for test purposes
-
-
Method Detail
-
getConsumerAssignedToVersionTopicPartition
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
- Specified by:
getConsumerAssignedToVersionTopicPartitionin classAbstractKafkaConsumerService
-
assignConsumerFor
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer assignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
This function assigns a consumer for the givenStoreIngestionTaskand returns the assigned consumer. Must be idempotent and thus return previously a assigned consumer (for the same params) if any exists.- Specified by:
assignConsumerForin classAbstractKafkaConsumerService
-
pickConsumerForPartition
protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer pickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
-
removeTopicPartitionFromConsumptionTask
protected void removeTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition)
-
unsubscribeAll
public void unsubscribeAll(PubSubTopic versionTopic)
Stop all subscription associated with the given version topic.- Specified by:
unsubscribeAllin classAbstractKafkaConsumerService
-
unSubscribe
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
Stop specific subscription associated with the given version topic.- Specified by:
unSubscribein classAbstractKafkaConsumerService
-
batchUnsubscribe
public void batchUnsubscribe(PubSubTopic versionTopic, java.util.Set<PubSubTopicPartition> topicPartitionsToUnSub)
- Specified by:
batchUnsubscribein classAbstractKafkaConsumerService
-
startInner
public boolean startInner()
- Specified by:
startInnerin classAbstractVeniceService- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
-
stopInner
public void stopInner() throws java.lang.Exception- Specified by:
stopInnerin classAbstractVeniceService- Throws:
java.lang.Exception
-
hasAnySubscriptionFor
public boolean hasAnySubscriptionFor(PubSubTopic versionTopic)
- Specified by:
hasAnySubscriptionForin classAbstractKafkaConsumerService
-
getMaxElapsedTimeMSSinceLastPollInConsumerPool
public long getMaxElapsedTimeMSSinceLastPollInConsumerPool()
- Specified by:
getMaxElapsedTimeMSSinceLastPollInConsumerPoolin classAbstractKafkaConsumerService
-
startConsumptionIntoDataReceiver
public void startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastReadOffset, ConsumedDataReceiver<java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>> consumedDataReceiver)
- Specified by:
startConsumptionIntoDataReceiverin classAbstractKafkaConsumerService
-
getOffsetLagBasedOnMetrics
public long getOffsetLagBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
- Specified by:
getOffsetLagBasedOnMetricsin classAbstractKafkaConsumerService
-
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
- Specified by:
getLatestOffsetBasedOnMetricsin classAbstractKafkaConsumerService
-
getIngestionInfoFromConsumer
public java.util.Map<PubSubTopicPartition,TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
- Specified by:
getIngestionInfoFromConsumerin classAbstractKafkaConsumerService
-
setThreadFactory
public void setThreadFactory(RandomAccessDaemonThreadFactory threadFactory)
-
-