Data Access Objects (DAOs) are often employed to encapsulate and abstract access to persistent data stores. Utilizing DAOs for this purpose typically increases the modularity of the code interacting with the persistence layer of an application. In principle, if we are given the Abstract Programming Interface (API) definition of a specific DAO, we can freely switch the particular back-end systems we use for persistence, so long as our API can also be implemented by the alternative system(s). Well-designed DAOs also help separate application logic from domain logic. DAOs procedures can be expressed in terms of high-level, domain logic-friendly APIs and hide application logic within the DAO implementation. These benefits to modularity and separation of concerns cumulatively provide a strong incentive to use DAOs in production applications.
DAO design and implementation is particularly well-studied in the realm of relational databases. In a 2003 IBM DeveloperWorks article, for example, Sean Sullivan describes detailed approaches for programming DAOs against relational databases. He covers demarcating transactions, logging run-time behavioral details of DAO operations, and handling DAO exceptions. As Sullivan notes, Java programmers can write rich DAOs leveraging the Java Transaction API (JTA), which simplifies transaction demarcation and even allows demarcation across different database instances. In short, developers seeking to program DAOs that interact with relational databases can avoid common pitfalls by relying on an abundance of decades-old industry advice.
Unfortunately, DAO best practices for NoSQL databases are sparsely documented by comparison. In our own attempts to successfully leverage Apache Cassandra in an enterprise product, we have found this lack of documentation to be rather bothersome. To at least partially remedy this situation, we discuss at length some best practices we have discovered when using DAOs with Apache Cassandra.
In this document, we describe how to atomically persist data to an Apache Cassandra back-end system without sacrificing the modularity and elegance benefits afforded by the DAO abstraction. First, we discuss why atomicity is convenient, if not required, for some interactions with the database. Next, we illustrate our “atomicity demarcation” design for guaranteeing that queries are performed atomically across multiple DAOs. Then, we provide and discuss a Java-based implementation of our design. Finally, we close with a discussion about isolation in Cassandra and describe how to isolate queries when needed.
Prerequisites
Motivation
In this chapter, we show that atomicity is often a desired behavior but is difficult to implement elegantly using data access objects. By the end of this section, readers should understand the precise definition of atomicity, relevant Cassandra constructs used to implement atomic queries, as well as the problem that concerns this document: ensuring atomicity across DAO operations without requiring callers to understand Cassandra-specific implementation details.
To motivate our DAO operations, suppose we are tasked with storing information about students applying to colleges in a Cassandra database. We have the following high-level API requirements:
- GetReceivedApplications: Retrieve all students whose applications have been received by college c.
- GetSentApplications: Retrieve all colleges to which student s has sent applications.
- AddApplication: Add new college application data to the database.
Notice that this is a standard many-to-many relationship. In this case, students can apply to many colleges, and any individual college can have many student applicants.
Data Modeling
In Cassandra, we can model the data as follows:
CREATE TABLE sent_applications (
student_id UUID,
college_id UUID,
PRIMARY KEY ( student_id, college_id )
);
CREATE TABLE received_applications (
college_id UUID,
student_id UUID,
PRIMARY KEY ( college_id, student_id )
);
With the Cassandra tables defined above, we can now tackle our requirements in terms of the Cassandra Query Language (CQL):
-
GetReceivedApplications:
SELECT student_id FROM received_applications WHERE college_id = <c>;
-
GetSentApplications:
SELECT college_id FROM sent_applications WHERE student_id = <s>;
-
AddApplication (naive solution):
INSERT INTO sent_applications ( student_id, college_id ) VALUES ( <s>, <c> );
INSERT INTO received_applications ( college_id, student_id ) VALUES ( <c>, <s> );
There is a subtle yet significant problem with the AddApplication query: a database server or client failure may cause only one of the two INSERT statements to be executed. If this happens, we will introduce an inconsistency between sent_applications and received_applications. Even if we re-order the two INSERT queries, there will still be a possibility for inconsistency.
We use the term “inconsistency” above in the parlance of data integrity, not Cassandra’s consistency level settings. In particular, we say that an inter-table inconsistency (or simply, inconsistency) has occurred when two or more tables representing the same data relationship disagree about the state of the relationship (or metadata associated with the relationship). In our example above, it is possible for sent_applications to indicate that a particular college was sent an application, but received_applications may disagree if an ill-timed failure occurs. This difference in the (student, college) relationship between the two tables qualifies as an inter-table inconsistency.
To solve this issue, we would like for the above INSERT queries to either succeed or fail as a group. If a failure occurs before both INSERTS can be executed, then the database should reflect that neither INSERT was applied. Otherwise, both INSERTS should be applied. We refer to this desired success-or-failure behavior as atomicity. Before we discuss how we might achieve this behavior, it is worth considering the consequences of allowing inconsistencies to occur.
Recovering from Inconsistency
Assume that we have an inconsistency between sent_applications and received_applications. How might we go about detecting this? Without loss of generality, suppose sent_applications indicates that some student has sent an application to a college, but received_applications reflects the opposite scenario. We will have a difficult time finding all records in sent_applications not properly reflected in received_applications since the Cassandra Query Language (CQL) does not support sub-queries or table joins.
Supposing that the two tables defined above are the only tables in our Cassandra data model, we would need to perform a set intersection operation on all tuples contained in these two tables. Since Cassandra will not do this for us, a client application must perform this task, which will entail Ω(n) computational complexity, where n is the number of records contained in the table containing the most records. Having a Cassandra client perform such expensive task is clearly undesirable.
Even assuming we could perform this task efficiently enough for our purposes, we must then repair the inconsistency. In general, a recovery procedure could become very complicated, especially if there are many opportunities for failure. ING, for example, claims to use a complex finite state machine approach leveraging the Floyd-Warshall algorithm to implement a recovery procedure in Cassandra. Considering the difficulty of implementing robust recovery, we think that relying on inconsistency detection and repair logic is a bad idea in general. It would be better, on the other hand, to prevent the inconsistency from occurring in the first place.
Atomic Batches in Cassandra
We might next wonder if Cassandra itself has any features that support atomic execution of operations. In the early days of Cassandra, only individual row updates were atomic, necessitating the use of client-side commitlog or other recovery mechanisms for high-level atomic behavior. However, in Cassandra 1.2, the atomic batch statement was introduced, which provides precisely the behavior we seek. To batch together our INSERT queries atomically, we can execute the following atomic batch query:
BEGIN BATCH
INSERT INTO sent_applications ( student_id, college_id ) VALUES ( <x>, <y> );
INSERT INTO received_applications ( college_id, student_id ) VALUES ( <y>, <x> );
APPLY_BATCH;
Instrumented in this way, we guarantee that either both INSERT statements are applied or that neither statement is applied, preventing a potential cause of inter-table inconsistency. Our remaining challenge, then, lies in leveraging the atomic batch statement in our DAO queries.
Data Access Objects
For our college applications example above, we would create two DAOs to model the behavior in this instance: a SentApplicationsDAO and a ReceivedApplicationsDAO. Naively, each DAO will have a method that generates one of the two INSERT statements above. The caller of these DAOs, then, needs to batch together the individual queries from each DAO and execute them atomically as a batch. By requiring the caller to know about batch the queries together in this Cassandra-specific way, we will have violated the encapsulation guarantees afforded by the DAO pattern.
Managing and resolving this tension between DAO encapsulation and the desire for atomicity is the subject of this document. We would like to retain our DAO abstraction to separate application logic from business logic. However, the need to atomically batch queries generated by different DAOs challenges our design to encapsulate the DAOs in this way. In the upcoming sections of this document, we will describe how to prevent tight coupling between the DAO caller and the specific underlying database representation.
Atomicity Demarcation
As discussed previously, it is not trivial to batch queries between DAOs without requiring the caller of the DAO methods to know the Cassandra-specific mechanism of batching queries together. Clearly, requiring the caller to act in such an implementation-specific way would reduce code reuseability in the event that we want to switch databases for the back-end. In this chapter, we describe a design that mitigates implementation-specific dependencies of this sort between the DAOs and their callers.
The Approach
To decouple the DAO caller from the Cassandra-specific DAO implementation, we recommend an approach that we call atomicity demarcation(analagous to transaction demarcation in relational databases). In essence, the DAO caller specifies that it wants a series of DAO operations to be executed atomically as a batch. In this section, we describe the precise mechanisms by which this is implemented,
The UML diagram below illustrates a solution design to the DAO encapsulation problem outlined earlier. Since the figure is rather large and complicated, we will devote this entire section to describing it. Please refer to the diagram throughout our discussion as a reference.
We will begin with the CassandraApplicationService, which can be found near the center of the diagram. By no coincidence, its three methods exactly correspond to the high-level API requirements given for our application. It is the responsibility of this service class to implement business logic in terms of lower-level application logic. The business logic is represented by the API signature of the service class; the application logic will essentially be carried out through various calls to data access objects in the implementation of the service’s methods. Notice that an instance of CassandraApplicationService has two attributes: one CassandraSentApplicationsDAO called “sentAppsDAO” and one CassandraReceivedApplicationsDAO called “receivedAppsDAO”. These low-level DAOs are used to implement high-level business logic.
Observe that CassandraSentApplicationsDAO.addSentApplication and CassandraReceivedApplicationsDAO.addReceivedApplication both include a mediator instance in their parameter lists. The necessary communication about atomic batching between the two DAOs is instrumented and encapsulated by this CassandraAtomicityMediator instance. The mediator internally stores an atomic batch query to submit to Cassandra called batch. A CassandraAtomicityMediator also implements two methods. The first, addStatement, is called by the DAOs themselves to add statements to the internal batch query. The second and final method, commit, atomically commits the mediator’s internal batch query to Cassandra.
Understanding the relationship between the service, DAOs, and mediator is key to understanding our design approach. To clarify this relationship, we provide Python-esque psuedocode in the next section.
Pseudocode
The pseudocode below illustrates how the CassandraApplicationService interacts with its two DAOs and a CassandraAtomicityMediator instance:
class CassandraApplicationService:
self.receivedAppsDAO = CassandraReceivedApplicationsDAO()
self.sentAppsDAO = CassandraSentAppsDAO()
def addApplication(application):
mediator = CassandraAtomicityMediator()
self.receivedAppsDAO.addSentApplication(mediator, application)
self.sentAppsDAO.addSentApplication(mediator, application)
mediator.commit()
Observe that in addApplication, the service first instantiates a mediator, then passes it to several DAO methods, and finally calls mediator.commit. The DAO methods, in turn, populate the internal batch statement of the mediator using the addStatement API:
class CassandraReceivedApplicationsDAO:
def addReceivedApplication(cassandraMediator, application):
query = "INSERT INTO received_applications (college_id, student_id)"
"VALUES (%s, %s)" % (application.collegeId, application.studentId)
cassandraMediator.addStatement(query)
class CassandraSentApplicationsDAO:
def addSentApplication(cassandraMediator, application):
query = "INSERT INTO sent_applications (student_id, college_id)"
"VALUES (%s, %s)" % (application.studentId, application.collegeId)
cassandraMediator.addStatement(query)
Ultimately, the service is able to atomically persist information to Cassandra without having to know how the atomic commitment is taking place. Moreover, the service is able to demarcate atomicity of operations fluidly; it does so simply by passing a mediator to several DAO methods. If we were to add a new DAO for college admissions information, it would be straightforward to modify the service to atomically insert new application records with using the new DAO.
To review, DAOs only call the mediator’s addStatement function and never call commit. The service layer, on the other hand, never calls addStatementand only calls commit. Through this contract, and by using the mediator as a locus for communication about batching, we can provide atomic queries without requiring the service to know about low-level database logic, nor do we have to tightly couple any DAOs together.
Final Notes
First, notice that the IAtomicityMediator interface declares a commit method but does not declare an addStatement method or anything similar. The reason for this is that different persistence layers can use a variety of operation syntaxes. As an example, IBM CloudAnt uses REST calls to query a database, whereas relational databases use Standard Query Language strings. As such, we leave the implementing class of IAtomicityMediator to define methods for adding individual operations to the atomic query.
Next, note that the API signatures of DAO read queries do not include a mediator parameter. This is the case because Cassandra’s batch queries can only be executed on data modification language statements. This constraint is reasonable to define in the DAO interfaces since it is unlikely that there would ever be a need to batch together read queries, regardless of the back-end database in use.
Savvy readers might recall that Gamma et al. define a mediator pattern in their influential book on software design patterns. Our use of the mediator pattern is drawn directly from this text but features a significant difference: our mediator is not aware of its colleagues, nor is a colleague class instance associated with precisely one mediator. Instead, we pass the mediator to directly to its DAO colleagues’ methods and instantiate the mediator in a service colleague method. This method instantiation makes sense because our DAO instances are much longer-lived than the mediator itself. Moreover, by passing the mediator as a reference parameter, the communication boundaries are made much clearer than with the vanilla mediator pattern.
Finally, recall that we have not yet discussed the relationship between the DAO interfaces and their respective implementations in the UML diagram above. Rather than attempt to explain this here, we will illustrate the type-safety of the design in the next chapter using several Java code examples.
Type-Safe Example
In this section, we show that the DAO interfaces defined in the previous chapter can be bound to their implementations in strongly typed programming languages. Namely, we provide a Java-based proof of concept for atomicity demarcation. Note that the code in these examples leverage the Datastax Java Driver for Cassandra.
IReceivedApplicationsDAO
First, we will define the IReceivedApplicationsDAO interface:
/**
* Encapsulates access to data about received college applications.
*
* @param <T>
* The type of mediator utilized by the DAO.
*/
public interface IReceivedApplicationsDAO<T extends IAtomicityMediator> {
/**
* Coordinate with a mediator to persist data about a received college
* application.
*
* @param mediator
* modified by reference to include a query that adds a received
* application
* @param application
* the application that was received
*/
public void addReceivedApplication(T mediator, Application application);
/**
* Retrieves all applications received by the specified college.
*
* @param college
* the college whose applications to retrieve
* @return the applications received by the specified college
*/
public Collection<Application> getReceivedApplications(College college);
}
The interface is parameterized with a generic type T which extends IAtomicityMediator. By declaring the interface in this way, implementing classes can utilize different types of mediators, in turn allowing different databases to be used on the back-end.
CassandraReceivedApplicationsDAO
We can see that CassandraReceivedApplicationsDAO implements the IReceivedApplicationsDAO, parameterizing it with CassandraAtomicityMediator:
/**
* Cassandra-specific implementation of the {@link IReceivedApplicationsDAO}.
*/
public class CassandraReceivedApplicationsDAO extends CassandraDAO implements
IReceivedApplicationsDAO<CassandraAtomicityMediator> {
/**
* Construct a new instance.
*
* @param session
* our connection to the database.
*/
public CassandraReceivedApplicationsDAO(Session session) {
super(session);
}
@Override
public void addReceivedApplication(CassandraAtomicityMediator mediator, Application application) {
String query = String.format(
"INSERT INTO test_keyspace.received_applications ( college_id, student_id ) VALUES ( %s, %s);",
application.college.collegeId, application.applicant.studentId);
mediator.addStatement(query);
}
@Override
public Collection<Application> getReceivedApplications(College college) {
ResultSet rs = executeReadQuery(String.format(
"SELECT student_id FROM test_keyspace.received_applications WHERE college_id = %s;", college.collegeId));
Collection<Application> receivedApps = new ArrayList<Application>();
for (Row row : rs) {
Application app = new Application();
app.college = college;
Student applicant = new Student();
applicant.studentId = row.getUUID(0);
app.applicant = applicant;
receivedApps.add(app);
}
return receivedApps;
}
}
CassandraApplicationService
Next, let’s examine the CassandraApplicationService to see how it coordinates DAO and mediator instances:
/**
* A Cassandra-specific implementation of the {@link IApplicationService}.
*/
public class CassandraApplicationService implements IApplicationService {
// //////////////////////////////////////////
// Cassandra DAOs relevant to this service //
// //////////////////////////////////////////
private ISentApplicationsDAO<CassandraAtomicityMediator> sentAppsDAO;
private IReceivedApplicationsDAO<CassandraAtomicityMediator> receivedAppsDAO;
/**
* Construct a new {@link CassandraApplicationService}, initializing all
* fields.
*/
public CassandraApplicationService() {
receivedAppsDAO = new CassandraReceivedApplicationsDAO(SessionManager.getSession());
sentAppsDAO = new CassandraSentApplicationsDAO(SessionManager.getSession());
}
@Override
public void addApplication(Application application) {
CassandraAtomicityMediator mediator = new CassandraAtomicityMediator(SessionManager.getSession());
sentAppsDAO.addSentApplication(mediator, application);
receivedAppsDAO.addReceivedApplication(mediator, application);
mediator.commit();
}
@Override
public Collection<Application> getReceivedApplications(College college) {
return receivedAppsDAO.getReceivedApplications(college);
}
@Override
public Collection<Application> getSentApplications(Student student) {
return sentAppsDAO.getSentApplications(student);
}
}
By declaring the CassandraDAOs using the ISentApplicationsDAO and IReceivedApplicationsDAO interfaces, the service eliminates the risk of calling any Cassandra-specific DAO methods. Apart from this, our Java code very closely resembles the psuedocode defined earlier.
Driver
Finally, we show a driver program that instantiates a service to perform database operations:
/**
* A driver program that demonstrates use of service layer APIs.
*/
public class Main {
/**
* Create a series of college application records, then read them back out.
* Prints to stdout.
*
* @param args
* unused
*/
public static void main(String[] args) {
IApplicationService appService = new CassandraApplicationService();
Set<Student> students = new HashSet<Student>();
Set<College> colleges = new HashSet<College>();
for (int i = 0; i < 10; i++) {
// create applicant
Student applicant = new Student();
applicant.studentId = UUIDs.random();
students.add(applicant);
// create college
College college = new College();
college.collegeId = UUIDs.random();
colleges.add(college);
// create application
Application app = new Application();
app.applicant = applicant;
app.college = college;
// add the application to the database
System.out.format("Adding %s%n", app);
appService.addApplication(app);
}
// read out all created applications by student
for (Student student : students) {
System.out.println(appService.getSentApplications(student));
}
// read out all created applications by college
for (College college : colleges) {
System.out.println(appService.getReceivedApplications(college));
}
}
}
Eclipse Project
A full Java implementation (from which the above examples are excerpted) can be found at the following link. This archive can be imported directly into Eclipse and requires a valid Maven runtime to function correctly out-of-the-box.
Atomicity Does Not Imply Isolation
As we discussed earlier, atomicity in Cassandra refers to the sucess-or-fail behavior regarding a group of data modification language queries. It is critical to note that atomic batches are not isolated in Cassandra, meaning that a database read query may return the results of a partially-competed atomic batch. This reality can be problematic if not considered and dealt with carefully.
Case in Point
In this section, we will walk through a simple Python program (leveraging the Datastax Python Driver for Cassandra) that illustrates Cassandra’s lack of isolation. We will start by excerpting the Globals section of our program, which contains information about our schema and query definitions:
class Globals:
KEYSPACE_NAME = "test_keyspace"
TABLE_NAME = "test_table"
INSERT_ONE = "INSERT INTO %s ( value ) VALUES ( '%%s' );" % TABLE_NAME
DELETE_ONE = "DELETE FROM %s WHERE value = '%%s';" % TABLE_NAME
BATCH = "BEGIN BATCH %s APPLY BATCH;"
SELECT_ALL = "SELECT * from %s;" % TABLE_NAME
In this case, we will be executing INSERT, DELETE, and SELECT queries on a table called “test_table” in the “test_keyspace” keyspace.
Our program spawns threads of execution that perform one of two types of operations against the database: reads and writes. The read operation is defined as follows:
def performRead(barrier, num_iterations, num_expected):
# get the session
session = getSession()
# retrieve the rows, check actual vs. expected number
for _ in range(num_iterations):
barrier.wait() # maximize contention
rows = session.execute(SimpleStatement(Globals.SELECT_ALL,
consistency_level=Globals.READ_CONSISTENCY))
values = [row.value for row in rows]
if len(values) != num_expected:
print("Expected [%d], got [%d]" % (num_expected, len(values)))
Here, a thread of execution obtains a Cassandra session (connection to the database), then repeatedly queries for all rows in test_table. In between these repeated attempts, the thread waits for barrier synchronization. As we will see shortly, this is key for increasing contention on the database.
Next, we show the write operation:
def performWrite(barrier, num_iterations, num_remaining):
# construct a randomized permutation of the ascii letters
indexes = list(range(len(string.ascii_letters)))
random.shuffle(indexes)
batch = BatchStatement(consistency_level=Globals.WRITE_CONSISTENCY)
# build num_remaining insert statements
for index in indexes[:num_remaining]:
batch.add(Globals.INSERT_ONE % string.ascii_letters[index])
# delete all other ascii letters from the database
for index in indexes[num_remaining:]:
batch.add(Globals.DELETE_ONE % string.ascii_letters[index])
# get the session
session = getSession()
# perform the writes
for _ in range(num_iterations):
barrier.wait() # maximize contention
session.execute(batch)
In the write operation, a thread of execution constructs an atomic batch statement that deletes several records in test_table based on the value of their primary key. The primary key values are randomly sampled from the 26 English lowercase letters. Then, the same thread adds one insert query for each of the remaining English letters not used in the delete queries. Finally, the write thread repeatedly attempts to execute this batch statement against the Cassandra back-end, waiting for barrier synchronization between each attempt.
Finally, we can show the main function, which drives the program:
def main():
"""
Spawn a series of processes, approximately half of which attempt to read
data from the database, and the other half of which attempt to modify data.
"""
num_processes = 8 # number of sub-processes to spawn/fork
num_rows = 10 # number of rows to keep in the database
num_iterations = 100 # number of read and write "rounds" of execution]
# ensure that num_rows exist before we begin executing our threads
performWrite(mp.Barrier(1), 1, num_rows)
barrier = mp.Barrier(num_processes) # synchronization barrier for processes
processes = []
# create the read processes; each waits for sync before executing queries
for _ in range(num_processes // 2):
process = mp.Process(target=performRead,
args=(barrier, num_iterations, num_rows))
processes.append(process)
process.start()
# create the write processes; each waits for sync before executing queries
for _ in range(num_processes // 2, num_processes):
process = mp.Process(target=performWrite,
args=(barrier, num_iterations, num_rows))
processes.append(process)
process.start()
# wait for the sub-processes to complete before terminating parent process
for process in processes:
process.join()
As the documentation notes, we create several subprocesses, half of which attempt to execute the read operation and half of which attempt to execute the write operation. We will see program output whenever the read queries observe greater or fewer than num_rows in the database. Indeed, this happens frequently on a desktop with 8 CPU cores.
Tip
The fully-implemented Python program, from which the above examples are excerpted, can be downloaded at this link.
Mitigation Strategies
Observing partially completed batches is not always acceptable in applications that leverage Cassandra. For cases in which isolation is required, we recommend utilizing a mutex. The following approaches may suffice, depending on the circumstances in which isolation is needed:
- If all possible read/write conflicts are generated and observed on a single machine, shared memory can be used to mitigate the potential for observing partially-completed batches. To do this, we can simply share a thread-safe mutex between each process and/or thread that might be involved in a conflict.
- If multiple processes running on different machines can take part in a read/write conflict, global synchronization may be required. This can be achieved using a service like ZooKeeper (and the highly regarded Curator client library).
- For the adventurous: inter-process locking can also be done in Cassandra itself. See this ING video for details on the strategy.
Regardless of which mutex/lock is desired, an appropriate place to perform locking is in the service class. By making a trivial modification to our Java CassandraApplicationService class example from earlier, for example, we can provide isolation:
/**
* A Cassandra-specific implementation of the {@link IApplicationService}.
*/
public class CassandraApplicationService implements IApplicationService {
...
/**
* Construct a new {@link CassandraApplicationService}, initializing all
* fields.
*/
public CassandraApplicationService() {
receivedAppsDAO = new CassandraReceivedApplicationsDAO(SessionManager.getSession());
sentAppsDAO = new CassandraSentApplicationsDAO(SessionManager.getSession());
lock = new ReentrantLock(); // could also be a Cassandra- or ZK-based lock for global isolation
}
@Override
public void addApplication(Application application) {
CassandraAtomicityMediator mediator = new CassandraAtomicityMediator(SessionManager.getSession());
lock.lock();
sentAppsDAO.addSentApplication(mediator, application);
receivedAppsDAO.addReceivedApplication(mediator, application);
mediator.commit();
lock.unlock();
}