This sample allows you to migrate data between tables in Apache Cassandra using Spark with Azure Databricks, while preserving the original writetime
. This can be useful when doing historic data loads during a live migration.
-
Provision an Azure Databricks cluster. Ensure it also has network access to your source and target Cassandra clusters.
-
Ensure you've already migrated the keyspace/table schema from your source Cassandra database to your target Cassandra database.
Select an Azure Databricks runtime version which supports Spark 3.0 or higher.
- Download the dependency jar here *
- Upload and install the jar on your Databricks cluster:
Select Install, and then restart the cluster when installation is complete.
* You can also build the dependency jar using SBT by running ./build.sh
in the /build_files directory of this repo.
Note
Make sure that you restart the Databricks cluster after the dependency jar has been installed.
In order to maximize throughput for large migrations, you may need to change Spark parameters at the cluster level. You can apply these settings in advanced options
within cluster config, e.g. below. You may also want to increase the number of workers in your Spark cluster.
spark.cassandra.output.batch.size.rows 1
spark.cassandra.output.concurrent.writes 500
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Create a new Scala notebook in Databricks with two seperate cells:
In this case, we are migrating from a source cluster which does not implement SSL, to a target table which does. You can adjust sslOptions
for your source/target tables accordingly.
import org.apache.spark.sql._
val spark = SparkSession
.builder()
.appName("cassandra-migrator")
.config("spark.task.maxFailures", "1024")
.config("spark.stage.maxConsecutiveAttempts", "60")
.getOrCreate
import com.cassandra.migrator.readers.Cassandra
import com.cassandra.migrator.config._
import com.datastax.spark.connector.cql.CassandraConnector;
val cassandraSource = new SourceSettings.Cassandra(
host = "<source Cassandra host name/IP here>",
port = 9042,
localDC = None,
credentials = Some(Credentials(
username="<username here>",
password="<password here>")
),
sslOptions = Some(SSLOptions(
clientAuthEnabled=false,
enabled=false,
trustStorePassword = None,
trustStorePath = None,
trustStoreType = None,
keyStorePassword = None,
keyStorePath = None,
keyStoreType = None,
enabledAlgorithms = None,
protocol = Some("TLS")
)),
keyspace = "<source keyspace name>",
table = "<source table name>",
splitCount = Some(1), // Number of splits to use - this should be at minimum the amount of cores available in the Spark cluster, and optimally more; higher splits will lead to more fine-grained resumes. Aim for 8 * (Spark cores).
connections = Some(1), // Number of connections to use to Cassandra when copying
fetchSize = 1000, // Number of rows to fetch in each read
preserveTimestamps = true, // Preserve TTLs and WRITETIMEs of cells in the source database. Note that this option is *incompatible* when copying tables with collections (lists, maps, sets).
where = None // Optional condition to filter source table data that will be migrated, e.g. where: race_start_date = '2015-05-27' AND race_end_date = '2015-05-27'
)
val sourceDF = Cassandra.readDataframe(
spark,
cassandraSource,
cassandraSource.preserveTimestamps,
tokenRangesToSkip = Set()
)
sourceDF.dataFrame.printSchema()
import com.cassandra.migrator.writers
implicit val spark = SparkSession
.builder()
.appName("cassandra-migrator")
.config("spark.task.maxFailures", "1024")
.config("spark.stage.maxConsecutiveAttempts", "60")
.getOrCreate
val target = new TargetSettings.Cassandra(
host = "<target Cassandra host name/IP>",
port = 9042,
localDC = None,
credentials = Some(com.cassandra.migrator.config.Credentials(
username="<username here>",
password="<password here>")
),
sslOptions = Some(SSLOptions(
clientAuthEnabled=false,
enabled=true,
trustStorePassword = None,
trustStorePath = None,
trustStoreType = None,
keyStorePassword = None,
keyStorePath = None,
keyStoreType = None,
enabledAlgorithms = Some(Set("TLS_RSA_WITH_AES_128_CBC_SHA","TLS_RSA_WITH_AES_256_CBC_SHA")),
protocol = Some("TLS")
)),
keyspace = "<target keyspace name>",
table = "<target table name>",
connections = Some(1),
stripTrailingZerosForDecimals = false
)
writers.Cassandra.writeDataframe(
target,
List(),
sourceDF.dataFrame,
sourceDF.timestampColumns
)
To validate the migration using row comparison, create a third cell with the following and adjust the parameters to preferred tolerance:
import com.cassandra.migrator.Validator
import com.cassandra.migrator.config._
val spark = SparkSession
.builder()
.appName("cassandra-migrator")
.config("spark.task.maxFailures", "1024")
.config("spark.stage.maxConsecutiveAttempts", "60")
.getOrCreate
val validatorConfig = new Validation(
compareTimestamps = true,
ttlToleranceMillis = 1,
writetimeToleranceMillis = 1,
failuresToFetch = 10,
floatingPointTolerance = 1.0
)
val migratorConfig = new MigratorConfig(
cassandraSource,
target,
List(),
savepoints = null,
skipTokenRanges = Set(),
validatorConfig
)
Validator.runValidation(migratorConfig)(spark)
If rows do not match, this will return something like the following output:
The row comparison in Validation
may return an error for missing rows in the target table, for example see below:
This may indicate that a transient failure occured during the overall migration process. If this happens, you can use the Validator to extract the missing records, and re-run the migration inserting only those records, as long as you can specify the primary key for filtering.
Add the below sample cell after your existing cells in the same notebook. This will construct the values required in the "where" parameter of SourceSettings
resulting from the row comparison, and will then write only those filtered records to the target table. Be sure to change the value of primaryKey
to be the name of your primary key field, as well as replacing the credentials and source keyspace/table:
implicit val spark = SparkSession
.builder()
.appName("cassandra-migrator")
.config("spark.task.maxFailures", "1024")
.config("spark.stage.maxConsecutiveAttempts", "60")
.getOrCreate
//construct Cassandra IN clause to filter only missing rows - ***CHANGE primaryKey
val primaryKey = "<primary key of source/target table>"
val failures = Validator.runValidation(migratorConfig)(spark)
val whereValues = failures
.map(failure => s"'${failure.row.getString(primaryKey)}'")
.mkString(s"$primaryKey IN (", ",", ")")
//re-set cassandraSource
var cassandraSource = new SourceSettings.Cassandra(
host = "<source Cassandra host name/IP here>",
port = 9042,
localDC = None,
credentials = Some(Credentials(
username="<username here>",
password="<password here>")
),
sslOptions = Some(SSLOptions(
clientAuthEnabled=false,
enabled=true,
trustStorePassword = None,
trustStorePath = None,
trustStoreType = None,
keyStorePassword = None,
keyStorePath = None,
keyStoreType = None,
enabledAlgorithms = None,
protocol = Some("TLS")
)),
keyspace = "<source keyspace name>",
table = "<source table name>",
splitCount = Some(1),
connections = Some(1),
fetchSize = 1000,
preserveTimestamps = true,
//specifying where values extracted from validation above to filter only missing records for migration
where = Some(whereValues)
)
val sourceDF = Cassandra.readDataframe(
spark,
cassandraSource,
cassandraSource.preserveTimestamps,
tokenRangesToSkip = Set()
)
// re-use exiting target config to re-migrate only failed records
writers.Cassandra.writeDataframe(
target,
List(),
sourceDF.dataFrame,
sourceDF.timestampColumns
)
Parameter | Description | Default value |
---|---|---|
enabled | Enable secure connection to Cassandra cluster | false |
trustStorePath | Path for the trust store being used | None |
trustStorePassword | Trust store password | None |
trustStoreType | Trust store type | JKS |
protocol | SSL protocol | TLS |
enabledAlgorithms | SSL cipher suites | Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA") |
clientAuthEnabled | Enable 2-way secure connection to Cassandra cluster | false |
keyStorePath | Path for the key store being used | None |
keyStorePassword | Key store password | None |
keyStoreType | Key store type | JKS |