Lenses JDBC

Download

Lenses offers a JDBC 4.0 compliant driver, suitable for any application that uses the JDBC interface, to communicate with Apache Kafka via the Lenses platform.

Users of this library can:

  • Select from topics
  • Insert into topics
  • Use prepared statements
  • Use batched inserts
  • Fetch metadata around topics
  • Fetch metadata around messages

Note

In the JDBC specification, the terminology for a record is a row because the interfaces were originally designed with relational databases in mind. In this guide, when you see the term row, you can think of a Kafka record.

Installation

Lenses JDBC is available on maven central as a single artifact. Users of maven can use the following coordinates:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-jdbc</artifactId>
    <version>2.0.1</version>
</dependency>

Uses of gradle can use:

compile 'com.landoop:lenses-jdbc:2.0.1'

The Lenses platform must be installed and running, as the JDBC driver will execute queries against the APIs exposed by Lenses.

Quick Start

As with most JDBC drivers, we must register it with the DriverManager before we can use it. The DriverManager is a singleton class that manages all known drivers and exposes them through the JDBC API. To register the lenses driver, we just need to create an instance of the driver class, like so:

new com.landoop.jdbc4.LsqlDriver()

You only need to place this in one place. And it is typically located somewhere that will be executed early in your application’s lifecycle.

Once the driver is registered, we can ask the DriverManager to open a Connection to the Lenses server. We need to provide the JDBC URI to the lenses API, as well as a username and password. Like so:

Connection conn = DriverManager.getConnection(
     "jdbc:lsql:kafka:http://localhost:3030",
     "username",
     "pasword");

Note the format of JDBC URI. You can see that it includes the scheme information before the hostname. The general format is jdbc:lsql:kafka:<lenses_hostname>:<lenses_port>

With this connection, we can create a Statement instance. A statement is an object by which we execute queries.

Statement stmt = conn.createStatement();

With a statement instance, we can use the executeQuery(String) method to send a query to the lenses platform and return the matching records as JDBC rows.

ResultSet rs = stmt.executeQuery( "SELECT * FROM mytopic WHERE _ktype='STRING' AND _vtype='AVRO'");

When a statement executes a query it will return a ResultSet instance, which acts as an iterator for the results of the query. The resultset has an internal cursor which points to the current row. A resultset can be traversed by invoking the next() method. When this method is invoked, if there is another row to read, it will move the cursor and return true. If there are no more rows to return, it will return false. For each row, we can use getter methods on the resultset to retrieve the field values.

Connection conn = DriverManager.getConnection(
     "jdbc:lsql:kafka:http://localhost:3030",
     "username",
     "pasword");

Statement stmt = conn.createStatement();

ResultSet rs = stmt.executeQuery("SELECT * FROM mytopic WHERE _ktype='STRING' AND _vtype='AVRO'");

while (rs.next()) {
    System.out.println(rs.getString("name"));
    System.out.println(rs.getInt("age"));
    System.out.println(rs.getString("location"));
}

Connections, statements, and resultsets will maintain an open socket while they are in use, so once you have finished processing it is best practice to close them, rather than wait for them to be garbage collected. In Java 7 onwards, you can use the try-with-resources expression.

try (Statement stmt = conn.createStatement()) {
    val rs = stmt.executeUpdate("SELECT * FROM mytopic WHERE _ktype='STRING' AND _vtype='AVRO'");
}

Or in earlier versions of Java, you can simply wrap with a try-catch block.

Select

As we have seen in the earlier section, you can use a Statement to execute a query. There are two ways to execute a query on a Statement.

The first is to use the executeQuery(String) method which will return an ResultSet directly. The second is to use execute(String) which will assign the resultset to the statement, so it can be retrieved later using stmt.getResultSet().

Once you have a reference to a resultset, we can query it. Recall the next() method from the quick-start. Internally the resultset maintains a cursor which points to the current row (Kafka record). Each time next() is invoked, this cursor moves to a single position. If the cursor then points to a valid row, next() returns true. If the cursor has moved past the end of the results, it will then return false.

If you are not familiar with JDBC previously, then it is worth knowing that the resultset acts as both the iterator, and the current element. It is different to the common iterator pattern you may be used to from the JDK or other libraries whereby you would use hasNext() to move the iterator cursor and next() to retrieve the element at that cursor.

It is possible to move the cursor to any offset in the resultset. This is because the resultset is stored in memory once it is retrieved. For example, we can move the cursor to the last row, the first row, a specific row, or a relative offset from the current position.

Here is an example of accessing the last record first, and then the first record second.

Connection conn = DriverManager.getConnection(
        "jdbc:lsql:kafka:http://localhost:3030",
        "username",
        "pasword");

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM mytopic WHERE _ktype='STRING' AND _vtype='AVRO'");

rs.last();
System.out.println(rs.getString("name"));

rs.first();
System.out.println(rs.getString("name"));

It is also possible to re-use a statement to execute another query, but any methods called on a given statement will always refer to the last executed query only. Therefore you should not use the same statement instances in a concurrent way.

Note

The resultset offers get methods for all supported SQL types, but Lenses JDBC is limited to the types supported by Avro and Json. Therefore attempting to use methods such as getNClob and getSQLXML will result in a SQLFeatureNotSupportedException being thrown.

Key and Value Types

When querying against Kafka topics, we must specify the key and value type for the query. This allows the driver to know the correct payload types.

We do this by specifying the _ktype and _vtype in the query. These must be included and set to whatever key and payload types are in use on that topic.

Note

Always escape the values with single quotes. Eg, 'AVRO'

Nested Fields

Lenses SQL supports nested fields using dot-delimited syntax. So if you wish to select the field postcode from a structure called address, alongside the name at the top level, then the query would be as the following:

ResultSet rs = stmt.executeQuery("SELECT name, address.postcode FROM mytopic WHERE _ktype='STRING' AND _vtype='AVRO'");
while (rs.next()) {
    System.out.println(rs.getString("name"));
    System.out.println(rs.getString("address.postcode"));
}

Notice that the dot-separated form is also used when accessing the field via the getters.

Metadata

It is very common to execute queries that have been generated programmatically, so the number of columns and the order and types of those columns may not be known statically in advance. To deal with this common situation, the JDBC specification allows us to query metadata on a resultset.

Using this metadata it is possible to retrieve the number of columns, the column types - both JDBC internal type and JVM type - the names of the columns, precision and scale information, whether a numerical type is signed, and finally if the column allows for null values.

Here is a program that will print out all the metadata for a given result set.

Connection conn = DriverManager.getConnection("jdbc:lsql:kafka:https://localhost:3030", "user", "pass");
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM mytopic");
ResultSetMetaData meta = rs.getMetaData();

for (int k = 1; k <= meta.getColumnCount(); k++) {
    System.out.println("ColumnName=" + meta.getColumnName(k));
    System.out.println("ColumnType=" + meta.getColumnTypeName(k));
    System.out.println("Nullability=" + meta.isNullable(k));
    System.out.println("Signed=" + meta.isSigned(k));
    System.out.println("Precision=" + meta.getPrecision(k));
    System.out.println("Scale=" + meta.getScale(k));
}

Insert

Lenses JDBC also has the ability to perform SQL inserts, which result in new records being written to a Kafka topic. The topic written to is the subject of the INSERT INTO clause, as is in a regular SQL query.

To perform an insert, we obtain a statement, as we have done previously. Then we use the executeUpdate() method, passing in a well-formed SQL insert query. For example:

Statement stmt = conn.createStatement();
int result = stmt.executeUpdate("INSERT INTO mytopic (name, city, lat, long) VALUES ('James T Kirk', 'Iowa City', 43.3, -54.2)");

The result returned indicates success.

Note

Lenses JDBC only supports a single insert per execution. Do not use ‘;’ to separate multiple inserts as this will not work. The correct approach to insert multiple records is to use a PreparedStatement.

Prepared Statements

If you need to insert multiple records at once, then it is more efficient to use prepared statements, sometimes in conjunction with batching.

A prepared statement uses a special type of statement called a PreparedStatement which is a subclass of the regular Statement object. Such an instance is obtained from the Connection object using the prepareStatement(String) method rather than the previously seen createStatement().

PreparedStatement stmt = conn.prepareStatement("INSERT INTO mytopic (name, city, lat, long) VALUES (?, ?, ?, ?)")

Unlike the regular create statement method, a prepared statement requires the SQL query to be specified when the statement is created. This is because a prepared statement is parsed in advance. As a result, this type of statement can be re-used across multiple executions more efficiently as the server does not have to parse the query each time it is used. In addition, the driver is able to send a more compact payload to the server, which results in faster operations for large datasets.

There are two main differences in the API of a prepared statement. The first is the ability to set parameters on the query, in a way that the driver will take care of any encoding that needs to happen (such as correctly escaping a String or converting between values). The second is that a statement can have multiple rows batched before execution, avoiding multiple round trips to the server.

Setting Parameters

As the advantage of a prepared statement is the multiple executions, we need some way of repeatedly populating rows with values. To do this, we must first have created the statement with placeholders. Placeholders are simply the ? character nested in a query. For example:

PreparedStatement stmt = conn.prepareStatment("INSERT INTO mytopic (name, city, lat, long) VALUES (?, ?, ?, ?)")

Then we use the parameter set methods, which exist for most types, such as setString(Int, String) or setBoolean(Int, String). The first parameter is the index position in the list of placeholders. Be aware these are 1 indexed and not zero-indexed. Once the parameters are set, we can perform the insert operations using the execute() method. A full example:

Connection conn = DriverManager.getConnection("jdbc:lsql:kafka:https://localhost:3030", "user", "pass");
PreparedStatement stmt = conn.prepareStatement("INSERT INTO mytopic (name, city, lat, long) VALUES (?, ?, ?, ?)");
stmt.setString(1, "Tyrian Lannister");
stmt.setString(2, "Kings Landing");
stmt.setDouble(3, 67.5);
stmt.setDouble(4, -41.2);
stmt.execute();

After the insert has been executed, we can set new parameters and execute again. We do not need to obtain another instance of the statement. In fact, after execution, the next insert inherits the previous values. This is useful if you want to prepare a statement with a set of values, and then in a loop repeatedly change just one or a few values and execute. This next example shows multiple rows being inserted, with only a subset of the values changing on each execute:

List<String> characters = Arrays.asList("Tyrian Lannister", "Cersei Lannister", "Tywin Lannister");

Connection conn = DriverManager.getConnection("jdbc:lsql:kafka:https://localhost:3030", "user", "pass");
PreparedStatement stmt = conn.prepareStatement("INSERT INTO mytopic (name, city, lat, long) VALUES (?, ?, ?, ?)");
stmt.setString(2, "Kings Landing");
stmt.setDouble(3, 67.5);
stmt.setDouble(4, -41.2);

for (String character : characters) {
    stmt.setString(1, character);
    stmt.execute();
}

stmt.close();

Batch Inserts

The second feature of the prepared statements is the ability to do batching. Instead of invoking execute() after each row is ready, we can add them to a buffer, which JDBC calls a batch. Then, once we have populated the batch to our target size, we can insert all rows in the batch in a single operation. Here is the previous example but re-written to use batching:

List<String> characters = Arrays.asList("Tyrian Lannister", "Cersei Lannister", "Tywin Lannister");

Connection conn = DriverManager.getConnection("jdbc:lsql:kafka:https://localhost:3030", "user", "pass");
PreparedStatement stmt = conn.prepareStatement("INSERT INTO mytopic (name, city, lat, long) VALUES (?, ?, ?, ?)");
stmt.setString(2, "Kings Landing");
stmt.setDouble(3, 67.5);
stmt.setDouble(4, -41.2);

for (String character : characters) {
    stmt.setString(1, character);
    stmt.addBatch();
}

stmt.executeBatch();
stmt.close();

You can see that the addBatch() method is used to add the current row into the buffer, and then stmt.executeBatch() is invoked once the batch is ready to be executed. In addition, if you wish to clear an existing batch for whatever reason, you can call clearBatch(). There is no need to invoke this after executeBatch as the batch will be automatically cleared after execution.

It is highly recommended to use batching over single inserts whenever you have to insert more than a trivial amount of data. The batch size to use will vary depending on the number of fields and the size of the data, so you should experiment. A good starting point would be 100 rows per batch.

Note

In the current release, prepared statements are limited to inserts only.

Metadata

Sometimes the need may arise to use batching or prepared statements in a generic way like you would for select queries. In this scenario, because the prepared statement is pre-parsed, it is possible to extract the metadata on the columns before the query has actually been executed. To facilitate this, the metadata method is exposed on the prepared statement instance.

By using this metadata we can programmatically set parameters, without needing to know in advance what those parameters are. Here is a very simple example that uses the metadata to retrieve parameter values from a map before executing the query.

Map<String, Object> values = new HashMap<>();
values.put("name", "Walter White");
values.put("city", "Albuquerque");
values.put("lat", 51.0);
values.put("long", 12.3);

Connection conn = DriverManager.getConnection("jdbc:lsql:kafka:https://localhost:3030", "user", "pass");
PreparedStatement stmt = conn.prepareStatement("INSERT INTO mytopic (name, city, lat, long) VALUES (?, ?, ?, ?)");
ResultSetMetaData meta = stmt.getMetaData();
for (int k = 1; k <= meta.getColumnCount(); k++) {
    String columnName = meta.getColumnName(k);
    Object value = values.get(columnName);
    stmt.setObject(k, value);
}
stmt.execute();

Connection Parameters

To connect to a lenses instance which is running behind a self-certified SSL where the certificates have not been stored in the JRE keystore, you will need to set an additional parameter on the connection: weakssl to true. This instructs the driver to ignore any certificates which it cannot validate.

Properties props = new Properties();
props.setProperty("user", "myuser");
props.setProperty("password", "mypass");
props.setProperty("weakssl", "true");
Connection conn = DriverManager.getConnection(
        "jdbc:lsql:kafka:http://localhost:3030",
        props);

Notice we wrap up the user, password and weakssl parameters into a Properties object which is then passed into the connection method.

Note

It is advised to use this only in development environments. The preferred solution for self-certification in production is to install the certificates into the JRE keystore.

Database Metadata

Since JDBC is a specification that covers many databases, there is a need for extensive discovery functionality. This way, tools such as a desktop database client, or a query designer, can inspect the target database to find out details on tables, views, columns, supported types and so forth. JDBC supports this kind of discovery through metadata that is retrieved directly on the Connection instance.

Firstly, retrieve the metadata object itself.

DatabaseMetaData meta = conn.getMetaData();

Next, we can query for column information, list of tables, supported types, driver version and so forth. The metadata available is quite extensive, and in this guide, we will only cover the discovery of tables and columns, but the rest of the methods work in the same way.

Table Level Information

We can retrieve the list of tables known to the driver.

DatabaseMetaData meta = conn.getMetaData();
ResultSet rs = meta.getTables(null, null, "sometable", null);
while (rs.next()) {
    System.out.println("Table=" + rs.getString(3));
    System.out.println("Type=" + rs.getString(4));
}

The method getTables() accepts four parameters. The first two refer to schema-name and catalog, which are typically used by relational databases to namespace tables. Note that schema-name here is not related to the concept of a record having a schema type. The third parameter allows us to specify a table to search by, and the fourth parameter accepts an array of table types. The last parameter allows us to limit our search to user level topics, or system topics.

Column Level Information

Let’s retrieve information about columns for a single table (Kafka topic).

DatabaseMetaData meta = conn.getMetaData();
ResultSet rs = meta.getColumns(null, null, "sometable", "name*");
while (rs.next()) {
    System.out.println("Table=" + rs.getString(3));
    System.out.println("Column=" + rs.getString(4));
    System.out.println("Datatype=" + rs.getString(5));
}

The method getColumns() accepts four parameters. The first two refer to schema-name and catalog as in the table’s metadata. The third parameter allows us to specify a table to search by, and the fourth parameter allows a regex to limit the column search. These parameters can be null, which is equivalent to a match all, or include * as a wildcard.

The return type of this method is a regular ResultSet which is queryable in the same way any other resultset is. In the example above we print out some key parts of the column info, but there are many other attributes. See the JDBC Javadoc for full details.

Unsupported Features

The JDBC interface is designed to cover dozens, if not hundreds, of possible databases, and as such, it is a wide-reaching specification. Due to the widely different possible implementations, there are features that some databases cannot implement. Lenses JDBC is no different. These are some of the features of the driver that cannot be used.

  • Transactions: Kafka has no notion of transactional support, so methods like commit() and rollback() have no effect. Attempting to use these will either result in a no-op or an exception being thrown.
  • Updates: Kafka records are immutable once stored, so methods which update are not supported. Inserts, as you have read earlier, are supported, but methods that mutate existing resultsets, or update a table will throw exceptions.
  • Stored procedures: Kafka has no support for stored procedures, so attempts to use the CallableStatement classes will result in an exception.
  • Unsupported Types: The specification covers a multitude of types, some of which do not exist in the Json or Avro libraries. So attempts to retrieve values from a row with an unsupported type such as a clob will result in an SQLException.

TroubleShooting

For troubleshooting or additional information and join our slack channel