Cloud Spanner — Maximizing data load throughput

Robert Kubis
Google Cloud - Community
5 min readApr 23, 2018

--

This post covers strategies for maximizing write throughput to Cloud Spanner.

Too busy to read? Check out the video above for the summary.

The scenario

As a reminder of the scenario we are working within this series of blog posts, imagine an online shop built on the back of a traditional database that we want to migrate to Cloud Spanner.
We start with a backup of that imaginary database, where all data is exported to CSV files and uploaded to Google Cloud Storage(GCS). A loader program then reads all the files from GCS, processes them and writes the data to Cloud Spanner.

The previous blog post discussed how to choose the right primary keys for the scenario to enable scalable reads and writes. The approach we took didn’t utilize the full data load potential of Cloud Spanner because we were writing only one row per transaction. Now we want to increase load throughput by batching multiple rows in one transaction.

Write batching and how Cloud Spanner transactions work

It seems obvious that we should batch as many writes as possible together to see a considerable throughput boost, while staying below transaction limits.

The records we’re loading are fairly small so we increase the batch size to 1000 rows per transaction, expecting much higher throughput due to the reduced client server communications and transaction overhead.

Surprisingly we see only a ~2.5x improvement, >50k rows/sec vs 20k rows/sec.

Without batching:

The API requests/s graph shows one api call per row committed to the database.
The bytes received per minute graph shows the API traffic on a scale of 0 to 512 MB / min

With batching:

The API requests/s graph shows api call per 1000 rows committed to the database.
The bytes received per minute graph shows the API traffic on a scale of 0 to 1024 MB / min

Don’t get me wrong, more than doubling the throughput is great but we want to see if we can improve this even further.

Ordered batches to the rescue, wait what, didn’t we just …?

As it turns out, we are still missing a step to get the real benefits of batching. The rows that we batch together have well-distributed PRIMARY KEYS, the ones we introduced in the previous blog post to fix the hotspotting issue during data load. As a result, writing 1000 of those rows (with well-distributed PRIMARY KEYS) in a single transaction ends up involving many, many splits. As you now know, a table is split based on size and load, in ranges of rows ordered alphabetically by PRIMARY KEY. The responsibility for these splits is then distributed among the available compute nodes where each split is controlled by exactly one node.

Now when we write to multiple splits in a single transaction Cloud Spanner needs to coordinate between all the involved nodes. For that it needs to obtain locks and serialize the access of all outstanding transactions writing to any of the splits, using the two-phase commit protocol. This means the benefits of having multiple rows in one transaction are reduced by the coordination overhead incurred by spanning multiple table splits.

Batch transactions with unordered rows accesses large number of table splits and can lead to contention

Armed with this knowledge of the inner workings of Cloud Spanner we need a way to reduce the number of splits involved in one transaction to gain the maximum benefit from batching.

Going back to our loader program we modify it to load a large number of rows into cache. We then sort these rows alphabetically by PRIMARY KEY, the same way the table splits in our database are sorted, and partition them by our chosen batch size. From there we create one transaction for each partition.

Batch transactions with ordered rows (by primary key columns) that are distinct in the ranges are accessing only a low number of table splits and thus minimize contention

Now some might ask, didn’t we just introduce well-distributed PRIMARY KEYS to avoid hotspotting in the previous blog post and now we are sorting? Right, the trick here is that the ordered partitions are distinct in the PRIMARY KEY ranges they write to, spanning fewer table splits and thus causing less contention during writes. You can see in the graph, this improves our load performance significantly to about 120k rows/sec.

The API requests/s graph shows api call per 1000 rows committed to the database.
The bytes received per minute graph shows the API traffic on a scale of 0 to 2.0 GB / min

To find the optimal load performance we have to keep a couple of limitations in mind. Besides the optimal size for transactions between 1MB and 10MB there is a strict limit of 20k mutations per transaction. Mutation? What is that?
A mutation has a special meaning here, where any modification of a single cell in a table or index accounts for one mutation, except if you delete a range of rows. So for example, if you have a table with five columns and an index defined on three of those columns, each insert of an entire row accounts for eight mutations.

Now if you don’t want to write all this logic into your data loader yourself you can use the Cloud Dataflow connector for Cloud Spanner. The connector does the sorting, partitioning, and scaling out writes to Cloud Spanner automagically for you. Check out the sample source code to learn more.

— A little side note here: As you start loading an empty database, it will take some time for the first splits to occur and the load-based rebalancing to kick in. This means it will take some time before you see the full utilization of your multi-node instance. —

Next steps

Check out the code samples here. To learn more about Google Cloud Spanner stay tuned for the next article, which will discuss how to maximize throughput during data loads. In the meantime, check out the docs and Getting Started guides.

--

--

Robert Kubis
Google Cloud - Community

Google Cloud DA, Developer, Do-it mentality, Enthusiast. Enjoying tech trends, food, travel, all kinds of outdoor activities, flying. Views are my own.