Skip to content

Commit

Permalink
Improve handling of transient replicas during range movements
Browse files Browse the repository at this point in the history
Patch by Sam Tunnicliffe and Marcus Ericsson; reviewed by Alex Petrov
for CASSANDRA-19344

Co-authored-by: Marcus Eriksson <marcuse@apache.org>
Co-authored-by: Sam Tunnicliffe <samt@apache.org>
  • Loading branch information
beobal and krummas committed Apr 19, 2024
1 parent cbf4dcb commit dabcb17
Show file tree
Hide file tree
Showing 27 changed files with 1,036 additions and 420 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
@@ -1,4 +1,6 @@
5.1
* Improve handling of transient replicas during range movements (CASSANDRA-19344)
* Enable debounced internode log requests to be cancelled at shutdown (CASSANDRA-19514)
* Correctly set last modified epoch when combining multistep operations into a single step (CASSANDRA-19538)
* Add new TriggersPolicy configuration to allow operators to disable triggers (CASSANDRA-19532)
* Use Transformation.Kind.id in local and distributed log tables (CASSANDRA-19516)
Expand Down
44 changes: 39 additions & 5 deletions src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java
Expand Up @@ -321,17 +321,51 @@ public Builder withReplicaGroup(VersionedEndpoints.ForRange replicas)
(t, v) -> {
EndpointsForRange old = v.get();
return VersionedEndpoints.forRange(Epoch.max(v.lastModified(), replicas.lastModified()),
replicas.get()
.newBuilder(replicas.size() + old.size())
.addAll(old)
.addAll(replicas.get(), ReplicaCollection.Builder.Conflict.ALL)
.build());
combine(old, replicas.get()));
});
if (group == null)
replicaGroups.put(replicas.range(), replicas);
return this;
}

/**
* Combine two replica groups, assuming one is the current group and the other is the proposed.
* During range movements this is used when calculating the maximal placement, which combines the current and
* future replica groups. This special cases the merging of two replica groups to make sure that when a replica
* moves from transient to full, it starts to act as a FULL write replica as early as possible.
*
* Where an endpoint is present in both groups, prefer the proposed iff it is a FULL replica. During a
* multi-step operation (join/leave/move), we want any change from transient to full to happen as early
* as possible so that a replica whose ownership is modified in this way becomes FULL for writes before it
* becomes FULL for reads. This works as additions to write replica groups are applied before any other
* placement changes (i.e. in START_[JOIN|LEAVE|MOVE]).
*
* @param prev Initial set of replicas for a given range
* @param next Proposed set of replicas for the same range.
* @return The union of the two groups
*/
private EndpointsForRange combine(EndpointsForRange prev, EndpointsForRange next)
{
Map<InetAddressAndPort, Replica> e1 = prev.byEndpoint();
Map<InetAddressAndPort, Replica> e2 = next.byEndpoint();
EndpointsForRange.Builder combined = prev.newBuilder(prev.size() + next.size());
e1.forEach((e, r1) -> {
Replica r2 = e2.get(e);
if (null == r2) // not present in next
combined.add(r1);
else if (r2.isFull()) // prefer replica from next, if it is moving from transient to full
combined.add(r2);
else
combined.add(r1); // replica is moving from full to transient, or staying the same
});
// any new replicas not in prev
e2.forEach((e, r2) -> {
if (!combined.contains(e))
combined.add(r2);
});
return combined.build();
}

public Builder withReplicaGroups(Iterable<VersionedEndpoints.ForRange> replicas)
{
replicas.forEach(this::withReplicaGroup);
Expand Down
Expand Up @@ -18,6 +18,22 @@

package org.apache.cassandra.tcm.ownership;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.sequences.LockedRanges;

/**
Expand All @@ -32,6 +48,8 @@
*/
public class PlacementTransitionPlan
{
private static final Logger logger = LoggerFactory.getLogger(MovementMap.class);

public final PlacementDeltas toSplit;
public final PlacementDeltas toMaximal;
public final PlacementDeltas toFinal;
Expand Down Expand Up @@ -125,4 +143,77 @@ public String toString()
", compiled=" + (addToWrites == null) +
'}';
}


/**
* Makes sure that a newly added read replica for a range already exists as a write replica
*
* We should never add both read & write replicas for the same range at the same time (or read replica before write)
*
* Also makes sure that we don't add a full read replica while the same write replica is only transient - we should
* always make the write replica full before adding the read replica.
*
* We split and merge ranges, so in the previous placements we could have full write replicas (a, b], (b, c], but then
* add a full read replica (a, c].
*
* @return null if everything is good, otherwise a Transformation.Result rejection containing information about the bad replica
*/
@Nullable
public static void assertPreExistingWriteReplica(DataPlacements placements, PlacementTransitionPlan transitionPlan)
{
assertPreExistingWriteReplica(placements,
transitionPlan.toSplit,
transitionPlan.addToWrites(),
transitionPlan.moveReads(),
transitionPlan.removeFromWrites());
}

@Nullable
public static void assertPreExistingWriteReplica(DataPlacements placements, PlacementDeltas ... deltasInOrder)
{
for (PlacementDeltas deltas : deltasInOrder)
{
for (Map.Entry<ReplicationParams, PlacementDeltas.PlacementDelta> entry : deltas)
{
ReplicationParams params = entry.getKey();
PlacementDeltas.PlacementDelta delta = entry.getValue();
for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> addedRead : delta.reads.additions.entrySet())
{
RangesAtEndpoint addedReadReplicas = addedRead.getValue();
RangesAtEndpoint existingWriteReplicas = placements.get(params).writes.byEndpoint().get(addedRead.getKey());
// we're adding read replicas - they should always exist as write replicas before doing that
// BUT we split and merge ranges so we need to check containment both ways
for (Replica newReadReplica : addedReadReplicas)
{
if (existingWriteReplicas.contains(newReadReplica))
continue;
boolean contained = false;
Set<Range<Token>> intersectingRanges = new HashSet<>();
for (Replica writeReplica : existingWriteReplicas)
{
if (writeReplica.isFull() == newReadReplica.isFull() || (writeReplica.isFull() && newReadReplica.isTransient()))
{
if (writeReplica.range().contains(newReadReplica.range()))
{
contained = true;
break;
}
else if (writeReplica.range().intersects(newReadReplica.range()))
{
intersectingRanges.add(writeReplica.range());
}
}
}
if (!contained && Range.normalize(intersectingRanges).stream().noneMatch(writeReplica -> writeReplica.contains(newReadReplica.range())))
{
String message = "When adding a read replica, that replica needs to exist as a write replica before that: " + newReadReplica + '\n' + placements.get(params) + '\n' + delta;
logger.warn(message);
throw new Transformation.RejectedTransformationException(message);
}
}
}
}
placements = deltas.apply(Epoch.FIRST, placements);
}
}
}
21 changes: 1 addition & 20 deletions src/java/org/apache/cassandra/tcm/sequences/Move.java
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -337,8 +336,7 @@ private static MovementMap movementMap(IFailureDetector fd, DataPlacements place
RangesByEndpoint targets = delta.writes.additions;
PlacementForRange oldOwners = placements.get(params).reads;
EndpointsByReplica.Builder movements = new EndpointsByReplica.Builder();
Iterables.concat(targets.flattenValues(),
transientToFullReplicas(midDeltas.get(params)).flattenValues()).forEach(destination -> {
targets.flattenValues().forEach(destination -> {
SourceHolder sources = new SourceHolder(fd, destination, toSplitRanges.get(params), strictConsistency);
AtomicBoolean needsRelaxedSources = new AtomicBoolean();
// first, try to find strict sources for the ranges we need to stream - these are the ranges that
Expand Down Expand Up @@ -441,23 +439,6 @@ private void addToMovements(Replica destination, EndpointsByReplica.Builder move
}
}

private static RangesByEndpoint transientToFullReplicas(PlacementDeltas.PlacementDelta midDelta)
{
RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder();
midDelta.reads.additions.flattenValues().forEach((newReplica) -> {
if (newReplica.isFull())
{
RangesAtEndpoint removals = midDelta.reads.removals.get(newReplica.endpoint());
if (removals != null)
{
Replica removed = removals.byRange().get(newReplica.range());
if (removed != null && removed.isTransient())
builder.put(newReplica.endpoint(), newReplica);
}
}
});
return builder.build();
}
private static int nextToIndex(Transformation.Kind next)
{
switch (next)
Expand Down
39 changes: 8 additions & 31 deletions src/java/org/apache/cassandra/tcm/sequences/RemoveNodeStreams.java
Expand Up @@ -29,7 +29,6 @@
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesByEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.SystemStrategy;
import org.apache.cassandra.net.Message;
Expand Down Expand Up @@ -57,9 +56,7 @@ public void execute(NodeId leaving, PlacementDeltas startLeave, PlacementDeltas
ClusterMetadata metadata = ClusterMetadata.current();
MovementMap movements = movementMap(metadata.directory.endpoint(leaving),
metadata,
startLeave,
midLeave,
finishLeave);
startLeave);
movements.forEach((params, eps) -> logger.info("Removenode movements: {}: {}", params, eps));
String operationId = leaving.toUUID().toString();
responseTracker = DataMovements.instance.registerMovements(RESTORE_REPLICA_COUNT, operationId, movements);
Expand Down Expand Up @@ -110,7 +107,7 @@ public String status()
* create a map where the key is the destination, and the values are possible sources
* @return
*/
private static MovementMap movementMap(InetAddressAndPort leaving, ClusterMetadata metadata, PlacementDeltas startDelta, PlacementDeltas midDelta, PlacementDeltas finishDelta)
private static MovementMap movementMap(InetAddressAndPort leaving, ClusterMetadata metadata, PlacementDeltas startDelta)
{
MovementMap.Builder allMovements = MovementMap.builder();
// map of dest->src* movements, keyed by replication settings. During unbootstrap, this will be used to construct
Expand All @@ -122,6 +119,7 @@ private static MovementMap movementMap(InetAddressAndPort leaving, ClusterMetada

EndpointsByReplica.Builder movements = new EndpointsByReplica.Builder();
RangesByEndpoint startWriteAdditions = startDelta.get(params).writes.additions;
RangesByEndpoint startWriteRemovals = startDelta.get(params).writes.removals;
// find current placements from the metadata, we need to stream from replicas that are not changed and are therefore not in the deltas
PlacementForRange currentPlacements = metadata.placements.get(params).reads;
startWriteAdditions.flattenValues()
Expand All @@ -131,33 +129,12 @@ private static MovementMap movementMap(InetAddressAndPort leaving, ClusterMetada
if (!replica.endpoint().equals(leaving) && !replica.endpoint().equals(newReplica.endpoint()))
candidateBuilder.add(replica, ReplicaCollection.Builder.Conflict.NONE);
});
movements.putAll(newReplica, candidateBuilder.build(), ReplicaCollection.Builder.Conflict.NONE);
EndpointsForRange sources = candidateBuilder.build();
// log if newReplica is an existing transient replica moving to a full replica
if (startWriteRemovals.get(newReplica.endpoint()).contains(newReplica.range(), false))
logger.debug("Streaming transient -> full conversion to {} from {}", newReplica.endpoint(), sources);
movements.putAll(newReplica, sources, ReplicaCollection.Builder.Conflict.NONE);
});
// and check if any replicas went from transient -> full:
for (Replica removal : finishDelta.get(params).writes.removals.flattenValues())
{
if (removal.isTransient())
{
// if a replica (ignoring transientness) is being added as a read replica in midJoin, but removed as
// a write replica in finishJoin (the "removal" replica) it means it must have changed from transient
// to full (or the other way round)
RangesByEndpoint midReadAdditions = midDelta.get(params).reads.additions;
Replica toStream = midReadAdditions.get(removal.endpoint()).byRange().get(removal.range());
if (toStream != null && toStream.isFull())
{
logger.debug("Conversion from transient to full replica {} -> {}", removal, toStream);
EndpointsForRange.Builder candidateBuilder = new EndpointsForRange.Builder(removal.range());
currentPlacements.forRange(removal.range()).get().forEach(replica -> {
if (!replica.endpoint().equals(leaving) && !replica.endpoint().equals(removal.endpoint()))
candidateBuilder.add(replica, ReplicaCollection.Builder.Conflict.NONE);
});
EndpointsForRange sources = candidateBuilder.build();
logger.debug("Streaming transient -> full conversion to {} from {}", removal, sources);
// `removal` is losing this transient range, but gaining the same full range, meaning we need to stream it in.
movements.putAll(removal, sources, ReplicaCollection.Builder.Conflict.NONE);
}
}
}
allMovements.put(params, movements.build());
});
return allMovements.build();
Expand Down
Expand Up @@ -66,7 +66,6 @@ public void execute(NodeId leaving, PlacementDeltas startLeave, PlacementDeltas
{
MovementMap movements = movementMap(ClusterMetadata.current().directory.endpoint(leaving),
startLeave,
midLeave,
finishLeave);
movements.forEach((params, eps) -> logger.info("Unbootstrap movements: {}: {}", params, eps));
started.set(true);
Expand All @@ -81,7 +80,7 @@ public void execute(NodeId leaving, PlacementDeltas startLeave, PlacementDeltas
}
}

private static MovementMap movementMap(InetAddressAndPort leaving, PlacementDeltas startDelta, PlacementDeltas midDelta, PlacementDeltas finishDelta)
private static MovementMap movementMap(InetAddressAndPort leaving, PlacementDeltas startDelta, PlacementDeltas finishDelta)
{
MovementMap.Builder allMovements = MovementMap.builder();
// map of src->dest movements, keyed by replication settings. During unbootstrap, this will be used to construct
Expand All @@ -100,23 +99,13 @@ private static MovementMap movementMap(InetAddressAndPort leaving, PlacementDelt
// removals to produce a src->dest mapping.
EndpointsByReplica.Builder movements = new EndpointsByReplica.Builder();
RangesByEndpoint startWriteAdditions = startDelta.get(params).writes.additions;
RangesByEndpoint startWriteRemovals = startDelta.get(params).writes.removals;
startWriteAdditions.flattenValues()
.forEach(newReplica -> movements.put(oldReplicas.get(newReplica.range()), newReplica));
// next, check if any replicas went from being transient to full, if so we need to stream to them;
Iterable<Replica> removalReplicas = delta.writes.removals.flattenValues();
for (Replica removal : removalReplicas)
{
if (removal.isTransient())
{
Replica destination = midDelta.get(params).reads.additions.get(removal.endpoint()).byRange().get(removal.range());
if (destination != null && destination.isFull())
{
logger.info("Conversion from transient to full replica {} -> {}", removal, destination);
Replica source = oldReplicas.get(removal.range());
movements.put(source, destination);
}
}
}
.forEach(newReplica -> {
if (startWriteRemovals.get(newReplica.endpoint()).contains(newReplica.range(), false))
logger.debug("Streaming transient -> full conversion to {} from {}", newReplica, oldReplicas.get(newReplica.range()));
movements.put(oldReplicas.get(newReplica.range()), newReplica);
});
allMovements.put(params, movements.build());
});
return allMovements.build();
Expand Down

0 comments on commit dabcb17

Please sign in to comment.