← На главную

Простая программа на Scala, работающая с Cassandra

Некоторое время назад мы с вами успешно разобрались, что такое Cassandra, а также как установить и настроить Cassandra-кластер в облаке от Amazon. В чем мы не разобрались, это в том, как использовать Cassandra в наших программах. Настало время исправить столь вопиющую несправедливость!

Изучая CQL в качестве примера мы создавали базу данных, хранящую список TODO. Раз уж этот пример так хорошо изучен, нет причин не использовать его повторно при написании программы.

Добавляем зависимость в build.sbt:

libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.6" )

Драйвер использует Guava, поэтому нам понадобится implicit преобразование футур Guava в футуры Scala:

package me.eax.cassandra_example import com.datastax.driver.core._ import com.google.common.util.concurrent._ import scala.concurrent._ package object utils { implicit def futCSToScala(f: ResultSetFuture): Future[ResultSet] = { val promise = Promise[ResultSet]() val callback = new FutureCallback[ResultSet] { def onSuccess(result: ResultSet): Unit = { promise success result } def onFailure(err: Throwable): Unit = { promise failure err } } Futures.addCallback(f, callback) promise.future } }

Пользоваться драйвером довольно просто:

package me.eax.cassandra_example.dao import com.datastax.driver.core._ import com.datastax.driver.core.querybuilder.{QueryBuilder => QB} import me.eax.cassandra_example.utils._ import scala.collection.JavaConverters._ import scala.concurrent._ case class TodoDTO(id: Int, descr: String) case class TodoDAO(session: Session)(implicit ec: ExecutionContext) { private val table = "todo_list" private val id = "id" private val description = "description" def createTable: Future[Unit] = { val query = s"create table if not exists $table ($id int " + s"primary key, $description text )" session.executeAsync(query).map(_ => {}) } def dropTable: Future[Unit] = { val query = s"drop table if exists $table" session.executeAsync(query).map(_ => {}) } def insert(dto: TodoDTO): Future[Unit] = { val query = { QB.insertInto(table) .value(id, dto.id) .value(description, dto.descr) } session.executeAsync(query).map(_ => {}) } def select: Future[Seq[TodoDTO]] = { val query = { QB.select(id, description) .from(table) } for { resultSet <- session.executeAsync(query) } yield { resultSet .asScala .map(row => TodoDTO(row.getInt(id), row.getString(description))) .toSeq } } def delete(idToDelete: Long): Future[Unit] = { val query = { QB.delete().all() .from(table) .where(QB.eq(id, idToDelete)) } session.executeAsync(query).map(_ => {}) } }

Наконец, имея готовый DAO, несложно написать программу, устанавливающую соединение с кластером и производящую кое-какие махинации с данными:

package me.eax.cassandra_example import com.datastax.driver.core._ import me.eax.cassandra_example.dao._ import me.eax.cassandra_example.utils._ import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global object CassandraExample extends App { val cluster = { Cluster.builder() .addContactPoint("10.110.0.10") // .withCredentials("username", "password") .build() } val session = cluster.connect("test") val todoDao = TodoDAO(session) val f = { for { _ <- todoDao.createTable _ = println("Inserting items") _ <- { ftraverse((1 to 3).toSeq) { n => val item = TodoDTO(n, s"Todo item $n") todoDao.insert(item) } } items <- todoDao.select _ = println(s"Items: $items") _ = println("Deleting item 2") _ <- todoDao.delete(2) newItems <- todoDao.select _ = println(s"New items: $newItems") _ <- todoDao.dropTable } yield {} } f onFailure { case e => println(s"ERROR: $e") e.printStackTrace() } Await.ready(f, Duration.Inf) cluster.close() println("Done!") }

Используемая в этом примере функция ftraverse аналогичная Future.sequence. Отличие заключается в том, что Future.sequence выполняет несколько футур параллельно, а при использовании ftraverse одновременно выполняется только одна футура. В сложных приложениях использование sequence может приводить к лавинообразному созданию футур и забиванию ими трэдпула при получении одного-единственного запроса пользователя. Поэтому по возможности я бы советовал всегда использовать ftraverse вместо sequence. Исходный код функции ftraverse можно найти здесь.

Есть один важный момент, который следует учитывать при работе с драйвером к Cassandra. Метод executeAsync класса Session перегружен и может принимать как строку с запросом на языке CQL, так и наследника абстрактного класса Statement. В приведенном примере мы пользовались последним способом. Но при этом Statement имеет метод getQueryString, который вроде как возвращает строку к запросом. И поэтому возникает соблазн вызвать getQueryString, а затем executeAsync, принимающий строку. Так вот, я не знаю, считается ли это багом или нормальным поведением, но getQueryString иногда возвращает невалидные запросы. Так что, если вдруг вы словите странное исключение вроде:

InvalidQueryException: Invalid amount of bind variables

… проверьте, не генерируете ли вы запросы при помощи getQueryString.

Следует отметить, что в отличие, например, от Java-клиента к Couchbase, клиент к Cassandra является очень умным. Он поддерживает пулы соединений, которые увеличиваются или уменьшаются в зависимости от текущей нагрузки, умеет автоматически восстанавливать порвавшееся соединение с экспоненциальным ростом времени реконнекта, а также многое другое. Поэтому в реальном проекте объявление переменной cluster будет больше похоже на следующее:

val cluster = { Cluster.builder() .addContactPoint("172.31.0.11") .addContactPoint("172.31.0.22") .addContactPoint("172.31.0.33") .withPort(port) .withPoolingOptions( new PoolingOptions() .setConnectionsPerHost(HostDistance.REMOTE, 5, 8) .setNewConnectionThreshold(HostDistance.REMOTE, 10) .setConnectionsPerHost(HostDistance.LOCAL, 5, 8) .setNewConnectionThreshold(HostDistance.LOCAL, 10) .setHeartbeatIntervalSeconds(10) ) .withSocketOptions( new SocketOptions() .setConnectTimeoutMillis(1000) .setReadTimeoutMillis(3000) ) .withReconnectionPolicy( new ExponentialReconnectionPolicy(300, 3000) ) .build() }

Понятно, что приведенный пример очень прост и многое осталось за кадром. Но со всякими нюансами типа выполнения batch запросов и указания consistency level, думаю, вы теперь без труда разберетесь самостоятельно.

Ссылки по теме:

А каким драйвером к Cassandra пользуетесь вы?