我们习惯使用驱动程序连接到关系数据库。例如,在Java中,JDBC是一种API,它抽象关系数据库的供应商实现,以呈现使用语句,PreparedStatements,ResultSet等存储和检索数据的一致方法。要与数据库进行交互,您将获得一个与您正在使用的特定数据库一起使用的驱动程序,例如Oracle,SQL Server或MySQL;这种交互的实现细节对开发人员是隐藏的。通常为各种编程语言提供驱动程序以连接到各种各样的数据库。
Cassandra还有许多客户端驱动程序,包括对大多数流行语言的支持。这些客户端的好处在于,您可以轻松地将它们嵌入到您自己的应用程序中(我们将看到如何操作),并且它们经常提供比CQL本机接口更多的功能,包括连接池和JMX集成和监视。在以下部分中,我们将了解可用的各种客户端及其提供的功能。
在Cassandra的早期,社区为不同语言制作了许多客户端驱动程序。这些贡献是Cassandra采用的关键推动因素。一些最着名的早期车手包括Hector和Astyanax。
赫克托尔以希腊神话中的特洛伊战士卡桑德拉的兄弟命名,是卡桑德拉最早的客户之一。 Hector提供了一个简单的Java接口,帮助许多早期开发人员避免了写入Thrift API的挑战,并成为其他几个驱动程序的灵感来源。该项目不再处于活动状态,但您可以通过https://github.com/hector-client/hector访问该项目。
Astyanax是一个Java客户端,最初由Netflix在Thrift API之上构建,作为Hector驱动程序的合理继承者(Astyanax是Hector的儿子)。一旦引入了DataStax Java驱动程序,除了原始的Thrift实现之外,Netflix还使用Astyanax来支持Java驱动程序。这有助于许多用户从Thrift过渡到CQL。然而,随着Java驱动程序的突出,Astyanax上的活动大大减缓,该项目于2016年2月退役。您仍然可以通过https://github.com/Netflix/astyanax访问该项目。
其他客户包括用于Python的Pycassa,用于Perl的Perlcassa,用于Node.js的Helenus,以及用于Microsoft .NET框架和C#的Cassandra-Sharp。大多数这些客户端不再主动维护,因为它们基于现已弃用的Thrift接口。您可以在http://www.planetcassandra.org/client-drivers-tools找到当前和旧版驱动程序的完整列表。
CQL的引入推动了Cassandra客户驱动程序的重大转变。 CQL的简单和熟悉的语法使得客户端程序的开发类似于传统的关系数据库驱动程序。 DataStax对Java和其他几种语言的开源驱动程序进行了战略投资,以继续推动Cassandra的采用。这些驱动因素很快成为新开发项目的事实标准。您可以访问https://github.com/datastax以访问驱动程序以及其他连接器和工具。
有关DataStax驱动程序的更多信息访问驱动程序矩阵页面以访问文档并确定与您的服务器版本兼容的驱动程序版本。
DataStax Java驱动程序是这些驱动程序中最古老,最成熟的驱动程序。出于这个原因,我们将专注于使用Java驱动程序,并以此为契机,了解DataStax驱动程序跨多种语言提供的功能。
首先,我们需要在开发环境中访问驱动程序。我们可以直接从之前列出的URL下载驱动程序并手动管理依赖项,但在现代Java开发中使用Maven等工具来管理依赖项更为常见。如果您正在使用Maven,则需要在项目pom.xml文件中添加以下内容:
<dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-core</artifactId> <version>3.0.0</version> </dependency>您可以在http://docs.datastax.com/en/drivers/java/3.0/index.html找到Java驱动程序的Javadoc。 或者,Javadoc也是源代码分发的一部分。
所有DataStax驱动程序都作为GitHub上的开源项目进行管理。 如果您对查看Java驱动程序源感兴趣,可以使用以下命令获取只读中继版本:
$ git clone https://github.com/datastax/java-driver.git一旦我们配置了环境,我们就可以开始编码了。 我们将根据我们在第5章中创建的酒店数据模型创建一个客户端应用程序。本章和本书其余部分中使用的所有源代码都可以在https://github.com/jeffreyscarpenter/cassandra-上找到。 指南。
要开始构建我们的应用程序,我们将使用驱动程序的API连接到我们的集群。 在Java驱动程序中,它由com.datastax.driver.core.Cluster和Session类表示。
Cluster类是驱动程序的主要入口点。 它使用构建器模式支持流畅的API。 例如,以下行创建与本地主机上运行的节点的连接:
Cluster cluster = Cluster.builder(). addContactPoint("127.0.0.1").build();这一个语句表示创建集群所需的最少信息:单个联系点。我们还可以指定多个联系点。接触点类似于Cassandra节点用于连接到同一群集中的其他节点的概念种子节点。
创建自定义群集初始化程序Cluster.Builder类实现了一个名为Cluster.Initializer的接口。这允许我们使用静态方法Cluster.buildFrom(Initializer initializer)插入一个不同的机制来初始化Cluster。例如,如果我们想要从配置文件加载连接信息,这可能很有用。
我们可以在群集上配置其他几个选项,例如度量标准,默认查询选项以及重新连接,重试和推测执行的策略。在我们看一些其他与连接相关的选项后,我们将在后面的章节中研究这些选项中的每一个:协议版本,压缩和身份验证。
该驱动程序支持多个版本的CQL本机协议。 Cassandra 3.0支持第4版,正如我们在第2章中对Cassandra发布历史的概述中所学到的。
默认情况下,驱动程序使用其连接的第一个节点支持的协议版本。虽然在大多数情况下这已足够,但如果您正在使用基于旧版Cassandra的群集,则可能需要覆盖此行为。您可以通过将com.datastax.driver.core.ProtocolVersion枚举中的所需值传递给Cluster.Builder。withProtocolVersion()操作来选择协议版本。
该驱动程序提供了在客户端和Cassandra节点之间压缩消息的选项,利用CQL本机协议支持的压缩选项。启用压缩可以减少驱动程序消耗的网络带宽,但代价是客户端和服务器的CPU使用量会增加。
目前有两种压缩算法,LZ4和SNAPPY,由com.datastax.driver.core.ProtocolOptions.Compression枚举定义。压缩默认为NONE,但可以通过调用Cluster.Builder .withCompression()操作来覆盖。
该驱动程序提供可插入的身份验证机制,可用于支持简单的用户名/密码登录,或与其他身份验证系统集成。默认情况下,不执行身份验证。您可以通过将com.datastax。驱动程序.core.AuthProvider接口(如PlainTextAuthProvider)的实现传递给Cluster.Builder.withAuthProvider()操作来选择身份验证提供程序。
驱动程序还可以加密与服务器的通信以确保隐私。客户端 - 服务器加密选项由其cassandra.yaml文件中的每个节点指定。驱动程序符合每个节点指定的加密设置。
我们将在第13章中更详细地从客户端和服务器角度检查身份验证,授权和加密。
在我们创建Cluster实例之后,在我们通过调用init()方法初始化它之前,它没有连接到任何Cassandra节点:
cluster.init();调用此方法时,驱动程序将连接到其中一个已配置的联系点,以获取有关群集的元数据。 如果没有可用的联系点,此操作将抛出NoHostAvailableException,如果验证失败,则抛出Authentication Exception。 我们将在第13章中更详细地讨论身份验证。
一旦我们初始化了Cluster对象,我们就需要建立一个会话来制定我们的查询。 我们可以通过调用Cluster.connect()操作之一来获取com.datastax.driver.core.Session对象。 您可以选择提供要连接的键空间的名称,就像我们在此示例中连接到酒店键空间一样:
Session session = cluster.connect("hotel");还有一个没有参数的connect()操作,它创建了一个可以与多个键空间一起使用的Session。如果选择此选项,则必须使用适当的键空间名称限定查询中的每个表引用。请注意,并不严格要求显式调用Cluster.init(),因为当我们调用connect()时也会在后台调用它。
每个会话管理与Cassandra集群的连接,这些集群用于使用Cassandra本机协议执行查询和控制操作。会话包含每个主机的TCP连接池。
会话很贵由于会话维护多个节点的连接池,因此它是一个相对重量级的对象。在大多数情况下,您需要创建一个会话并在整个应用程序中重复使用它,而不是不断地构建和拆除Sessions。另一个可接受的选择是,如果您的应用程序正在访问多个键空间,则为每个键空间创建一个Session。
由于CQL本机协议是异步的,因此每个连接允许多个同时请求;协议v2中最多为128个并发请求,而v3和v4允许最多32,768个同时请求。由于同时发出的请求数量较多,因此每个节点的连接数量较少。实际上,默认值是每个节点一个连接。
该驱动程序支持根据每个连接的请求数量向上或向下扩展连接数的功能。这些连接池设置可通过PoolingOptions类进行配置,该类设置用于本地和远程主机的最大和最小(或“核心”)连接数。如果核心值和最大值不同,则驱动程序会根据客户端发出的请求数量向上或向下扩展每个节点的连接池大小。每个连接的最小和最大请求阈值的设置用于确定何时创建新连接,以及何时可以回收未充分利用的连接。还有一个缓冲期可以防止连续建立和拆除连接。
使用ClusterBuilder创建集群时可以设置PoolingOptions。withPoolingOptions(),或者在使用Cluster创建集群后进行操作.getConfiguration()。getPoolingOptions()。 以下是创建群集的示例,该群集将远程节点的最大连接数限制为一个:
PoolingOptions poolingOptions = new PoolingOptions(). setMaxConnectionsPerHost(HostDistance.REMOTE, 1); Cluster cluster = Cluster.builder(). addContactPoint("127.0.0.1"). withPoolingOptions(poolingOptions).build();驱动程序提供连接心跳,用于确保通过介入网络设备不会过早关闭连接。默认为30秒,但可以使用PoolingOptions.setHeartbeat IntervalSeconds()操作覆盖。但是,这仅适用于设置值后建立的连接,因此您很可能希望在创建群集时对其进行配置。
到目前为止,我们只配置了与集群的连接,并且尚未执行任何读取或写入操作。要开始做一些真正的应用程序工作,我们将使用com.datastax.driver.core.Statement类及其各种子类创建和执行语句。 Statement是一个具有多个实现的抽象类,包括SimpleStatement,PreparedStatement,BoundStatement,BatchStatement和BuiltStatement。
创建和执行语句的最简单方法是使用表示语句的字符串调用Session.execute()操作。这是一个声明的示例,它将返回酒店表的全部内容:
session.execute("SELECT * from hotel.hotels");此语句在单个方法调用中创建并执行查询。实际上,这可能会成为在大型数据库中执行的非常昂贵的查询,但它确实是一个非常简单的查询的有用示例。我们需要构建的大多数查询都会更复杂,因为我们将有指定的搜索条件或要插入的特定值。我们当然可以使用Java的各种字符串实用程序来手动构建查询的语法,但这当然容易出错。如果我们不小心清理来自最终用户的字符串,它甚至可能会将我们的应用程序暴露给注入攻击。
值得庆幸的是,我们不需要对自己如此努力。 Java驱动程序提供SimpleStatement类以帮助构造参数化语句。事实证明,我们之前看到的execute()操作实际上是一种创建SimpleStatement的便捷方法。
让我们尝试通过询问Session对象来创建SimpleStatement来构建查询。这是一个语句的示例,它将在我们的hotels表中插入一行,然后我们可以执行:
SimpleStatement hotelInsert = session.newSimpleStatement( "INSERT INTO hotels (hotel_id, name, phone) VALUES (?, ?, ?)", "AZ123", "Super Hotel at WestWorld", "1-888-999-9999"); session.execute(hotelInsert);调用的第一个参数是查询的基本语法,指示我们感兴趣的表和列。问号用于指示我们将在其他参数中提供的值。 我们使用简单的字符串来保存酒店ID,姓名和电话号码的值。
如果我们正确创建了语句,插入将成功执行(并且静默)。 现在让我们创建另一个语句来回读刚刚插入的行:
SimpleStatement hotelSelect = session.newSimpleStatement( "SELECT * FROM hotels WHERE id=?", "AZ123"); ResultSet hotelSelectResult = session.execute(hotelSelect);同样,我们使用参数化来为我们的搜索提供ID。 这次,当我们执行查询时,我们确保接收从execute()方法返回的ResultSet。 我们可以迭代ResultSet返回的行,如下所示:
for (Row row : hotelSelectResult) { System.out.format("hotel_id: %s, name: %s, phone: %s\n", row.getString("hotel_id"), row.getString("name"), row.getString("phone")); }此代码使用ResultSet.iterator()选项在结果集中的行上获取迭代器,并在每行上循环,打印出所需的列值。 请注意,我们使用特殊访问器来获取每列的值,具体取决于所需的类型 - 在本例中为Row.getString()。 正如我们所料,这将打印出如下结果:
hotel_id: AZ123, name: Super Hotel at WestWorld, phone: 1-888-999-9999 使用自定义编解码器正如我们已经指出的那样,我们需要知道在与ResultSet中的Rows交互时我们请求的列的类型。如果我们使用Row.getString()请求id列,我们将收到CodecNotFoundException,表明驱动程序不知道如何将CQL类型uuid映射到java.lang。字符串。
这里发生的是驱动程序维护Java和CQL类型之间的默认映射列表,称为编解码器,它用于在应用程序和Cassandra之间来回转换。驱动程序通过扩展类com.datastax.driver.core.TypeCodec 并使用Cluster管理的CodecRegistry注册它来提供添加其他映射的方法:
cluster.getConfiguration().getCodecRegistry(). register(myCustomCodec)自定义编解码器机制非常灵活,如以下用例所示:
映射到备用日期/时间格式(例如,Java 8之前用户的Joda时间)将字符串数据映射到XML和JSON等格式将列表,集和映射映射到各种Java集合类型您可以在示例com.cassandraguide.clients.SimpleStatementExample中找到用于处理SimpleStatements的代码示例。 异步执行
Session.execute()操作是同步的,这意味着它会一直阻塞,直到获得结果或发生错误,例如网络超时。 该驱动程序还提供异步executeAsync()操作以支持与Cassandra的非阻塞交互。 这些非阻塞请求可以使并行发送多个查询更加简单,从而提高客户端应用程序的性能。
让我们从之前进行操作并修改它以使用异步操作:
ResultSetFuture result = session.executeAsync(statement);结果是ResultSetFuture类型,它是java.util.concurrent.Future接口的实现。 Future是一种Java泛型类型,用于捕获异步操作的结果。可以检查每个Future以查看操作是否已完成,然后根据绑定类型查询操作结果。还有阻塞等待结果的wait()操作。如果调用者不再对操作结果感兴趣,也可以取消Future。 Future类是实现异步编程模式的有用工具,但需要阻塞或轮询才能等待操作完成。
为了解决这个缺点,Java驱动程序利用了Google的Guava框架中的ListenableFuture接口。 ListenableFuture接口扩展了Future,并添加了一个addListener()操作,允许客户端注册Future完成时调用的回调方法。回调方法在由驱动程序管理的线程中调用,因此重要的是该方法快速完成以避免占用驱动程序资源。 ResultSetFuture绑定到ResultSet类型。
额外的异步操作除了Session.executeAsync()操作之外,驱动程序还支持其他几个异步操作,包括Cluster.closeAsync(),Session.prepareAsync()和对象映射器上的几个操作。
虽然SimpleStatements对于创建即席查询非常有用,但大多数应用程序倾向于重复执行同一组查询。 PreparedStatement旨在更有效地处理这些查询。语句的结构一次性发送到节点进行准备,并返回该语句的句柄。要使用预准备语句,只需要发送句柄和参数。
在构建应用程序时,通常会创建PreparedStatements以读取数据,这些数据对应于您在数据模型中派生的每种访问模式,以及其他用于将数据写入表以支持这些访问模式的数据。
让我们使用Session.prepare()操作创建一些PreparedStatements来表示与以前相同的酒店查询:
PreparedStatement hotelInsertPrepared = session.prepare( "INSERT INTO hotels (hotel_id, name, phone) VALUES (?, ?, ?)"); PreparedStatement hotelSelectPrepared = session.prepare( "SELECT * FROM hotels WHERE hotel_id=?");请注意,PreparedStatement使用我们之前用于SimpleStatement的相同参数化语法。然而,一个关键的区别是PreparedStatement不是Statement的子类型。这可以防止尝试将未绑定的PreparedStatement传递给要执行的会话时出错。
然而,在我们开始之前,让我们退后一步,讨论Session.prepare()操作幕后发生的事情。驱动程序将PreparedStatement的内容传递给Cassandra节点,并获取该语句的唯一标识符。创建BoundStatement时会引用此唯一标识符。如果您很好奇,可以通过调用PreparedStatement.getPreparedID()来实际看到此引用。
您可以将PreparedStatement视为用于创建查询的模板。除了指定查询的形式之外,我们还可以在PreparedStatement上设置其他属性,这些属性将用作其用于创建的语句的默认值,包括默认一致性级别,重试策略和跟踪。
除了提高效率之外,PreparedStatements还通过将CQL的查询逻辑与数据分离来提高安全性。这提供了针对注入攻击的保护,其试图将命令嵌入到数据字段中以便获得未经授权的访问。
现在我们的PreparedStatement可供我们用来创建查询。为了使用PreparedStatement,我们通过调用bind()操作将它与实际值绑定。例如,我们可以绑定我们之前创建的SELECT语句,如下所示:
BoundStatement hotelSelectBound = hotelSelectPrepared.bind("AZ123");我们在这里使用的bind()操作允许我们提供匹配PreparedStatement中每个变量的值。 可以提供前n个绑定值,在这种情况下,必须在执行语句之前单独绑定其余值。 还有一个版本的bind(),它不带参数,在这种情况下,所有参数必须单独绑定。 BoundStatement提供了几个set()操作,可用于绑定不同类型的值。 例如,我们可以从上面获取INSERT预处理语句,并使用setString()操作绑定名称和电话值:
BoundStatement hotelInsertBound = hotelInsertPrepared.bind("AZ123"); hotelInsertBound.setString("name", "Super Hotel at WestWorld"); hotelInsertBound.setString("phone", "1-888-999-9999");一旦我们绑定了所有值,我们就使用Session.execute()执行BoundStatement。如果我们未能绑定任何值,如果正在使用协议v4(Cassandra 3.0或更高版本),它们将在服务器端被忽略。旧协议版本的驱动程序行为是,如果存在任何未绑定的值,则抛出IllegalStateException。
您可以在示例com.cassandraguide.clients.PreparedStatementExample中找到用于处理PreparedStatement和BoundStatement的代码示例。
该驱动程序还提供了com.datastax.driver.core.querybuilder.QueryBuilder类,该类为构建查询提供了流畅的API。这适用于查询结构存在差异的情况(例如可选参数),这会使PreparedStatements变得困难。与PreparedStatement类似,它还提供一些防止注入攻击的保护。
我们使用一个带有Cluster对象的简单构造函数构造一个QueryBuilder:
QueryBuilder queryBuilder = new QueryBuilder(cluster);QueryBuilder生成使用BuiltStatement类及其子类表示的查询。 每个类的方法返回BuiltStatement的实例,这些实例表示在构建时添加到查询的内容。 在构建查询时,您可能会发现IDE非常有用,可帮助您识别允许的操作。
让我们在使用QueryBuilder之前重现查询,看看它是如何工作的。 首先,我们将构建一个CQL INSERT查询:
BuiltStatement hotelInsertBuilt = queryBuilder.insertInto("hotels") .value("hotel_id", "AZ123") .value("name", "Super Hotel at WestWorld") .value("phone", "1-888-999-9999");第一个操作调用QueryBuilder.insertInto()操作为hotels表创建一个Insert语句。 如果需要,我们可以使用Insert.using()向我们的语句添加CQL USING子句,但我们选择开始向查询添加值。 在我们添加值时,Insert.value()操作继续返回Insert语句。 生成的Insert可以像使用Session.execute()或executeAsync()的任何其他Statement一样执行。
CQL SELECT命令的构造类似于:
BuiltStatement hotelSelectBuilt = queryBuilder.select() .all() .from("hotels") .where(eq("hotel_id", "AZ123"));对于此查询,我们调用QueryBuilder.select()来创建Select语句。 我们使用Select.all()操作来选择所有列,尽管我们也可以使用column()操作来选择特定的列。 我们通过Select.where()操作添加一个CQL WHERE子句,该操作接受Clause类的一个实例。 我们使用QueryBuilder提供的静态操作创建Clauses。 在这种情况下,我们使用eq()操作来检查与ID的相等性。
要访问这些静态操作,我们需要在Java源文件中添加其他import语句,例如:
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;有关使用QueryBuilder和BuiltStatement的完整代码示例,请参阅com.cassandraguide.clients.QueryBuilderExample类。
我们已经探索了几种使用驱动程序创建和执行查询语句的技术。 我们将看到一种最终技术,它提供了更多的抽象。 Java驱动程序提供了一个对象映射器,使您可以专注于开发域模型(或API上使用的数据类型)并与之交互。 对象映射器使用源代码中的注释,用于将Java类映射到表或用户定义类型(UDT)。
对象映射API作为与cassandra-driver-mapping.jar文件中驱动程序其余部分的单独库提供,因此您需要包含此额外的Maven依赖项才能在项目中使用Mapper:
<dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-mapping</artifactId> <version>3.0.0</version> </dependency>例如,让我们创建并注释与我们的酒店表对应的酒店域模型类:
import com.datastax.driver.mapping.annotations.Column; import com.datastax.driver.mapping.annotations.PartitionKey; import com.datastax.driver.mapping.annotations.Table; @Table(keyspace = "hotel", name = "hotels") public class Hotel { @PartitionKey private String id; @Column (name = "name") private String name; @Column (name = "phone") private String phone; @Column (name = "address") private String address; @Column (name = "pois") private Set<String> pointsOfInterest; // constructors, get/set methods, hashcode, equals }现在我们使用com.datastax.driver.mapping.MappingManager附加到Session并为我们的带注释的域模型类创建一个Mapper:
MappingManager mappingManager = new MappingManager(session); Mapper<Hotel> hotelMapper = MappingManager.mapper(Hotel.class);我们假设酒店类有一个简单的构造函数,只需要一个UUID,名称和电话号码,我们将使用它来创建一个我们可以使用对象映射器保存的简单酒店:
Hotel hotel = new Hotel("AZ123", "Super Hotel at WestWorld”, "1-888-999-9999"); hotelMapper.save(hotel);我们需要执行Mapper.save()操作来执行CQL INSERT或UPDATE,因为这些操作与Cassandra完全相同。 Mapper代表我们构建并执行语句。
要检索对象,我们使用Mapper.get()操作,传入一个与分区键元素匹配的参数列表:
Hotel retrievedHotel = hotelMapper.get(hotelId);删除对象的语法类似:
hotelMapper.delete(hotelId);与save()操作一样,get()和delete()完全处理代表我们使用驱动程序执行语句的细节。还有saveAsync(),getAsync()和deleteAsync()操作,它们使用我们前面讨论过的ListenableFuture接口支持异步执行。
如果您希望能够在执行查询之前配置查询,Mapper上还会返回返回语句的操作:saveQuery(),getQuery()和deleteQuery()。
对象映射器是一个有用的工具,用于抽象与代码交互的一些细节,特别是如果您有现有的域模型。如果您的域模型包含引用其他类的类,则可以使用@UDT注释将引用的类注释为用户定义的类型。对象映射器使用带注释的类型递归处理对象。
Achilles:高级对象映射器DuyHai Doan为Java开发了一种名为Achilles的高级对象映射器。 Achilles支持更高级的功能,例如复杂的键映射,轻量级事务,用户定义的函数等。您可以访问https://github.com/doanduyhai/Achilles查看。
Java驱动程序提供了几个策略接口,可用于调整驱动程序的行为。其中包括负载平衡,重试请求和管理群集中节点的连接的策略。
正如我们在第6章中学到的,可以对集群中的任何节点进行查询,然后将其称为该查询的协调节点。根据查询的内容,协调器可以与其他节点通信以满足查询。如果客户端要在同一节点上引导其所有查询,则会在集群上产生不平衡负载,尤其是在其他客户端执行相同操作的情况下。
为了解决这个问题,驱动程序提供了一个可插拔的机制来平衡多个节点之间的查询负载。通过选择com.datastax.driver.core.policies.LoadBalancing策略接口的实现来实现负载平衡。
每个LoadBalancingPolicy必须提供distance()操作,以根据HostDistance枚举将群集中的每个节点分类为本地,远程或忽略。驱动程序更喜欢与本地节点的交互,并且与远程节点保持与本地节点的更多连接。另一个关键操作是newQueryPlan(),它按照应该查询的顺序返回节点列表。 LoadBalancingPolicy接口还包含用于在添加或删除节点或向上或向下移动节点时通知策略的操作。这些操作有助于策略避免在查询计划中包含关闭或删除的节点。
该驱动程序提供了两种基本的负载平衡实现:RoundRobin Policy(默认值)和DCAwareRoundRobinPolicy。
RoundRobinPolicy以重复模式跨集群中的节点分配请求以分散处理负载。 DCAwareRoundRobinPolicy类似,但将其查询计划集中在本地数据中心的节点上。此策略可以在远程数据中心中添加可配置数量的节点来查询计划,但远程节点将始终优先于本地节点。可以显式标识本地数据中心,也可以允许驱动程序自动发现它。
第二种模式是令牌感知,其使用分区密钥的令牌值以选择作为所需数据的副本的节点,从而最小化必须查询的节点的数量。这是通过使用TokenAwarePolicy包装所选策略来实现的。
LoadBalancingPolicy在构建时在群集上设置。例如,以下语句将初始化群集以具有令牌感知并优先选择本地数据中心中的节点:
Cluster.builder().withLoadBalancingPolicy( new TokenAwarePolicy(new DCAwareRoundRobinPolicy.Builder().build());当Cassandra节点发生故障或无法访问时,驱动程序会自动并透明地尝试其他节点并安排重新连接到后台中的死节点。由于网络条件的临时更改也会使节点显示为脱机,因此驱动程序还提供了一种机制来重试因网络相关错误而失败的查询。这消除了在客户端代码中编写重试逻辑的需要。
驱动程序根据com.datastax.driver.core.RetryPolicy接口提供的实现重试失败的查询。 onReadTimeout(),onWriteTimeout()和onUnavailable()操作定义了查询失败时应采取的行为,分别包含与网络相关的异常ReadTimeoutException,WriteTimeoutException或UnavailableException。
DataStax Java驱动程序异常可以由Java驱动程序生成的各种异常和错误收集在com.datastax.driver.core.exceptions包中。
RetryPolicy操作返回RetryDecision,它指示是否应该重试查询,如果是,则指示在什么一致性级别。如果未重试异常,则可以重新抛出或忽略该异常,在这种情况下,查询操作将返回空ResultSet。
Java驱动程序提供了几个RetryPolicy实现:
DefaultRetryPolicy是一种保守的实现,仅在一组较窄的条件下重试查询。FallthroughRetryPolicy从不建议重试,始终建议重新抛出异常。DowngradingConsistencyRetryPolicy是一种更积极的策略,它会降低所需的一致性级别,以尝试使查询成功。 关于DowngradingConsistencyRetryPolicy的一个词此策略附带警告:如果您愿意在某些情况下接受降级的一致性级别,您是否真的需要更高的一致性级别的一致性级别?
可以在构建时在集群上设置RetryPolicy,如以下语句所示,该语句选择DowngradingConsistencyRetryPolicy并使用LoggingRetryPolicy对其进行包装,以便记录每次重试尝试:
Cluster.builder().withRetryPolicy(new LoggingRetryPolicy( DowngradingConsistencyRetryPolicy.INSTANCE));除非通过Statement.setRetryPolicy()操作覆盖任何单个查询,否则群集上的RetryPolicy将用于在该群集上执行的所有查询。
虽然拥有一个自动响应网络超时的重试机制非常棒,但我们通常无法等待超时甚至长时间的垃圾收集暂停。为了加快速度,驱动程序提供了一种推测执行机制。如果查询的原始协调器节点未能以预定间隔响应,则驱动程序抢先地针对不同的协调器节点开始另外执行查询。当其中一个查询返回时,驱动程序提供该响应并取消任何其他未完成的查询。
通过指定com.datastax.driver.core.policies.SpeculativeExecutionPolicy的实现,在集群上设置推测执行行为。
默认值为NoSpeculativeExecutionPolicy,它不会安排任何推测性执行。还有一个ConstantSpeculativeExecutionPolicy,它以最多毫秒的固定延迟计划最多重试次数。 PercentileSpeculativeExecutionPolicy是一个较新的策略,从3.0驱动程序版本开始仍被视为Beta。它会根据观察到的原始协调节点的延迟,在延迟时触发推测性执行。
使用Cluster.Builder设置策略,例如:
Cluster.builder().withSpeculativeExecutionPolicy( new ConstantSpeculativeExecutionPolicy ( 200, // delay in ms 3 // max number of speculative executions );以后不能更改此策略,也不能在单个语句上覆盖此策略。
在我们到目前为止看到的示例中,每个节点都由在其cassandra.yaml文件中配置为节点的rpc_address的IP地址标识。在某些部署中,客户端可能无法访问该地址。为了处理这种情况,驱动程序提供了一个可插入的功能,可以通过com.datastax.driver.core.policy.AddressTranslator接口转换地址(在3.0之前的驱动程序版本中,“translator”在整个过程中拼错为“translater”) API)。
例如,Java驱动程序附带IdentityTranslator,一个保留IP地址不变的默认转换程序,以及EC2MultiRegionAddressTranslator,它对Amazon EC2环境很有用。在客户端可能需要通过公共IP地址访问另一个数据中心中的节点的情况下,此转换器非常有用。我们将在第14章中更详细地讨论EC2部署。
要访问集群元数据,我们调用Cluster.getMetadata()方法。 com.datastax.driver.core.Metadata类提供有关群集的信息,包括群集名称,包含键空间和表的模式以及群集中的已知主机。我们可以通过以下代码获取集群的名称:
Metadata metadata = cluster.getMetadata(); System.out.printf("Connected to cluster: %s\n", metadata.getClusterName(), cluster.getClusterName()); 分配群集名称有点令人困惑的是,Cluster.Builder类允许我们在构建Cluster实例时为其分配名称。此名称实际上只是客户端跟踪多个Cluster对象的一种方式,可以与实际Cassandra集群中的节点所知的名称不同。第二个集群名称是我们通过Metadata类获得的名称。
如果我们在构造时没有为Cluster指定名称,则会为其分配一个默认名称,例如“cluster1”,“cluster2”等等(如果创建了多个群集)。如果从之前修改示例,将metadata.getClusterName()更改为cluster.getClusterName(),则可以看到此值。 节点发现
Cluster对象维护与其中一个联系点的永久连接,它用于维护有关群集状态和拓扑的信息。使用此连接,驱动程序将发现当前群集中的所有节点。驱动程序使用com.datastax.driver.core.Host类来表示每个节点。以下代码显示了迭代主机以打印其信息的示例:
for (Host host : cluster.getMetadata.getAllHosts()) { System.out.printf("Data Center: %s; Rack: %s; Host: %s\n", host.getDatacenter(), host.getRack(), host.getAddress()); }您可以在com.cassandraguide.clients.Simple类连接示例中找到此代码。
如果我们使用Cassandra Cluster Manager(ccm)运行多节点集群(例如我们在第7章中创建的集群),则此程序的输出将如下所示:
Connected to cluster: my_cluster Data Center: datacenter1; Rack: rack1; Host: /127.0.0.1 Data Center: datacenter1; Rack: rack1; Host: /127.0.0.2 Data Center: datacenter1; Rack: rack1; Host: /127.0.0.3使用该连接,驱动程序还可以发现当前群集中的所有节点。 驱动程序还可以检测何时将新节点添加到群集。 您可以通过实现Host.StateListener接口来注册侦听器。 这需要我们实现几个操作,例如onAdd()和onRemove(),它们在从集群添加或删除节点时调用,以及onUp()和onDown(),它们指示节点何时上升或下降。 让我们看一下在集群中注册监听器的示例类的一部分:
public class ConnectionListenerExample implements Host.StateListener { public String getHostString(Host host) { return new StringBuilder("Data Center: " + host.getDatacenter() + " Rack: " + host.getRack() + " Host: " + host.getAddress().toString() + " Version: " + host.getCassandraVersion() + " State: " + host.getState()); } public void onUp(Host host) { System.out.printf("Node is up: %s\n", getHostString(host)); } public void onDown(Host host) { System.out.printf("Node is down: %s\n", getHostString(host)); } // other required methods omitted... public static void main(String[] args) { List<Host.StateListener> list = ArrayList<Host.StateListener>(); list.add(new ConnectionListenerExample()); Cluster cluster = Cluster.builder(). addContactPoint("127.0.0.1"). withInitialListeners(list). build(); cluster.init(); } }此代码只是在节点上升或下降时打印出状态消息。 您将注意到,我们使用了比前一个示例更多的有关每个节点的信息,包括每个节点使用的Cassandra版本。 您可以在com.cassandraguide.clients.ConnectionListenerExample类中找到完整的代码清单。
我们来运行这个示例程序。 因为在调用init()之前添加了我们的监听器,所以我们立即得到以下输出:
Node added: Data Center: datacenter1 Rack: rack1 Host: /127.0.0.1 Version: 3.0.0 State: UP Node added: Data Center: datacenter1 Rack: rack1 Host: /127.0.0.2 Version: 3.0.0 State: UP Node added: Data Center: datacenter1 Rack: rack1 Host: /127.0.0.3 Version: 3.0.0 State: UP现在让我们使用ccm stop命令关闭我们的一个节点,我们将看到如下内容:
Node is down: Data Center: datacenter1 Rack: rack1 Host: /127.0.0.1 Version: 3.0.0 State: DOWN同样,如果我们重新启动节点,我们会看到节点重新联机的通知:
Node is up: Data Center: datacenter1 Rack: rack1 Host: /127.0.0.1 Version: 3.0.0 State: UPMetadata类还允许客户端了解群集中的架构。 exportSchemaAsString()操作创建一个String,描述集群中定义的所有键空间和表,包括系统键空间。 此输出等同于cqlsh命令DESCRIBE FULL SCHEMA。 其他操作支持浏览各个键空间和表的内容。
我们之前已经在第2章讨论了Cassandra对最终一致性的支持。因为模式信息本身是使用Cassandra存储的,所以它最终也是一致的,因此不同节点可能有不同版本的模式。 从3.0版本开始,Java驱动程序不会直接公开架构版本,但您可以通过运行nodetool describecluster命令来查看示例:
$ ccm node1 nodetool describecluster Cluster Information: Name: test_cluster Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch Partitioner: org.apache.cassandra.dht.Murmur3Partitioner Schema versions: ea46580a-4ab4-3e70-b68f-5e57da189ac5: [127.0.0.1, 127.0.0.2, 127.0.0.3]这个输出向我们展示了一些东西。首先,我们看到模式版本是UUID值。该值是基于节点知道的所有键空间和表定义的散列计算的。所有三个节点共享相同的模式版本的事实意味着它们都具有定义的相同模式。
当然,在创建,更改和删除键空间和表时,使用的模式版本可能会随着时间的推移而发生变化。驱动程序为客户端提供通知机制,通过向群集注册com.datastax.driver.core.SchemaChangeListener来了解这些更改。
您可以通过运行示例com.cassandraguide.clients.SimpleSchemaExample来查找这些调用的示例。
除了我们刚刚在Metadata类中检查的模式访问之外,Java驱动程序还提供了一个用于管理com.datastax.driver.core.schemabuilder包中的模式的工具。 SchemaBuilder提供了一种流畅的API,用于创建SchemaStatements,表示对键空间,表,索引和用户定义类型(UDT)的CREATE,ALTER和DROP操作等操作。
例如,以下代码可用于创建酒店密钥空间:
SchemaStatement hotelSchemaStatement = SchemaBuilder.createTable("hotels"). addPartitionKey("id", DataType.text()). addColumn("name", DataType.text()). addColumn("phone", DataType.text()). addColumn("address", DataType.text()). addColumn("pois", DataType.set(DataType.text())); session.execute(hotelSchemaStatement);我们还导入com.datastax.driver.core.DataType,以便我们可以利用其静态操作来定义每列的数据类型。
使用程序化架构定义时避免冲突许多开发人员已经注意到,这种编程模式管理功能可以用作简化应用程序部署的“延迟初始化”技术:如果我们的应用程序使用的模式不存在,我们可以简单地以编程方式创建它。但是,在运行多个客户端时,建议不要使用此技术,即使使用IF NOT EXISTS语义也是如此。来自多个并发客户端的CREATE TABLE或ALTER TABLE语句可能导致节点之间的状态不一致,需要手动修复。
该驱动程序提供了用于监视和调试客户端使用Cassandra的功能,包括用于记录和度量的工具。还有一个查询跟踪功能,我们将在第12章中学习。
正如我们将在第10章中学到的,Cassandra使用了一个名为Simple Logging Facade for Java的日志API(SLF4J)。 Java驱动程序也使用SLF4J API。为了在Java客户端应用程序上启用日志记录,您需要在类路径上提供兼容的SLF4J实现。
下面是我们可以添加到Maven POM文件中的依赖示例,以选择Logback项目作为实现:
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.3</version> </dependency>您可以在http://logback.qos.ch/上了解有关Logback的更多信息。
默认情况下,Java驱动程序设置为使用DEBUG日志记录级别,这非常详细。我们可以通过利用Logback的配置机制来配置日志记录,该机制支持测试和生产环境的单独配置。 Logback首先检查表示测试配置的文件logback-test.xml的类路径,然后如果没有找到测试配置,则会搜索文件logback.xml。
有关Logback配置的更多详细信息,包括测试和生产环境的示例配置文件,请参阅配置页面。
有时,监视客户端应用程序的行为会有所帮助,以便检测异常情况和调试错误。 Java驱动程序收集有关其活动的指标,并使用Dropwizard指标库使这些指标可用。驱动程序报告连接,任务队列,查询和错误(如连接错误,读取和写入超时,重试和推测执行)的度量标准。
您可以通过Cluster.getMetrics()操作在本地访问Java驱动程序指标。 Metrics库还与Java Management Extensions(JMX)集成,以允许远程监控指标。默认情况下启用JMX报告,但可以在构建群集时提供的配置中覆盖此报告。
最受欢迎的早期Python客户端是Pycassa,它是在Thrift界面上构建的。但是,不再维护Pycassa项目,建议使用DataStax Python驱动程序进行所有新开发。
DataStax Python驱动程序的第一个完整版本于2014年1月推出,提供了会话管理,节点发现,连接池,同步/异步查询,负载平衡,查询跟踪,指标(使用Scales库),日志记录,身份验证和SSL。 2014年5月的2.0版本中添加了支持Cassandra 2.1及更高版本的分页,轻量级事务和Python 3支持等功能.Python Driver与Cassandra 1.2及更高版本兼容,并在Python 2.6,2.7,3.3和3.4。该驱动程序在Linux,Mac OS和Windows上运行。
DataStax网站上提供了驱动程序的官方文档,而GitHub上提供了源驱动程序。 您可以通过运行Python安装程序pip来安装驱动程序:
pip install cassandra-driver 安装Python和PIP要使用示例代码,您需要为您的平台提供兼容的Python版本(如前所述)和pip。 您可以通过下载脚本https://bootstrap.pypa.io/get-pip.py并运行命令python get-pip.py来安装pip。 您可能需要在Unix系统上通过sudo运行此命令。
这是一个连接到集群并在hotels表中插入行的简单示例:
from cassandra.cluster import Cluster cluster = Cluster(['127.0.0.1']) session = cluster.connect('hotel') session.execute(""" insert into hotels (id, name, phone) values (%s, %s, %s) """ ('AZ123', 'Super Hotel at WestWorld', '1-888-999-9999') )Python驱动程序包含一个名为cqlengine的对象映射器,可通过cassandra.cqlengine.models.Model类访问。 Python驱动程序还利用第三方库来提高性能,压缩和指标。使用Cython的一些C扩展用于加速性能。驱动程序也可以在PyPy上运行,PyPy是一个使用JIT编译器的备用Python运行时。降低的CPU消耗导致吞吐量提高,比普通Python高出两倍。压缩需要安装lz4或python-snappy库,具体取决于所需的算法。
最初由Node.js提供的基于Thrift的客户端绑定由Helenus提供,Jorge Bay提供的node-cassandra-cql包随后提供了一个CQL本机客户端。
DataStax Node.js驱动程序于2014年10月正式推出,基于node-cassandra-cql,添加了Apache Cassandra的其他DataStax驱动程序中常见的标准功能。您可以访问https://github.com/datastax/nodejs-driver访问该驱动程序。
Node.js驱动程序通过节点包管理器(NPM)安装:
npm install cassandra-driver 安装Node.js运行时和程序包管理器如果您没有使用Node的经验,可以在https://nodejs.org上获得包含Node.js和NPM的平台安装。 这些通常安装在Unix系统上的/ usr / local / bin / node和/ usr / local / bin / npm中。
语法有点不同,因为您在其他语言驱动程序中访问Client对象而不是Cluster。 其他结构非常相似:
var cassandra = require('cassandra-driver'); var client = new cassandra.Client({ contactPoints: ['127.0.0.1'], keyspace: 'hotel'});构建和执行参数化查询如下所示:
var query = 'SELECT * FROM hotels WHERE id=?'; client.execute(query, ['AZ123'], function(err, result) { assert.ifError(err); console.log('got hotel with name ' + result.rows[0].name); });Fauna是一个早期基于Thrift的Ruby客户端,由Twitter为Cassandra创建。 从2013年初开始,Theo Hultberg领导了cql-rb gem的开发,它成为2014年11月发布的DataStax Ruby Driver的基础。您可以访问https://github.com/datastax/ruby-driver上的Ruby驱动程序。
您可以使用Ruby Gems安装驱动程序:
$ gem install cassandra-driver这是一个创建集群和会话并执行迭代我们的hotels表的内容的简单异步查询的示例:
require 'cassandra' cluster = Cassandra.cluster(hosts: ['127.0.0.1']) session = cluster.connect('hotel') future = session.execute_async('SELECT * FROM hotels') future.on_success do |rows| rows.each do |row| puts "Hotel: #{row['id']} Name: #{row['name']}" end end future.joinRuby驱动程序在标准Ruby上运行,但也可以在JRuby 1.7或更高版本上运行以提高性能。 该驱动程序在Linux,Mac OS上运行,但不支持Windows。
DataStax C#驱动程序于2013年7月首次发布,为使用.NET框架的Windows客户端提供支持。 出于这个原因,它也经常被称为“.NET驱动程序”。
C#驱动程序可在NuGet上获得,NuGet是Microsoft开发平台的软件包管理器。 在PowerShell中,在程序包管理器控制台中运行以下命令:
PM> Install-Package CassandraCSharpDriver要使用该驱动程序,请在Visual Studio中创建一个新项目,并添加一个引用Cassandra名称空间的using指令。 以下示例连接到我们的酒店密钥空间,并将新记录插入到hotels表中:
Cluster Cluster = Cluster.Builder() .AddContactPoint("127.0.0.1") .Build(); ISession Session = Cluster.Connect("hotel"); Session.Execute( "INSERT INTO hotels (id, name, phone) " + "VALUES (" + "'AZ123'," + "'Super Hotel at WestWorld'," + "'1-888-999-9999'," + ";");C#驱动程序与语言集成查询(LINQ)集成,后者是一种Microsoft .NET Framework组件,可为.NET语言添加查询功能;还有一个单独的对象映射器。
示例应用程序:KillrVideoLuke Tillman,Patrick McFadin和其他人创建了一个名为KillrVideo的视频共享应用程序。 KillrVideo是一个使用DataStax C#驱动程序构建的开源.NET应用程序,并部署到Microsoft的Azure云中。它还利用了DataStax Enterprise功能,例如与Apache Spark和Apache SOLR的集成。您可以在GitHub上下载源代码。
DataStax C / C ++驱动程序于2014年2月发布。您可以访问https://github.com/datastax/cpp-driver上的驱动程序,以及http://datastax.github.io/cpp-driver上的文档。
C / C ++驱动程序与其他驱动程序略有不同,因为它的API侧重于异步操作以排除同步操作。例如,创建会话是一个返回未来的异步操作:
#include <cassandra.h> #include <stdio.h> int main() { CassFuture* connect_future = NULL; CassCluster* cluster = cass_cluster_new(); CassSession* session = cass_session_new(); cass_cluster_set_contact_points(cluster, "127.0.0.1"); connect_future = cass_session_connect(session, cluster); if (cass_future_error_code(connect_future) == CASS_OK) { /* proceed with processing... */但是,如示例所示,通过立即阻止将来可以轻松支持同步语义。 构建和执行简单查询如下所示:
CassStatement* select_statement = cass_statement_new("SELECT * " "FROM hotel.hotels", 0); CassFuture* hotel_select_future = cass_session_execute(session, select_statement); if(cass_future_error_code(result_future) == CASS_OK) { const CassResult* result = cass_future_get_result(result_future); CassIterator* rows = cass_iterator_from_result(result); while(cass_iterator_next(rows)) { const CassRow* row = cass_iterator_get_row(rows); const CassValue* value = cass_row_get_column_by_name(row, "name"); const char* name; size_t name_length; cass_value_get_string(value, &name, &name_length); printf("Hotel_name: '%.*s'\n", (int)name_length, name); } }请记住,内存管理在C / C ++程序中非常重要; 为简洁起见,我们省略了对诸如集群,会话,期货和结果等对象进行释放的语句。
C / C ++驱动程序使用libuv库进行异步I / O操作,如果需要,还可以使用OpenSSL库进行加密的客户机 - 节点连接。 编译和链接的说明因平台而异,因此请参阅驱动程序文档以获取详细信息。
DataStax PHP驱动程序支持PHP服务器端脚本语言。 该驱动程序于2015年发布,包含DataStax C / C ++驱动程序,并支持Unix和Windows环境。
驱动程序有多个安装选项,但最简单的方法是使用PECL存储库:
pecl install cassandra以下简短示例从hotels表中选择行,并使用异步API打印出它们的值:
<?php $keyspace = 'hotel'; $session = $cluster->connect($keyspace); $statement = new Cassandra\SimpleStatement( 'SELECT * FROM hotels' ); $future = $session->executeAsync($statement); $result = $future->get(); foreach ($result as $row) { printf("id: %s, name: %s, phone: %s\n", $row['id'], $row['name'], $row['phone']); }您可以访问https://github.com/datastax/php-driver上的PHP驱动程序文档,以及https://datastax.github.io/php-driver上的源代码。
您现在应该了解Cassandra可用的各种客户端接口,它们提供的功能以及如何安装和使用它们。 我们特别关注DataStax Java驱动程序,以获得一些实践经验,即使您选择使用其他DataStax驱动程序,也应该为您提供良好的服务。 我们将在接下来的章节中继续使用DataStax Java驱动程序,因为我们会进行更多的阅读和编写。