Простая программа на Scala, работающая с Cassandra

5 августа 2015

Некоторое время назад мы с вами успешно разобрались, что такое Cassandra, а также как установить и настроить Cassandra-кластер в облаке от Amazon. В чем мы не разобрались, это в том, как использовать Cassandra в наших программах. Настало время исправить столь вопиющую несправедливость!

Изучая CQL в качестве примера мы создавали базу данных, хранящую список TODO. Раз уж этот пример так хорошо изучен, нет причин не использовать его повторно при написании программы.

Добавляем зависимость в build.sbt:

libraryDependencies ++= Seq(
    "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.6"
  )

Драйвер использует Guava, поэтому нам понадобится implicit преобразование футур Guava в футуры Scala:

package me.eax.cassandra_example

import com.datastax.driver.core._
import com.google.common.util.concurrent._
import scala.concurrent._

package object utils {

  implicit def futCSToScala(f: ResultSetFuture): Future[ResultSet] = {
    val promise = Promise[ResultSet]()

    val callback = new FutureCallback[ResultSet] {
      def onSuccess(result: ResultSet): Unit = {
        promise success result
      }

      def onFailure(err: Throwable): Unit = {
        promise failure err
      }
    }

    Futures.addCallback(f, callback)
    promise.future
  }

}

Пользоваться драйвером довольно просто:

package me.eax.cassandra_example.dao

import com.datastax.driver.core._
import com.datastax.driver.core.querybuilder.{QueryBuilder => QB}
import me.eax.cassandra_example.utils._

import scala.collection.JavaConverters._
import scala.concurrent._

case class TodoDTO(id: Int, descr: String)

case class TodoDAO(session: Session)(implicit ec: ExecutionContext) {

  private val table = "todo_list"
  private val id = "id"
  private val description = "description"

  def createTable: Future[Unit] = {
    val query = s"create table if not exists $table ($id int " +
                s"primary key, $description text )"
    session.executeAsync(query).map(_ => {})
  }

  def dropTable: Future[Unit] = {
    val query = s"drop table if exists $table"
    session.executeAsync(query).map(_ => {})
  }

  def insert(dto: TodoDTO): Future[Unit] = {
    val query = {
      QB.insertInto(table)
        .value(id, dto.id)
        .value(description, dto.descr)
    }
    session.executeAsync(query).map(_ => {})
  }

  def select: Future[Seq[TodoDTO]] = {
    val query = {
      QB.select(id, description)
        .from(table)
    }

    for {
      resultSet <- session.executeAsync(query)
    } yield {
      resultSet
        .asScala
        .map(row => TodoDTO(row.getInt(id),
                            row.getString(description)))
        .toSeq
    }
  }

  def delete(idToDelete: Long): Future[Unit] = {
    val query = {
      QB.delete().all()
        .from(table)
        .where(QB.eq(id, idToDelete))
    }
    session.executeAsync(query).map(_ => {})
  }
}

Наконец, имея готовый DAO, несложно написать программу, устанавливающую соединение с кластером и производящую кое-какие махинации с данными:

package me.eax.cassandra_example

import com.datastax.driver.core._
import me.eax.cassandra_example.dao._
import me.eax.cassandra_example.utils._

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object CassandraExample extends App {
  val cluster = {
    Cluster.builder()
      .addContactPoint("10.110.0.10")
      // .withCredentials("username", "password")
      .build()
  }

  val session = cluster.connect("test")
  val todoDao = TodoDAO(session)

  val f = {
    for {
      _ <- todoDao.createTable
      _ = println("Inserting items")
      _ <- {
        ftraverse((1 to 3).toSeq) { n =>
          val item = TodoDTO(n, s"Todo item $n")
          todoDao.insert(item)
        }
      }
      items <- todoDao.select
      _ = println(s"Items: $items")
      _ = println("Deleting item 2")
      _ <- todoDao.delete(2)
      newItems <- todoDao.select
      _ = println(s"New items: $newItems")
      _ <- todoDao.dropTable
    } yield {}
  }

  f onFailure { case e =>
    println(s"ERROR: $e")
    e.printStackTrace()
  }

  Await.ready(f, Duration.Inf)
  cluster.close()
  println("Done!")

}

Используемая в этом примере функция ftraverse аналогичная Future.sequence. Отличие заключается в том, что Future.sequence выполняет несколько футур параллельно, а при использовании ftraverse одновременно выполняется только одна футура. В сложных приложениях использование sequence может приводить к лавинообразному созданию футур и забиванию ими трэдпула при получении одного-единственного запроса пользователя. Поэтому по возможности я бы советовал всегда использовать ftraverse вместо sequence. Исходный код функции ftraverse можно найти здесь.

Есть один важный момент, который следует учитывать при работе с драйвером к Cassandra. Метод executeAsync класса Session перегружен и может принимать как строку с запросом на языке CQL, так и наследника абстрактного класса Statement. В приведенном примере мы пользовались последним способом. Но при этом Statement имеет метод getQueryString, который вроде как возвращает строку к запросом. И поэтому возникает соблазн вызвать getQueryString, а затем executeAsync, принимающий строку. Так вот, я не знаю, считается ли это багом или нормальным поведением, но getQueryString иногда возвращает невалидные запросы. Так что, если вдруг вы словите странное исключение вроде:

InvalidQueryException: Invalid amount of bind variables

… проверьте, не генерируете ли вы запросы при помощи getQueryString.

Следует отметить, что в отличие, например, от Java-клиента к Couchbase, клиент к Cassandra является очень умным. Он поддерживает пулы соединений, которые увеличиваются или уменьшаются в зависимости от текущей нагрузки, умеет автоматически восстанавливать порвавшееся соединение с экспоненциальным ростом времени реконнекта, а также многое другое. Поэтому в реальном проекте объявление переменной cluster будет больше похоже на следующее:

val cluster = {
  Cluster.builder()
    .addContactPoint("172.31.0.11")
    .addContactPoint("172.31.0.22")
    .addContactPoint("172.31.0.33")
    .withPort(port)
    .withPoolingOptions(
      new PoolingOptions()
        .setConnectionsPerHost(HostDistance.REMOTE, 5, 8)
        .setNewConnectionThreshold(HostDistance.REMOTE, 10)
        .setConnectionsPerHost(HostDistance.LOCAL, 5, 8)
        .setNewConnectionThreshold(HostDistance.LOCAL, 10)
        .setHeartbeatIntervalSeconds(10)
    )
    .withSocketOptions(
      new SocketOptions()
        .setConnectTimeoutMillis(1000)
        .setReadTimeoutMillis(3000)
    )
    .withReconnectionPolicy(
      new ExponentialReconnectionPolicy(300, 3000)
    )
    .build()
}

Понятно, что приведенный пример очень прост и многое осталось за кадром. Но со всякими нюансами типа выполнения batch запросов и указания consistency level, думаю, вы теперь без труда разберетесь самостоятельно.

Ссылки по теме:

А каким драйвером к Cassandra пользуетесь вы?

Метки: , , .