Skip to content

Commit

Permalink
Reuse native transport-driven futures in Debounce.
Browse files Browse the repository at this point in the history
Patch by Alex Petrov; reviewed by Sam Tunnicliffe for CASSANDRA-19158.
  • Loading branch information
ifesdjeen committed May 28, 2024
1 parent 67bbbb0 commit 2e05cd4
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 141 deletions.
28 changes: 17 additions & 11 deletions src/java/org/apache/cassandra/auth/CassandraRoleManager.java
Expand Up @@ -137,29 +137,35 @@ static int getGensaltLogRounds()
public CassandraRoleManager()
{
supportedOptions = DatabaseDescriptor.getAuthenticator() instanceof PasswordAuthenticator
? ImmutableSet.of(Option.LOGIN, Option.SUPERUSER, Option.PASSWORD, Option.HASHED_PASSWORD)
: ImmutableSet.of(Option.LOGIN, Option.SUPERUSER);
? ImmutableSet.of(Option.LOGIN, Option.SUPERUSER, Option.PASSWORD, Option.HASHED_PASSWORD)
: ImmutableSet.of(Option.LOGIN, Option.SUPERUSER);
alterableOptions = DatabaseDescriptor.getAuthenticator() instanceof PasswordAuthenticator
? ImmutableSet.of(Option.PASSWORD, Option.HASHED_PASSWORD)
: ImmutableSet.<Option>of();
? ImmutableSet.of(Option.PASSWORD, Option.HASHED_PASSWORD)
: ImmutableSet.<Option>of();
}

@Override
public void setup(boolean asyncRoleSetup)
{
loadRoleStatement();
loadIdentityStatement();
if (asyncRoleSetup)
if (!asyncRoleSetup)
{
scheduleSetupTask(() -> {
try
{
// Try to set up synchronously
setupDefaultRole();
return null;
});
return;
}
catch (Throwable t)
{
// We tried to execute the task in a sync way, but failed. Try asynchronous setup.
}
}
else
{
scheduleSetupTask(() -> {
setupDefaultRole();
}
return null;
});
}

@Override
Expand Down
Expand Up @@ -474,6 +474,7 @@ public enum CassandraRelevantProperties
SET_SEP_THREAD_NAME("cassandra.set_sep_thread_name", "true"),
SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", "2000"),
SIZE_RECORDER_INTERVAL("cassandra.size_recorder_interval", "300"),
SKIP_AUTH_SETUP("cassandra.skip_auth_setup", "false"),
SKIP_GC_INSPECTOR("cassandra.skip_gc_inspector", "false"),
SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE("cassandra.skip_paxos_repair_on_topology_change"),
/** If necessary for operational purposes, permit certain keyspaces to be ignored for paxos topology repairs. */
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/gms/GossipVerbHandler.java
Expand Up @@ -20,11 +20,13 @@

import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.tcm.ClusterMetadataService;

public class GossipVerbHandler<T> implements IVerbHandler<T>
{
public void doVerb(Message<T> message)
{
Gossiper.instance.setLastProcessedMessageAt(message.creationTimeMillis());
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(), message.epoch());
}
}
11 changes: 8 additions & 3 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -446,7 +446,7 @@ public static List<Range<Token>> getAllRanges(List<Token> sortedTokens)
private boolean isSurveyMode = TEST_WRITE_SURVEY.getBoolean(false);
/* true if node is rebuilding and receiving data */
private volatile boolean initialized = false;
private final AtomicBoolean authSetupCalled = new AtomicBoolean(false);
private final AtomicBoolean authSetupCalled = new AtomicBoolean(CassandraRelevantProperties.SKIP_AUTH_SETUP.getBoolean());
private volatile boolean authSetupComplete = false;

/* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
Expand Down Expand Up @@ -1093,12 +1093,17 @@ private void exitWriteSurveyMode()
InProgressSequences.finishInProgressSequences(id);
}

void doAuthSetup()
{
doAuthSetup(true);
}

@VisibleForTesting
public void doAuthSetup()
public void doAuthSetup(boolean async)
{
if (!authSetupCalled.getAndSet(true))
{
DatabaseDescriptor.getRoleManager().setup();
DatabaseDescriptor.getRoleManager().setup(async);
DatabaseDescriptor.getAuthenticator().setup();
DatabaseDescriptor.getAuthorizer().setup();
DatabaseDescriptor.getNetworkAuthorizer().setup();
Expand Down
98 changes: 33 additions & 65 deletions src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
Expand Up @@ -18,110 +18,78 @@

package org.apache.cassandra.tcm;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.Closeable;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Promise;

/**
* When debouncing from a replica we know exactly which epoch we need, so to avoid retries we
* keep track of which epoch we are currently debouncing, and if a request for a newer epoch
* comes in, we create a new future. If a request for a newer epoch comes in, we simply
* swap out the current future reference for a new one which is requesting the newer epoch.
*/
public class EpochAwareDebounce
public class EpochAwareDebounce implements Closeable
{
private static final Logger logger = LoggerFactory.getLogger(EpochAwareDebounce.class);
public static final EpochAwareDebounce instance = new EpochAwareDebounce();
private final AtomicReference<EpochAwareAsyncPromise> currentFuture = new AtomicReference<>();
private final ExecutorPlus executor;
private final List<Promise<LogState>> inflightRequests = new CopyOnWriteArrayList<>();

private final AtomicReference<EpochAwareFuture> currentFuture = new AtomicReference<>();

private EpochAwareDebounce()
{
// 2 threads since we might start a new debounce for a newer epoch while the old one is executing
this.executor = ExecutorFactory.Global.executorFactory().pooled("debounce", 2);
}

/**
* Deduplicate requests to catch up log state based on the desired epoch. Callers supply a target epoch and
* a function obtain the ClusterMetadata that corresponds with it. It is expected that this function will make rpc
* calls to peers, retrieving a LogState which can be applied locally to produce the necessary {@code
* ClusterMetadata}. The function takes a {@code Promise<LogState>} as input, with the expectation that this
* specific instance will be used to provide blocking behaviour when making the rpc calls that fetch the {@code
* LogState}. These promises are memoized in order to cancel them when {@link #shutdownAndWait(long, TimeUnit)} is
* called. This causes the fetch function to stop waiting on any in flight {@code LogState} requests and prevents
* shutdown from being blocked.
* ClusterMetadata}.
*
* @param fetchFunction executes the request for LogState. It's expected that this popluates fetchResult with the
* successful result.
* @param fetchFunction supplies the future that, when dereferenced, will yield metadata for the desired epoch
* @param epoch the desired epoch
* @return
*/
public Future<ClusterMetadata> getAsync(Function<Promise<LogState>, ClusterMetadata> fetchFunction,
Epoch epoch)
public Future<ClusterMetadata> getAsync(Supplier<Future<ClusterMetadata>> fetchFunction, Epoch epoch)
{
while (true)
{
EpochAwareAsyncPromise running = currentFuture.get();
if (running != null && !running.isDone() && running.epoch.isEqualOrAfter(epoch))
return running;
EpochAwareFuture running = currentFuture.get();
// Someone else is about to install a new future
if (running == SENTINEL)
continue;

Promise<LogState> fetchResult = new AsyncPromise<>();
if (running != null && !running.future.isDone() && running.epoch.isEqualOrAfter(epoch))
return running.future;

EpochAwareAsyncPromise promise = new EpochAwareAsyncPromise(epoch);
if (currentFuture.compareAndSet(running, promise))
if (currentFuture.compareAndSet(running, SENTINEL))
{
fetchResult.addCallback((logState, error) -> {
logger.debug("Removing future remotely requesting epoch {} from in flight list", epoch);
inflightRequests.remove(fetchResult);
});
inflightRequests.add(fetchResult);

executor.submit(() -> {
try
{
promise.setSuccess(fetchFunction.apply(fetchResult));
}
catch (Throwable t)
{
fetchResult.cancel(true);
inflightRequests.remove(fetchResult);
promise.setFailure(t);
}
});
return promise;
EpochAwareFuture promise = new EpochAwareFuture(epoch, fetchFunction.get());
boolean res = currentFuture.compareAndSet(SENTINEL, promise);
assert res : "Should not have happened";
return promise.future;
}
}
}

private static class EpochAwareAsyncPromise extends AsyncPromise<ClusterMetadata>
private static final EpochAwareFuture SENTINEL = new EpochAwareFuture(Epoch.EMPTY, null);

@Override
public void close()
{
EpochAwareFuture future = currentFuture.get();
if (future != null && future != SENTINEL)
future.future.cancel(true);
}

private static class EpochAwareFuture
{
private final Epoch epoch;
public EpochAwareAsyncPromise(Epoch epoch)
private final Future<ClusterMetadata> future;
public EpochAwareFuture(Epoch epoch, Future<ClusterMetadata> future)
{
this.epoch = epoch;
this.future = future;
}
}

public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
logger.info("Cancelling {} in flight log fetch requests", inflightRequests.size());
for (Promise<LogState> toCancel : inflightRequests)
toCancel.cancel(true);
ExecutorUtils.shutdownAndWait(timeout, unit, executor);
}
}
48 changes: 31 additions & 17 deletions src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +34,7 @@
import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Promise;

Expand Down Expand Up @@ -74,40 +74,54 @@ public ClusterMetadata fetchLogEntriesAndWait(InetAddressAndPort remote, Epoch a

public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort remote, Epoch awaitAtleast)
{
Function<Promise<LogState>, ClusterMetadata> fn = promise -> fetchLogEntriesAndWaitInternal(promise, remote, awaitAtleast);
return EpochAwareDebounce.instance.getAsync(fn, awaitAtleast);
return EpochAwareDebounce.instance.getAsync(() -> fetchLogEntriesAndWaitInternal(remote, awaitAtleast), awaitAtleast);
}

private ClusterMetadata fetchLogEntriesAndWaitInternal(Promise<LogState> remoteRequest, InetAddressAndPort remote, Epoch awaitAtleast)
private Future<ClusterMetadata> fetchLogEntriesAndWaitInternal(InetAddressAndPort remote, Epoch awaitAtleast)
{
Epoch before = ClusterMetadata.current().epoch;
if (before.isEqualOrAfter(awaitAtleast))
return ClusterMetadata.current();
{
Promise<ClusterMetadata> res = new AsyncPromise<>();
res.setSuccess(ClusterMetadata.current());
return res;
}

Promise<LogState> fetchRes = new AsyncPromise<>();
logger.info("Fetching log from {}, at least {}", remote, awaitAtleast);

try (Timer.Context ctx = TCMMetrics.instance.fetchPeerLogLatency.time())
{
RemoteProcessor.sendWithCallbackAsync(remoteRequest,
RemoteProcessor.sendWithCallbackAsync(fetchRes,
Verb.TCM_FETCH_PEER_LOG_REQ,
new FetchPeerLog(before),
new RemoteProcessor.CandidateIterator(Collections.singletonList(remote)),
new RemoteProcessor.CandidateIterator(Collections.singletonList(remote), false),
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
LogState logState = remoteRequest.awaitUninterruptibly().get();
log.append(logState);
ClusterMetadata fetched = log.waitForHighestConsecutive();
if (fetched.epoch.isEqualOrAfter(awaitAtleast))
{
TCMMetrics.instance.peerLogEntriesFetched(before, logState.latestEpoch());
return fetched;
}

return fetchRes.map((logState) -> {
log.append(logState);
ClusterMetadata fetched = log.waitForHighestConsecutive();
if (fetched.epoch.isEqualOrAfter(awaitAtleast))
{
TCMMetrics.instance.peerLogEntriesFetched(before, logState.latestEpoch());
return fetched;
}
else
{
throw new IllegalStateException(String.format("Queried for epoch %s, but could not catch up", awaitAtleast));
}
});

}
catch (Throwable t)
{
fetchRes.cancel(true);
JVMStabilityInspector.inspectThrowable(t);

logger.warn("Unable to fetch log entries from " + remote, t);
Promise<ClusterMetadata> res = new AsyncPromise<>();
res.setFailure(new IllegalStateException("Unable to fetch log entries from " + remote, t));
return res;
}
return ClusterMetadata.current();
}
}

0 comments on commit 2e05cd4

Please sign in to comment.