Skip to content


Improve handling of transient replicas during range movements
Browse files Browse the repository at this point in the history
Patch by Sam Tunnicliffe and Marcus Ericsson; reviewed by Alex Petrov
for CASSANDRA-19344

Co-authored-by: Marcus Eriksson <>
Co-authored-by: Sam Tunnicliffe <>
  • Loading branch information
beobal and krummas committed Apr 19, 2024
1 parent cbf4dcb commit dabcb17
Show file tree
Hide file tree
Showing 27 changed files with 1,036 additions and 420 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
@@ -1,4 +1,6 @@
* Improve handling of transient replicas during range movements (CASSANDRA-19344)
* Enable debounced internode log requests to be cancelled at shutdown (CASSANDRA-19514)
* Correctly set last modified epoch when combining multistep operations into a single step (CASSANDRA-19538)
* Add new TriggersPolicy configuration to allow operators to disable triggers (CASSANDRA-19532)
* Use in local and distributed log tables (CASSANDRA-19516)
Expand Down
44 changes: 39 additions & 5 deletions src/java/org/apache/cassandra/tcm/ownership/
Expand Up @@ -321,17 +321,51 @@ public Builder withReplicaGroup(VersionedEndpoints.ForRange replicas)
(t, v) -> {
EndpointsForRange old = v.get();
return VersionedEndpoints.forRange(Epoch.max(v.lastModified(), replicas.lastModified()),
.newBuilder(replicas.size() + old.size())
.addAll(replicas.get(), ReplicaCollection.Builder.Conflict.ALL)
combine(old, replicas.get()));
if (group == null)
replicaGroups.put(replicas.range(), replicas);
return this;

* Combine two replica groups, assuming one is the current group and the other is the proposed.
* During range movements this is used when calculating the maximal placement, which combines the current and
* future replica groups. This special cases the merging of two replica groups to make sure that when a replica
* moves from transient to full, it starts to act as a FULL write replica as early as possible.
* Where an endpoint is present in both groups, prefer the proposed iff it is a FULL replica. During a
* multi-step operation (join/leave/move), we want any change from transient to full to happen as early
* as possible so that a replica whose ownership is modified in this way becomes FULL for writes before it
* becomes FULL for reads. This works as additions to write replica groups are applied before any other
* placement changes (i.e. in START_[JOIN|LEAVE|MOVE]).
* @param prev Initial set of replicas for a given range
* @param next Proposed set of replicas for the same range.
* @return The union of the two groups
private EndpointsForRange combine(EndpointsForRange prev, EndpointsForRange next)
Map<InetAddressAndPort, Replica> e1 = prev.byEndpoint();
Map<InetAddressAndPort, Replica> e2 = next.byEndpoint();
EndpointsForRange.Builder combined = prev.newBuilder(prev.size() + next.size());
e1.forEach((e, r1) -> {
Replica r2 = e2.get(e);
if (null == r2) // not present in next
else if (r2.isFull()) // prefer replica from next, if it is moving from transient to full
combined.add(r1); // replica is moving from full to transient, or staying the same
// any new replicas not in prev
e2.forEach((e, r2) -> {
if (!combined.contains(e))

public Builder withReplicaGroups(Iterable<VersionedEndpoints.ForRange> replicas)
Expand Down
Expand Up @@ -18,6 +18,22 @@

package org.apache.cassandra.tcm.ownership;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.sequences.LockedRanges;

Expand All @@ -32,6 +48,8 @@
public class PlacementTransitionPlan
private static final Logger logger = LoggerFactory.getLogger(MovementMap.class);

public final PlacementDeltas toSplit;
public final PlacementDeltas toMaximal;
public final PlacementDeltas toFinal;
Expand Down Expand Up @@ -125,4 +143,77 @@ public String toString()
", compiled=" + (addToWrites == null) +

* Makes sure that a newly added read replica for a range already exists as a write replica
* We should never add both read & write replicas for the same range at the same time (or read replica before write)
* Also makes sure that we don't add a full read replica while the same write replica is only transient - we should
* always make the write replica full before adding the read replica.
* We split and merge ranges, so in the previous placements we could have full write replicas (a, b], (b, c], but then
* add a full read replica (a, c].
* @return null if everything is good, otherwise a Transformation.Result rejection containing information about the bad replica
public static void assertPreExistingWriteReplica(DataPlacements placements, PlacementTransitionPlan transitionPlan)

public static void assertPreExistingWriteReplica(DataPlacements placements, PlacementDeltas ... deltasInOrder)
for (PlacementDeltas deltas : deltasInOrder)
for (Map.Entry<ReplicationParams, PlacementDeltas.PlacementDelta> entry : deltas)
ReplicationParams params = entry.getKey();
PlacementDeltas.PlacementDelta delta = entry.getValue();
for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> addedRead : delta.reads.additions.entrySet())
RangesAtEndpoint addedReadReplicas = addedRead.getValue();
RangesAtEndpoint existingWriteReplicas = placements.get(params).writes.byEndpoint().get(addedRead.getKey());
// we're adding read replicas - they should always exist as write replicas before doing that
// BUT we split and merge ranges so we need to check containment both ways
for (Replica newReadReplica : addedReadReplicas)
if (existingWriteReplicas.contains(newReadReplica))
boolean contained = false;
Set<Range<Token>> intersectingRanges = new HashSet<>();
for (Replica writeReplica : existingWriteReplicas)
if (writeReplica.isFull() == newReadReplica.isFull() || (writeReplica.isFull() && newReadReplica.isTransient()))
if (writeReplica.range().contains(newReadReplica.range()))
contained = true;
else if (writeReplica.range().intersects(newReadReplica.range()))
if (!contained && Range.normalize(intersectingRanges).stream().noneMatch(writeReplica -> writeReplica.contains(newReadReplica.range())))
String message = "When adding a read replica, that replica needs to exist as a write replica before that: " + newReadReplica + '\n' + placements.get(params) + '\n' + delta;
throw new Transformation.RejectedTransformationException(message);
placements = deltas.apply(Epoch.FIRST, placements);
21 changes: 1 addition & 20 deletions src/java/org/apache/cassandra/tcm/sequences/
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -337,8 +336,7 @@ private static MovementMap movementMap(IFailureDetector fd, DataPlacements place
RangesByEndpoint targets = delta.writes.additions;
PlacementForRange oldOwners = placements.get(params).reads;
EndpointsByReplica.Builder movements = new EndpointsByReplica.Builder();
transientToFullReplicas(midDeltas.get(params)).flattenValues()).forEach(destination -> {
targets.flattenValues().forEach(destination -> {
SourceHolder sources = new SourceHolder(fd, destination, toSplitRanges.get(params), strictConsistency);
AtomicBoolean needsRelaxedSources = new AtomicBoolean();
// first, try to find strict sources for the ranges we need to stream - these are the ranges that
Expand Down Expand Up @@ -441,23 +439,6 @@ private void addToMovements(Replica destination, EndpointsByReplica.Builder move

private static RangesByEndpoint transientToFullReplicas(PlacementDeltas.PlacementDelta midDelta)
RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder();
midDelta.reads.additions.flattenValues().forEach((newReplica) -> {
if (newReplica.isFull())
RangesAtEndpoint removals = midDelta.reads.removals.get(newReplica.endpoint());
if (removals != null)
Replica removed = removals.byRange().get(newReplica.range());
if (removed != null && removed.isTransient())
builder.put(newReplica.endpoint(), newReplica);
private static int nextToIndex(Transformation.Kind next)
switch (next)
Expand Down
39 changes: 8 additions & 31 deletions src/java/org/apache/cassandra/tcm/sequences/
Expand Up @@ -29,7 +29,6 @@
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesByEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.SystemStrategy;
Expand Down Expand Up @@ -57,9 +56,7 @@ public void execute(NodeId leaving, PlacementDeltas startLeave, PlacementDeltas
ClusterMetadata metadata = ClusterMetadata.current();
MovementMap movements = movementMap(,
movements.forEach((params, eps) ->"Removenode movements: {}: {}", params, eps));
String operationId = leaving.toUUID().toString();
responseTracker = DataMovements.instance.registerMovements(RESTORE_REPLICA_COUNT, operationId, movements);
Expand Down Expand Up @@ -110,7 +107,7 @@ public String status()
* create a map where the key is the destination, and the values are possible sources
* @return
private static MovementMap movementMap(InetAddressAndPort leaving, ClusterMetadata metadata, PlacementDeltas startDelta, PlacementDeltas midDelta, PlacementDeltas finishDelta)
private static MovementMap movementMap(InetAddressAndPort leaving, ClusterMetadata metadata, PlacementDeltas startDelta)
MovementMap.Builder allMovements = MovementMap.builder();
// map of dest->src* movements, keyed by replication settings. During unbootstrap, this will be used to construct
Expand All @@ -122,6 +119,7 @@ private static MovementMap movementMap(InetAddressAndPort leaving, ClusterMetada

EndpointsByReplica.Builder movements = new EndpointsByReplica.Builder();
RangesByEndpoint startWriteAdditions = startDelta.get(params).writes.additions;
RangesByEndpoint startWriteRemovals = startDelta.get(params).writes.removals;
// find current placements from the metadata, we need to stream from replicas that are not changed and are therefore not in the deltas
PlacementForRange currentPlacements = metadata.placements.get(params).reads;
Expand All @@ -131,33 +129,12 @@ private static MovementMap movementMap(InetAddressAndPort leaving, ClusterMetada
if (!replica.endpoint().equals(leaving) && !replica.endpoint().equals(newReplica.endpoint()))
candidateBuilder.add(replica, ReplicaCollection.Builder.Conflict.NONE);
movements.putAll(newReplica,, ReplicaCollection.Builder.Conflict.NONE);
EndpointsForRange sources =;
// log if newReplica is an existing transient replica moving to a full replica
if (startWriteRemovals.get(newReplica.endpoint()).contains(newReplica.range(), false))
logger.debug("Streaming transient -> full conversion to {} from {}", newReplica.endpoint(), sources);
movements.putAll(newReplica, sources, ReplicaCollection.Builder.Conflict.NONE);
// and check if any replicas went from transient -> full:
for (Replica removal : finishDelta.get(params).writes.removals.flattenValues())
if (removal.isTransient())
// if a replica (ignoring transientness) is being added as a read replica in midJoin, but removed as
// a write replica in finishJoin (the "removal" replica) it means it must have changed from transient
// to full (or the other way round)
RangesByEndpoint midReadAdditions = midDelta.get(params).reads.additions;
Replica toStream = midReadAdditions.get(removal.endpoint()).byRange().get(removal.range());
if (toStream != null && toStream.isFull())
logger.debug("Conversion from transient to full replica {} -> {}", removal, toStream);
EndpointsForRange.Builder candidateBuilder = new EndpointsForRange.Builder(removal.range());
currentPlacements.forRange(removal.range()).get().forEach(replica -> {
if (!replica.endpoint().equals(leaving) && !replica.endpoint().equals(removal.endpoint()))
candidateBuilder.add(replica, ReplicaCollection.Builder.Conflict.NONE);
EndpointsForRange sources =;
logger.debug("Streaming transient -> full conversion to {} from {}", removal, sources);
// `removal` is losing this transient range, but gaining the same full range, meaning we need to stream it in.
movements.putAll(removal, sources, ReplicaCollection.Builder.Conflict.NONE);
Expand Down
Expand Up @@ -66,7 +66,6 @@ public void execute(NodeId leaving, PlacementDeltas startLeave, PlacementDeltas
MovementMap movements = movementMap(ClusterMetadata.current().directory.endpoint(leaving),
movements.forEach((params, eps) ->"Unbootstrap movements: {}: {}", params, eps));
Expand All @@ -81,7 +80,7 @@ public void execute(NodeId leaving, PlacementDeltas startLeave, PlacementDeltas

private static MovementMap movementMap(InetAddressAndPort leaving, PlacementDeltas startDelta, PlacementDeltas midDelta, PlacementDeltas finishDelta)
private static MovementMap movementMap(InetAddressAndPort leaving, PlacementDeltas startDelta, PlacementDeltas finishDelta)
MovementMap.Builder allMovements = MovementMap.builder();
// map of src->dest movements, keyed by replication settings. During unbootstrap, this will be used to construct
Expand All @@ -100,23 +99,13 @@ private static MovementMap movementMap(InetAddressAndPort leaving, PlacementDelt
// removals to produce a src->dest mapping.
EndpointsByReplica.Builder movements = new EndpointsByReplica.Builder();
RangesByEndpoint startWriteAdditions = startDelta.get(params).writes.additions;
RangesByEndpoint startWriteRemovals = startDelta.get(params).writes.removals;
.forEach(newReplica -> movements.put(oldReplicas.get(newReplica.range()), newReplica));
// next, check if any replicas went from being transient to full, if so we need to stream to them;
Iterable<Replica> removalReplicas = delta.writes.removals.flattenValues();
for (Replica removal : removalReplicas)
if (removal.isTransient())
Replica destination = midDelta.get(params).reads.additions.get(removal.endpoint()).byRange().get(removal.range());
if (destination != null && destination.isFull())
{"Conversion from transient to full replica {} -> {}", removal, destination);
Replica source = oldReplicas.get(removal.range());
movements.put(source, destination);
.forEach(newReplica -> {
if (startWriteRemovals.get(newReplica.endpoint()).contains(newReplica.range(), false))
logger.debug("Streaming transient -> full conversion to {} from {}", newReplica, oldReplicas.get(newReplica.range()));
movements.put(oldReplicas.get(newReplica.range()), newReplica);
Expand Down

0 comments on commit dabcb17

Please sign in to comment.