Skip to content

Commit

Permalink
Fix peers v2 system table behaviour when 2 nodes swap their IP Addresses
Browse files Browse the repository at this point in the history
Throw if node id has been changed and does not match directory. If, however the _ip_ address has changed, issue Startup and correct the IP address. Disallow picking over identity of other nodes via hijacking their IPs or via overriding local node id with theirs.

Patch by Alex Petrov; reviewed by Sam Tunnicliffe for CASSANDRA-19221
  • Loading branch information
ifesdjeen committed Apr 26, 2024
1 parent 397568d commit 38512a4
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 39 deletions.
26 changes: 9 additions & 17 deletions src/java/org/apache/cassandra/db/virtual/PeersTable.java
Expand Up @@ -113,15 +113,6 @@ public DataSet data()
return result;
}

public static void initializeLegacyPeerTables(ClusterMetadata prev, ClusterMetadata next)
{
QueryProcessor.executeInternal(String.format("TRUNCATE %s.%s", SYSTEM_KEYSPACE_NAME, PEERS_V2));
QueryProcessor.executeInternal(String.format("TRUNCATE %s.%s", SYSTEM_KEYSPACE_NAME, LEGACY_PEERS));

for (NodeId nodeId : next.directory.peerIds())
updateLegacyPeerTable(nodeId, prev, next);
}

private static String peers_v2_query = "INSERT INTO %s.%s ("
+ "peer, peer_port, "
+ "preferred_ip, preferred_port, "
Expand Down Expand Up @@ -156,9 +147,7 @@ public static void updateLegacyPeerTable(NodeId nodeId, ClusterMetadata prev, Cl
if (next.directory.peerState(nodeId) == null || next.directory.peerState(nodeId) == NodeState.LEFT)
{
NodeAddresses addresses = prev.directory.getNodeAddresses(nodeId);
logger.debug("Purging {} from system.peers_v2 table", addresses);
QueryProcessor.executeInternal(String.format(peers_delete_query, SYSTEM_KEYSPACE_NAME, PEERS_V2), addresses.broadcastAddress.getAddress(), addresses.broadcastAddress.getPort());
QueryProcessor.executeInternal(String.format(legacy_peers_delete_query, SYSTEM_KEYSPACE_NAME, LEGACY_PEERS), addresses.broadcastAddress.getAddress());
removeFromSystemPeersTables(addresses.broadcastAddress);
}
else if (NodeState.isPreJoin(next.directory.peerState(nodeId)))
{
Expand All @@ -169,11 +158,7 @@ else if (NodeState.isPreJoin(next.directory.peerState(nodeId)))
NodeAddresses addresses = next.directory.getNodeAddresses(nodeId);
NodeAddresses oldAddresses = prev.directory.getNodeAddresses(nodeId);
if (oldAddresses != null && !oldAddresses.equals(addresses))
{
logger.debug("Purging {} from system.peers_v2 table", oldAddresses);
QueryProcessor.executeInternal(String.format(peers_delete_query, SYSTEM_KEYSPACE_NAME, PEERS_V2), oldAddresses.broadcastAddress.getAddress(), oldAddresses.broadcastAddress.getPort());
QueryProcessor.executeInternal(String.format(legacy_peers_delete_query, SYSTEM_KEYSPACE_NAME, LEGACY_PEERS), oldAddresses.broadcastAddress.getAddress());
}
removeFromSystemPeersTables(oldAddresses.broadcastAddress);

Location location = next.directory.location(nodeId);

Expand All @@ -197,4 +182,11 @@ else if (NodeState.isPreJoin(next.directory.peerState(nodeId)))
tokens);
}
}

public static void removeFromSystemPeersTables(InetAddressAndPort addr)
{
logger.debug("Purging {} from system.peers_v2 table", addr);
QueryProcessor.executeInternal(String.format(peers_delete_query, SYSTEM_KEYSPACE_NAME, PEERS_V2), addr.getAddress(), addr.getPort());
QueryProcessor.executeInternal(String.format(legacy_peers_delete_query, SYSTEM_KEYSPACE_NAME, LEGACY_PEERS), addr.getAddress());
}
}
13 changes: 11 additions & 2 deletions src/java/org/apache/cassandra/tcm/Startup.java
Expand Up @@ -158,8 +158,17 @@ public static void initializeAsNonCmsNode(Function<Processor, Processor> wrapPro
UUID currentHostId = SystemKeyspace.getLocalHostId();
if (nodeId != null && !Objects.equals(nodeId.toUUID(), currentHostId))
{
logger.info("NodeId is wrong, updating from {} to {}", currentHostId, nodeId.toUUID());
SystemKeyspace.setLocalHostId(nodeId.toUUID());
if (currentHostId == null)
{
logger.info("Taking over the host ID: {}, replacing address {}", nodeId.toUUID(), FBUtilities.getBroadcastAddressAndPort());
SystemKeyspace.setLocalHostId(nodeId.toUUID());
return;
}

String error = String.format("NodeId does not match locally set one. Check for the IP address collision: %s vs %s %s.",
currentHostId, nodeId.toUUID(), FBUtilities.getBroadcastAddressAndPort());
logger.error(error);
throw new IllegalStateException(error);
}
}

Expand Down
Expand Up @@ -63,18 +63,20 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean
next.tokenMap.lastModified().equals(prev.tokenMap.lastModified()))
return;

Set<NodeId> removed = Sets.difference(prev.directory.peerIds(), next.directory.peerIds());
Set<InetAddressAndPort> removedAddr = Sets.difference(new HashSet<>(prev.directory.allAddresses()),
new HashSet<>(next.directory.allAddresses()));

Set<NodeId> changed = new HashSet<>();
for (NodeId node : next.directory.peerIds())
{
if (directoryEntryChangedFor(node, prev.directory, next.directory) || !prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node)))
changed.add(node);
}

for (NodeId remove : removed)
for (InetAddressAndPort remove : removedAddr)
{
GossipHelper.evictFromMembership(prev.directory.endpoint(remove));
PeersTable.updateLegacyPeerTable(remove, prev, next);
GossipHelper.evictFromMembership(remove);
PeersTable.removeFromSystemPeersTables(remove);
}

for (NodeId change : changed)
Expand Down
Expand Up @@ -18,35 +18,120 @@

package org.apache.cassandra.distributed.test.log;

import java.util.UUID;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.junit.Test;

import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.tcm.membership.NodeId;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
import static org.junit.Assert.fail;

public class BounceResetHostIdTest extends TestBaseImpl
{
@Test
public void bounceTest() throws Exception
public void swapIpsTest() throws Exception
{
try (Cluster cluster = init(builder().withNodes(1)
.start()))
try (Cluster cluster = builder().withNodes(3)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)
// disable DistributedTestSnitch as it tries to query before we setup
.set("endpoint_snitch", "org.apache.cassandra.locator.SimpleSnitch"))
.createWithoutStarting())
{
String wrongId = UUID.randomUUID().toString();
cluster.get(1).runOnInstance(() -> {
SystemKeyspace.setLocalHostId(UUID.fromString(wrongId));
assertFalse(NodeId.isValidNodeId(SystemKeyspace.getLocalHostId()));
});
cluster.get(1).shutdown().get();
cluster.get(1).startup();
cluster.get(1).logs().watchFor("NodeId is wrong, updating from "+wrongId+" to "+(new NodeId(1).toUUID()));
cluster.get(1).runOnInstance(() -> assertTrue(NodeId.isValidNodeId(SystemKeyspace.getLocalHostId())));
// This test relies on node IDs being in the same order as IP addresses
for (int i = 1; i <= 3; i++)
cluster.get(i).startup();

cluster.get(2).shutdown().get();
ClusterUtils.updateAddress(cluster.get(2), "127.0.0.4");
cluster.get(2).startup();

cluster.get(3).shutdown().get();
ClusterUtils.updateAddress(cluster.get(3), "127.0.0.2");
cluster.get(3).startup();

cluster.get(2).shutdown().get();
ClusterUtils.updateAddress(cluster.get(2), "127.0.0.3");
cluster.get(2).startup();

ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1));

long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
while (true)
{
try
{
AssertUtils.assertRows(sortHelper(cluster.coordinator(2).execute("select peer, host_id from system.peers_v2", ConsistencyLevel.QUORUM)),
rows(row(InetAddress.getByName("127.0.0.1"), new NodeId(1).toUUID()),
row(InetAddress.getByName("127.0.0.2"), new NodeId(3).toUUID())
));
AssertUtils.assertRows(sortHelper(cluster.coordinator(3).execute("select peer, host_id from system.peers_v2", ConsistencyLevel.QUORUM)),
rows(row(InetAddress.getByName("127.0.0.1"), new NodeId(1).toUUID()),
row(InetAddress.getByName("127.0.0.3"), new NodeId(2).toUUID())

));
return;
}
catch (AssertionError t)
{
// If we are past the deadline, throw; allow to retry otherwise
if (System.nanoTime() > deadline)
throw t;
}
}
}
}

@Test
public void swapIpsDirectlyTest() throws Exception
{
try (Cluster cluster = builder().withNodes(3)
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)
// disable DistributedTestSnitch as it tries to query before we setup
.set("endpoint_snitch", "org.apache.cassandra.locator.SimpleSnitch"))
.createWithoutStarting())
{
// This test relies on node IDs being in the same order as IP addresses
for (int i = 1; i <= 3; i++)
cluster.get(i).startup();

cluster.get(2).shutdown().get();
cluster.get(3).shutdown().get();
ClusterUtils.updateAddress(cluster.get(2), "127.0.0.3");
ClusterUtils.updateAddress(cluster.get(3), "127.0.0.2");
try
{
cluster.get(2).startup();
fail("Should not have been able to start");
}
catch (Throwable t)
{
Assert.assertTrue(t.getMessage().contains("NodeId does not match locally set one"));
}
try
{
cluster.get(3).startup();
fail("Should not have been able to start");
}
catch (Throwable t)
{
Assert.assertTrue(t.getMessage().contains("NodeId does not match locally set one"));
}
}
}
public static Object[][] sortHelper(Object[][] rows)
{
Arrays.sort(rows, Comparator.comparing(r -> ((InetAddress)r[0]).getHostAddress()));
return rows;
}
}

0 comments on commit 38512a4

Please sign in to comment.