Class LeaderFollowerStoreIngestionTask
- java.lang.Object
-
- com.linkedin.davinci.kafka.consumer.StoreIngestionTask
-
- com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
-
- All Implemented Interfaces:
java.io.Closeable,java.lang.AutoCloseable,java.lang.Runnable
- Direct Known Subclasses:
ActiveActiveStoreIngestionTask
public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask
This class contains the state transition work between leader and follower; both leader and follower will keep track of information like which topic leader is consuming from and the corresponding offset as well as the latest successfully consumed or produced offset in the version topic (VT). State Transition: 1. OFFLINE -> STANDBY: Generate a SUBSCRIBE message in the consumer action queue; the logic here is the same as Online/Offline model; all it needs to do is to restore the checkpointed state from OffsetRecord; 2. STANDBY -> LEADER: The partition will be marked as in the transition progress from STANDBY to LEADER and completes the action immediately; after processing the rest of the consumer actions in the queue, check whether there is any partition is in the transition progress, if so: (i) consume the latest messages from version topic; (ii) drain all the messages in drainer queue in order to update the latest consumed message replication metadata; (iii) check whether there has been at least 5 minutes (configurable) of inactivity for this partition (meaning no new messages); if so, turn on the LEADER flag for this partition. 3. LEADER -> STANDBY: a. if the leader is consuming from VT, just set "isLeader" field to false and resume consumption; b. if the leader is consuming from anything other than VT, it needs to unsubscribe from the leader topic for this partition first, drain all the messages in the drainer queue for this leader topic/partition so that it can get the last producer callback for the last message it produces to VT; block on getting the result from the callback to update the corresponding offset in version topic, so that the new follower can subscribe back to VT using the recently updated VT offset.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
StoreIngestionTask.DelegateConsumerRecordResult
-
-
Field Summary
Fields Modifier and Type Field Description protected booleanhasChangeCaptureViewprotected it.unimi.dsi.fastutil.ints.Int2ObjectMap<java.lang.String>kafkaClusterIdToUrlMapstatic java.util.function.Predicate<? super PartitionConsumptionState>LEADER_OFFSET_LAG_FILTERprotected AvroStoreDeserializerCachestoreDeserializerCacheprotected static java.util.function.LongPredicateVALID_LAGprotected Lazy<VeniceWriter<byte[],byte[],byte[]>>veniceWriterN.B.: With L/F+native replication and many Leader partitions getting assigned to a single SN thisVeniceWritermay be called from multiple thread simultaneously, during start of batch push.protected Lazy<VeniceWriter<byte[],byte[],byte[]>>veniceWriterForRealTimeprotected java.util.Map<java.lang.String,VeniceViewWriter>viewWriters-
Fields inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
aggKafkaConsumerService, availableSchemaIds, batchReportIncPushStatusEnabled, bootstrapTimeoutInMs, chunkAssembler, compressionStrategy, compressor, compressorFactory, consumerActionSequenceNumber, consumerActionsQueue, databaseSyncBytesIntervalForDeferredWriteMode, databaseSyncBytesIntervalForTransactionalMode, dataRecoverySourceVersionNumber, defaultReadyToServeChecker, deserializedSchemaIds, diskUsage, divErrorMetricCallback, emitMetrics, emptyPollSleepMs, errorPartitionId, hostLevelIngestionStats, hybridStoreConfig, idleCounter, ingestionNotificationDispatcher, ingestionTaskName, isChunked, isCurrentVersion, isDataRecovery, isDaVinciClient, isGlobalRtDivEnabled, isIsolatedIngestion, isRmdChunked, isRunning, isWriteComputationEnabled, kafkaClusterUrlResolver, kafkaProps, kafkaVersionTopic, KILL_WAIT_TIME_MS, localKafkaClusterId, localKafkaServer, localKafkaServerSingletonSet, manifestSerializer, metaStoreWriter, parallelProcessingThreadPool, partitionConsumptionStateMap, partitionCount, partitionToPendingConsumerActionCountMap, pubSubTopicRepository, readCycleDelayMs, readOnlyForBatchOnlyStoreEnabled, realTimeTopic, recordLevelMetricEnabled, REDUNDANT_LOGGING_FILTER, SCHEMA_POLLING_DELAY_MS, schemaRepository, serverConfig, storageEngine, storageEngineRepository, storageMetadataService, STORE_VERSION_POLLING_DELAY_MS, storeBufferService, storeName, storeRepository, storeVersionPartitionCount, topicManagerRepository, versionedDIVStats, versionedIngestionStats, versionNumber, versionRole, versionTopic, WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED, workloadType, writeComputeFailureCode
-
-
Constructor Summary
Constructors Constructor Description LeaderFollowerStoreIngestionTask(StoreIngestionTaskFactory.Builder builder, Store store, Version version, java.util.Properties kafkaConsumerProperties, java.util.function.BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected voidaddPartitionConsumptionState(java.lang.Integer partition, PartitionConsumptionState pcs)protected doublecalculateAssembledRecordSizeRatio(long recordSize)protected java.util.Map<java.lang.String,java.lang.Long>calculateLeaderUpstreamOffsetWithTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic, java.util.List<java.lang.CharSequence> unreachableBrokerList)protected static voidcheckAndHandleUpstreamOffsetRewind(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, long newUpstreamOffset, long previousUpstreamOffset, LeaderFollowerStoreIngestionTask ingestionTask)protected booleancheckAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState pcs, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp)Checks whether the lag is acceptable for hybrid storesprotected voidcheckLongRunningTaskState()The following function will be executed after processing all the quick actions in the consumer action queues, so that the long running actions doesn't block other partition's consumer actions.protected static booleancheckWhetherToCloseUnusedVeniceWriter(Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterLazy, Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterForRealTimeLazy, java.util.Map<java.lang.Integer,PartitionConsumptionState> partitionConsumptionStateMap, java.lang.Runnable reInitializeVeniceWriterLazyRunnable, java.lang.String versionTopicName)protected voidcloseVeniceViewWriters()voidcloseVeniceWriters(boolean doFlush)static VeniceWriter<byte[],byte[],byte[]>constructVeniceWriter(VeniceWriterFactory veniceWriterFactory, java.lang.String topic, Version version, boolean producerCompressionEnabled, int producerCnt)voidconsumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState)Unsubscribe from all the topics being consumed for the partition in partitionConsumptionStateprotected LeaderProducerCallbackcreateProducerCallback(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs)protected StoreIngestionTask.DelegateConsumerRecordResultdelegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed.voiddemoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)protected voidgetAndUpdateLeaderCompletedState(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaValue, ControlMessage controlMessage, PubSubMessageHeaders pubSubMessageHeaders, PartitionConsumptionState partitionConsumptionState)HeartBeat SOS messages carry the leader completion state in the header.longgetBatchFollowerOffsetLag()longgetBatchLeaderOffsetLag()longgetBatchReplicationLag()protected java.util.Set<java.lang.String>getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)longgetFollowerOffsetLag()Measure the offset lag between follower and leaderlonggetHybridFollowerOffsetLag()longgetHybridLeaderOffsetLag()protected IngestionBatchProcessorgetIngestionBatchProcessor()protected longgetLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, java.lang.String ignoredKafkaUrl)For regular L/F stores without A/A enabled, there is always only one real-time source.protected longgetLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, java.lang.String ignoredUpstreamKafkaUrl)For L/F or NR, there is only one entry in upstreamOffsetMap whose key is NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY.longgetLeaderOffsetLag()protected intgetMaxNearlineRecordSizeBytes()protected intgetMaxRecordSizeBytes()protected java.util.Set<java.lang.String>getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)longgetRegionHybridOffsetLag(int regionId)protected longgetTopicPartitionOffsetByKafkaURL(java.lang.CharSequence kafkaURL, PubSubTopicPartition pubSubTopicPartition, long rewindStartTimestamp)protected Lazy<VeniceWriter<byte[],byte[],byte[]>>getVeniceWriter(PartitionConsumptionState partitionConsumptionState)intgetWriteComputeErrorCode()protected booleanisHybridFollower(PartitionConsumptionState partitionConsumptionState)protected static booleanisLeader(PartitionConsumptionState partitionConsumptionState)booleanisReadyToServeAnnouncedWithRTLag()protected booleanisRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState)protected voidleaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic)protected java.nio.ByteBuffermaybeCompressData(int partition, java.nio.ByteBuffer data, PartitionConsumptionState partitionConsumptionState)Compresses data in a bytebuffer when consuming from rt as a leader node and compression is enabled for the store version for which we're consuming data.protected java.util.Set<java.lang.String>maybeSendIngestionHeartbeat()For hybrid stores only, the leader periodically writes a special SOS message to the RT topic with the following properties:
1.protected longmeasureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)For Leader/Follower state model, we already keep track of the consumption progress in leader, so directly calculate the lag with the real-time topic and the leader consumption offset.protected longmeasureRTOffsetLagForMultiRegions(java.util.Set<java.lang.String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)protected longmeasureRTOffsetLagForSingleRegion(java.lang.String sourceRealTimeTopicKafkaURL, PartitionConsumptionState pcs, boolean shouldLog)This method fetches/calculates latest leader persisted offset and last offset in RT topic.protected voidprocessConsumerAction(ConsumerAction message, Store store)protected voidprocessMessageAndMaybeProduceToKafka(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)protected booleanprocessTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState)ProcessTopicSwitchcontrol message at given partition offset for a specificPartitionConsumptionState.protected voidprocessVersionSwapMessage(ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState)This isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation inActiveActiveStoreIngestionTaskprotected voidproduceToLocalKafka(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, java.util.function.BiConsumer<ChunkAwareCallback,LeaderMetadataWrapper> produceFunction, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs)voidpromoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)protected voidrecordAssembledRecordSizeRatio(double ratio, long currentTimeMs)protected voidrecordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, java.lang.String kafkaUrl)protected voidrecordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize)protected voidrecordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState)protected voidreportIfCatchUpVersionTopicOffset(PartitionConsumptionState pcs)Check if the ingestion progress has reached to the end of the version topic.protected voidresubscribe(PartitionConsumptionState partitionConsumptionState)Resubscribe operation by passing new version role and partition role toAggKafkaConsumerService.protected voidresubscribeAsFollower(PartitionConsumptionState partitionConsumptionState)protected voidresubscribeAsLeader(PartitionConsumptionState partitionConsumptionState)protected booleanshouldCheckLeaderCompleteStateInFollower()For non AA hybrid stores with AGGREGATE DRP, SIT reads from parent RT while the HB is written to the child RTs.protected booleanshouldCompressData(PartitionConsumptionState partitionConsumptionState)protected booleanshouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionState partitionConsumptionState)protected booleanshouldPersistRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record, PartitionConsumptionState partitionConsumptionState)Additional safeguards in Leader/Follower ingestion: 1.protected booleanshouldProcessRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record)For Leader/Follower model, the follower should have the same kind of check as the Online/Offline model; for leader, it's possible that it consumers from real-time topic or GF topic.protected booleanshouldProduceToVersionTopic(PartitionConsumptionState partitionConsumptionState)For the corresponding partition being tracked in `partitionConsumptionState`, if it's in LEADER state and it's not consuming from version topic, it should produce the new message to version topic; besides, if LEADER is consuming remotely, it should also produce to local fabric.protected voidstartConsumingAsLeader(PartitionConsumptionState partitionConsumptionState)protected voidstartConsumingAsLeaderInTransitionFromStandby(PartitionConsumptionState partitionConsumptionState)protected voidsyncConsumedUpstreamRTOffsetMapIfNeeded(PartitionConsumptionState pcs, java.util.Map<java.lang.String,java.lang.Long> upstreamStartOffsetByKafkaURL)protected voidsyncTopicSwitchToIngestionMetadataService(TopicSwitch topicSwitch, PartitionConsumptionState partitionConsumptionState)protected voidupdateLatestInMemoryLeaderConsumedRTOffset(PartitionConsumptionState pcs, java.lang.String ignoredKafkaUrl, long offset)protected voidupdateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, java.lang.String kafkaUrl, boolean dryRun)Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord.voidupdateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState)protected voidupdateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState)Sync the metadata about offset inOffsetRecord.protected voidupdateOffsetsFromConsumerRecord(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateVersionTopicOffset updateVersionTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateUpstreamTopicOffset updateUpstreamTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.GetLastKnownUpstreamTopicOffset lastKnownUpstreamTopicOffsetSupplier, java.util.function.Supplier<java.lang.String> sourceKafkaUrlSupplier, boolean dryRun)A helper function to the latest in-memory offsets processed by drainers inPartitionConsumptionState, after processing the givenPubSubMessage.protected java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>validateAndFilterOutDuplicateMessagesFromLeaderTopic(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, java.lang.String kafkaUrl, PubSubTopicPartition topicPartition)protected voidwaitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState)Besides draining messages in the drainer queue, wait for the last producer future.protected voidwaitForLastLeaderPersistFuture(PartitionConsumptionState partitionConsumptionState, java.lang.String errorMsg)-
Methods inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
checkIngestionProgress, cloneProducerStates, close, consumerBatchUnsubscribe, consumerHasAnySubscription, consumerHasSubscription, consumerResetOffset, consumerSubscribe, consumerUnSubscribe, createKafkaConsumerProperties, disableMetricsEmission, dumpPartitionConsumptionStates, dumpStoreVersionState, enableMetricsEmission, getCompressionStrategy, getCompressor, getFailedIngestionPartitionCount, getHostLevelIngestionStats, getIngestionTaskName, getKafkaVersionTopic, getOffsetToOnlineLagThresholdPerPartition, getPartitionConsumptionState, getPartitionOffsetLagBasedOnMetrics, getSchemaRepo, getServerConfig, getStorageEngine, getStoragePartitionConfig, getStoragePartitionConfig, getStoreName, getTopicManager, getTopicPartitionEndOffSet, getVersionedDIVStats, getVersionIngestionStats, getVersionNumber, getVersionTopic, hasAllPartitionReportedCompleted, hasAnyPartitionConsumptionState, hasAnySubscription, hasPendingPartitionIngestionAction, isActiveActiveReplicationEnabled, isChunked, isCurrentVersion, isFutureVersion, isHybridMode, isIngestionTaskActive, isMetricsEmissionEnabled, isPartitionConsumingOrHasPendingIngestionAction, isProducingVersionTopicHealthy, isReadyToServe, isRunning, isSegmentControlMsg, isStuckByMemoryConstraint, isTransientRecordBufferUsed, isUserSystemStore, kill, logStorageOperationWhileUnsubscribed, measureLagWithCallToPubSub, measureLagWithCallToPubSub, minZeroLag, nextSeqNum, processCommonConsumerAction, processConsumerRecord, processEndOfIncrementalPush, processEndOfPush, processStartOfIncrementalPush, produceToStoreBufferService, produceToStoreBufferServiceOrKafka, produceToStoreBufferServiceOrKafkaInBatch, putInStorageEngine, recordAssembledRecordSize, recordChecksumVerificationFailure, removeFromStorageEngine, reportError, resetPartitionConsumptionOffset, resolveSourceKafkaServersWithinTopicSwitch, run, setLastConsumerException, setLastStoreIngestionException, setPartitionConsumptionState, shouldUpdateUpstreamOffset, shutdown, subscribePartition, subscribePartition, throwIfNotRunning, throwOrLogStorageFailureDependingIfStillSubscribed, unSubscribePartition, unSubscribePartition, updateIngestionRoleIfStoreChanged, updateOffsetMetadataAndSync, updateOffsetMetadataAndSyncOffset, validateMessage, waitVersionStateAvailable
-
-
-
-
Field Detail
-
veniceWriter
protected Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriter
N.B.: With L/F+native replication and many Leader partitions getting assigned to a single SN thisVeniceWritermay be called from multiple thread simultaneously, during start of batch push. Therefore, we wrap it inLazyto initialize it in a thread safe way and to ensure that only one instance is created for the entire ingestion task. Important: Please don't use these writers directly, and you should retrieve the writer fromPartitionConsumptionState.getVeniceWriterLazyRef()when producing to the local topic.
-
veniceWriterForRealTime
protected final Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterForRealTime
-
kafkaClusterIdToUrlMap
protected final it.unimi.dsi.fastutil.ints.Int2ObjectMap<java.lang.String> kafkaClusterIdToUrlMap
-
viewWriters
protected final java.util.Map<java.lang.String,VeniceViewWriter> viewWriters
-
hasChangeCaptureView
protected final boolean hasChangeCaptureView
-
storeDeserializerCache
protected final AvroStoreDeserializerCache storeDeserializerCache
-
VALID_LAG
protected static final java.util.function.LongPredicate VALID_LAG
-
LEADER_OFFSET_LAG_FILTER
public static final java.util.function.Predicate<? super PartitionConsumptionState> LEADER_OFFSET_LAG_FILTER
-
-
Constructor Detail
-
LeaderFollowerStoreIngestionTask
public LeaderFollowerStoreIngestionTask(StoreIngestionTaskFactory.Builder builder, Store store, Version version, java.util.Properties kafkaConsumerProperties, java.util.function.BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction)
-
-
Method Detail
-
constructVeniceWriter
public static VeniceWriter<byte[],byte[],byte[]> constructVeniceWriter(VeniceWriterFactory veniceWriterFactory, java.lang.String topic, Version version, boolean producerCompressionEnabled, int producerCnt)
-
closeVeniceWriters
public void closeVeniceWriters(boolean doFlush)
- Overrides:
closeVeniceWritersin classStoreIngestionTask
-
closeVeniceViewWriters
protected void closeVeniceViewWriters()
- Overrides:
closeVeniceViewWritersin classStoreIngestionTask
-
getIngestionBatchProcessor
protected IngestionBatchProcessor getIngestionBatchProcessor()
- Specified by:
getIngestionBatchProcessorin classStoreIngestionTask
-
promoteToLeader
public void promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
- Specified by:
promoteToLeaderin classStoreIngestionTask
-
demoteToStandby
public void demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
- Specified by:
demoteToStandbyin classStoreIngestionTask
-
processConsumerAction
protected void processConsumerAction(ConsumerAction message, Store store) throws java.lang.InterruptedException
- Specified by:
processConsumerActionin classStoreIngestionTask- Throws:
java.lang.InterruptedException
-
checkLongRunningTaskState
protected void checkLongRunningTaskState() throws java.lang.InterruptedExceptionThe following function will be executed after processing all the quick actions in the consumer action queues, so that the long running actions doesn't block other partition's consumer actions. Besides, there is no thread sleeping operations in this function in order to be efficient, but this function will be invoked again and again in the main loop of the StoreIngestionTask to check whether some long-running actions can finish now. The only drawback is that for regular batch push, leader flag is never on at least a few minutes after the leader consumes the last message (END_OF_PUSH), which is an acceptable trade-off for us in order to share and test the same code path between regular push job, hybrid store and reprocessing job.- Specified by:
checkLongRunningTaskStatein classStoreIngestionTask- Throws:
java.lang.InterruptedException
-
checkWhetherToCloseUnusedVeniceWriter
protected static boolean checkWhetherToCloseUnusedVeniceWriter(Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterLazy, Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterForRealTimeLazy, java.util.Map<java.lang.Integer,PartitionConsumptionState> partitionConsumptionStateMap, java.lang.Runnable reInitializeVeniceWriterLazyRunnable, java.lang.String versionTopicName)
-
startConsumingAsLeaderInTransitionFromStandby
protected void startConsumingAsLeaderInTransitionFromStandby(PartitionConsumptionState partitionConsumptionState)
-
calculateLeaderUpstreamOffsetWithTopicSwitch
protected java.util.Map<java.lang.String,java.lang.Long> calculateLeaderUpstreamOffsetWithTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic, java.util.List<java.lang.CharSequence> unreachableBrokerList)
-
startConsumingAsLeader
protected void startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState)
- Overrides:
startConsumingAsLeaderin classStoreIngestionTask
-
leaderExecuteTopicSwitch
protected void leaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic)
-
syncConsumedUpstreamRTOffsetMapIfNeeded
protected void syncConsumedUpstreamRTOffsetMapIfNeeded(PartitionConsumptionState pcs, java.util.Map<java.lang.String,java.lang.Long> upstreamStartOffsetByKafkaURL)
-
waitForLastLeaderPersistFuture
protected void waitForLastLeaderPersistFuture(PartitionConsumptionState partitionConsumptionState, java.lang.String errorMsg)
-
getTopicPartitionOffsetByKafkaURL
protected long getTopicPartitionOffsetByKafkaURL(java.lang.CharSequence kafkaURL, PubSubTopicPartition pubSubTopicPartition, long rewindStartTimestamp)
-
getConsumptionSourceKafkaAddress
protected java.util.Set<java.lang.String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
- Specified by:
getConsumptionSourceKafkaAddressin classStoreIngestionTask
-
getRealTimeDataSourceKafkaAddress
protected java.util.Set<java.lang.String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
- Overrides:
getRealTimeDataSourceKafkaAddressin classStoreIngestionTask
-
shouldNewLeaderSwitchToRemoteConsumption
protected boolean shouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionState partitionConsumptionState)
-
shouldProduceToVersionTopic
protected boolean shouldProduceToVersionTopic(PartitionConsumptionState partitionConsumptionState)
For the corresponding partition being tracked in `partitionConsumptionState`, if it's in LEADER state and it's not consuming from version topic, it should produce the new message to version topic; besides, if LEADER is consuming remotely, it should also produce to local fabric. If buffer replay is disable, all replicas will stick to version topic, no one is going to produce any message.
-
isLeader
protected static boolean isLeader(PartitionConsumptionState partitionConsumptionState)
-
processTopicSwitch
protected boolean processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
ProcessTopicSwitchcontrol message at given partition offset for a specificPartitionConsumptionState. Return whether we need to execute additional ready-to-serve check after this message is processed.- Overrides:
processTopicSwitchin classStoreIngestionTask
-
syncTopicSwitchToIngestionMetadataService
protected void syncTopicSwitchToIngestionMetadataService(TopicSwitch topicSwitch, PartitionConsumptionState partitionConsumptionState)
-
updateOffsetsFromConsumerRecord
protected void updateOffsetsFromConsumerRecord(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateVersionTopicOffset updateVersionTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateUpstreamTopicOffset updateUpstreamTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.GetLastKnownUpstreamTopicOffset lastKnownUpstreamTopicOffsetSupplier, java.util.function.Supplier<java.lang.String> sourceKafkaUrlSupplier, boolean dryRun)
A helper function to the latest in-memory offsets processed by drainers inPartitionConsumptionState, after processing the givenPubSubMessage. When using this helper function to update the latest in-memory offsets processed by drainers inPartitionConsumptionState: "updateVersionTopicOffsetFunction" should try to update the VT offset inPartitionConsumptionState"updateRealtimeTopicOffsetFunction" should try to update the latest processed upstream offset map inPartitionConsumptionStateIn LeaderFollowerStoreIngestionTask, "sourceKafkaUrlSupplier" should always returnOffsetRecord.NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY; in ActiveActiveStoreIngestionTask, "sourceKafkaUrlSupplier" should return the actual source Kafka url of the "consumerRecordWrapper" Dry-run mode would only check whether the offset rewind is benign or not instead of persisting the processed offset.
-
updateOffsetMetadataInOffsetRecord
protected void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState)
Description copied from class:StoreIngestionTaskSync the metadata about offset inOffsetRecord.PartitionConsumptionStatewill pass through some information toOffsetRecordfor persistence and Offset rewind/split brain has been guarded inStoreIngestionTask.updateLatestInMemoryProcessedOffset(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState, com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long>, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext, java.lang.String, boolean).- Specified by:
updateOffsetMetadataInOffsetRecordin classStoreIngestionTask
-
updateLatestInMemoryProcessedOffset
protected void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, java.lang.String kafkaUrl, boolean dryRun)
Description copied from class:StoreIngestionTaskMaintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.- Specified by:
updateLatestInMemoryProcessedOffsetin classStoreIngestionTask
-
checkAndHandleUpstreamOffsetRewind
protected static void checkAndHandleUpstreamOffsetRewind(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, long newUpstreamOffset, long previousUpstreamOffset, LeaderFollowerStoreIngestionTask ingestionTask)
-
produceToLocalKafka
protected void produceToLocalKafka(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, java.util.function.BiConsumer<ChunkAwareCallback,LeaderMetadataWrapper> produceFunction, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs)
-
isRealTimeBufferReplayStarted
protected boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState)
- Specified by:
isRealTimeBufferReplayStartedin classStoreIngestionTask
-
measureHybridOffsetLag
protected long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
For Leader/Follower state model, we already keep track of the consumption progress in leader, so directly calculate the lag with the real-time topic and the leader consumption offset.- Specified by:
measureHybridOffsetLagin classStoreIngestionTask
-
measureRTOffsetLagForMultiRegions
protected long measureRTOffsetLagForMultiRegions(java.util.Set<java.lang.String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
-
isReadyToServeAnnouncedWithRTLag
public boolean isReadyToServeAnnouncedWithRTLag()
- Overrides:
isReadyToServeAnnouncedWithRTLagin classStoreIngestionTask
-
reportIfCatchUpVersionTopicOffset
protected void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState pcs)
Description copied from class:StoreIngestionTaskCheck if the ingestion progress has reached to the end of the version topic. This is currently only usedLeaderFollowerStoreIngestionTask.- Specified by:
reportIfCatchUpVersionTopicOffsetin classStoreIngestionTask
-
shouldProcessRecord
protected boolean shouldProcessRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record)
For Leader/Follower model, the follower should have the same kind of check as the Online/Offline model; for leader, it's possible that it consumers from real-time topic or GF topic.- Overrides:
shouldProcessRecordin classStoreIngestionTask
-
shouldPersistRecord
protected boolean shouldPersistRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record, PartitionConsumptionState partitionConsumptionState)
Additional safeguards in Leader/Follower ingestion: 1. Check whether the incoming messages are from the expected source topics- Overrides:
shouldPersistRecordin classStoreIngestionTask
-
recordWriterStats
protected void recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState)- Specified by:
recordWriterStatsin classStoreIngestionTask
-
recordProcessedRecordStats
protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize)
- Overrides:
recordProcessedRecordStatsin classStoreIngestionTask
-
getMaxRecordSizeBytes
protected int getMaxRecordSizeBytes()
-
getMaxNearlineRecordSizeBytes
protected int getMaxNearlineRecordSizeBytes()
-
calculateAssembledRecordSizeRatio
protected final double calculateAssembledRecordSizeRatio(long recordSize)
- Specified by:
calculateAssembledRecordSizeRatioin classStoreIngestionTask
-
recordAssembledRecordSizeRatio
protected final void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs)- Specified by:
recordAssembledRecordSizeRatioin classStoreIngestionTask
-
isHybridFollower
protected boolean isHybridFollower(PartitionConsumptionState partitionConsumptionState)
- Specified by:
isHybridFollowerin classStoreIngestionTask
-
shouldCheckLeaderCompleteStateInFollower
protected boolean shouldCheckLeaderCompleteStateInFollower()
For non AA hybrid stores with AGGREGATE DRP, SIT reads from parent RT while the HB is written to the child RTs. Once all hybrid stores are either AA for cross colo replication and non AA otherwise, DRP and this extra check can also be removed.- Specified by:
shouldCheckLeaderCompleteStateInFollowerin classStoreIngestionTask
-
checkAndLogIfLagIsAcceptableForHybridStore
protected boolean checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState pcs, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp)
Checks whether the lag is acceptable for hybrid storesIf the instance is a hybrid standby or DaVinciClient: Also check if
1. checkLeaderCompleteStateInFollower feature is enabled based on configs
2. leaderCompleteStatus has the leader state=completed and
3. the last update time was within the configured time interval to not use the stale leader state: checkConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS- Specified by:
checkAndLogIfLagIsAcceptableForHybridStorein classStoreIngestionTask
-
getAndUpdateLeaderCompletedState
protected void getAndUpdateLeaderCompletedState(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaValue, ControlMessage controlMessage, PubSubMessageHeaders pubSubMessageHeaders, PartitionConsumptionState partitionConsumptionState)
HeartBeat SOS messages carry the leader completion state in the header. This function extracts the leader completion state from that header and updates the {@param partitionConsumptionState} accordingly.
-
recordHeartbeatReceived
protected void recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, java.lang.String kafkaUrl)
- Overrides:
recordHeartbeatReceivedin classStoreIngestionTask
-
validateAndFilterOutDuplicateMessagesFromLeaderTopic
protected java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> validateAndFilterOutDuplicateMessagesFromLeaderTopic(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, java.lang.String kafkaUrl, PubSubTopicPartition topicPartition)
- Specified by:
validateAndFilterOutDuplicateMessagesFromLeaderTopicin classStoreIngestionTask
-
delegateConsumerRecord
protected StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed. It's decided based on the function output ofshouldProduceToVersionTopic(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState)and message type. It also perform any necessary additional computation operation such as for write-compute/update message. It returns a boolean indicating if it was produced to kafka or not. This function should be called as one of the first steps in processing pipeline for all messages consumed from any kafka topic. The caller of this function should only process this {@param consumerRecord} further if the return isStoreIngestionTask.DelegateConsumerRecordResult.QUEUED_TO_DRAINER. This function assumesshouldProcessRecord(PubSubMessage)has been called which happens inStoreIngestionTask.produceToStoreBufferServiceOrKafka(Iterable, PubSubTopicPartition, String, int)before calling this and the it was decided that this record needs to be processed. It does not perform any validation check on the PartitionConsumptionState object to keep the goal of the function simple and not overload. Also DIV validation is done here if the message is received from RT topic. For more info please see please see StoreIngestionTask#internalProcessConsumerRecord(PubSubMessage, PartitionConsumptionState, LeaderProducedRecordContext, int, String, long) This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this function.- Specified by:
delegateConsumerRecordin classStoreIngestionTask- Returns:
- a
StoreIngestionTask.DelegateConsumerRecordResultindicating what to do with the record
-
waitForAllMessageToBeProcessedFromTopicPartition
protected void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
Besides draining messages in the drainer queue, wait for the last producer future.- Overrides:
waitForAllMessageToBeProcessedFromTopicPartitionin classStoreIngestionTask- Parameters:
topicPartition- for which to wait- Throws:
java.lang.InterruptedException
-
getBatchReplicationLag
public long getBatchReplicationLag()
- Specified by:
getBatchReplicationLagin classStoreIngestionTask
-
getLeaderOffsetLag
public long getLeaderOffsetLag()
- Specified by:
getLeaderOffsetLagin classStoreIngestionTask
-
getBatchLeaderOffsetLag
public long getBatchLeaderOffsetLag()
- Specified by:
getBatchLeaderOffsetLagin classStoreIngestionTask
-
getHybridLeaderOffsetLag
public long getHybridLeaderOffsetLag()
- Specified by:
getHybridLeaderOffsetLagin classStoreIngestionTask
-
getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement
protected long getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, java.lang.String ignoredUpstreamKafkaUrl)
For L/F or NR, there is only one entry in upstreamOffsetMap whose key is NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY. Return the value of the entry.
-
getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement
protected long getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, java.lang.String ignoredKafkaUrl)
For regular L/F stores without A/A enabled, there is always only one real-time source.
-
updateLatestInMemoryLeaderConsumedRTOffset
protected void updateLatestInMemoryLeaderConsumedRTOffset(PartitionConsumptionState pcs, java.lang.String ignoredKafkaUrl, long offset)
-
getFollowerOffsetLag
public long getFollowerOffsetLag()
Description copied from class:StoreIngestionTaskMeasure the offset lag between follower and leader- Specified by:
getFollowerOffsetLagin classStoreIngestionTask
-
getBatchFollowerOffsetLag
public long getBatchFollowerOffsetLag()
- Specified by:
getBatchFollowerOffsetLagin classStoreIngestionTask
-
getHybridFollowerOffsetLag
public long getHybridFollowerOffsetLag()
- Specified by:
getHybridFollowerOffsetLagin classStoreIngestionTask
-
getRegionHybridOffsetLag
public long getRegionHybridOffsetLag(int regionId)
- Specified by:
getRegionHybridOffsetLagin classStoreIngestionTask
-
consumerUnSubscribeAllTopics
public void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState)
Unsubscribe from all the topics being consumed for the partition in partitionConsumptionState- Specified by:
consumerUnSubscribeAllTopicsin classStoreIngestionTask
-
getWriteComputeErrorCode
public int getWriteComputeErrorCode()
- Specified by:
getWriteComputeErrorCodein classStoreIngestionTask
-
updateLeaderTopicOnFollower
public void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState)
- Specified by:
updateLeaderTopicOnFollowerin classStoreIngestionTask
-
maybeCompressData
protected java.nio.ByteBuffer maybeCompressData(int partition, java.nio.ByteBuffer data, PartitionConsumptionState partitionConsumptionState)Compresses data in a bytebuffer when consuming from rt as a leader node and compression is enabled for the store version for which we're consuming data.- Parameters:
partition- which partition we're acting on so as to determine the PartitionConsumptionStatedata- the data that we might compress- Returns:
- a bytebuffer that's either the original bytebuffer or a new one depending on if we compressed it.
-
shouldCompressData
protected boolean shouldCompressData(PartitionConsumptionState partitionConsumptionState)
-
processMessageAndMaybeProduceToKafka
protected void processMessageAndMaybeProduceToKafka(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
-
measureRTOffsetLagForSingleRegion
protected long measureRTOffsetLagForSingleRegion(java.lang.String sourceRealTimeTopicKafkaURL, PartitionConsumptionState pcs, boolean shouldLog)This method fetches/calculates latest leader persisted offset and last offset in RT topic. The method relies ongetLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState, String)to fetch latest leader persisted offset for different data replication policy.- Returns:
- the lag (lastOffsetInRealTimeTopic - latestPersistedLeaderOffset)
-
processVersionSwapMessage
protected void processVersionSwapMessage(ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState)
Description copied from class:StoreIngestionTaskThis isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation inActiveActiveStoreIngestionTask- Overrides:
processVersionSwapMessagein classStoreIngestionTask
-
createProducerCallback
protected LeaderProducerCallback createProducerCallback(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs)
-
getVeniceWriter
protected Lazy<VeniceWriter<byte[],byte[],byte[]>> getVeniceWriter(PartitionConsumptionState partitionConsumptionState)
-
addPartitionConsumptionState
protected void addPartitionConsumptionState(java.lang.Integer partition, PartitionConsumptionState pcs)
-
maybeSendIngestionHeartbeat
protected java.util.Set<java.lang.String> maybeSendIngestionHeartbeat()
For hybrid stores only, the leader periodically writes a special SOS message to the RT topic with the following properties:
1. Special key: This key contains constant bytes, allowing for compaction.
2. Fixed/known producer GUID: This GUID is dedicated to heartbeats and prevents DIV from breaking.
3. Special segment: This segment never contains data, eliminating the need for an EOS message.
Upon consuming the SOS message, the leader writes it to its local VT. Once the drainer processes the record, the leader updates its latest processed upstream RT topic offset. At this point, the offset reflects the correct position, regardless of trailing CMs or skippable data records due to DCR.
This heartbeat message does not send a leader completion header. This results in having the leader completion states only in VTs and not in the RT, avoiding the need to differentiate between heartbeats from leaders of different versions (backup/current/future) and colos.
- Specified by:
maybeSendIngestionHeartbeatin classStoreIngestionTask- Returns:
- the set of partitions that failed to send heartbeat (used for tests)
-
resubscribe
protected void resubscribe(PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
Resubscribe operation by passing new version role and partition role toAggKafkaConsumerService. The action for leader and follower replica will be handled differently.- Specified by:
resubscribein classStoreIngestionTask- Throws:
java.lang.InterruptedException
-
resubscribeAsFollower
protected void resubscribeAsFollower(PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
resubscribeAsLeader
protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
-