Building a Converged Database Java Application with Spring Boot
In this article, we’ll build a converged database News Event Streaming application using Java 21+, Spring Boot, and Oracle Database.
The demo application will stream news articles as events from a REST API, processing those articles to vector embeddings and storing them in a multi-modal database that includes relational, document, event, and vector data types.
Just want the code? Check out the demo app GitHub Repository.
Processed news articles can be queried over REST using vector similarity search, for a plain query or an LLM-enhanced summarization query.
What is a converged database?
A multi-modal database, that abides by the principle of One Database, Any Data, Any Workload.
Oracle Database handles relational, JSON, graph, spatial, event-streaming, vectors, and more — all within a single, converged database engine.
Why is this important?
The News Event Streaming Application makes use of the following four converged Oracle Database features, in addition to OCI GenAI APIs for text embedding and chat:
(1) Relational data schemas: news and news_vector tables are created to store news articles and their vector embeddings. A news article may have one or more vector embeddings, depending on the size of the article.
(2) JSON Relational Duality Views: The news and news_vector tables are combined in a read-write duality view that gives document style access to data from both tables.
(3) Transactional Event Queues: We stream news article events using the Kafka Java Client for Oracle Database Transactional Event Queues, with topic for raw news, and a topic for parsed news JSON data (including vector embeddings).
(4) Vector Data and Similarity Search: Using news article vector embeddings and the NewsService REST API, we can query for news articles that are similar in meaning to an input text. We can also summarize news articles similar to an input query by combing vector search with an OCI GenAI chat model!
Vector, JSON, and Relational Data
Need a primer on Vector Databases or JSON Relational Duality Views? Check out my Intro to Vector Databases article, or 7 Reasons to Try JSON Relational Duality Views.
The com/example/news/genai/vectorstore package contains classes for working with vector and JSON data. The NewsStore class implements an addAll method to save news JSON data containing vectors using a JSON Relational Duality View.
The News JSON is saved in a single, atomic transaction, persisting both the article and vector data together.
/**
* Adds a list of Embeddings to the vector store, in batches.
* @param newsList To add.
* @param connection Database connection used for upsert.
*/
public void addAll(ConsumerRecords<String, News> newsList, Connection connection) {
final String sql = """
insert into news_dv (data) values(?)
""";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
int i = 0;
for (ConsumerRecord<String, News> record : newsList) {
News news = record.value();
byte[] oson = jsonb.toOSON(news);
news.setNews_vector(new ArrayList<>());
stmt.setObject(1, oson, OracleTypes.JSON);
stmt.addBatch();
// If BATCH_SIZE records have been added to the statement, execute the batch.
if (i % BATCH_SIZE == BATCH_SIZE - 1) {
stmt.executeBatch();
}
i++;
}
// If there are any remaining batches, execute them.
if (newsList.count() % BATCH_SIZE != 0) {
stmt.executeBatch();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
The NewsStore class also implements similarity search, using a traditional relational query on news vectors. The search method finds news articles that are semantically similar (similar content) to an input query vector.
public List<NewsDTO> search(SearchRequest searchRequest, VECTOR vector, Connection connection) {
// This query is designed to:
// 1. Calculate a similarity score for each row based on the cosine distance between the embedding column
// and a given vector using the "vector_distance" function.
// 2. Order the rows by this similarity score in descending order.
// 3. Filter out rows with a similarity score below a specified threshold.
// 4. Return only the top rows that meet the criteria.
// 5. Group by article ID, so multiple chunks from the same article do not duplicate results.
final String searchQuery = """
select n.news_id, n.article, nv.score
from news n
join (
select news_id, max(score) as score
from (
select news_id, (1 - vector_distance(embedding, ?, cosine)) as score
from news_vector
order by score desc
)
where score >= ?
group by news_id
order by score desc
fetch first 5 rows only
) nv on n.news_id = nv.news_id
order by nv.score desc""";
List<NewsDTO> matches = new ArrayList<>();
try (PreparedStatement stmt = connection.prepareStatement(searchQuery)) {
// When using the VECTOR data type with prepared statements, always use setObject with the OracleType.VECTOR targetSqlType.
stmt.setObject(1, vector, OracleType.VECTOR.getVendorTypeNumber());
stmt.setObject(2, searchRequest.getMinScore(), OracleType.NUMBER.getVendorTypeNumber());
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
matches.add(fromResultSet(rs));
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return matches;
}
To handle these data requirements, the news-schema DDL combines Vector, JSON, and relational models in a single schema. Relational tables are created, containing a vector type and vector index for fast similarity search. Next, a JSON Relational Duality View is layered over the relational schema to provide JSON document access!
-- News articles
create table if not exists news (
news_id varchar2(36) default sys_guid() primary key,
article clob
);
-- News Vectors, many-to-one with news
create table if not exists news_vector (
id varchar2(36) default sys_guid() primary key,
news_id varchar2(36) ,
chunk varchar2(2500),
embedding vector(1024, FLOAT64) annotations(Distance 'COSINE', IndexType 'IVF'),
constraint fk_news_vector foreign key (news_id)
references news(news_id) on delete cascade
);
-- Vector index for News Vectors
create vector index if not exists news_vector_ivf_idx on news_vector (embedding) organization neighbor partitions
distance COSINE
with target accuracy 90
parameters (type IVF, neighbor partitions 10);
-- JSON Relational Duality View for the News Schema
create or replace force editionable json relational duality view news_dv
as
news @insert @update @delete {
_id : news_id
article
news_vector @insert @update @delete
@link (to : [NEWS_ID]) {
id
chunk
embedding
}
}
/
Event Producers and Consumers
Want a primer on transactional messaging with Oracle Database? Read Event Streaming and the Transactional Outbox Pattern or Intro to Kafka Java Clients for Oracle Database Transactional Event Queues for a refresher.
The com/example/news/events package contains classes for producing and consuming events with Oracle Database Transactional Event Queues Kafka API.
For example, the NewsConsumer class reads data from a topic, and saves the records to the database in a single, atomic transaction with Apache Kafka compatible APIs.
@Override
public void run() {
consumer.subscribe(Collections.singletonList(newsTopic));
while (true) {
// Poll a batch of records from the news topic.
ConsumerRecords<String, News> records = consumer.poll(Duration.ofMillis(200));
if (records.isEmpty()) {
continue;
}
try {
// Add all news records to the vector store
vectorStore.addAll(records, consumer.getDBConnection());
// You may also use auto-commit, or consumer.commitAsync()
consumer.commitSync();
log.info("Committed {} records", records.count());
} catch (Exception e) {
log.error("Error processing news events", e);
handleError();
}
}
}
private void handleError() {
try {
consumer.getDBConnection().rollback();
} catch (SQLException e) {
log.error("Error rolling back transaction", e);
}
}
The NewsParserConsumerProducer reads raw news data, parses and embeds that data, and then produces that message to a new topic. The message read and produce is included in the same atomic transaction, eliminating problems of data consistency when any of the read, embedding, or produce fail.
@Override
public void run() {
// Subscribe to the news_raw topic, consuming unparsed news events.
consumer.subscribe(Collections.singletonList(rawTopic));
while (true) {
// Poll a batch of records from the news_raw topic.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
if (records != null && records.count() > 0) {
processRecords(records);
}
}
}
private void processRecords(ConsumerRecords<String, String> records) {
// Both Producer and Consumer are instantiated with the same connection
producer.beginTransaction();
try {
List<CompletableFuture<News>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
// Asynchronous (with virtual threads) parse each record
futures.add(parser.parseAsync(record.value()));
}
for (CompletableFuture<News> future : futures) {
// Process the news events, publishing them to the news topic.
News news = future.get();
ProducerRecord<String, News> pr = new ProducerRecord<>(
parsedTopic,
news.get_id(),
news
);
producer.send(pr);
}
// Commit both the poll and publish of events.
producer.commitTransaction();
log.info("Successfully processed {} records", records.count());
} catch (Exception e) {
log.error("Error processing records", e);
// Abort both the poll and event publish on an unexpected processing error.
producer.abortTransaction();
}
}
Try out the application
Next, we’ll demonstrate the app by running it locally.
Prerequisites:
Database and Schema
The app requires access to an Oracle Database instance. The default configuration assumes a local containerized database running on port 1521. You can replicate this configuration by starting an Oracle Database Free container:
docker run --name oracledb -d -p 1521:1521 -e ORACLE_PASSWORD=testpwd gvenzl/oracle-free:23.6-slim-faststart
After a few moments, the database should start up. Once your database is ready, run the testuser.sql and news-schema.sql scripts to create a user, grants, tables, and duality view required by the application. A companion cleanup script, cleanup.sql, is provided to delete the tables, duality view, and topics used by the app.
If you don’t have Docker or for some reason can’t run Oracle Database locally, Autonomous Database is available in the OCI Always Free Tier
App configuration
You’ll need to set the following environment variables where your application is running, or provide these values in the application properties file
Once these values are configured, start the application like so. The REST APIs will be available on port 8080:
mvn spring-boot:run
Streaming News Data
In this section, we’ll stream news article data from the CNN Dailymail dataset. You can find test data files in the src/test/resources directory.
Using the NewsService REST API, we’ll insert one or more of these news article. POSTing news article data goes through the following processing steps:
POST a single article
curl -X POST "http://localhost:8080/news" \
-H "Content-Type: application/json" \
-d "@src/test/resources/one-record.json"
POST a small batch of articles by ingesting a subset of the news dataset.
curl -X POST "http://localhost:8080/news" \
-H "Content-Type: application/json" \
-d "@src/test/resources/input-small.json"
POST a large batch of articles
WARNING: The large data set contains almost 12,000 records. Before running this, ensure you’re comfortable with issuing approximately 25,000 text embedding requests.
curl -X POST "http://localhost:8080/news" \
-H "Content-Type: application/json" \
-d "@src/test/resources/input-data.json"
Try out Similarity Search
Find news articles similar to an input query
The minScore is the minimum similarity score for included articles, between 0 and 1. Higher values are more restrictive, lower values are more permissive for similarity.
curl -X GET "http://localhost:8080/news" \
-H "Content-Type: application/json" \
-d '{"input": "Large Hadron Collider particle accelerator", "minScore": 0.2}'
Sample output:
{"articles":[{"id":"e45bc7b6-6e5f-45e5-94c7-66c363460c74","article":"(CNN)The world's biggest and most powerful physics experiment... ]}
The following request summarizes an article by ID:
# Use your own ID in the path!
curl -X POST "http://localhost:8080/news/summarize/e45bc7b6-6e5f-45e5-94c7-66c363460c74"
Sample output:
The Large Hadron Collider (LHC), the world’s largest particle accelerator, has restarted after a two-year shutdown. The LHC aims to recreate conditions after the Big Bang, helping scientists understand the universe’s evolution. With a cost of €3 billion, it generates an immense number of particles, allowing researchers to explore unanswered questions in physics, such as the origin of mass and the nature of dark matter.
Summarize by input query
The following request summarizes an article that is semantically similar to the input query, if such an article exists:
curl -X POST "http://localhost:8080/news/summarize" \
-H "Content-Type: application/json" \
-d '{"input": "Large Hadron Collider particle accelerator", "minScore": 0.2}'
Output should be similar to above, depending on temperature, provided the article exists.
Truncate data (cleanup all embeddings and data)
When you’re done or want to clean up the data, use the following request to reset the database tables:
curl -X POST "http://localhost:8080/news/reset"
Questions? Leave a comment, open a GitHub issue, or connect with me on LinkedIn!