/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.internals.MemberState;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicIdPartitionSet;
import org.apache.kafka.clients.consumer.internals.Utils;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public abstract class AbstractMembershipManager<R extends AbstractResponse>
implements RequestManager {
    static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator();
    protected final String groupId;
    protected final String memberId = Uuid.randomUuid().toString();
    protected int memberEpoch = 0;
    protected MemberState state;
    private LocalAssignment currentAssignment;
    protected final SubscriptionState subscriptions;
    private final Metadata metadata;
    protected final Logger log;
    private final Map<Uuid, String> assignedTopicNamesCache;
    private LocalAssignment currentTargetAssignment;
    private boolean reconciliationInProgress;
    private boolean rejoinedWhileReconciliationInProgress;
    private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
    private final List<MemberStateListener> stateUpdatesListeners;
    private CompletableFuture<Void> staleMemberAssignmentRelease;
    private final RebalanceMetricsManager metricsManager;
    private final Time time;
    private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false);
    private boolean isPollTimerExpired;
    private final boolean autoCommitEnabled;
    protected CloseOptions.GroupMembershipOperation leaveGroupOperation = CloseOptions.GroupMembershipOperation.DEFAULT;

    AbstractMembershipManager(String groupId, SubscriptionState subscriptions, Metadata metadata, Logger log, Time time, RebalanceMetricsManager metricsManager, boolean autoCommitEnabled) {
        this.groupId = groupId;
        this.state = MemberState.UNSUBSCRIBED;
        this.subscriptions = subscriptions;
        this.metadata = metadata;
        this.assignedTopicNamesCache = new HashMap<Uuid, String>();
        this.currentTargetAssignment = LocalAssignment.NONE;
        this.currentAssignment = LocalAssignment.NONE;
        this.log = log;
        this.stateUpdatesListeners = new ArrayList<MemberStateListener>();
        this.time = time;
        this.metricsManager = metricsManager;
        this.autoCommitEnabled = autoCommitEnabled;
    }

    protected void transitionTo(MemberState nextState) {
        if (!this.state.equals((Object)nextState) && !nextState.getPreviousValidStates().contains((Object)this.state)) {
            throw new IllegalStateException(String.format("Invalid state transition from %s to %s", new Object[]{this.state, nextState}));
        }
        if (AbstractMembershipManager.isCompletingRebalance(this.state, nextState)) {
            this.metricsManager.recordRebalanceEnded(this.time.milliseconds());
        }
        if (AbstractMembershipManager.isStartingRebalance(this.state, nextState)) {
            this.metricsManager.recordRebalanceStarted(this.time.milliseconds());
        }
        this.log.info("Member {} with epoch {} transitioned from {} to {}.", new Object[]{this.memberId, this.memberEpoch, this.state, nextState});
        this.state = nextState;
    }

    private static boolean isCompletingRebalance(MemberState currentState, MemberState nextState) {
        return currentState == MemberState.RECONCILING && (nextState == MemberState.STABLE || nextState == MemberState.ACKNOWLEDGING);
    }

    private static boolean isStartingRebalance(MemberState currentState, MemberState nextState) {
        return currentState != MemberState.RECONCILING && nextState == MemberState.RECONCILING;
    }

    public String groupId() {
        return this.groupId;
    }

    public String memberId() {
        return this.memberId;
    }

    public int memberEpoch() {
        return this.memberEpoch;
    }

    public CloseOptions.GroupMembershipOperation leaveGroupOperation() {
        return this.leaveGroupOperation;
    }

    public abstract void onHeartbeatSuccess(R var1);

    public void onHeartbeatFailure(boolean retriable) {
        if (!retriable) {
            this.metricsManager.maybeRecordRebalanceFailed();
        }
        if (this.state == MemberState.UNSUBSCRIBED && this.maybeCompleteLeaveInProgress()) {
            this.log.warn("Member {} with epoch {} received a failed response to the heartbeat to leave the group and completed the leave operation. ", (Object)this.memberId, (Object)this.memberEpoch);
        }
    }

    protected boolean maybeCompleteLeaveInProgress() {
        if (this.leaveGroupInProgress.isPresent()) {
            this.leaveGroupInProgress.get().complete(null);
            this.leaveGroupInProgress = Optional.empty();
            return true;
        }
        return false;
    }

    protected boolean isNotInGroup() {
        return this.state == MemberState.UNSUBSCRIBED || this.state == MemberState.FENCED || this.state == MemberState.FATAL || this.state == MemberState.STALE;
    }

    protected void processAssignmentReceived(Map<Uuid, SortedSet<Integer>> assignment) {
        this.replaceTargetAssignmentWithNewAssignment(assignment);
        if (!this.targetAssignmentReconciled()) {
            this.transitionTo(MemberState.RECONCILING);
        } else {
            this.log.debug("Target assignment {} received from the broker is equals to the member current assignment {}. Nothing to reconcile.", (Object)this.currentTargetAssignment, (Object)this.currentAssignment);
            if (this.state == MemberState.RECONCILING || this.state == MemberState.JOINING) {
                this.transitionTo(MemberState.STABLE);
            }
        }
    }

    private void replaceTargetAssignmentWithNewAssignment(Map<Uuid, SortedSet<Integer>> assignment) {
        this.currentTargetAssignment.updateWith(assignment).ifPresent(updatedAssignment -> {
            this.log.debug("Member {} updated its target assignment from {} to {}. Member will reconcile it on the next poll.", new Object[]{this.memberId, this.currentTargetAssignment, updatedAssignment});
            this.currentTargetAssignment = updatedAssignment;
            this.subscriptions.setAssignedTopicIds(this.currentTargetAssignment.partitions.keySet());
        });
    }

    public void transitionToFenced() {
        if (this.state == MemberState.PREPARE_LEAVING) {
            this.log.info("Member {} with epoch {} got fenced but it is already preparing to leave the group, so it will stop sending heartbeat and won't attempt to send the leave request or rejoin.", (Object)this.memberId, (Object)this.memberEpoch);
            this.transitionToSendingLeaveGroup(false);
            this.transitionTo(MemberState.UNSUBSCRIBED);
            this.maybeCompleteLeaveInProgress();
            return;
        }
        if (this.state == MemberState.LEAVING) {
            this.log.debug("Member {} with epoch {} got fenced before sending leave group heartbeat. It will not send the leave request and won't attempt to rejoin.", (Object)this.memberId, (Object)this.memberEpoch);
            this.transitionTo(MemberState.UNSUBSCRIBED);
            this.maybeCompleteLeaveInProgress();
            return;
        }
        if (this.state == MemberState.UNSUBSCRIBED) {
            this.log.debug("Member {} with epoch {} got fenced but it already left the group, so it won't attempt to rejoin.", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        this.transitionTo(MemberState.FENCED);
        this.resetEpoch();
        this.log.debug("Member {} with epoch {} transitioned to {} state. It will release its assignment and rejoin the group.", new Object[]{this.memberId, this.memberEpoch, MemberState.FENCED});
        CompletableFuture<Void> callbackResult = this.signalPartitionsLost(this.subscriptions.assignedPartitions());
        callbackResult.whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("onPartitionsLost callback invocation failed while releasing assignment after member got fenced. Member will rejoin the group anyways.", error);
            }
            this.clearAssignment();
            if (this.state == MemberState.FENCED) {
                this.transitionToJoining();
            } else {
                this.log.debug("Fenced member onPartitionsLost callback completed but the state has already changed to {}, so the member won't rejoin the group", (Object)this.state);
            }
        });
    }

    public void transitionToFatal() {
        MemberState previousState = this.state;
        this.transitionTo(MemberState.FATAL);
        this.log.error("Member {} with epoch {} transitioned to fatal state", (Object)this.memberId, (Object)this.memberEpoch);
        this.notifyEpochChange(Optional.empty());
        if (previousState == MemberState.UNSUBSCRIBED && this.maybeCompleteLeaveInProgress()) {
            this.log.debug("Member {} with epoch {} got fatal error from the broker but it already left the group, so onPartitionsLost callback won't be triggered.", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        if (previousState == MemberState.LEAVING || previousState == MemberState.PREPARE_LEAVING) {
            this.log.info("Member {} with epoch {} was leaving the group with state {} when it got a fatal error from the broker. It will discard the ongoing leave and remain in fatal state.", new Object[]{this.memberId, this.memberEpoch, previousState});
            this.maybeCompleteLeaveInProgress();
            return;
        }
        CompletableFuture<Void> callbackResult = this.signalPartitionsLost(this.subscriptions.assignedPartitions());
        callbackResult.whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("onPartitionsLost callback invocation failed while releasing assignmentafter member failed with fatal error.", error);
            }
            this.clearAssignment();
        });
    }

    public void onSubscriptionUpdated() {
        this.subscriptionUpdated.compareAndSet(false, true);
    }

    public void onConsumerPoll() {
        if (this.subscriptionUpdated.compareAndSet(true, false) && this.state == MemberState.UNSUBSCRIBED) {
            this.transitionToJoining();
        }
    }

    private void clearAssignment() {
        if (this.subscriptions.hasAutoAssignedPartitions()) {
            this.subscriptions.assignFromSubscribed(Collections.emptySet());
            this.notifyAssignmentChange(Collections.emptySet());
        }
        this.currentAssignment = LocalAssignment.NONE;
        this.clearPendingAssignmentsAndLocalNamesCache();
    }

    private void updateSubscriptionAwaitingCallback(TopicIdPartitionSet assignedPartitions, SortedSet<TopicPartition> addedPartitions) {
        this.subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(), addedPartitions);
        this.notifyAssignmentChange(assignedPartitions.topicPartitions());
    }

    public void transitionToJoining() {
        if (this.state == MemberState.FATAL) {
            this.log.warn("No action taken to join the group with the updated subscription because the member is in FATAL state");
            return;
        }
        if (this.reconciliationInProgress) {
            this.rejoinedWhileReconciliationInProgress = true;
        }
        this.resetEpoch();
        this.transitionTo(MemberState.JOINING);
        this.log.debug("Member {} will join the group on the next call to poll.", (Object)this.memberId);
        this.clearPendingAssignmentsAndLocalNamesCache();
    }

    public CompletableFuture<Void> leaveGroupOnClose(CloseOptions.GroupMembershipOperation membershipOperation) {
        this.leaveGroupOperation = membershipOperation;
        return this.leaveGroup(false);
    }

    public CompletableFuture<Void> leaveGroup() {
        return this.leaveGroup(true);
    }

    protected CompletableFuture<Void> leaveGroup(boolean runCallbacks) {
        if (this.isNotInGroup()) {
            if (this.state == MemberState.FENCED) {
                this.clearAssignment();
                this.transitionTo(MemberState.UNSUBSCRIBED);
            }
            this.subscriptions.unsubscribe();
            this.notifyAssignmentChange(Collections.emptySet());
            return CompletableFuture.completedFuture(null);
        }
        if (this.state == MemberState.PREPARE_LEAVING || this.state == MemberState.LEAVING) {
            this.log.debug("Leave group operation already in progress for member {}", (Object)this.memberId);
            return this.leaveGroupInProgress.get();
        }
        this.transitionTo(MemberState.PREPARE_LEAVING);
        CompletableFuture<Void> leaveResult = new CompletableFuture<Void>();
        this.leaveGroupInProgress = Optional.of(leaveResult);
        if (runCallbacks) {
            CompletableFuture<Void> callbackResult = this.signalMemberLeavingGroup();
            callbackResult.whenComplete((result, error) -> {
                if (error != null) {
                    this.log.error("Member {} callback to release assignment failed. It will proceed to clear its assignment and send a leave group heartbeat", (Object)this.memberId, error);
                } else {
                    this.log.info("Member {} completed callback to release assignment. It will proceed to clear its assignment and send a leave group heartbeat", (Object)this.memberId);
                }
                this.clearAssignmentAndLeaveGroup();
            });
        } else {
            this.log.debug("Member {} attempting to leave has no rebalance callbacks, so it will clear assignments and transition to send heartbeat to leave group.", (Object)this.memberId);
            this.clearAssignmentAndLeaveGroup();
        }
        return leaveResult;
    }

    private void clearAssignmentAndLeaveGroup() {
        this.subscriptions.unsubscribe();
        this.clearAssignment();
        this.transitionToSendingLeaveGroup(false);
    }

    public void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
        if (this.state == MemberState.FATAL) {
            this.log.warn("Member {} with epoch {} won't send leave group request because it is in FATAL state", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        if (this.state == MemberState.UNSUBSCRIBED) {
            this.log.warn("Member {} won't send leave group request because it is already out of the group.", (Object)this.memberId);
            return;
        }
        if (dueToExpiredPollTimer) {
            this.isPollTimerExpired = true;
            this.transitionTo(MemberState.PREPARE_LEAVING);
        }
        this.updateMemberEpoch(this.leaveGroupEpoch());
        this.currentAssignment = LocalAssignment.NONE;
        this.transitionTo(MemberState.LEAVING);
    }

    void notifyEpochChange(Optional<Integer> epoch) {
        this.stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, this.memberId));
    }

    void notifyAssignmentChange(Set<TopicPartition> partitions) {
        this.stateUpdatesListeners.forEach(stateListener -> stateListener.onGroupAssignmentUpdated(partitions));
    }

    public boolean shouldHeartbeatNow() {
        MemberState state = this.state();
        return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING;
    }

    public void onHeartbeatRequestGenerated() {
        MemberState state = this.state();
        if (state == MemberState.ACKNOWLEDGING) {
            if (this.targetAssignmentReconciled()) {
                this.transitionTo(MemberState.STABLE);
            } else {
                this.log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent to ack a previous reconciliation. \n\t\tCurrent assignment: {} \n\t\tTarget assignment: {}\n", new Object[]{this.memberId, this.memberEpoch, MemberState.RECONCILING, this.currentAssignment, this.currentTargetAssignment});
                this.transitionTo(MemberState.RECONCILING);
            }
        } else if (state == MemberState.LEAVING) {
            if (this.isPollTimerExpired) {
                this.log.debug("Member {} with epoch {} generated the heartbeat to leave due to expired poll timer. It will remain stale (no heartbeat) until it rejoins the group on the next consumer poll.", (Object)this.memberId, (Object)this.memberEpoch);
                this.transitionToStale();
            } else {
                this.log.debug("Member {} with epoch {} generated the heartbeat to leave the group.", (Object)this.memberId, (Object)this.memberEpoch);
                this.transitionTo(MemberState.UNSUBSCRIBED);
            }
        }
    }

    public void onHeartbeatRequestSkipped() {
        if (this.state == MemberState.LEAVING) {
            this.log.warn("Heartbeat to leave group cannot be sent (most probably due to coordinator not known/available). Member {} with epoch {} will transition to {}.", new Object[]{this.memberId, this.memberEpoch, MemberState.UNSUBSCRIBED});
            this.transitionTo(MemberState.UNSUBSCRIBED);
            this.maybeCompleteLeaveInProgress();
        }
    }

    private boolean targetAssignmentReconciled() {
        return this.currentAssignment.equals(this.currentTargetAssignment);
    }

    public boolean shouldSkipHeartbeat() {
        MemberState state = this.state();
        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE || state == MemberState.FENCED;
    }

    public boolean isLeavingGroup() {
        MemberState state = this.state();
        return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING;
    }

    public void maybeRejoinStaleMember() {
        this.isPollTimerExpired = false;
        if (this.state == MemberState.STALE) {
            this.log.debug("Expired poll timer has been reset so stale member {} will rejoin the group when it completes releasing its previous assignment.", (Object)this.memberId);
            this.staleMemberAssignmentRelease.whenComplete((__, error) -> this.transitionToJoining());
        }
    }

    private void transitionToStale() {
        this.transitionTo(MemberState.STALE);
        CompletableFuture<Void> callbackResult = this.signalPartitionsLost(this.subscriptions.assignedPartitions());
        this.staleMemberAssignmentRelease = callbackResult.whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("onPartitionsLost callback invocation failed while releasing assignment after member left group due to expired poll timer.", error);
            }
            this.clearAssignment();
            this.log.debug("Member {} sent leave group heartbeat and released its assignment. It will remain in {} state until the poll timer is reset, and it will then rejoin the group", (Object)this.memberId, (Object)MemberState.STALE);
        });
    }

    public void maybeReconcile(boolean canCommit) {
        if (this.state != MemberState.RECONCILING) {
            return;
        }
        if (this.targetAssignmentReconciled()) {
            this.log.trace("Ignoring reconciliation attempt. Target assignment is equal to the current assignment.");
            return;
        }
        if (this.reconciliationInProgress) {
            this.log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment {} will be handled in the next reconciliation loop.", (Object)this.currentTargetAssignment);
            return;
        }
        TopicIdPartitionSet assignedTopicIdPartitions = this.findResolvableAssignmentAndTriggerMetadataUpdate();
        LocalAssignment resolvedAssignment = new LocalAssignment(this.currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
        if (!this.currentAssignment.isNone() && resolvedAssignment.partitions.equals(this.currentAssignment.partitions)) {
            this.log.debug("There are unresolved partitions, and the resolvable fragment of the target assignment {} is equal to the current assignment. Bumping the local epoch of the assignment and acknowledging the partially resolved assignment", resolvedAssignment.partitions);
            this.currentAssignment = resolvedAssignment;
            this.transitionTo(MemberState.ACKNOWLEDGING);
            return;
        }
        if (this.autoCommitEnabled && !canCommit) {
            return;
        }
        this.markReconciliationInProgress();
        SortedSet<TopicPartition> assignedTopicPartitions = assignedTopicIdPartitions.toTopicNamePartitionSet();
        TreeSet<TopicPartition> ownedPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        ownedPartitions.addAll(this.subscriptions.assignedPartitions());
        TreeSet<TopicPartition> addedPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        addedPartitions.addAll(assignedTopicPartitions);
        addedPartitions.removeAll(ownedPartitions);
        TreeSet<TopicPartition> revokedPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        revokedPartitions.addAll(ownedPartitions);
        revokedPartitions.removeAll(assignedTopicPartitions);
        this.log.info("Reconciling assignment with local epoch {}\n\tMember:                                    {}\n\tAssigned partitions:                       {}\n\tCurrent owned partitions:                  {}\n\tAdded partitions (assigned - owned):       {}\n\tRevoked partitions (owned - assigned):     {}\n", new Object[]{resolvedAssignment.localEpoch, this.memberId, assignedTopicPartitions, ownedPartitions, addedPartitions, revokedPartitions});
        this.markPendingRevocationToPauseFetching(revokedPartitions);
        CompletableFuture<Void> commitResult = this.signalReconciliationStarted();
        ((CompletableFuture)commitResult.whenComplete((__, commitReqError) -> {
            if (commitReqError != null) {
                this.log.error("Auto-commit request before reconciling new assignment failed. Will proceed with the reconciliation anyway.", commitReqError);
            } else {
                this.log.debug("Auto-commit before reconciling new assignment completed successfully.");
            }
            if (!this.maybeAbortReconciliation()) {
                this.revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions);
            }
        })).exceptionally(error -> {
            if (error != null) {
                this.log.error("Reconciliation failed.", error);
            }
            return null;
        });
    }

    long getDeadlineMsForTimeout(long timeoutMs) {
        long expiration = this.time.milliseconds() + timeoutMs;
        if (expiration < 0L) {
            return Long.MAX_VALUE;
        }
        return expiration;
    }

    private void revokeAndAssign(LocalAssignment resolvedAssignment, TopicIdPartitionSet assignedTopicIdPartitions, SortedSet<TopicPartition> revokedPartitions, SortedSet<TopicPartition> addedPartitions) {
        CompletableFuture<Object> revocationResult = !revokedPartitions.isEmpty() ? this.revokePartitions(revokedPartitions) : CompletableFuture.completedFuture(null);
        CompletionStage reconciliationResult = revocationResult.thenCompose(__ -> {
            if (!this.maybeAbortReconciliation()) {
                return this.assignPartitions(assignedTopicIdPartitions, addedPartitions);
            }
            return CompletableFuture.completedFuture(null);
        });
        ((CompletableFuture)reconciliationResult).whenComplete((__, error) -> {
            if (error != null) {
                this.log.error("Reconciliation failed.", error);
                this.markReconciliationCompleted();
            } else if (this.reconciliationInProgress && !this.maybeAbortReconciliation()) {
                this.currentAssignment = resolvedAssignment;
                this.signalReconciliationCompleting();
                this.transitionTo(MemberState.ACKNOWLEDGING);
                this.markReconciliationCompleted();
            }
        });
    }

    boolean maybeAbortReconciliation() {
        boolean shouldAbort;
        boolean bl = shouldAbort = this.state != MemberState.RECONCILING || this.rejoinedWhileReconciliationInProgress;
        if (shouldAbort) {
            String reason = this.rejoinedWhileReconciliationInProgress ? "the member has re-joined the group" : "the member already transitioned out of the reconciling state into " + String.valueOf((Object)this.state);
            this.log.info("Interrupting reconciliation that is not relevant anymore because {}", (Object)reason);
            this.markReconciliationCompleted();
        }
        return shouldAbort;
    }

    void updateAssignment(Map<Uuid, SortedSet<Integer>> partitions) {
        this.currentAssignment = new LocalAssignment(0L, partitions);
    }

    protected CompletableFuture<Void> signalReconciliationStarted() {
        return CompletableFuture.completedFuture(null);
    }

    protected void signalReconciliationCompleting() {
    }

    protected CompletableFuture<Void> signalMemberLeavingGroup() {
        return CompletableFuture.completedFuture(null);
    }

    protected CompletableFuture<Void> signalPartitionsLost(Set<TopicPartition> partitionsLost) {
        return CompletableFuture.completedFuture(null);
    }

    void markReconciliationInProgress() {
        this.reconciliationInProgress = true;
        this.rejoinedWhileReconciliationInProgress = false;
    }

    void markReconciliationCompleted() {
        this.reconciliationInProgress = false;
        this.rejoinedWhileReconciliationInProgress = false;
    }

    private TopicIdPartitionSet findResolvableAssignmentAndTriggerMetadataUpdate() {
        TopicIdPartitionSet assignmentReadyToReconcile = new TopicIdPartitionSet();
        HashMap<Uuid, SortedSet<Integer>> unresolved = new HashMap<Uuid, SortedSet<Integer>>(this.currentTargetAssignment.partitions);
        Iterator<Map.Entry<Uuid, SortedSet<Integer>>> it = unresolved.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Uuid, SortedSet<Integer>> e = it.next();
            Uuid topicId = e.getKey();
            SortedSet<Integer> topicPartitions = e.getValue();
            Optional<String> nameFromMetadata = this.findTopicNameInGlobalOrLocalCache(topicId);
            nameFromMetadata.ifPresent(resolvedTopicName -> {
                assignmentReadyToReconcile.addAll(topicId, (String)resolvedTopicName, (Set<Integer>)topicPartitions);
                it.remove();
            });
        }
        if (!unresolved.isEmpty()) {
            this.log.debug("Topic Ids {} received in target assignment were not found in metadata and are not currently assigned. Requesting a metadata update now to resolve topic names.", unresolved.keySet());
            this.metadata.requestUpdate(true);
        }
        return assignmentReadyToReconcile;
    }

    private Optional<String> findTopicNameInGlobalOrLocalCache(Uuid topicId) {
        String nameFromMetadataCache = this.metadata.topicNames().getOrDefault(topicId, null);
        if (nameFromMetadataCache != null) {
            this.assignedTopicNamesCache.put(topicId, nameFromMetadataCache);
            return Optional.of(nameFromMetadataCache);
        }
        String nameFromSubscriptionCache = this.assignedTopicNamesCache.getOrDefault(topicId, null);
        return Optional.ofNullable(nameFromSubscriptionCache);
    }

    CompletableFuture<Void> revokePartitions(Set<TopicPartition> partitionsToRevoke) {
        HashSet<TopicPartition> revokedPartitions = new HashSet<TopicPartition>(partitionsToRevoke);
        revokedPartitions.retainAll(this.subscriptions.assignedPartitions());
        this.log.info("Revoking previously assigned partitions {}", revokedPartitions);
        this.signalPartitionsBeingRevoked(revokedPartitions);
        this.markPendingRevocationToPauseFetching(revokedPartitions);
        CompletableFuture<Void> revocationResult = new CompletableFuture<Void>();
        if (this.state == MemberState.FATAL) {
            String errorMsg = String.format("Member %s with epoch %s received a fatal error while waiting for a revocation commit to complete. Will abort revocation without triggering user callback.", this.memberId, this.memberEpoch);
            this.log.debug(errorMsg);
            revocationResult.completeExceptionally(new KafkaException(errorMsg));
            return revocationResult;
        }
        CompletableFuture<Void> userCallbackResult = this.signalPartitionsRevoked(revokedPartitions);
        userCallbackResult.whenComplete((callbackResult, callbackError) -> {
            if (callbackError != null) {
                this.log.error("onPartitionsRevoked callback invocation failed for partitions {}", (Object)revokedPartitions, callbackError);
                revocationResult.completeExceptionally((Throwable)callbackError);
            } else {
                revocationResult.complete(null);
            }
        });
        return revocationResult;
    }

    private CompletableFuture<Void> assignPartitions(TopicIdPartitionSet assignedPartitions, SortedSet<TopicPartition> addedPartitions) {
        this.updateSubscriptionAwaitingCallback(assignedPartitions, addedPartitions);
        CompletableFuture<Void> result = this.signalPartitionsAssigned(addedPartitions);
        result.whenComplete((__, exception) -> {
            if (exception == null) {
                this.subscriptions.enablePartitionsAwaitingCallback(assignedPartitions.topicPartitions());
            } else if (!addedPartitions.isEmpty()) {
                this.log.warn("Leaving newly assigned partitions {} marked as non-fetchable and not requiring initializing positions after onPartitionsAssigned callback failed.", (Object)addedPartitions, exception);
            }
        });
        SortedSet<String> assignedTopics = assignedPartitions.topicNames();
        this.assignedTopicNamesCache.values().retainAll(assignedTopics);
        return result;
    }

    public CompletableFuture<Void> signalPartitionsAssigned(Set<TopicPartition> partitionsAssigned) {
        return CompletableFuture.completedFuture(null);
    }

    public void signalPartitionsBeingRevoked(Set<TopicPartition> partitionsToRevoke) {
    }

    public CompletableFuture<Void> signalPartitionsRevoked(Set<TopicPartition> partitionsRevoked) {
        return CompletableFuture.completedFuture(null);
    }

    private void markPendingRevocationToPauseFetching(Set<TopicPartition> partitionsToRevoke) {
        this.log.debug("Marking partitions pending for revocation: {}", partitionsToRevoke);
        this.subscriptions.markPendingRevocation(partitionsToRevoke);
    }

    private void clearPendingAssignmentsAndLocalNamesCache() {
        this.currentTargetAssignment = LocalAssignment.NONE;
        this.assignedTopicNamesCache.clear();
    }

    protected void resetEpoch() {
        this.updateMemberEpoch(this.joinGroupEpoch());
    }

    abstract int joinGroupEpoch();

    abstract int leaveGroupEpoch();

    protected void updateMemberEpoch(int newEpoch) {
        boolean newEpochReceived = this.memberEpoch != newEpoch;
        this.memberEpoch = newEpoch;
        if (newEpochReceived) {
            if (this.memberEpoch > 0) {
                this.notifyEpochChange(Optional.of(this.memberEpoch));
            } else {
                this.notifyEpochChange(Optional.empty());
            }
        }
    }

    public MemberState state() {
        return this.state;
    }

    public LocalAssignment currentAssignment() {
        return this.currentAssignment;
    }

    Set<Uuid> topicsAwaitingReconciliation() {
        return this.topicPartitionsAwaitingReconciliation().keySet();
    }

    Map<Uuid, SortedSet<Integer>> topicPartitionsAwaitingReconciliation() {
        if (this.currentTargetAssignment == LocalAssignment.NONE) {
            return Collections.emptyMap();
        }
        if (this.currentAssignment == LocalAssignment.NONE) {
            return this.currentTargetAssignment.partitions;
        }
        HashMap topicPartitionMap = new HashMap();
        this.currentTargetAssignment.partitions.forEach((topicId, targetPartitions) -> {
            SortedSet<Integer> reconciledPartitions = this.currentAssignment.partitions.get(topicId);
            if (!targetPartitions.equals(reconciledPartitions)) {
                TreeSet missingPartitions = new TreeSet(targetPartitions);
                if (reconciledPartitions != null) {
                    missingPartitions.removeAll(reconciledPartitions);
                }
                topicPartitionMap.put(topicId, missingPartitions);
            }
        });
        return Collections.unmodifiableMap(topicPartitionMap);
    }

    boolean reconciliationInProgress() {
        return this.reconciliationInProgress;
    }

    public void registerStateListener(MemberStateListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("State updates listener cannot be null");
        }
        this.stateUpdatesListeners.add(listener);
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        this.maybeReconcile(false);
        return NetworkClientDelegate.PollResult.EMPTY;
    }

    List<MemberStateListener> stateListeners() {
        return Collections.unmodifiableList(this.stateUpdatesListeners);
    }

    boolean subscriptionUpdated() {
        return this.subscriptionUpdated.get();
    }

    public static class LocalAssignment {
        public static final long NONE_EPOCH = -1L;
        public static final LocalAssignment NONE = new LocalAssignment(-1L, Collections.emptyMap());
        public final long localEpoch;
        public final Map<Uuid, SortedSet<Integer>> partitions;

        public LocalAssignment(long localEpoch, Map<Uuid, SortedSet<Integer>> partitions) {
            this.localEpoch = localEpoch;
            this.partitions = partitions;
            if (localEpoch == -1L && !partitions.isEmpty()) {
                throw new IllegalArgumentException("Local epoch must be set if there are partitions");
            }
        }

        public LocalAssignment(long localEpoch, TopicIdPartitionSet topicIdPartitions) {
            Objects.requireNonNull(topicIdPartitions);
            this.localEpoch = localEpoch;
            if (localEpoch == -1L && !topicIdPartitions.isEmpty()) {
                throw new IllegalArgumentException("Local epoch must be set if there are partitions");
            }
            this.partitions = topicIdPartitions.toTopicIdPartitionMap();
        }

        public String toString() {
            return "LocalAssignment{localEpoch=" + this.localEpoch + ", partitions=" + String.valueOf(this.partitions) + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            LocalAssignment that = (LocalAssignment)o;
            return this.localEpoch == that.localEpoch && Objects.equals(this.partitions, that.partitions);
        }

        public int hashCode() {
            return Objects.hash(this.localEpoch, this.partitions);
        }

        public boolean isNone() {
            return this.localEpoch == -1L;
        }

        Optional<LocalAssignment> updateWith(Map<Uuid, SortedSet<Integer>> assignment) {
            if (this.localEpoch != -1L && assignment.equals(this.partitions)) {
                return Optional.empty();
            }
            long nextLocalEpoch = this.localEpoch + 1L;
            return Optional.of(new LocalAssignment(nextLocalEpoch, assignment));
        }
    }
}

