Простая программа на Scala, работающая с Cassandra
5 августа 2015
Некоторое время назад мы с вами успешно разобрались, что такое Cassandra, а также как установить и настроить Cassandra-кластер в облаке от Amazon. В чем мы не разобрались, это в том, как использовать Cassandra в наших программах. Настало время исправить столь вопиющую несправедливость!
Изучая CQL в качестве примера мы создавали базу данных, хранящую список TODO. Раз уж этот пример так хорошо изучен, нет причин не использовать его повторно при написании программы.
Добавляем зависимость в build.sbt:
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.6"
)
Драйвер использует Guava, поэтому нам понадобится implicit преобразование футур Guava в футуры Scala:
import com.datastax.driver.core._
import com.google.common.util.concurrent._
import scala.concurrent._
package object utils {
implicit def futCSToScala(f: ResultSetFuture): Future[ResultSet] = {
val promise = Promise[ResultSet]()
val callback = new FutureCallback[ResultSet] {
def onSuccess(result: ResultSet): Unit = {
promise success result
}
def onFailure(err: Throwable): Unit = {
promise failure err
}
}
Futures.addCallback(f, callback)
promise.future
}
}
Пользоваться драйвером довольно просто:
import com.datastax.driver.core._
import com.datastax.driver.core.querybuilder.{QueryBuilder => QB}
import me.eax.cassandra_example.utils._
import scala.collection.JavaConverters._
import scala.concurrent._
case class TodoDTO(id: Int, descr: String)
case class TodoDAO(session: Session)(implicit ec: ExecutionContext) {
private val table = "todo_list"
private val id = "id"
private val description = "description"
def createTable: Future[Unit] = {
val query = s"create table if not exists $table ($id int " +
s"primary key, $description text )"
session.executeAsync(query).map(_ => {})
}
def dropTable: Future[Unit] = {
val query = s"drop table if exists $table"
session.executeAsync(query).map(_ => {})
}
def insert(dto: TodoDTO): Future[Unit] = {
val query = {
QB.insertInto(table)
.value(id, dto.id)
.value(description, dto.descr)
}
session.executeAsync(query).map(_ => {})
}
def select: Future[Seq[TodoDTO]] = {
val query = {
QB.select(id, description)
.from(table)
}
for {
resultSet <- session.executeAsync(query)
} yield {
resultSet
.asScala
.map(row => TodoDTO(row.getInt(id),
row.getString(description)))
.toSeq
}
}
def delete(idToDelete: Long): Future[Unit] = {
val query = {
QB.delete().all()
.from(table)
.where(QB.eq(id, idToDelete))
}
session.executeAsync(query).map(_ => {})
}
}
Наконец, имея готовый DAO, несложно написать программу, устанавливающую соединение с кластером и производящую кое-какие махинации с данными:
import com.datastax.driver.core._
import me.eax.cassandra_example.dao._
import me.eax.cassandra_example.utils._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object CassandraExample extends App {
val cluster = {
Cluster.builder()
.addContactPoint("10.110.0.10")
// .withCredentials("username", "password")
.build()
}
val session = cluster.connect("test")
val todoDao = TodoDAO(session)
val f = {
for {
_ <- todoDao.createTable
_ = println("Inserting items")
_ <- {
ftraverse((1 to 3).toSeq) { n =>
val item = TodoDTO(n, s"Todo item $n")
todoDao.insert(item)
}
}
items <- todoDao.select
_ = println(s"Items: $items")
_ = println("Deleting item 2")
_ <- todoDao.delete(2)
newItems <- todoDao.select
_ = println(s"New items: $newItems")
_ <- todoDao.dropTable
} yield {}
}
f onFailure { case e =>
println(s"ERROR: $e")
e.printStackTrace()
}
Await.ready(f, Duration.Inf)
cluster.close()
println("Done!")
}
Используемая в этом примере функция ftraverse аналогичная Future.sequence. Отличие заключается в том, что Future.sequence выполняет несколько футур параллельно, а при использовании ftraverse одновременно выполняется только одна футура. В сложных приложениях использование sequence может приводить к лавинообразному созданию футур и забиванию ими трэдпула при получении одного-единственного запроса пользователя. Поэтому по возможности я бы советовал всегда использовать ftraverse вместо sequence. Исходный код функции ftraverse можно найти здесь.
Есть один важный момент, который следует учитывать при работе с драйвером к Cassandra. Метод executeAsync класса Session перегружен и может принимать как строку с запросом на языке CQL, так и наследника абстрактного класса Statement. В приведенном примере мы пользовались последним способом. Но при этом Statement имеет метод getQueryString, который вроде как возвращает строку к запросом. И поэтому возникает соблазн вызвать getQueryString, а затем executeAsync, принимающий строку. Так вот, я не знаю, считается ли это багом или нормальным поведением, но getQueryString иногда возвращает невалидные запросы. Так что, если вдруг вы словите странное исключение вроде:
… проверьте, не генерируете ли вы запросы при помощи getQueryString.
Следует отметить, что в отличие, например, от Java-клиента к Couchbase, клиент к Cassandra является очень умным. Он поддерживает пулы соединений, которые увеличиваются или уменьшаются в зависимости от текущей нагрузки, умеет автоматически восстанавливать порвавшееся соединение с экспоненциальным ростом времени реконнекта, а также многое другое. Поэтому в реальном проекте объявление переменной cluster будет больше похоже на следующее:
Cluster.builder()
.addContactPoint("172.31.0.11")
.addContactPoint("172.31.0.22")
.addContactPoint("172.31.0.33")
.withPort(port)
.withPoolingOptions(
new PoolingOptions()
.setConnectionsPerHost(HostDistance.REMOTE, 5, 8)
.setNewConnectionThreshold(HostDistance.REMOTE, 10)
.setConnectionsPerHost(HostDistance.LOCAL, 5, 8)
.setNewConnectionThreshold(HostDistance.LOCAL, 10)
.setHeartbeatIntervalSeconds(10)
)
.withSocketOptions(
new SocketOptions()
.setConnectTimeoutMillis(1000)
.setReadTimeoutMillis(3000)
)
.withReconnectionPolicy(
new ExponentialReconnectionPolicy(300, 3000)
)
.build()
}
Понятно, что приведенный пример очень прост и многое осталось за кадром. Но со всякими нюансами типа выполнения batch запросов и указания consistency level, думаю, вы теперь без труда разберетесь самостоятельно.
Ссылки по теме:
- Список рассылки, посвященный Java-клиенту к Cassandra;
- Phantom — асинхронный и типизированный DSL для Cassandra;
- При написании тестов используйте Mock к Cassandra;
- Что нужно и не нужно делать при работе с Cassandra-драйвером;
- Полная версия исходников к этой заметке на GitHub;
А каким драйвером к Cassandra пользуетесь вы?
Метки: Scala, СУБД, Функциональное программирование.
Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.