Building a Converged Database Java Application with Spring Boot

Building a Converged Database Java Application with Spring Boot

In this article, we’ll build a converged database News Event Streaming application using Java 21+, Spring Boot, and Oracle Database.

The demo application will stream news articles as events from a REST API, processing those articles to vector embeddings and storing them in a multi-modal database that includes relational, document, event, and vector data types.

Just want the code? Check out the demo app GitHub Repository.
Article content

Processed news articles can be queried over REST using vector similarity search, for a plain query or an LLM-enhanced summarization query.

Article content

What is a converged database?

A multi-modal database, that abides by the principle of One Database, Any Data, Any Workload.

Oracle Database handles relational, JSON, graph, spatial, event-streaming, vectors, and more — all within a single, converged database engine.

Why is this important?

  1. Simplified Architecture: No need to manage and integrate multiple databases.
  2. Consistent Security & Governance: Enforce policies uniformly across all data types.
  3. High Performance & Scalability: Optimized query execution across diverse workloads.
  4. Developer Productivity: Build applications faster without worrying about selecting and maintaining different databases.

Article content

The News Event Streaming Application makes use of the following four converged Oracle Database features, in addition to OCI GenAI APIs for text embedding and chat:

(1) Relational data schemas: news and news_vector tables are created to store news articles and their vector embeddings. A news article may have one or more vector embeddings, depending on the size of the article.

(2) JSON Relational Duality Views: The news and news_vector tables are combined in a read-write duality view that gives document style access to data from both tables.

Article content

(3) Transactional Event Queues: We stream news article events using the Kafka Java Client for Oracle Database Transactional Event Queues, with topic for raw news, and a topic for parsed news JSON data (including vector embeddings).

Article content

(4) Vector Data and Similarity Search: Using news article vector embeddings and the NewsService REST API, we can query for news articles that are similar in meaning to an input text. We can also summarize news articles similar to an input query by combing vector search with an OCI GenAI chat model!

Article content

Vector, JSON, and Relational Data

Need a primer on Vector Databases or JSON Relational Duality Views? Check out my Intro to Vector Databases article, or 7 Reasons to Try JSON Relational Duality Views.

The com/example/news/genai/vectorstore package contains classes for working with vector and JSON data. The NewsStore class implements an addAll method to save news JSON data containing vectors using a JSON Relational Duality View.

The News JSON is saved in a single, atomic transaction, persisting both the article and vector data together.

/**
 * Adds a list of Embeddings to the vector store, in batches.
 * @param newsList To add.
 * @param connection Database connection used for upsert.
 */
public void addAll(ConsumerRecords<String, News> newsList, Connection connection) {
    final String sql = """
            insert into news_dv (data) values(?)
            """;
    try (PreparedStatement stmt = connection.prepareStatement(sql)) {
        int i = 0;
        for (ConsumerRecord<String, News> record : newsList) {
            News news = record.value();
            byte[] oson = jsonb.toOSON(news);

            news.setNews_vector(new ArrayList<>());
            stmt.setObject(1, oson, OracleTypes.JSON);
            stmt.addBatch();

            // If BATCH_SIZE records have been added to the statement, execute the batch.
            if (i % BATCH_SIZE == BATCH_SIZE - 1) {
                stmt.executeBatch();
            }
            i++;
        }
        // If there are any remaining batches, execute them.
        if (newsList.count() % BATCH_SIZE != 0) {
            stmt.executeBatch();
        }
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
}        

The NewsStore class also implements similarity search, using a traditional relational query on news vectors. The search method finds news articles that are semantically similar (similar content) to an input query vector.

public List<NewsDTO> search(SearchRequest searchRequest, VECTOR vector, Connection connection) {
    // This query is designed to:
    // 1. Calculate a similarity score for each row based on the cosine distance between the embedding column
    // and a given vector using the "vector_distance" function.
    // 2. Order the rows by this similarity score in descending order.
    // 3. Filter out rows with a similarity score below a specified threshold.
    // 4. Return only the top rows that meet the criteria.
    // 5. Group by article ID, so multiple chunks from the same article do not duplicate results.
    final String searchQuery = """
        select n.news_id, n.article, nv.score
        from news n
        join (
            select news_id, max(score) as score
            from (
                select news_id, (1 - vector_distance(embedding, ?, cosine)) as score
                from news_vector
                order by score desc
            )
            where score >= ?
            group by news_id
            order by score desc
            fetch first 5 rows only
        ) nv on n.news_id = nv.news_id
        order by nv.score desc""";

    List<NewsDTO> matches = new ArrayList<>();
    try (PreparedStatement stmt = connection.prepareStatement(searchQuery)) {
        // When using the VECTOR data type with prepared statements, always use setObject with the OracleType.VECTOR targetSqlType.
        stmt.setObject(1, vector, OracleType.VECTOR.getVendorTypeNumber());
        stmt.setObject(2, searchRequest.getMinScore(), OracleType.NUMBER.getVendorTypeNumber());
        try (ResultSet rs = stmt.executeQuery()) {
            while (rs.next()) {
                matches.add(fromResultSet(rs));
            }
        }
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
    return matches;
}        

To handle these data requirements, the news-schema DDL combines Vector, JSON, and relational models in a single schema. Relational tables are created, containing a vector type and vector index for fast similarity search. Next, a JSON Relational Duality View is layered over the relational schema to provide JSON document access!

-- News articles
create table if not exists news (
    news_id    varchar2(36) default sys_guid() primary key,
    article    clob
);

-- News Vectors, many-to-one with news
create table if not exists news_vector (
    id        varchar2(36) default sys_guid() primary key,
    news_id   varchar2(36) ,
    chunk     varchar2(2500),
    embedding vector(1024, FLOAT64) annotations(Distance 'COSINE', IndexType 'IVF'),
    constraint fk_news_vector foreign key (news_id)
    references news(news_id) on delete cascade
);

-- Vector index for News Vectors
create vector index if not exists news_vector_ivf_idx on news_vector (embedding) organization neighbor partitions
distance COSINE
with target accuracy 90
parameters (type IVF, neighbor partitions 10);

-- JSON Relational Duality View for the News Schema
create or replace force editionable json relational duality view news_dv
 as
news @insert @update @delete {
    _id : news_id
    article
    news_vector @insert @update @delete
             @link (to : [NEWS_ID]) {
        id
        chunk
        embedding
    }
}
/        

Event Producers and Consumers

Want a primer on transactional messaging with Oracle Database? Read Event Streaming and the Transactional Outbox Pattern or Intro to Kafka Java Clients for Oracle Database Transactional Event Queues for a refresher.

The com/example/news/events package contains classes for producing and consuming events with Oracle Database Transactional Event Queues Kafka API.

Article content

For example, the NewsConsumer class reads data from a topic, and saves the records to the database in a single, atomic transaction with Apache Kafka compatible APIs.

@Override
public void run() {
    consumer.subscribe(Collections.singletonList(newsTopic));

    while (true) {
        // Poll a batch of records from the news topic.
        ConsumerRecords<String, News> records = consumer.poll(Duration.ofMillis(200));
        if (records.isEmpty()) {
            continue;
        }
        try {
            // Add all news records to the vector store
            vectorStore.addAll(records, consumer.getDBConnection());
            // You may also use auto-commit, or consumer.commitAsync()
            consumer.commitSync();
            log.info("Committed {} records", records.count());
        } catch (Exception e) {
            log.error("Error processing news events", e);
            handleError();
        }
    }
}

private void handleError() {
    try {
        consumer.getDBConnection().rollback();
    } catch (SQLException e) {
        log.error("Error rolling back transaction", e);
    }

}        

The NewsParserConsumerProducer reads raw news data, parses and embeds that data, and then produces that message to a new topic. The message read and produce is included in the same atomic transaction, eliminating problems of data consistency when any of the read, embedding, or produce fail.


@Override
public void run() {
    // Subscribe to the news_raw topic, consuming unparsed news events.
    consumer.subscribe(Collections.singletonList(rawTopic));
    while (true) {
        // Poll a batch of records from the news_raw topic.
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
        if (records != null && records.count() > 0) {
            processRecords(records);
        }
    }
}

private void processRecords(ConsumerRecords<String, String> records) {
    // Both Producer and Consumer are instantiated with the same connection
    producer.beginTransaction();
    try {
        List<CompletableFuture<News>> futures = new ArrayList<>();
        for (ConsumerRecord<String, String> record : records) {
            // Asynchronous (with virtual threads) parse each record
            futures.add(parser.parseAsync(record.value()));
        }

        for (CompletableFuture<News> future : futures) {
            // Process the news events, publishing them to the news topic.
            News news = future.get();
            ProducerRecord<String, News> pr = new ProducerRecord<>(
                    parsedTopic,
                    news.get_id(),
                    news
            );
            producer.send(pr);
        }

        // Commit both the poll and publish of events.
        producer.commitTransaction();
        log.info("Successfully processed {} records", records.count());
    } catch (Exception e) {
        log.error("Error processing records", e);
        // Abort both the poll and event publish on an unexpected processing error.
        producer.abortTransaction();
    }
}        

Try out the application

Next, we’ll demonstrate the app by running it locally.

Prerequisites:

  • Maven
  • Java 21+
  • OCI (Oracle Cloud Infrastructure) for OCI GenAI access. Used for Chat and Embedding models.
  • Docker (Optional)

Database and Schema

The app requires access to an Oracle Database instance. The default configuration assumes a local containerized database running on port 1521. You can replicate this configuration by starting an Oracle Database Free container:

docker run --name oracledb -d -p 1521:1521 -e ORACLE_PASSWORD=testpwd gvenzl/oracle-free:23.6-slim-faststart        

After a few moments, the database should start up. Once your database is ready, run the testuser.sql and news-schema.sql scripts to create a user, grants, tables, and duality view required by the application. A companion cleanup script, cleanup.sql, is provided to delete the tables, duality view, and topics used by the app.

If you don’t have Docker or for some reason can’t run Oracle Database locally, Autonomous Database is available in the OCI Always Free Tier

App configuration

You’ll need to set the following environment variables where your application is running, or provide these values in the application properties file

  • OJDBC_PATH: Path to the directory containing your ojdbc.properties file. I use the ojdbc.properties file in src/test/resources.
  • OCI_COMPARTMENT_ID: OCI Compartment OCID where your GenAI models are found. You can find this on the OCI console.
  • OCI_CHAT_MODEL_ID: OCI Chat Model OCID. You can find this on the OCI GenAI console. Currently, the app supports Cohere models.
  • OCI_EMBEDDING_MODEL_ID: OCI Embedding Model OCID. You can find this on the OCI GenAI console. Currently, the app supports Cohere models.

Once these values are configured, start the application like so. The REST APIs will be available on port 8080:

mvn spring-boot:run        

Streaming News Data

In this section, we’ll stream news article data from the CNN Dailymail dataset. You can find test data files in the src/test/resources directory.

Using the NewsService REST API, we’ll insert one or more of these news article. POSTing news article data goes through the following processing steps:

  1. Send the raw article data to the news_raw topic.
  2. Read from the news_raw topic, parsing, chunking, and embedding news article content. 3. News article chunk embedding is powered by OCI GenAI.
  3. Write JSON news articles, including their embeddings, to the news_parsed topic.
  4. Read from the news_parsed topic, and write the JSON from (3) to the news_dv JSON Relational Duality View.

POST a single article

curl -X POST "http://localhost:8080/news" \
  -H "Content-Type: application/json" \
  -d "@src/test/resources/one-record.json"        

POST a small batch of articles by ingesting a subset of the news dataset.

curl -X POST "http://localhost:8080/news" \
  -H "Content-Type: application/json" \
  -d "@src/test/resources/input-small.json"        

POST a large batch of articles

WARNING: The large data set contains almost 12,000 records. Before running this, ensure you’re comfortable with issuing approximately 25,000 text embedding requests.
curl -X POST "http://localhost:8080/news" \
  -H "Content-Type: application/json" \
  -d "@src/test/resources/input-data.json"        

Try out Similarity Search

Find news articles similar to an input query

The minScore is the minimum similarity score for included articles, between 0 and 1. Higher values are more restrictive, lower values are more permissive for similarity.

curl -X GET "http://localhost:8080/news" \
  -H "Content-Type: application/json" \
  -d '{"input": "Large Hadron Collider particle accelerator", "minScore": 0.2}'        

Sample output:

{"articles":[{"id":"e45bc7b6-6e5f-45e5-94c7-66c363460c74","article":"(CNN)The world's biggest and most powerful physics experiment... ]}

The following request summarizes an article by ID:

# Use your own ID in the path!
curl -X POST "http://localhost:8080/news/summarize/e45bc7b6-6e5f-45e5-94c7-66c363460c74"        

Sample output:

The Large Hadron Collider (LHC), the world’s largest particle accelerator, has restarted after a two-year shutdown. The LHC aims to recreate conditions after the Big Bang, helping scientists understand the universe’s evolution. With a cost of €3 billion, it generates an immense number of particles, allowing researchers to explore unanswered questions in physics, such as the origin of mass and the nature of dark matter.

Summarize by input query

The following request summarizes an article that is semantically similar to the input query, if such an article exists:

curl -X POST "http://localhost:8080/news/summarize" \
  -H "Content-Type: application/json" \
  -d '{"input": "Large Hadron Collider particle accelerator", "minScore": 0.2}'        

Output should be similar to above, depending on temperature, provided the article exists.

Truncate data (cleanup all embeddings and data)

When you’re done or want to clean up the data, use the following request to reset the database tables:

curl -X POST "http://localhost:8080/news/reset"        

Questions? Leave a comment, open a GitHub issue, or connect with me on LinkedIn!

To view or add a comment, sign in

Others also viewed

Explore topics