Skip to content

Commit

Permalink
ForceSnapshot transformations should not be persisted in the local lo…
Browse files Browse the repository at this point in the history
…g table

Patch by Sam Tunnicliffe; reviewed by marcuse for CASSANDRA-19190
  • Loading branch information
beobal authored and krummas committed Apr 23, 2024
1 parent 34d999c commit 17ecece
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 8 deletions.
11 changes: 9 additions & 2 deletions src/java/org/apache/cassandra/schema/DistributedSchema.java
Expand Up @@ -58,9 +58,16 @@ public static final DistributedSchema empty()
return new DistributedSchema(Keyspaces.none(), Epoch.EMPTY);
}

public static DistributedSchema first()
public static DistributedSchema first(Set<String> knownDatacenters)
{
return new DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(Collections.singleton(DatabaseDescriptor.getLocalDataCenter()))), Epoch.FIRST);
if (knownDatacenters.isEmpty())
{
if (DatabaseDescriptor.getLocalDataCenter() != null)
knownDatacenters = Collections.singleton(DatabaseDescriptor.getLocalDataCenter());
else
knownDatacenters = Collections.singleton("DC1");
}
return new DistributedSchema(Keyspaces.of(DistributedMetadataLogKeyspace.initialMetadata(knownDatacenters)), Epoch.FIRST);
}

private final Keyspaces keyspaces;
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/tcm/ClusterMetadata.java
Expand Up @@ -107,7 +107,7 @@ public ClusterMetadata(IPartitioner partitioner)
@VisibleForTesting
public ClusterMetadata(IPartitioner partitioner, Directory directory)
{
this(partitioner, directory, DistributedSchema.first());
this(partitioner, directory, DistributedSchema.first(directory.knownDatacenters()));
}

@VisibleForTesting
Expand Down
Expand Up @@ -20,15 +20,24 @@

import java.util.Collections;

import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
import org.apache.cassandra.schema.DistributedSchema;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.tcm.Commit.Replicator;
import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.ownership.PlacementProvider;
import org.apache.cassandra.tcm.ownership.TokenMap;
import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
import org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.tcm.sequences.LockedRanges;

public class StubClusterMetadataService extends ClusterMetadataService
{
Expand Down Expand Up @@ -73,12 +82,24 @@ private StubClusterMetadataService(ClusterMetadata initial)
.withInitialState(initial)
.createLog(),
new StubProcessor(),
Commit.Replicator.NO_OP,
Replicator.NO_OP,
false);
this.metadata = initial;
this.log().readyUnchecked();
}

private StubClusterMetadataService(PlacementProvider placement,
MetadataSnapshots snapshots,
LocalLog log,
Processor processor,
Replicator replicator,
boolean isMember)
{
super(placement, snapshots, log, processor, replicator, isMember);
this.metadata = log.metadata();
this.log().readyUnchecked();
}

@Override
public <T1> T1 commit(Transformation transform, CommitSuccessHandler<T1> onSuccess, CommitFailureHandler<T1> onFailure)
{
Expand Down Expand Up @@ -125,4 +146,64 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy
throw new UnsupportedOperationException();
}
}


public static Builder builder()
{
return new Builder();
}

public static Builder builder(IPartitioner partitioner)
{
return new Builder(partitioner);
}

public static class Builder
{
IPartitioner partitioner;
ClusterMetadata initial;
MetadataSnapshots snapshots = MetadataSnapshots.NO_OP;

public StubClusterMetadataService build()
{
if (initial == null)
initial = new ClusterMetadata(Epoch.EMPTY,
partitioner,
DistributedSchema.empty(),
Directory.EMPTY,
new TokenMap(partitioner),
DataPlacements.EMPTY,
LockedRanges.EMPTY,
InProgressSequences.EMPTY,
ImmutableMap.of());
return new StubClusterMetadataService(new UniformRangePlacement(),
snapshots != null ? snapshots : MetadataSnapshots.NO_OP,
LocalLog.logSpec().withInitialState(initial).createLog(),
new StubProcessor(),
Replicator.NO_OP,
false);
}

private Builder()
{
this(DatabaseDescriptor.getPartitioner());
}

private Builder(IPartitioner partitioner)
{
this.partitioner = partitioner;
}

public Builder withInitial(ClusterMetadata initial)
{
this.initial = initial;
return this;
}

public Builder withSnapshots(MetadataSnapshots snapshots)
{
this.snapshots = snapshots;
return this;
}
}
}
Expand Up @@ -18,6 +18,8 @@

package org.apache.cassandra.tcm.listeners;

import java.util.EnumSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,14 +28,20 @@
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.log.Entry;

import static org.apache.cassandra.tcm.Transformation.Kind.FORCE_SNAPSHOT;
import static org.apache.cassandra.tcm.Transformation.Kind.TRIGGER_SNAPSHOT;

public class MetadataSnapshotListener implements LogListener
{
private static final Logger logger = LoggerFactory.getLogger(MetadataSnapshotListener.class);

private static final EnumSet<Transformation.Kind> triggers = EnumSet.of(TRIGGER_SNAPSHOT, FORCE_SNAPSHOT);

@Override
public void notify(Entry entry, Transformation.Result result)
{
ClusterMetadata next = result.success().metadata;
if (entry.transform.kind() == Transformation.Kind.TRIGGER_SNAPSHOT)
if (triggers.contains(entry.transform.kind()))
{
try
{
Expand Down
6 changes: 4 additions & 2 deletions src/java/org/apache/cassandra/tcm/log/LocalLog.java
Expand Up @@ -263,7 +263,7 @@ private LocalLog(LogSpec logSpec)
if (spec.initial == null)
spec.initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner());
if (spec.prev == null)
spec.prev = new ClusterMetadata(DatabaseDescriptor.getPartitioner());
spec.prev = new ClusterMetadata(spec.initial.partitioner);
assert spec.initial.epoch.is(EMPTY) || spec.initial.epoch.is(Epoch.UPGRADE_STARTUP) || spec.isReset :
String.format(String.format("Should start with empty epoch, unless we're in upgrade or reset mode: %s (isReset: %s)", spec.initial, spec.isReset));

Expand Down Expand Up @@ -480,7 +480,9 @@ void processPendingInternal()
String.format("Epoch %s for %s can either force snapshot, or immediately follow %s",
next.epoch, pendingEntry.transform, prev.epoch);

if (replayComplete.get())
// If replay during initialisation has completed persist to local storage unless the entry is
// a synthetic ForceSnapshot which is not a replicated event but enables jumping over gaps
if (replayComplete.get() && pendingEntry.transform.kind() != Transformation.Kind.FORCE_SNAPSHOT)
storage.append(pendingEntry.maybeUnwrapExecuted());

notifyPreCommit(prev, next, isSnapshot);
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;


import org.apache.cassandra.ServerTestUtils.ResettableClusterMetadataService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
Expand Down Expand Up @@ -62,19 +63,23 @@
import org.apache.cassandra.tcm.MetadataSnapshots;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.ownership.TokenMap;
import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
import org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.tcm.sequences.LockedRanges;
import org.apache.cassandra.tcm.sequences.Move;
import org.apache.cassandra.tcm.sequences.LeaveStreams;
import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
import org.apache.cassandra.tcm.sequences.Move;
import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
import org.apache.cassandra.tcm.transformations.AlterSchema;
import org.apache.cassandra.tcm.transformations.PrepareJoin;
Expand Down Expand Up @@ -137,6 +142,18 @@ public static ClusterMetadataService instanceForTest()
return service;
}

public static ClusterMetadata minimalForTesting(Epoch epoch, IPartitioner partitioner)
{
return new ClusterMetadata(epoch, Murmur3Partitioner.instance,
DistributedSchema.empty(),
Directory.EMPTY,
new TokenMap(partitioner),
DataPlacements.empty(),
LockedRanges.EMPTY,
InProgressSequences.EMPTY,
ImmutableMap.of());
}

public static ClusterMetadata minimalForTesting(IPartitioner partitioner)
{
return new ClusterMetadata(Epoch.EMPTY,
Expand Down
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.tcm.listeners;

import java.util.Random;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MetadataSnapshots;
import org.apache.cassandra.tcm.StubClusterMetadataService;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.ownership.OwnershipUtils;
import org.apache.cassandra.tcm.transformations.ForceSnapshot;
import org.apache.cassandra.tcm.transformations.TriggerSnapshot;

import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.minimalForTesting;
import static org.apache.cassandra.tcm.sequences.SequencesUtils.epoch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class MetadataSnapshotListenerTest
{
private static final Logger logger = LoggerFactory.getLogger(MetadataSnapshotListenerTest.class);
private IPartitioner partitioner = Murmur3Partitioner.instance;
private Random r;

@BeforeClass
public static void disableSortedReplicaGroups()
{
// Set this so that we don't attempt to sort the random placements as this depends on a populated
// TokenMap. This is a temporary element of ClusterMetadata, at least in the current form
CassandraRelevantProperties.TCM_SORT_REPLICA_GROUPS.setBoolean(false);
}

@Before
public void setup()
{
long seed = System.nanoTime();
r = new Random(seed);
logger.info("SEED: {}", seed);
}

@Test
public void forceSnapshotTriggersSnapshot()
{
// ForceSnapshot contains a complete ClusterMetadata which is what we expect to be
// stored as the snapshot. The input to its execute method is the previous ClusterMetadata
// and isn't relevant here.
MetadataSnapshots snapshots = init();
ClusterMetadata toSnapshot = metadataForSnapshot();
Entry entry = new Entry(Entry.Id.NONE,
toSnapshot.epoch,
new ForceSnapshot(toSnapshot));

ClusterMetadata previous = minimalForTesting(Epoch.FIRST, partitioner);
Transformation.Result result = entry.transform.execute(previous);
MetadataSnapshotListener listener = new MetadataSnapshotListener();

// The payload of the transformation should be retrievable by its epoch
assertNull(snapshots.getSnapshot(toSnapshot.epoch));
listener.notify(entry, result);
assertEquals(toSnapshot, snapshots.getSnapshot(toSnapshot.epoch));
}

@Test
public void triggerSnapshotTest()
{
// TriggerSnapshot has no payload itself, but stores the preceding ClusterMetadata state as a snapshot.
MetadataSnapshots snapshots = init();
ClusterMetadata toSnapshot = metadataForSnapshot();

Epoch nextEpoch = toSnapshot.nextEpoch();
Entry entry = new Entry(Entry.Id.NONE, nextEpoch, TriggerSnapshot.instance);

Transformation.Result result = entry.transform.execute(toSnapshot);
MetadataSnapshotListener listener = new MetadataSnapshotListener();

assertNull(snapshots.getSnapshot(nextEpoch));
listener.notify(entry, result);
ClusterMetadata snapshot = snapshots.getSnapshot(nextEpoch);
assertEquals(nextEpoch, snapshot.epoch);
assertEquals(toSnapshot.placements, snapshot.placements);
}

private MetadataSnapshots init()
{
MetadataSnapshots snapshots = new AtomicLongBackedProcessor.InMemoryMetadataSnapshots();
StubClusterMetadataService service = StubClusterMetadataService.builder(partitioner)
.withSnapshots(snapshots)
.build();
ClusterMetadataService.unsetInstance();
ClusterMetadataService.setInstance(service);
return snapshots;
}

private ClusterMetadata metadataForSnapshot()
{
return minimalForTesting(epoch(r), partitioner)
.transformer()
.with(OwnershipUtils.randomPlacements(r)).build()
.metadata;
}

}

0 comments on commit 17ecece

Please sign in to comment.