与前一章一样,我们使用DataStax Java驱动程序包含了代码示例,以帮助说明这些概念在实践中如何工作。
让我们首先注意向Cassandra写入数据的一些基本属性。首先,在Cassandra中写入数据非常快,因为它的设计不需要执行磁盘读取或搜索。 memtables和SSTables使Cassandra不必在写入时执行这些操作,从而减慢了许多数据库的速度。 Cassandra中的所有写入都是仅附加的。
由于数据库提交日志和提示切换设计,数据库始终是可写的,并且在列族中,写入始终是原子的。
插入,更新和Upsert由于Cassandra使用追加模型,因此插入和更新操作之间没有根本区别。如果插入与现有行具有相同主键的行,则替换该行。如果更新行并且主键不存在,Cassandra会创建它。
出于这个原因,人们常说Cassandra支持upsert,这意味着插入和更新被视为相同,只有一个小的例外,我们将在轻量级事务中看到它。
Cassandra的可调整一致性级别意味着您可以在查询中指定写入所需的一致性。更高的一致性级别意味着更多的副本节点需要响应,表明写入已完成。更高的一致性级别还伴随着可用性的降低,因为更多节点必须可操作才能使写入成功。表9-1中显示了对写入使用不同一致性级别的含义。
Consistency levelImplicationANYEnsure that the value is written to a minimum of one replica node before returning to the client, allowing hints to count as a write.ONE, TWO, THREEEnsure that the value is written to the commit log and memtable of at least one, two, or three nodes before returning to the client.LOCAL_ONESimilar to ONE, with the additional requirement that the responding node is in the local data center.QUORUMEnsure that the write was received by at least a majority of replicas ((replication factor / 2) + 1).LOCAL_QUORUMSimilar to QUORUM, where the responding nodes are in the local data center.EACH_QUORUMEnsure that a QUORUM of nodes respond in each data center.ALLEnsure that the number of nodes specified by replication factor received the write before returning to the client. If even one replica is unresponsive to the write operation, fail the operation.写入的最显着的一致性级别是任何级别。此级别意味着写入保证至少到达一个节点,但它允许提示计为成功写入。也就是说,如果执行写入操作并且操作针对该值的节点关闭,则服务器将对其自身进行注释,称为提示,它将存储,直到该节点重新启动。一旦节点启动,服务器将检测到这一点,查看它是否有任何以稍后以提示形式保存的写入,然后将值写入已恢复节点。在许多情况下,使提示的节点实际上不是存储它的节点;相反,它将其发送到已关闭的节点的一个非重复邻居。
在写入时使用ONE的一致性级别意味着写入操作将写入提交日志和memtable。这意味着在ONE处写入是持久的,因此该级别是用于实现快速性能和持久性的最低级别。如果此节点在写入操作之后立即关闭,则该值将被写入提交日志,当服务器重新启动时可以重播该日志,以确保它仍具有该值。
默认一致性级别Cassandra客户端通常支持为所有查询设置默认一致性级别,以及单个查询的特定级别。例如,在cqlsh中,您可以使用CONSISTENCY命令检查并设置默认一致性级别:
cqlsh> CONSISTENCY; Current consistency level is ONE. cqlsh> CONSISTENCY LOCAL_ONE; Consistency level set to LOCAL_ONE.在DataStax Java驱动程序中,可以通过提供com.datastax.driver.core.QueryOptions对象在Cluster.Builder上设置默认一致性级别:
QueryOptions queryOptions = new QueryOptions(); queryOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE); Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1"). withQueryOptions(queryOptions).build();可以在单个语句上覆盖默认一致性级别:
Statement statement = ... statement.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);写入路径描述了如何处理客户端发起的数据修改查询,最终导致数据存储在磁盘上。我们将根据节点之间的交互以及在单个节点上存储数据的内部过程来检查写入路径。图9-1显示了多数据中心集群中节点之间的写路径交互概述。
当客户端向Cassandra节点发起写入查询时,写入路径开始,该节点充当该请求的协调者。协调器节点根据密钥空间的复制因子使用分区器来识别集群中的哪些节点是副本。协调器节点本身可以是副本,尤其是在客户端使用令牌感知驱动程序的情况下。如果协调器知道没有足够的副本满足请求的一致性级别,则会立即返回错误。
接下来,协调器节点向所有正在写入的数据的副本发送同时写入请求。这确保了所有节点只要它们启动就会得到写入。关闭的节点将不具有一致的数据,但它们将通过反熵机制之一进行修复:暗示切换,读取修复或反熵修复。
如果群集跨越多个数据中心,则本地协调器节点在每个其他数据中心中选择远程协调器,以协调对该数据中心中的副本的写入。 每个远程副本直接响应原始协调器节点。
协调员等待副本响应。 一旦足够数量的副本响应以满足一致性级别,协调器就会确认写入客户端。 如果副本在超时内没有响应,则假定它已关闭,并为写入存储提示。 除非使用一致性级别ANY,否则提示不会计为成功的副本写入。
图9-2描述了每个副本节点内处理写入请求的交互。
首先,副本节点接收写请求并立即将数据写入提交日志。接下来,副本节点将数据写入memtable。如果使用行缓存并且该行在缓存中,则该行无效。我们将在读取路径下更详细地讨论缓存。
如果写入导致提交日志或memtable通过其最大阈值,则计划运行刷新。我们将在第12章学习如何调整这些阈值。
此时,写入被认为已成功,并且节点可以回复协调器节点或客户端。
返回后,如果安排了一个节点,节点将执行刷新。每个memtable的内容都作为SSTable存储在磁盘上,并清除提交日志。刷新完成后,将安排其他任务以检查是否需要压缩,然后在必要时执行压缩。
写路径的更多细节当然,这是写路径的简单概述,没有考虑诸如计数器修改和物化视图之类的变体。写入具有物化视图的表更复杂,因为必须锁定分区。 Cassandra在内部利用已记录的批次以维护物化视图。
有关写入路径的更深入处理,请参阅Michael Edge关于Apache Cassandra Wiki的优秀描述,网址为https://wiki.apache.org/cassandra/WritePathForUsers。
让我们看一下Cassandra写入磁盘的文件的更多细节,包括提交日志和SSTables。
Cassandra将提交日志作为二进制文件写入文件系统。提交日志文件位于$ CASSANDRA_HOME / data / commitlog目录下。
提交日志文件根据模式CommitLog- - .log命名。例如:CommitLog-6-1451831400468.log。版本是表示提交日志格式的整数。例如,3.0版本的版本是6.您可以在org.apache .cassandra .db.commitlog.CommitLogDescriptor类中找到发行版中正在使用的版本。
在刷新期间将SSTable写入文件系统时,实际上有几个文件是根据SSTable写入的。让我们看看$ CASSANDRA_HOME / data / data 目录,看看文件在磁盘上的组织方式。
强制SSTables到磁盘如果您在真正的Cassandra节点上跟随本书中的练习,那么您可能希望此时执行nodetool flush命令,因为您可能还没有为Cassandra输入足够的数据以自动将数据刷新到磁盘。我们将在第11章中了解有关此命令的更多信息。
查看数据目录,您将看到每个键空间的目录。反过来,这些目录包含每个表的目录,包括表名和UUID。 UUID的目的是区分多个模式版本,因为表的模式可以随时间改变。
每个目录都包含SSTable文件,其中包含存储的数据。这是一个示例目录路径:hotel / hotels-3677bbb0155811e5899aa9fac1d00bce。
每个SSTable由多个共享通用命名方案的文件表示。这些文件根据模式<version> - <generation> - <implementation> - <component> .db命名。模式的意义如下:
该版本是一个双字符序列,表示SSTable格式的主要/次要版本。 例如,3.0版本的版本是ma。 您可以在org.apache.cassandra.io.sstable.Descriptor类中了解有关各种版本的更多信息。生成是索引号,每次为表创建新的SSTable时,该索引号都会递增。该实现是对org.apache.cassandra.io.sstable.format.SSTableWriter接口的实现的引用。 从3.0版本开始,该值为“big”,它引用了org.apache.cassandra.io.sstable.format.big.BigFormat类中的“Bigtable格式”。每个SSTable都分为多个文件或组件。 这些是3.0版本中的组件:
* -Data.db 这些是存储实际数据的文件,是Cassandra备份机制保留的唯一文件,我们将在第11章中了解这些文件。 * -CompressionInfo.db 提供有关Data.db文件压缩的元数据。 * -Digest.adler32 包含* -Data.db文件的校验和。 (在3.0之前发布使用CRC 32校验和和.crc32扩展。) * -Filter.db 包含此SSTable的bloom过滤器。 * -Index.db 在相应的* -Data.db文件中提供行和列偏移。 Summary.db 索引的样本,用于更快的读取。 Statistics.db 存储nodetool tablehistograms命令使用的有关SSTable的统计信息。 TOC.txt 列出此SSTable的文件组件。旧版本支持不同的版本和文件名。 2.2之前的版本将密钥空间和表名称添加到每个文件,而2.2及更高版本将这些文件留下,因为它们可以从目录名称推断出来。
我们将在第11章中研究一些使用SSTable文件的工具。
正如我们之前在第1章中讨论的那样,Cassandra和许多其他NoSQL数据库不支持具有关系数据库支持的完整ACID语义的事务。但是,Cassandra确实提供了两种提供某些事务行为的机制:轻量级事务和批量。
Cassandra的轻量级事务(LWT)机制使用第6章中描述的Paxos算法.LWTs在2.0版本中引入。 LWT支持以下语义:
每个事务的范围仅限于一个分区。每个事务都包含读取和写入,也称为“比较和设置”操作。仅在比较成功时才执行该设置。如果事务因现有值与您预期的值不匹配而失败,Cassandra将包含当前值,以便您可以决定是否重试或中止而无需提出额外请求。 不支持USING TIMESTAMP选项。假设我们想要使用我们在第5章中介绍的数据模型为新酒店创建记录。我们希望确保我们不会覆盖具有相同ID的酒店,因此我们将IF NOT EXISTS语法添加到我们的插入命令:
cqlsh> INSERT INTO hotel.hotels (id, name, phone) VALUES ( 'AZ123', 'Super Hotel at WestWorld', '1-888-999-9999') IF NOT EXISTS; [applied] ----------- True此命令检查是否存在包含分区键的记录,该表包含hotel_id。 因此,让我们看看第二次执行此命令时会发生什么:
cqlsh> INSERT INTO hotel.hotels (id, name, phone) VALUES ( 'AZ123', 'Super Hotel at WestWorld', '1-888-999-9999') IF NOT EXISTS; [applied] | id | address | name | phone | pois -----------+-------+---------+--------------------------+----------------+------ False | AZ123 | null | Super Hotel at WestWorld | 1-888-999-9999 | null在这种情况下,事务失败,因为已经有一个ID为“AZ123”的酒店,并且cqlsh有助于回显包含失败指示的行和我们尝试输入的值。
它以类似的方式工作以进行更新。 例如,我们可能会使用以下语句来确保我们正在更改此酒店的名称:
cqlsh> UPDATE hotel.hotels SET name='Super Hotel Suites at WestWorld' ... WHERE id='AZ123' IF name='Super Hotel at WestWorld'; [applied] ----------- True cqlsh> UPDATE hotel.hotels SET name='Super Hotel Suites at WestWorld' ... WHERE id='AZ123' IF name='Super Hotel at WestWorld'; [applied] | name -----------+--------------------------------- False | Super Hotel Suites at WestWorld与我们在多个INSERT语句中看到的类似,再次输入相同的UPDATE语句会失败,因为已经设置了该值。 由于Cassandra的upsert模型,INSERT上可用的IF NOT EXISTS语法和UPDATE上的IF x = y语法表示这两个操作之间唯一的语义差异。
在架构创建上使用事务CQL还支持在创建键空间和表时使用IF NOT EXISTS选项。 如果您编写多个架构更新的脚本,这将特别有用。
让我们在使用DataStax Java驱动程序之前实现酒店创建示例。 执行条件语句时,ResultSet将包含一个Row,其列名为applied,类型为boolean。 这表明条件语句是否成功。 我们还可以在语句中使用wasApplied()操作:
SimpleStatement hotelInsert = session.newSimpleStatement( "INSERT INTO hotels (id, name, phone) VALUES (?, ?, ?) IF NOT EXISTS", "AZ123", "Super Hotel at WestWorld", "1-888-999-9999"); ResultSet hotelInsertResult = session.execute(hotelInsert); boolean wasApplied = hotelInsertResult.wasApplied()); if (wasApplied) { Row row = hotelInsertResult.one(); row.getBool("applied"); }除常规一致性级别外,条件写入语句还可以具有串行一致性级别。 串行一致性级别确定当参与节点正在协商建议的写入时,必须在写入的Paxos阶段中回复的节点数。 表9-2中显示了两个可用选项。
Consistency levelImplicationSERIALThis is the default serial consistency level, indicating that a quorum of nodes must respond.LOCAL_SERIALSimilar to SERIAL, but indicates that the transaction will only involve nodes in the local data center.串行一致性级别也可以应用于读取。如果Cassandra检测到查询正在读取属于未提交事务的数据,则它会根据指定的串行一致性级别将事务作为读取的一部分提交。
您可以使用SERIAL CONSISTENCY语句为cqlsh中的所有语句设置默认的串行一致性级别,或使用Query Options .setSerialConsistencyLevel()操作在DataStax Java驱动程序中设置默认的串行一致性级别。
虽然轻量级事务仅限于单个分区,但Cassandra提供了一种批处理机制,允许您将对多个分区的修改分组到一个语句中。
批处理操作的语义如下:
批处理中可能只包含修改语句(INSERT,UPDATE或DELETE)。批处理是原子的 - 也就是说,如果批处理被批处理,批处理中的所有语句最终都会成功。这就是为什么Cassandra的批次有时被称为原子批次或记录批次。属于给定分区键的批处理中的所有更新都是独立执行的,但跨分区没有隔离保证。这意味着可以在批处理完成之前读取对不同分区的修改。批处理不是事务机制,但您可以批量包含轻量级事务语句。批处理中的多个轻量级事务必须应用于同一分区。只有在称为计数批次的特殊批次形式中才允许进行计数器修改。计数器批次只能包含计数器修改。 Deprecation of Unlogged Batches在3.0之前的版本中,Cassandra支持未记录的批次或批处理,其中跳过涉及批处理日志的步骤。未记录批次的缺点是无法保证批次成功完成,这可能使数据库处于不一致状态。
使用批处理可以在客户端和协调器节点之间保存来回流量,因为客户端能够在单个查询中对多个语句进行分组。但是,批处理在协调器上放置了额外的工作来协调各种语句的执行。
Cassandra的批次非常适合用例,例如对单个分区进行多次更新,或者保持多个表同步。一个很好的例子是对非规范化表进行修改,这些表为不同的访问模式存储相同的数据。
Batches Aren’t for Bulk Loading第一次用户经常混淆批处理以获得更快的批量更新性能。绝对不是这种情况 - 批次实际上会降低性能并且可能导致垃圾收集压力。
让我们看一下我们可能用于在非规范化表设计中插入新酒店的示例批处理。我们使用CQL BEGIN BATCH和APPLY BATCH关键字来包围批处理中的语句:
cqlsh> BEGIN BATCH INSERT INTO hotel.hotels (id, name, phone) VALUES ('AZ123', 'Super Hotel at WestWorld', '1-888-999-9999'); INSERT INTO hotel.hotels_by_poi (poi_name, id, name, phone) VALUES ('West World', 'AZ123', 'Super Hotel at WestWorld', '1-888-999-9999'); APPLY BATCH;DataStax Java驱动程序通过com.datastax.driver.core.BatchStatement类支持批处理。 以下是Java客户端中相同批处理内容的示例:
SimpleStatement hotelInsert = session.newSimpleStatement( "INSERT INTO hotels (id, name, phone) VALUES (?, ?, ?)", "AZ123", "Super Hotel at WestWorld", "1-888-999-9999"); SimpleStatement hotelsByPoiInsert = session.newSimpleStatement( "INSERT INTO hotels_by_poi (poi_name, id, name, phone) VALUES (?, ?, ?, ?)", "WestWorld", "AZ123", "Super Hotel at WestWorld", "1-888-999-9999"); BatchStatement hotelBatch = new BatchStatement(); hotelBatch.add(hotelsByPoiInsert); hotelBatch.add(hotelInsert); ResultSet hotelInsertResult = session.execute(hotelBatch);您还可以通过传递其他语句,使用QueryBuilder.batch()操作创建批处理。您可以找到用于处理BatchStatement和com.cassandraguide.readwrite.BatchStatementExample的代码示例。
在DataStax驱动程序中创建计数器批次DataStax驱动程序不为计数器批处理提供单独的机制。相反,您必须记住创建仅包含计数器修改或仅包含非计数器修改的批次。
以下是批处理的工作原理:协调器将称为批处理日志的批处理副本发送到另外两个节点,并存储在system.batchlog表中。然后,协调器执行批处理中的所有语句,并在语句完成后从其他节点中删除批处理日志。
如果协调器未能完成批处理,则其他节点在其批处理日志中具有副本,因此能够重播批处理。每个节点每分钟检查一次批处理日志,以查看是否有任何应该已完成的批处理。为了给协调器提供足够的时间来完成任何正在进行的批处理,Cassandra使用批处理语句中时间戳的宽限期等于write_request_timeout_in_ms属性值的两倍。任何早于此宽限期的批次都将被重播,然后从剩余节点中删除。第二个批日志节点提供额外的冗余层,确保批处理机制的高可靠性。
Cassandra强制限制批处理语句的大小,以防止它们变得任意大,并影响集群的性能和稳定性。 cassandra.yaml文件包含两个控制其工作方式的属性:batch_size_warn_threshold_in_kb属性定义节点将在WARN日志级别记录的级别,该级别已收到大批量,而任何超过设置值batch_size_fail_threshold_in_kb的批次将被拒绝并导致向客户端发送错误通知。批量大小是根据CQL查询语句的长度来度量的。警告阈值默认为5KB,而失败阈值默认为50KB。
Cassandra的读取功能有一些基本属性值得注意。首先,它很容易读取数据,因为客户端可以连接到群集中的任何节点以执行读取,而无需知道特定节点是否充当该数据的副本。如果客户端连接到没有其尝试读取的数据的节点,则它所连接的节点将充当协调器节点,以从具有它的节点读取数据,由令牌范围标识。
在Cassandra中,读取通常比写入慢。为了完成读取操作,Cassandra通常必须执行搜索,但您可以通过添加节点,使用具有更多内存的计算实例以及使用Cassandra的缓存来在内存中保留更多数据。 Cassandra还必须在读取时同步等待响应(基于一致性级别和复制因子),然后根据需要执行读取修复。
读操作的一致性级别与写一致性级别类似,但它们的含义略有不同。更高的一致性级别意味着更多节点需要响应查询,从而更加确保每个副本上存在的值相同。如果两个节点以不同的时间戳响应,则最新值将获胜,这将返回给客户端。在后台,Cassandra将执行所谓的读取修复:它注意到一个或多个副本响应具有过期值的查询的事实,并使用最新值更新这些副本以使它们全部一致。
表9-3中显示了可能的一致性级别以及为读取查询指定每个级别的含义。
Consistency levelImplicationONE, TWO, THREEImmediately return the record held by the first node(s) that respond to the query. A background thread is created to check that record against the same record on other replicas. If any are out of date, a read repair is then performed to sync them all to the most recent value.LOCAL_ONESimilar to ONE, with the additional requirement that the responding node is in the local data center.QUORUMQuery all nodes. Once a majority of replicas ((replication factor / 2) + 1) respond, return to the client the value with the most recent timestamp. Then, if necessary, perform a read repair in the background on all remaining replicas.LOCAL_QUORUMSimilar to QUORUM, where the responding nodes are in the local data center.EACH_QUORUMEnsure that a QUORUM of nodes respond in each data center.ALLQuery all nodes. Wait for all nodes to respond, and return to the client the record with the most recent timestamp. Then, if necessary, perform a read repair in the background. If any nodes fail to respond, fail the read operation.从表中可以看出,读操作不支持ANY一致性级别。请注意,一致性级别ONE的含义是响应读取操作的第一个节点是客户端将获得的值 - 即使它已过期。在返回记录之后执行读取修复操作,因此任何后续读取都将具有一致的值,而不管响应节点如何。
值得注意的另一个项目是在一致性级别ALL的情况下。如果指定ALL,则表示您需要响应所有副本,因此如果具有该记录的任何节点关闭或者在超时之前无法响应,则读取操作将失败。如果节点在配置文件中的rpc_timeout_in_ms指定的值之前没有响应查询,则认为该节点没有响应。默认值为10秒。
对齐读写一致性级别您选择在应用程序中使用的读写一致性级别是Cassandra为我们在一致性,可用性和性能之间进行权衡的灵活性示例。
正如我们在第6章中学到的,Cassandra可以通过使用总和超过复制因子的读写一致性级别来保证读取的强一致性。实现此目的的一种简单方法是要求QUORUM进行读写。例如,在复制因子为3的键空间上,QUORUM表示来自三个节点中的两个的响应。因为2 + 2> 3,所以保证了强大的一致性。
如果您愿意牺牲强一致性以支持增加的吞吐量和对已故节点的更大容忍度,则可以使用较低的一致性级别。例如,对于写入使用QUORUM而对于读取使用ONE不能保证强一致性,因为2 + 1仅等于3。
通过实际考虑这一点,如果你只保证写入三个副本中的两个,那么其中一个副本肯定有可能没有收到写入但尚未修复,并且在一致性级别ONE的读取可能会那节点。
现在让我们来看看客户端请求数据时会发生什么。这称为读取路径。我们将从查询单个分区键的角度描述读取路径,从图9-3中所示的节点之间的交互开始。
当客户端向协调器节点发起读取查询时,读取路径开始。与写入路径一样,协调器使用分区程序来确定副本,并检查是否有足够的副本来满足请求的一致性级别。与写入路径的另一个相似之处在于,对于涉及多个数据中心的任何读取查询,每个数据中心选择一个远程协调器。
如果协调器本身不是副本,则协调器然后向最快的副本发送读取请求,这由动态告警确定。协调器节点还向其他副本发送摘要请求。摘要请求类似于标准读取请求,除了副本返回所请求数据的摘要或散列。
协调器计算从最快副本返回的数据的摘要哈希值,并将其与从其他副本返回的摘要进行比较。如果摘要一致,并且已满足所需的一致性级别,则可以返回来自最快副本的数据。如果摘要不一致,则协调器必须执行读取修复,如以下部分所述。
图9-4显示了每个副本节点内处理读取请求的交互。
当副本节点收到读取请求时,它首先检查行缓存。如果行缓存包含数据,则可以立即返回。行缓存有助于加快频繁访问的行的读取性能。我们将在第12章讨论行缓存的优缺点。
如果数据不在行缓存中,则副本节点将在memtables和SSTable中搜索数据。对于给定的表,只有一个memtable,因此搜索的一部分很简单。但是,单个Cassandra表可能有许多物理SSTable,每个表都可能包含一部分请求的数据。
Cassandra实现了一些优化SSTable搜索的功能:密钥缓存,Bloom过滤器,SSTable索引和摘要索引。
在磁盘上搜索SSTables的第一步是使用Bloom过滤器来确定给定SSTable中是否存在请求的分区,这样就不必搜索该SSTable。
调整Bloom过滤器Cassandra在内存中维护了Bloom过滤器的副本,尽管您可能还记得我们之前对上述文件的讨论,Bloom过滤器与SSTable数据文件一起存储在文件中,因此如果重新启动节点则不必重新计算它们。
Bloom过滤器不保证SSTable包含分区,只保证它可能包含分区。您可以在每个表上设置bloom_ filter_ fp_chance属性,以控制Bloom过滤器报告的误报百分比。这种提高的准确性是以额外的内存使用为代价的。
如果SSTable通过Bloom过滤器,Cassandra会检查密钥缓存,看它是否包含SSTable中分区键的偏移量。密钥缓存实现为映射结构,其中密钥是SSTable文件描述符和分区密钥的组合,值是偏移位置到SSTable文件中。密钥缓存有助于消除SSTable文件中针对频繁访问的数据的搜索,因为可以直接读取数据。
如果未从密钥缓存中获取偏移量,则Cassandra使用存储在磁盘上的两级索引来定位偏移量。第一级索引是分区摘要,用于获取在第二级索引(分区索引)内搜索分区键的偏移量。分区索引是存储分区密钥的SSTable偏移量的位置。
如果找到分区键的偏移量,Cassandra将以指定的偏移量访问SSTable并开始读取数据。
从所有SSTable获取数据后,Cassandra通过选择具有每个请求列的最新时间戳的值来合并SSTable数据和可记忆数据。遇到的任何墓碑都会被忽略。
最后,可以将合并的数据添加到行缓存(如果已启用)并返回到客户端或协调器节点。摘要请求的处理方式与常规读取请求的处理方式大致相同,另外一步是在结果数据上计算摘要而不是数据本身。
读取路径上的更多细节有关读取路径的更多详细信息,请参阅Apache Cassandra Wiki。
以下是读取修复的工作原理:协调器从所有副本节点发出完整的读取请求。协调器节点通过为每个请求的列选择一个值来合并数据。它比较从副本返回的值并返回具有最新时间戳的值。如果Cassandra发现使用相同时间戳存储的不同值,它将按字典顺序比较值并选择具有更大值的值。这种情况应该非常罕见。合并数据是返回给客户端的值。
异步地,协调器识别返回过时数据的任何副本,并向每个副本发出读取修复请求,以根据合并数据更新其数据。
可以在返回客户端之前或之后执行读取修复。如果您使用两个更强的一致性级别之一(QUORUM或ALL),则在将数据返回到客户端之前进行读取修复。如果客户端指定弱一致性级别(例如ONE),则在返回到客户端之后可选地在后台执行读取修复。导致给定表的后台修复的读取百分比由表的read_repair_chance和dc_local_read_repair_chance选项确定。
到目前为止,在我们的旅行中,我们将阅读查询限制在非常简单的示例中。让我们看一下Cassandra在SELECT命令中提供的更多选项,例如WHERE和ORDER BY子句。
首先,让我们研究一下如何使用Cassandra提供的WHERE子句来读取分区内的数据范围,有时也称为slice。
但是,为了进行范围查询,有助于使用一些数据。虽然我们还没有很多数据,但我们可以通过使用Cassandra的批量加载工具快速获得一些数据。
批量加载选项在使用Cassandra时,您经常会发现将测试或引用数据加载到集群中很有用。 幸运的是,有几种简单的方法可以将格式化数据批量加载到Cassandra和从Cassandra加载。
cqlsh支持通过COPY命令加载和卸载逗号分隔变量(CSV)文件。
例如,以下命令可用于将hotels表的内容保存到文件中:
cqlsh:hotel> COPY hotels TO 'hotels.csv' WITH HEADER=TRUE;TO值指定要写入的文件,HEADER选项为TRUE导致列名称在我们的输出文件中打印。 我们可以使用以下命令编辑此文件并读回内容:
cqlsh:hotel> COPY hotels FROM 'hotels.csv' WITH HEADER=true;COPY命令支持其他选项来配置引号,转义和时间的表示方式。
Brian Hess创建了一个名为Cassandra Loader的命令行工具,可以加载和卸载CSV文件以及其他分隔文件,并且足够灵活,可以使用逗号作为小数分隔符。
我们可以使用cqlsh将一些样本酒店库存数据加载到我们的集群中。 您可以在本书的GitHub存储库中访问一个简单的.csv文件。 available_rooms.csv文件包含两个小旅馆的一个月的库存,每个小旅馆有五个房间。 让我们将数据加载到集群中:
cqlsh:hotel> COPY available_rooms_by_hotel_date FROM 'available_rooms.csv' WITH HEADER=true; 310 rows imported in 0.789 seconds.如果您快速查询以阅读这些数据,您会发现我们有两家酒店的数据:“AZ123”和“NY229”。
现在让我们考虑如何支持我们标记为“Q4”的查询。 在第5章中查找给定日期范围内的可用空间。请记住,我们使用主键设计了available_rooms_by_hotel_date表以支持此查询:
PRIMARY KEY (hotel_id, date, room_number)这意味着hotel_id是分区键,而date和room_number是聚类列。
这是一个CQL语句,允许我们搜索特定酒店和日期范围的酒店房间:
cqlsh:hotel> SELECT * FROM available_rooms_by_hotel_date WHERE hotel_id='AZ123' and date>'2016-01-05' and date<'2016-01-12'; hotel_id | date | room_number | is_available ----------+------------+-------------+-------------- AZ123 | 2016-01-06 | 101 | True AZ123 | 2016-01-06 | 102 | True AZ123 | 2016-01-06 | 103 | True AZ123 | 2016-01-06 | 104 | True AZ123 | 2016-01-06 | 105 | True ... (60 rows)请注意,此查询涉及分区键hotel_id和一系列值,这些值表示我们在群集关键日期上搜索的开始和结束。
如果我们想尝试在AZ123酒店找到101号房间的记录,我们可能会尝试如下查询:
cqlsh:hotel> SELECT * FROM available_rooms_by_hotel_date WHERE hotel_id='AZ123' and room_number=101; InvalidRequest: code=2200 [Invalid query] message="PRIMARY KEY column "room_number" cannot be restricted as preceding column "date" is not restricted"如您所见,此查询会导致错误,因为我们尝试限制第二个群集键的值,而不限制第一个群集键的值。
WHERE子句的语法包含以下规则:
必须标识分区键的所有元素如果所有先前的群集密钥都受到限制,则只能限制给定的群集密钥这些限制基于Cassandra如何在磁盘上存储数据,该数据基于CREATE TABLE命令中指定的群集列和排序顺序。 聚类列上的条件仅限于允许Cassandra选择连续行排序的条件。
此规则的例外是ALLOW FILTERING关键字,它允许我们省略分区键元素。 例如,我们可以使用此查询在特定日期搜索多个酒店的房间状态以查找房间:
cqlsh:hotel> SELECT * FROM available_rooms_by_hotel_date WHERE date='2016-01-25' ALLOW FILTERING; hotel_id | date | room_number | is_available ----------+------------+-------------+-------------- AZ123 | 2016-01-25 | 101 | True AZ123 | 2016-01-25 | 102 | True AZ123 | 2016-01-25 | 103 | True AZ123 | 2016-01-25 | 104 | True AZ123 | 2016-01-25 | 105 | True NY229 | 2016-01-25 | 101 | True NY229 | 2016-01-25 | 102 | True NY229 | 2016-01-25 | 103 | True NY229 | 2016-01-25 | 104 | True NY229 | 2016-01-25 | 105 | True (10 rows)但是,不推荐使用允许过滤,因为它有可能导致非常昂贵的查询。如果您发现自己需要这样的查询,则需要重新访问数据模型,以确保设计了支持查询的表。
IN子句可用于测试与列的多个可能值的相等性。例如,我们可以使用以下命令通过命令查找每周两个日期的库存:
cqlsh:hotel> SELECT * FROM available_rooms_by_hotel_date WHERE hotel_id='AZ123' AND date IN ('2016-01-05', '2016-01-12');请注意,使用IN子句可能会导致查询性能降低,因为指定的列值可能对应于行中的非连续区域。
最后,SELECT命令允许我们覆盖在创建表时在列上指定的排序顺序。 例如,我们可以使用ORDER BY语法按日期降序获取我们之前任何查询的房间:
cqlsh:hotel> SELECT * FROM available_rooms_by_hotel_date WHERE hotel_id='AZ123' and date>'2016-01-05' and date<'2016-01-12' ORDER BY date DESC;Cassandra 2.2引入了两个功能,允许客户端将一些处理工作转移到协调器节点:用户定义的函数(UDF)和用户定义的聚合(UDA)。 使用这些功能可以通过减少必须返回到客户端的数据量并减少客户端上的处理负载来提高某些情况下的性能,但代价是服务器上的其他处理。
UDF是作为查询处理的一部分在Cassandra节点上应用于存储数据的函数。 在群集中使用UDF之前,必须在每个节点上的cassandra.yaml文件中启用它们:
enable_user_defined_functions: true以下是其工作原理的快速摘要:我们使用CQL CREATE FUNCTION命令创建UDF,该命令会将函数传播到集群中的每个节点。 当您执行引用UDF的查询时,它将应用于查询结果的每一行。
让我们创建一个示例UDF来计算available_rooms_by_hotel_date表中可用房间的数量:
cqlsh:hotel> CREATE FUNCTION count_if_true(input boolean) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE java AS 'if (input) return 1; else return 0;';我们将一次解剖这个命令。我们创建了一个名为count_if_true的UDF,它对布尔参数进行操作并返回一个整数。我们还包括一个空检查,以确保该函数在未定义值的情况下有效工作。请注意,如果UDF失败,则会中止查询的执行,因此这可能是一项重要的检查。
UDF安全3.0版本添加了一个安全功能,可以在单独的沙箱中运行UDF代码,以限制恶意函数未经授权访问节点的Java运行时的能力。
接下来,请注意我们已将此声明为具有LANGUAGE子句的Java实现。 Cassandra本身支持Java和JavaScript中定义的函数和聚合。它们也可以使用JSR 223中指定的Java Scripting API支持的任何语言实现,包括Python,Ruby和Scala。在这些语言中定义的函数需要向Cassandra的Java CLASSPATH添加其他脚本引擎JAR文件。
最后,我们在AS子句中包含函数的实际Java语法。现在这个函数本身就有些微不足道了,因为我们所做的就是将真值计算为1.我们将使用这个UDF做一些更强大的功能。
首先,让我们在available_rooms_by_hotel_date表上尝试我们的UDF,看看它是如何工作的:
cqlsh:hotel> SELECT room_number, count_if_true(is_available) FROM available_rooms_by_hotel_date WHERE hotel_id='AZ123' and date='2016-01-05'; room_number | hotel.count_if_true(is_available) -------------+----------------------------------- 101 | 1 102 | 1 103 | 1 104 | 1 105 | 1 (5 rows)如您所见,具有函数结果的列使用酒店键空间名称限定。这是因为每个UDF都与特定键空间相关联。如果我们要在DataStax Java驱动程序中执行类似的查询,我们会在每一行中找到名为hotel_count_if_true_is_available的列。
正如我们刚刚学到的,用户定义的函数在单行上运行。为了跨多行执行操作,我们创建了一个用户定义的聚合。 UDA利用两个UDF:状态函数和可选的最终函数。对每一行执行状态函数,而最终函数(如果存在)对状态函数的结果进行操作。
让我们看一个简单的例子来帮助研究它是如何工作的。首先,我们需要一个状态函数。 count_if_true函数接近我们需要的函数,但是我们需要进行一些小的更改以允许可用的计数在多行中求和。让我们创建一个新函数,允许传入总计,递增并返回:
cqlsh:hotel> CREATE FUNCTION state_count_if_true(total int, input boolean) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE java AS 'if (input) return total+1; else return total;';请注意,total参数作为第一个参数传递,其类型与函数的返回类型(int)匹配。 要将UDF用作状态函数,第一个参数类型和返回类型必须匹配。 第二个参数是我们在原始count_if_true UDF中的布尔值。
现在我们可以创建一个使用此状态函数的聚合:
cqlsh:hotel> CREATE AGGREGATE total_available (boolean) SFUNC state_count_if_true STYPE int INITCOND 0;让我们一块一块地分解这个陈述:首先,我们声明了一个名为total_available的UDA,它对boolean类型的列进行操作。
SFUNC子句标识此查询使用的状态函数 - 在本例中为state_count_if_true。
接下来,我们确定用于通过STYPE子句从状态函数累积结果的类型。 Cassandra维护这种类型的值,它传递给状态函数,因为它在每个连续的行上调用。 STYPE必须与state函数的第一个参数和返回类型相同。 INITCOND子句允许我们设置结果的初始值;在这里,我们将初始计数设置为零。
在这种情况下,我们选择省略最终函数,但我们可以包含一个函数,该函数接受STYPE的参数并返回任何其他类型,例如接受整数参数的函数并返回一个布尔值,指示是否库存处于低水平,应生成警报。
现在让我们使用我们的聚合来获取我们之前的一个查询返回的可用房间数。请注意,我们的查询必须只包含UDA,而不包含其他列或函数:
cqlsh:hotel> SELECT total_available(is_available) FROM available_rooms_by_hotel_date WHERE hotel_id='AZ123' and date='2016-01-05'; hotel.total_available(is_available) ------------------------------------- 5 (1 rows)如您所见,此查询会生成指定酒店和日期的五个可用房间的结果。
其他UDF / UDA命令选项在创建UDF和UDA时,您可以使用熟悉的IF NOT EXISTS语法,以避免尝试创建具有重复签名的函数和聚合的错误消息。 或者,您可以在实际打算覆盖当前函数或聚合时使用CREATE OR REPLACE语法。
使用DESCRIBE FUNCTIONS命令或DESCRIBE AGGREGATES命令了解已定义了哪些UDF和UDA。 当存在具有相同名称但具有不同签名的函数时,这尤其有用。
最后,您可以使用DROP FUNCTION和DROP AGGREGATE命令删除UDF和UDA。
除了用户定义的函数和聚合之外,Cassandra还提供了一些我们可以使用的内置函数或本机函数和聚合:
COUNT SELECT COUNT(*) FROM hotel.hotels;此命令还可用于计算指定列的非空值的数量。 例如,以下内容可用于计算提供电子邮件地址的访客数量:
SELECT COUNT(emails) FROM reservation.guests; MIN和MAXMIN和MAX函数可用于计算查询为给定列返回的最小值和最大值。 例如,此查询可用于确定在给定酒店和抵达日期预订的最短和最长逗留时间(以夜晚为单位):
SELECT MIN(nights), MAX(nights) FROM reservations_by_hotel_date WHERE hotel_id='AZ123' AND start_date='2016-09-09'; sumsum函数可用于汇总查询为给定列返回的所有值。 我们可以将多个预订的住宿天数相加如下:
SELECT SUM(nights) FROM reservations_by_hotel_date WHERE hotel_id='AZ123' AND start_date='2016-09-09'; 平均avg函数可用于计算查询为给定列返回的所有值的平均值。 要获得夜晚的平均逗留时间,我们可能会执行以下查询:
SELECT AVG(nights) FROM reservations_by_hotel_date WHERE hotel_id='AZ123' AND start_date='2016-09-09';这些内置聚合在技术上是系统键空间的一部分。 因此,包含上一次查询结果的列名称为system_avg_nights。
在Cassandra的早期版本中,客户必须确保一次小心地限制所请求的数据量。 对于大型结果集,即使到了内存不足的情况,也有可能压倒节点和客户端。
值得庆幸的是,Cassandra提供了一种分页机制,允许逐步检索结果集。 通过使用CQL关键字LIMIT显示了一个简单的示例。 例如,以下命令将返回不超过100个酒店:
cqlsh> SELECT * FROM hotel.hotels LIMIT 100;当然,LIMIT关键字(双关语)的限制是无法获得包含超出请求数量的额外行的其他页面。
Cassandra 2.0版本引入了一种称为自动分页的功能。 自动分页允许客户端请求查询返回的数据子集。 服务器将结果分解为在客户端请求它们时返回的页面。
您可以通过PAGING命令在cqlsh中查看分页状态。 以下输出显示了检查分页状态,更改提取大小(页面大小)和禁用分页的顺序:
cqlsh> PAGING; Query paging is currently enabled. Use PAGING OFF to disable Page size: 100 cqlsh> PAGING 1000; Page size: 1000 cqlsh> PAGING OFF; Disabled Query paging. cqlsh> PAGING ON; Now Query paging is enabled现在让我们看看如何在DataStax Java驱动程序中进行分页。 您可以为Cluster实例全局设置默认提取大小:
Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1"). withQueryOptions(new QueryOptions().setFetchSize(2000)).build();也可以在单个语句上设置提取大小,覆盖默认值:
Statement statement = new SimpleStatement("..."); statement.setFetchSize(2000);如果在语句上设置提取大小,则优先; 否则,将使用群集范围的值(默认为5,000)。 请注意,设置提取大小并不意味着Cassandra将始终返回所请求的确切行数; 它可能会稍微或多或少地返回结果。
驱动程序代表我们处理自动分页,允许我们迭代ResultSet而不需要知道分页机制。 例如,请考虑以下代码示例来迭代酒店的查询:
SimpleStatement hotelSelect = session.newSimpleStatement( "SELECT * FROM hotels"); ResultSet resultSet = session.execute(hotelSelect); for (Row row : resultSet) { // process the row }幕后发生的事情如下:当我们的应用程序调用session.execute()操作时,驱动程序向Cassandra执行查询,请求结果的第一页。 我们的应用程序迭代结果,如for循环中所示,当驱动程序检测到当前页面上没有剩余项目时,它会请求下一页。
请求下一页的小暂停可能会影响我们应用程序的性能和用户体验,因此ResultSet提供了额外的操作,允许对分页进行更细粒度的控制。 这是一个示例,说明我们如何扩展应用程序以对行进行预取:
for (Row row : resultSet) { if (resultSet.getAvailableWithoutFetching() < 100 && !resultSet.isFullyFetched()) resultSet.fetchMoreResults(); // process the row }此附加语句使用getAvailableWithoutFetching()检查当前页面上是否剩余少于100行。 如果有另一个要检索的页面,我们通过检查isFullyFetched()来确定,我们启动异步调用以通过fetchMoreResults()获取额外的行。
驱动程序还公开了更直接访问分页状态的能力,因此可以保存并在以后重用。 如果您的应用程序是无状态Web服务,而不支持跨多个调用的会话,那么这可能很有用。
我们可以通过ResultSet的ExecutionInfo访问分页状态
PagingState nextPage = resultSet.getExecutionInfo().getPagingState();然后我们可以在应用程序中保存此状态,或将其返回给客户端。 可以使用toString()将PagingState转换为字符串,或使用toBytes()将字节数组转换为字节数组。
请注意,在字符串或字节数组形式中,状态不应该尝试使用不同的语句来操作或重用。 这样做会导致分页状态异常。
要从给定的PagingState恢复查询,我们在Statement上设置它:
SimpleStatement hotelSelect = session.newSimpleStatement( "SELECT * FROM hotels"); hotelSelect.setPagingState(pagingState);我们之前在第8章中讨论过DataStax Java驱动程序提供的SpeculativeExecutionPolicy,如果初始节点在可配置的时间内没有响应,它会先使用不同的节点重试读取查询。
我们可以在每个节点上配置相同的行为,以便当节点充当协调器时,它可以启动对备用节点的推测请求。可以通过speculative_retry属性在每个表上配置此行为,该属性允许以下值:
ALWAYS 重试读取所有副本。 PERCENTILE 如果在第X百分位响应时间内未收到响应,则启动重试。ms 如果在Y毫秒内未收到响应,则重试。NONE 不要重试读取。默认值为99.0PERCENTILE。这通过加速“离群值”缓慢执行请求而不会使群集充满大量重复读取请求来实现良好的平衡。
此功能也称为快速读取保护,并在2.0.2版中引入。请注意,它对一致性级别ALL的查询没有影响,因为没有其他节点可以重试。
删除数据在Cassandra中与在关系数据库中不同。在RDBMS中,您只需发出一个delete语句来标识要删除的行。在Cassandra中,删除实际上不会立即删除数据。这有一个简单的原因:Cassandra的耐用,最终一致的分布式设计。如果Cassandra有一个传统的删除设计,那么在删除时关闭的任何节点都不会收到删除。一旦这些节点中的一个重新联机,就会错误地认为已经收到删除的所有节点实际上都错过了写入(因为它错过了删除而仍然存在的数据),并且它将开始修复所有节点其他节点。所以Cassandra需要一种更复杂的机制来支持删除。这种机制称为墓碑。
逻辑删除是在删除中发出的一种特殊标记,它覆盖已删除的值,充当占位符。如果任何副本没有收到删除操作,则墓碑可以在以后再次可用时传播到这些副本。此设计的净效果是您的数据存储在删除后不会立即缩小。每个节点都会跟踪其所有墓碑的年龄。一旦它们达到gc_grace_seconds(默认为10天)中配置的年龄,则运行压缩,对逻辑删除进行垃圾收集,并恢复相应的磁盘空间。
由于SSTable是不可变的,因此不会从SSTable中删除数据。在压缩时,会考虑逻辑删除,对合并数据进行排序,在排序数据上创建新索引,并将新合并,排序和索引的数据写入单个新文件。假设在压缩运行之前10天是足够的时间让故障节点重新联机。如果您觉得这样做很舒服,可以缩短宽限期以更快地回收磁盘空间。
在DataStax Java驱动程序中简单删除整行如下所示:
SimpleStatement hotelDelete = session.newSimpleStatement( "DELETE * FROM hotels WHERE id=?", "AZ123"); ResultSet hotelDeleteResult = session.execute(hotelDelete);您可以通过在查询中按名称标识非主键列来删除它们。
您还可以使用PreparedStatements,QueryBuilder和MappingManager删除数据。
以下是使用QueryBuilder删除整行的示例:
BuiltStatement hotelDeleteBuilt = queryBuilder.delete().all(). from("hotels").where(eq("id", "AZ123")); session.execute(hotelDeleteBuilt);**
删除的一致性级别由于删除是一种写入形式,因此可用于删除的一致性级别与为写入列出的一致性级别相同。
在本章中,我们了解了如何使用cqlsh和客户端驱动程序读取,写入和删除数据。 我们还在幕后了解了Cassandra如何实现这些操作,这有助于我们在使用Cassandra设计,实现,部署和维护应用程序时做出更明智的决策。