Class KafkaStoreIngestionService
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService
-
- All Implemented Interfaces:
StoreIngestionService,IngestionMetadataRetriever,java.io.Closeable,java.lang.AutoCloseable
public class KafkaStoreIngestionService extends AbstractVeniceService implements StoreIngestionService
Assumes: One to One mapping between a Venice Store and Kafka Topic. Manages Kafka topics and partitions that need to be consumed for the stores on this node. LaunchesStoreIngestionTaskfor each store version to consume and process messages. Uses the "new" Kafka Consumer.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Constructor Description KafkaStoreIngestionService(StorageEngineRepository storageEngineRepository, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository metadataRepo, ReadOnlySchemaRepository schemaRepo, ReadOnlyLiveClusterConfigRepository liveClusterConfigRepository, io.tehuti.metrics.MetricsRepository metricsRepository, java.util.Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader, java.util.Optional<ClientConfig> clientConfig, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer, java.util.Optional<HelixReadOnlyZKSharedSchemaRepository> zkSharedSchemaRepository, ICProvider icProvider, boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, java.util.Optional<SSLFactory> sslFactory, HeartbeatMonitoringService heartbeatMonitoringService)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description voidaddIngestionNotifier(VeniceNotifier notifier)Adds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states.booleancontainsRunningConsumption(VeniceStoreVersionConfig veniceStore)Check whether there is a running consumption task for given store.booleancontainsRunningConsumption(java.lang.String topic)Check whether there is a running consumption task for given store version topic.voiddemoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)AggVersionedIngestionStatsgetAggVersionedIngestionStats()Get AggVersionedStorageIngestionStatsAdminResponsegetConsumptionSnapshots(java.lang.String topicName, ComplementSet<java.lang.Integer> partitions)java.util.Set<java.lang.String>getIngestingTopicsWithVersionStatusNotOnline()Get topic names that are currently maintained by the ingestion service with corresponding version status not in an online state.KafkaValueSerializergetKafkaValueSerializer()ReadOnlyStoreRepositorygetMetadataRepo()java.nio.ByteBuffergetPartitionOffsetRecords(java.lang.String topicName, int partition)This method should only be called when the forked ingestion process is handing over ingestion task to main process.StoreIngestionTaskgetStoreIngestionTask(java.lang.String topicName)java.nio.ByteBuffergetStoreVersionCompressionDictionary(java.lang.String topicName)TopicPartitionIngestionContextResponsegetTopicPartitionIngestionContext(java.lang.String versionTopic, java.lang.String topicName, int partitionId)VeniceConfigLoadergetVeniceConfigLoader()booleanhasCurrentVersionBootstrapping()static booleanhasCurrentVersionBootstrapping(java.util.Map<java.lang.String,StoreIngestionTask> ingestionTaskMap)booleanisLiveUpdateSuppressionEnabled()booleanisPartitionConsuming(java.lang.String topic, int partitionId)Check whether the specified partition is still being consumedbooleankillConsumptionTask(java.lang.String topicName)Kill all of running consumptions of given store.voidpromoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)voidrecordIngestionFailure(java.lang.String storeName)voidreplaceAndAddTestNotifier(VeniceNotifier notifier)voidshutdownStoreIngestionTask(java.lang.String topicName)This method closes the specifiedStoreIngestionTaskand wait for up to 10 seconds for fully shutdown.voidstartConsumption(VeniceStoreVersionConfig veniceStore, int partitionId)Starts consuming messages from Kafka Partition corresponding to Venice Partition.booleanstartInner()Starts the Kafka consumption tasks for already subscribed partitions.java.util.concurrent.CompletableFuture<java.lang.Void>stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId)Stops consuming messages from Kafka Partition corresponding to Venice Partition.voidstopConsumptionAndWait(VeniceStoreVersionConfig veniceStore, int partitionId, int sleepSeconds, int numRetries, boolean whetherToResetOffset)Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.voidstopInner()Stops all the Kafka consumption tasks.voidsyncTopicPartitionOffset(java.lang.String topicName, int partition)Updates offset metadata and sync to storage for specified topic partition.booleantopicPartitionHasAnyPendingActions(java.lang.String topic, int partition)voidupdatePartitionOffsetRecords(java.lang.String topicName, int partition, java.nio.ByteBuffer offsetRecordByteBuffer)This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch.protected voidupdateStatsEmission(java.util.NavigableMap<java.lang.String,StoreIngestionTask> taskMap, java.lang.String storeName)This function will go through all known ingestion task in this server node, find the task that matches the storeName and has the largest version number; if the task doesn't enable metric emission, enable it and update store ingestion stats.protected voidupdateStatsEmission(java.util.NavigableMap<java.lang.String,StoreIngestionTask> taskMap, java.lang.String storeName, int maximumVersion)Find the task that matches both the storeName and maximumVersion number, enable metrics emission for this task and update ingestion stats with this task; disable metric emission for all the task that doesn't max version.voidwaitIngestionTaskToCompleteAllPartitionPendingActions(java.lang.String topicName, int partition, long retryIntervalInMs, int numRetries)
-
-
-
Constructor Detail
-
KafkaStoreIngestionService
public KafkaStoreIngestionService(StorageEngineRepository storageEngineRepository, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository metadataRepo, ReadOnlySchemaRepository schemaRepo, ReadOnlyLiveClusterConfigRepository liveClusterConfigRepository, io.tehuti.metrics.MetricsRepository metricsRepository, java.util.Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader, java.util.Optional<ClientConfig> clientConfig, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer, java.util.Optional<HelixReadOnlyZKSharedSchemaRepository> zkSharedSchemaRepository, ICProvider icProvider, boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, java.util.Optional<SSLFactory> sslFactory, HeartbeatMonitoringService heartbeatMonitoringService)
-
-
Method Detail
-
startInner
public boolean startInner()
Starts the Kafka consumption tasks for already subscribed partitions.- Specified by:
startInnerin classAbstractVeniceService- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
-
hasCurrentVersionBootstrapping
public boolean hasCurrentVersionBootstrapping()
-
hasCurrentVersionBootstrapping
public static boolean hasCurrentVersionBootstrapping(java.util.Map<java.lang.String,StoreIngestionTask> ingestionTaskMap)
-
stopInner
public void stopInner()
Stops all the Kafka consumption tasks. Closes all the Kafka clients.- Specified by:
stopInnerin classAbstractVeniceService
-
startConsumption
public void startConsumption(VeniceStoreVersionConfig veniceStore, int partitionId)
Starts consuming messages from Kafka Partition corresponding to Venice Partition. Subscribes to partition if required.- Specified by:
startConsumptionin interfaceStoreIngestionService- Parameters:
veniceStore- Venice Store for the partition.partitionId- Venice partition's id.
-
shutdownStoreIngestionTask
public void shutdownStoreIngestionTask(java.lang.String topicName)
This method closes the specifiedStoreIngestionTaskand wait for up to 10 seconds for fully shutdown.- Parameters:
topicName- Topic name of the ingestion task to be shutdown.
-
promoteToLeader
public void promoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
- Specified by:
promoteToLeaderin interfaceStoreIngestionService
-
demoteToStandby
public void demoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
- Specified by:
demoteToStandbyin interfaceStoreIngestionService
-
waitIngestionTaskToCompleteAllPartitionPendingActions
public void waitIngestionTaskToCompleteAllPartitionPendingActions(java.lang.String topicName, int partition, long retryIntervalInMs, int numRetries)
-
topicPartitionHasAnyPendingActions
public boolean topicPartitionHasAnyPendingActions(java.lang.String topic, int partition)
-
isLiveUpdateSuppressionEnabled
public boolean isLiveUpdateSuppressionEnabled()
-
getVeniceConfigLoader
public VeniceConfigLoader getVeniceConfigLoader()
- Specified by:
getVeniceConfigLoaderin interfaceStoreIngestionService
-
updateStatsEmission
protected void updateStatsEmission(java.util.NavigableMap<java.lang.String,StoreIngestionTask> taskMap, java.lang.String storeName, int maximumVersion)
Find the task that matches both the storeName and maximumVersion number, enable metrics emission for this task and update ingestion stats with this task; disable metric emission for all the task that doesn't max version.
-
updateStatsEmission
protected void updateStatsEmission(java.util.NavigableMap<java.lang.String,StoreIngestionTask> taskMap, java.lang.String storeName)
This function will go through all known ingestion task in this server node, find the task that matches the storeName and has the largest version number; if the task doesn't enable metric emission, enable it and update store ingestion stats.
-
stopConsumption
public java.util.concurrent.CompletableFuture<java.lang.Void> stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId)
Stops consuming messages from Kafka Partition corresponding to Venice Partition.- Specified by:
stopConsumptionin interfaceStoreIngestionService- Parameters:
veniceStore- Venice Store for the partition.partitionId- Venice partition's id.
-
stopConsumptionAndWait
public void stopConsumptionAndWait(VeniceStoreVersionConfig veniceStore, int partitionId, int sleepSeconds, int numRetries, boolean whetherToResetOffset)
Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.- Specified by:
stopConsumptionAndWaitin interfaceStoreIngestionService- Parameters:
veniceStore- Venice Store for the partition.partitionId- Venice partition's id.sleepSeconds-numRetries-
-
killConsumptionTask
public boolean killConsumptionTask(java.lang.String topicName)
Description copied from interface:StoreIngestionServiceKill all of running consumptions of given store.- Specified by:
killConsumptionTaskin interfaceStoreIngestionService- Parameters:
topicName- Venice topic (store and version number) for the corresponding consumer task that needs to be killed. No action is taken for invocations of killConsumptionTask on topics that are not in the map. This includes logging.- Returns:
- true if a kill is needed and called, otherwise false
-
addIngestionNotifier
public void addIngestionNotifier(VeniceNotifier notifier)
Description copied from interface:StoreIngestionServiceAdds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states. Multiple Notifiers can be added for the same consumption tasks and all of them will be notified in order.- Specified by:
addIngestionNotifierin interfaceStoreIngestionService
-
replaceAndAddTestNotifier
public void replaceAndAddTestNotifier(VeniceNotifier notifier)
- Specified by:
replaceAndAddTestNotifierin interfaceStoreIngestionService
-
containsRunningConsumption
public boolean containsRunningConsumption(VeniceStoreVersionConfig veniceStore)
Description copied from interface:StoreIngestionServiceCheck whether there is a running consumption task for given store.- Specified by:
containsRunningConsumptionin interfaceStoreIngestionService
-
containsRunningConsumption
public boolean containsRunningConsumption(java.lang.String topic)
Description copied from interface:StoreIngestionServiceCheck whether there is a running consumption task for given store version topic.- Specified by:
containsRunningConsumptionin interfaceStoreIngestionService
-
isPartitionConsuming
public boolean isPartitionConsuming(java.lang.String topic, int partitionId)Description copied from interface:StoreIngestionServiceCheck whether the specified partition is still being consumed- Specified by:
isPartitionConsumingin interfaceStoreIngestionService
-
getIngestingTopicsWithVersionStatusNotOnline
public java.util.Set<java.lang.String> getIngestingTopicsWithVersionStatusNotOnline()
Description copied from interface:StoreIngestionServiceGet topic names that are currently maintained by the ingestion service with corresponding version status not in an online state. Topics with invalid store or version number are also included in the returned list.- Specified by:
getIngestingTopicsWithVersionStatusNotOnlinein interfaceStoreIngestionService- Returns:
- a
Setof topic names.
-
recordIngestionFailure
public void recordIngestionFailure(java.lang.String storeName)
- Specified by:
recordIngestionFailurein interfaceStoreIngestionService
-
getAggVersionedIngestionStats
public AggVersionedIngestionStats getAggVersionedIngestionStats()
Description copied from interface:StoreIngestionServiceGet AggVersionedStorageIngestionStats- Specified by:
getAggVersionedIngestionStatsin interfaceStoreIngestionService- Returns:
- an instance of
AggVersionedIngestionStats
-
getStoreVersionCompressionDictionary
public java.nio.ByteBuffer getStoreVersionCompressionDictionary(java.lang.String topicName)
- Specified by:
getStoreVersionCompressionDictionaryin interfaceIngestionMetadataRetriever
-
getStoreIngestionTask
public StoreIngestionTask getStoreIngestionTask(java.lang.String topicName)
- Specified by:
getStoreIngestionTaskin interfaceStoreIngestionService
-
getConsumptionSnapshots
public AdminResponse getConsumptionSnapshots(java.lang.String topicName, ComplementSet<java.lang.Integer> partitions)
- Specified by:
getConsumptionSnapshotsin interfaceIngestionMetadataRetriever
-
getTopicPartitionIngestionContext
public TopicPartitionIngestionContextResponse getTopicPartitionIngestionContext(java.lang.String versionTopic, java.lang.String topicName, int partitionId)
- Specified by:
getTopicPartitionIngestionContextin interfaceIngestionMetadataRetriever
-
updatePartitionOffsetRecords
public void updatePartitionOffsetRecords(java.lang.String topicName, int partition, java.nio.ByteBuffer offsetRecordByteBuffer)This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch.
-
getPartitionOffsetRecords
public java.nio.ByteBuffer getPartitionOffsetRecords(java.lang.String topicName, int partition)This method should only be called when the forked ingestion process is handing over ingestion task to main process. It collects the user partition's latest OffsetRecords from partition consumption states (PCS). In theory, PCS should be available in this situation as we haven't unsubscribed from topic. If it is not available, we will throw exception as this is not as expected.
-
syncTopicPartitionOffset
public void syncTopicPartitionOffset(java.lang.String topicName, int partition)Updates offset metadata and sync to storage for specified topic partition. This method is invoked only when isolated ingestion process is reporting topic partition completion to make sure ingestion process is persisted.
-
getMetadataRepo
public final ReadOnlyStoreRepository getMetadataRepo()
-
getKafkaValueSerializer
public KafkaValueSerializer getKafkaValueSerializer()
-
-