Lenses JDBC Driver

Download

Lenses offers a JDBC 4.0 compliant driver, suitable for any application that uses the JDBC interface, to communicate with Apache Kafka via the Lenses platform.

Users of this library can:

  • Select from topics
  • Insert into topics
  • Fetch topics metadata
  • Fetch metadata around messages

Note

In the JDBC specification, the terminology for a record is a row; historically, the interfaces have been designed with relational databases in mind. In this guide, when you see the term row, you can think of a Kafka record value part. More on this later.

Installation

Lenses JDBC is available on maven central as a single artifact. Users of maven should take the snippet below:

<dependency>
    <groupId>io.lenses</groupId>
    <artifactId>lenses-jdbc</artifactId>
    <version>3.0.2</version>
</dependency>

Users of Gradle can utilize this:

compile 'io.lenses:lenses-jdbc:3.0.2'

The Lenses platform must be installed and running, as the JDBC driver executes queries against the APIs exposed by Lenses.

Semantics

The Java Database Connectivity (JDBC) API is the industry standard for database-independent connectivity between the Java programming language and a wide range of databases SQL databases and other tabular data sources, such as spreadsheets or flat files. Each row represents a unique record, and each column represents a field in the record. For example, a table that contains employee data for a company contains a row for each employee and columns representing employee information such as number, name, job title, email, office code, extension, the person to report to.

../../_images/employee_data.png

To query such a table one would write this SQL:

JDBC interfaces have been built with tabular data in mind.

A Kafka record data is anything but tabular. A record has a key (optional), a value (optional) and some metadata (partition, offset, timestamp) and headers. Furthermore, the key and value are not linear structure necessarily. For example, it is quite common to have a payload for an employee like the one below:

Lenses SQL engine can handle multiple data formats and multiple data structures, and it can address key and value field(-s) as well as metadata, When it comes to field selection, it can handle the full spectrum of nested fields, arrays, nested fields in arrays. Every result for a wildcard query like the following:

returns for each record the information split into three sections:
  • key
  • value
  • metadata

Any field selection made outside of the wildcard projection is attached to the value category. Given the query below

the result for each record has the information returned in two sections
  • value
  • metadata (always available)

To be able to query Kafka via JDBC, some rules have to be put in place to accommodate the multi values of a Kafka record. For example, a query like:

returns multi-value data for each record - and this is a result of a Kafka message structure. This does not fit into the JDBC models - you can think of trying to represent 3 tables (key, value, metadata) as one.

The convention for the JDBC driver is that it only picks up the information from the value category. Using wildcard selection means the value information section for a record returns only the data present in the Kafka record value. However through projections, the key and metadata fields can be brought into the JDBC row.

Note

To return the key and metadata fields into the result of a JDBC row, the query needs to project the key and metadata explicitly. For example SELECT *, _key.*, _meta.* FROM topic brings all the Kafka record information into the JDBC row.

Quick Start

As with most JDBC drivers, we must register it with the DriverManager before we can use it. The DriverManager is a singleton class that manages all known drivers and exposes them through the JDBC API. To register the lenses driver, we need to create an instance of the driver class, like so:

new io.lenses.jdbc4.LensesDriver()

You only need to place this in one place. It is typically located somewhere that it gets executed early in your application’s lifecycle.

Once the driver is registered, we can ask the DriverManager to open a Connection to the Lenses server. We need to provide the JDBC URI to the lenses API, as well as a username and password. Like so:

Connection conn = DriverManager.getConnection(
     "jdbc:lenses:kafka:http://localhost:3030",
     "username",
     "pasword");

Note the format of JDBC URI. You can see that it includes the scheme information before the hostname. The general format is jdbc:lenses:kafka:<lenses_hostname>:<lenses_port>

With this connection, we can create a Statement instance. A statement is an object by which we execute queries.

Statement stmt = conn.createStatement();

With a statement instance, we can use the executeQuery(String) method to send a query to the Lenses platform and return the matching records as JDBC rows.

ResultSet rs = stmt.executeQuery( "SELECT * FROM mytopic");

When a statement executes a query, it returns a ResultSet instance, which acts as an iterator for the results. The resultset has an internal cursor which points to the current row. A resultset can be traversed by invoking the next() method. When this method is invoked, if there is another row to read, it moves the cursor and returns true. If there are no more rows to return, it returns false. For each row, we can use getter methods on the resultset to retrieve the field values.

Connection conn = DriverManager.getConnection(
     "jdbc:lenses:kafka:http://localhost:3030",
     "username",
     "pasword");

Statement stmt = conn.createStatement();

ResultSet rs = stmt.executeQuery("SELECT * FROM mytopic");

while (rs.next()) {
    System.out.println(rs.getString("name"));
    System.out.println(rs.getInt("age"));
    System.out.println(rs.getString("location"));
}

Connections, statements, and resultsets maintain an open socket while they are in use, so once you have finished processing it is best practice to close them, rather than wait for them to be garbage collected. In Java 7 onwards, you can use the try-with-resources expression.

try (Statement stmt = conn.createStatement()) {
    val rs = stmt.executeUpdate("SELECT * FROM mytopic");
}

Alternatively, in earlier versions of Java, you can wrap with a try-catch block.

Select

As we have seen in the earlier section, you can use a Statement to execute a query. There are two ways to execute a query on a Statement.

The first is to use the executeQuery(String) method which returns an ResultSet directly. The second is to use execute(String) which assigns the resultset to the statement so that it can be retrieved later using stmt.getResultSet().

Once you have a reference to a resultset, we can query it. Recall the next() method from the quick-start. Internally the resultset maintains a cursor which points to the current row (Kafka record). Each time next() is invoked, this cursor moves to a single position. If the cursor then points to a valid row, next() returns true. If the cursor has moved past the end of the results, it then returns false.

If you are not familiar with JDBC previously, then it is worth knowing that the resultset acts as both the iterator, and the current element.

It is possible to move the cursor to any offset in the resultset. This is possible because the resultset is stored in memory once it has been loaded from the store. For example, we can move the cursor to the last row, the first row, a specific row, or a relative offset from the current position.

Here is an example of accessing the last record first, and then the first record second.

Connection conn = DriverManager.getConnection(
        "jdbc:lenses:kafka:http://localhost:3030",
        "username",
        "pasword");

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM mytopic");

rs.last();
System.out.println(rs.getString("name"));

rs.first();
System.out.println(rs.getString("name"));

It is also possible to re-use a statement to execute another query, but any methods called on a given statement always refer to the last executed query. Therefore you should not use the same statement instances in a concurrent context.

Note

The resultset offers get methods for all supported SQL types, but Lenses JDBC is limited to the types supported by AVRO and JSON. Therefore, attempting to use methods such as getNClob and getSQLXML results in a SQLFeatureNotSupportedException being thrown.

Nested Fields

Lenses SQL supports nested fields using dot-delimited syntax. So if you wish to select the field postcode from a structure called address, alongside the name at the top level, then the query would be as the following:

ResultSet rs = stmt.executeQuery("SELECT name, address.postcode FROM mytopic");
while (rs.next()) {
    System.out.println(rs.getString("name"));
    System.out.println(rs.getString("address.postcode"));
}

Notice the dot-separated field name used when accessing nested fields.

Metadata

It is common to execute SQL queries which have been generated programmatically. In this case, the number of columns, their order, and types can not be known upfront. In this situation, the JDBC specification allows us to provide metadata for a resultset.

By using the metadata, it is possible to retrieve the number of columns, the column types - both JDBC internal type and JVM type - the names of the columns, precision and scale information, whether a numerical type is signed, and finally if the column allows for null values.

Here is a program that prints out all the metadata for a given result set.

Connection conn = DriverManager.getConnection("jdbc:lenses:kafka:https://localhost:3030", "user", "pass");
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM mytopic");
ResultSetMetaData meta = rs.getMetaData();

for (int k = 1; k <= meta.getColumnCount(); k++) {
    System.out.println("ColumnName=" + meta.getColumnName(k));
    System.out.println("ColumnType=" + meta.getColumnTypeName(k));
    System.out.println("Nullability=" + meta.isNullable(k));
    System.out.println("Signed=" + meta.isSigned(k));
    System.out.println("Precision=" + meta.getPrecision(k));
    System.out.println("Scale=" + meta.getScale(k));
}

Insert

Lenses JDBC also can perform SQL inserts, which result in new records being written to a Kafka topic. The topic written to is the subject of the INSERT INTO clause, as is in a regular SQL query.

To perform an insert, we obtain a statement, as we have done previously. Then we use the executeUpdate() method, passing in a well-formed SQL insert query. For example:

Statement stmt = conn.createStatement();
int result = stmt.executeUpdate("INSERT INTO mytopic (name, city, lat, long) VALUES ('James T Kirk', 'Iowa City', 43.3, -54.2)");

The result returned indicates success.

Note

Lenses JDBC allows you to have multiple VALUES set to insert more records at once. Check the Lenses table engine tutorial here to see learn about multi values insert. PreparedStatement are not supported at the moment.

Connection Parameters

Connecting to a Lenses instance running behind a self-certified SSL where the certificates have not been stored in the JRE keystore, it requires an additional connection parameter: weakssl to true. This parameter instructs the driver to ignore any certificates which it cannot validate.

Properties props = new Properties();
props.setProperty("user", "myuser");
props.setProperty("password", "mypass");
props.setProperty("weakssl", "true");
Connection conn = DriverManager.getConnection(
        "jdbc:lsql:kafka:http://localhost:3030",
        props);

Notice we wrap up the user, password and weakssl parameters into a Properties object which is then passed into the connection method.

Note

It is advised to use this only in development environments. The preferred solution for self-certification in production is to install the certificates into the JRE keystore.

Database Metadata

Since JDBC is a specification that covers many databases, there is a need for extensive discovery functionality. This way, tools such as a desktop database client, or a query designer, can inspect the target database to find out details on tables, views, columns, supported types. JDBC supports this kind of discovery through metadata that is retrieved directly on the Connection instance.

Firstly, retrieve the metadata object itself.

DatabaseMetaData meta = conn.getMetaData();

Next, we can query for column information, list of tables, supported types, driver version, and so forth. The metadata available is quite extensive, and in this guide only covers the discovery of tables and columns, but the rest of the methods work in the same way.

Table Level Information

We can retrieve the list of tables known to the driver.

DatabaseMetaData meta = conn.getMetaData();
ResultSet rs = meta.getTables(null, null, "sometable", null);
while (rs.next()) {
    System.out.println("Table=" + rs.getString(3));
    System.out.println("Type=" + rs.getString(4));
}

The method getTables() accepts four parameters. The first two refer to schema-name and catalog, which are typically used by relational databases to namespace tables. Note that schema-name here is not related to the concept of a record having a schema type. The third parameter allows us to specify a table to search by, and the fourth parameter accepts an array of table types. The last parameter allows us to limit our search to user-level topics or system topics.

Column Level Information

Let’s retrieve information about columns for a single table (Kafka topic).

DatabaseMetaData meta = conn.getMetaData();
ResultSet rs = meta.getColumns(null, null, "sometable", "name*");
while (rs.next()) {
    System.out.println("Table=" + rs.getString(3));
    System.out.println("Column=" + rs.getString(4));
    System.out.println("Datatype=" + rs.getString(5));
}

The method getColumns() accepts four parameters. The first two refer to schema-name and catalog as in the table’s metadata. The third parameter allows us to specify a table to search by, and the fourth parameter allows a regex to limit the column search. These parameters can be null, which is equivalent to a match all, or include * as a wildcard.

The return type of this method is a regular ResultSet which is queryable in the same way any other resultset is. The example above prints out some key parts of the column info, but there are many other attributes. See the JDBC Javadoc for full details.

Unsupported Features

The JDBC interface is designed to cover dozens, if not hundreds, of possible databases, and as such, it is a wide-reaching specification. Due to the widely different possible implementations, there are features that some databases cannot implement. Lenses JDBC is no different. These are some of the features of the driver that cannot be used.

  • Transactions: Kafka transactions support is not wired in with the JDBC driver. Attempting to use the operations will result in a no-op or an exception being thrown.
  • Updates: Kafka records are immutable once stored, so methods which update are not supported. Inserts, as you have read earlier, are supported, but methods that mutate existing resultsets, or update a table will throw exceptions.
  • Stored procedures: Kafka has no support for stored procedures, so attempts to use the CallableStatement classes results in an exception.
  • Unsupported Types: The specification covers a multitude of types, some of which do not exist in the JSON or AVRO libraries. Trying to retrieve values from a row with an unsupported type such as a clob results in an SQLException.
  • Prepared statements and batch inserts: The support for such functionality is on the roadmap.

TroubleShooting

For troubleshooting or additional information and join our slack channel