Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ RUN apt-get update && apt-get install -y procps && rm -rf /var/lib/apt/lists/*

WORKDIR /app

ARG MESSAGE_FORMAT

RUN mkdir -p /app/config

COPY app/build/libs/kpipe-*.jar /app/app.jar
COPY app/${MESSAGE_FORMAT}/build/libs/kpipe-*.jar /app/app.jar

ENV KAFKA_BOOTSTRAP_SERVERS=kafka:9092 \
KAFKA_CONSUMER_GROUP=kpipe-group \
Expand Down
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,14 @@ Follow these steps to test the KPipe Kafka Consumer:
### Build and Run

```bash
# Format code and build the application
./gradlew clean spotlessApply build
# Format code and build the library module
./gradlew clean :lib:spotlessApply :lib:build

# Format code and build the applications module
./gradlew :app:clean :app:spotlessApply :app:build

# Build the consumer app container and start all services
docker compose build --no-cache
docker compose build --no-cache --build-arg MESSAGE_FORMAT=<json|avro|protobuf>
docker compose down -v
docker compose up -d

Expand All @@ -314,6 +317,31 @@ Follow these steps to test the KPipe Kafka Consumer:
kcat -P -b kafka:9092 -t json-topic; done
```

### Working with the Schema Registry and Avro

If you want to use Avro with a schema registry, follow these steps:

```bash
# Register an Avro schema
curl -X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "{\"schema\": $(cat lib/src/test/resources/avro/customer.avsc | jq tostring)}" \
http://localhost:8081/subjects/com.kpipe.customer/versions

# Read registered schema
curl -s http://localhost:8081/subjects/com.kpipe.customer/versions/latest | jq

# Produce an Avro message using kafka-avro-console-producer
docker run -it --rm --network=host confluentinc/cp-schema-registry:latest kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic avro-topic \
--property schema.registry.url=http://localhost:8081 \
--property value.schema.id=1

# Enter the message:
{"id": 1, "name": "Mariano Gonzalez", "email": "mariano@example.com", "active": true, "registrationDate": 1635724800000, "address": {"street": "123 Main St", "city": "Chicago", "zipCode": "00000", "country": "USA"}, "tags": ["premium", "verified"], "preferences": {"notifications": "email"}}
```

Kafka consumer will:

- Connect to `localhost:9092`
Expand Down
5 changes: 5 additions & 0 deletions app/avro/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
description = "KPipe - Kafka Consumer Application Using Avro"

application {
mainClass.set("org.kpipe.App")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.kpipe.metrics.ConsumerMetricsReporter;
import org.kpipe.metrics.MetricsReporter;
import org.kpipe.metrics.ProcessorMetricsReporter;
import org.kpipe.registry.MessageFormat;
import org.kpipe.registry.MessageProcessorRegistry;
import org.kpipe.registry.MessageSinkRegistry;
import org.kpipe.sink.MessageSink;
Expand Down Expand Up @@ -65,7 +66,7 @@ public static void main(final String[] args) {
* @param config The application configuration
*/
public App(final AppConfig config) {
this.processorRegistry = new MessageProcessorRegistry(config.appName());
this.processorRegistry = new MessageProcessorRegistry(config.appName(), MessageFormat.AVRO);
this.sinkRegistry = new MessageSinkRegistry();

this.functionalConsumer = createConsumer(config, processorRegistry, sinkRegistry);
Expand Down Expand Up @@ -122,7 +123,7 @@ public static FunctionalConsumer<byte[], byte[]> createConsumer(
.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic(config.topic())
.withProcessor(createProcessorPipeline(processorRegistry))
.withProcessor(createJsonProcessorPipeline(processorRegistry))
.withPollTimeout(config.pollTimeout())
.withMessageSink(createSinksPipeline(sinkRegistry))
.withCommandQueue(commandQueue)
Expand Down Expand Up @@ -163,7 +164,7 @@ private static MessageSink<byte[], byte[]> createSinksPipeline(final MessageSink
* @param registry the message processor registry
* @return a function that processes messages through the pipeline
*/
private static Function<byte[], byte[]> createProcessorPipeline(final MessageProcessorRegistry registry) {
private static Function<byte[], byte[]> createJsonProcessorPipeline(final MessageProcessorRegistry registry) {
final var pipeline = registry.pipeline("parseJson", "addSource", "markProcessed", "addTimestamp");
return MessageProcessorRegistry.withErrorHandling(pipeline, null);
}
Expand Down
64 changes: 36 additions & 28 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,38 +1,46 @@
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
java
application
id("com.github.johnrengelman.shadow") version "8.1.1"
id("com.github.johnrengelman.shadow") version "8.1.1" apply false
}

description = "KPipe - Kafka Consumer Application"
subprojects {
apply(plugin = "application")
apply(plugin = "com.github.johnrengelman.shadow")

application {
mainClass.set("org.kpipe.App")
}
dependencies {
"implementation"(project(":lib"))
"implementation"("org.apache.kafka:kafka-clients:3.9.0")
"implementation"("org.slf4j:slf4j-simple:2.0.9")
}

dependencies {
implementation(project(":lib"))
implementation("org.apache.kafka:kafka-clients:3.9.0")
implementation("org.slf4j:slf4j-simple:2.0.9")
}
tasks.withType<JavaCompile> {
options.release.set(24)
}

tasks.jar {
enabled = false
}
tasks.named("jar") {
enabled = false
}

tasks.shadowJar {
archiveBaseName.set("kpipe")
archiveClassifier.set("")
archiveVersion.set(version.toString())
manifest {
attributes(
"Main-Class" to application.mainClass.get(),
"Implementation-Title" to project.name,
"Implementation-Version" to project.version
)
tasks.named<ShadowJar>("shadowJar") {
archiveBaseName.set("kpipe-${project.name}")
archiveClassifier.set("")
archiveVersion.set(version.toString())
manifest {
attributes(
mapOf(
"Main-Class" to application.mainClass.get(),
"Implementation-Title" to project.name,
"Implementation-Version" to project.version
)
)
}
mergeServiceFiles()
}
mergeServiceFiles()
}

tasks.build {
dependsOn(tasks.shadowJar)
}
tasks.named("build") {
dependsOn("shadowJar")
}
}
5 changes: 5 additions & 0 deletions app/json/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
description = "KPipe - Kafka Consumer Application Using JSON"

application {
mainClass.set("org.kpipe.App")
}
206 changes: 206 additions & 0 deletions app/json/src/main/java/org/kpipe/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package org.kpipe;

import java.lang.System.Logger;
import java.lang.System.Logger.Level;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.kpipe.config.AppConfig;
import org.kpipe.config.KafkaConsumerConfig;
import org.kpipe.consumer.ConsumerRunner;
import org.kpipe.consumer.FunctionalConsumer;
import org.kpipe.consumer.OffsetManager;
import org.kpipe.consumer.enums.ConsumerCommand;
import org.kpipe.metrics.ConsumerMetricsReporter;
import org.kpipe.metrics.MetricsReporter;
import org.kpipe.metrics.ProcessorMetricsReporter;
import org.kpipe.registry.MessageFormat;
import org.kpipe.registry.MessageProcessorRegistry;
import org.kpipe.registry.MessageSinkRegistry;
import org.kpipe.sink.MessageSink;

/**
* Application that consumes messages from a Kafka topic and processes them using a configurable
* pipeline of message processors.
*/
public class App implements AutoCloseable {

private static final Logger LOGGER = System.getLogger(App.class.getName());

private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
private final FunctionalConsumer<byte[], byte[]> functionalConsumer;
private final ConsumerRunner<FunctionalConsumer<byte[], byte[]>> runner;
private final AtomicReference<Map<String, Long>> currentMetrics = new AtomicReference<>();
private final MessageProcessorRegistry processorRegistry;
private final MessageSinkRegistry sinkRegistry;

/**
* Main entry point for the Kafka consumer application.
*
* @param args Command line arguments
*/
public static void main(final String[] args) {
final var config = AppConfig.fromEnv();

try (final App app = new App(config)) {
app.start();
final var normalShutdown = app.awaitShutdown();
if (!normalShutdown) {
LOGGER.log(Level.WARNING, "Application didn't shut down cleanly");
}
} catch (final Exception e) {
LOGGER.log(Level.ERROR, "Fatal error in Kafka consumer application", e);
System.exit(1);
}
}

/**
* Creates a new KafkaConsumerApp with the specified configuration.
*
* @param config The application configuration
*/
public App(final AppConfig config) {
this.processorRegistry = new MessageProcessorRegistry(config.appName(), MessageFormat.JSON);
this.sinkRegistry = new MessageSinkRegistry();

this.functionalConsumer = createConsumer(config, processorRegistry, sinkRegistry);

final var consumerMetricsReporter = new ConsumerMetricsReporter(
functionalConsumer::getMetrics,
() -> System.currentTimeMillis() - startTime.get(),
null
);

final var processorMetricsReporter = new ProcessorMetricsReporter(processorRegistry, null);

this.runner = createConsumerRunner(config, consumerMetricsReporter, processorMetricsReporter);
}

/** Creates the consumer runner with appropriate lifecycle hooks. */
private ConsumerRunner<FunctionalConsumer<byte[], byte[]>> createConsumerRunner(
final AppConfig config,
final MetricsReporter consumerMetricsReporter,
final MetricsReporter processorMetricsReporter
) {
return ConsumerRunner
.builder(functionalConsumer)
.withStartAction(c -> {
c.start();
LOGGER.log(Level.INFO, "Kafka consumer application started successfully");
})
.withHealthCheck(FunctionalConsumer::isRunning)
.withGracefulShutdown(ConsumerRunner::performGracefulConsumerShutdown)
.withMetricsReporters(List.of(consumerMetricsReporter, processorMetricsReporter))
.withMetricsInterval(config.metricsInterval().toMillis())
.withShutdownTimeout(config.shutdownTimeout().toMillis())
.withShutdownHook(true)
.build();
}

/**
* Creates a configured consumer for processing byte array messages.
*
* @param config The application configuration
* @param processorRegistry Map of processor functions
* @param sinkRegistry Map of sink functions
* @return A configured functional consumer
*/
public static FunctionalConsumer<byte[], byte[]> createConsumer(
final AppConfig config,
final MessageProcessorRegistry processorRegistry,
final MessageSinkRegistry sinkRegistry
) {
final var kafkaProps = KafkaConsumerConfig.createConsumerConfig(config.bootstrapServers(), config.consumerGroup());
final var commandQueue = new ConcurrentLinkedQueue<ConsumerCommand>();

return FunctionalConsumer
.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic(config.topic())
.withProcessor(createJsonProcessorPipeline(processorRegistry))
.withPollTimeout(config.pollTimeout())
.withMessageSink(createSinksPipeline(sinkRegistry))
.withCommandQueue(commandQueue)
.withOffsetManagerProvider(createOffsetManagerProvider(Duration.ofSeconds(30), commandQueue))
.withMetrics(true)
.build();
}

/**
* Creates an OffsetManager provider function that can be used with FunctionalConsumer builder
*
* @param commitInterval The interval at which to automatically commit offsets
* @param commandQueue The command queue
* @return A function that creates an OffsetManager when given a Consumer
*/
private static Function<Consumer<byte[], byte[]>, OffsetManager<byte[], byte[]>> createOffsetManagerProvider(
final Duration commitInterval,
final Queue<ConsumerCommand> commandQueue
) {
return consumer ->
OffsetManager.builder(consumer).withCommandQueue(commandQueue).withCommitInterval(commitInterval).build();
}

/**
* Creates a message sink pipeline using the provided registry.
*
* @param registry the message sink registry
* @return a message sink that processes messages through the pipeline
*/
private static MessageSink<byte[], byte[]> createSinksPipeline(final MessageSinkRegistry registry) {
final var pipeline = registry.<byte[], byte[]>pipeline("logging");
return MessageSinkRegistry.withErrorHandling(pipeline);
}

/**
* Creates a processor pipeline using the provided registry.
*
* @param registry the message processor registry
* @return a function that processes messages through the pipeline
*/
private static Function<byte[], byte[]> createJsonProcessorPipeline(final MessageProcessorRegistry registry) {
final var pipeline = registry.pipeline("parseJson", "addSource", "markProcessed", "addTimestamp");
return MessageProcessorRegistry.withErrorHandling(pipeline, null);
}

/**
* Gets the processor registry used by this application.
*
* @return the message processor registry
*/
public MessageProcessorRegistry getProcessorRegistry() {
return processorRegistry;
}

/**
* Gets the sink registry used by this application.
*
* @return the message sink registry
*/
public MessageSinkRegistry getSinkRegistry() {
return sinkRegistry;
}

private void start() {
runner.start();
}

private boolean awaitShutdown() {
return runner.awaitShutdown();
}

private Map<String, Long> getMetrics() {
return currentMetrics.get();
}

@Override
public void close() {
runner.close();
}
}
5 changes: 5 additions & 0 deletions app/protobuf/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
description = "KPipe - Kafka Consumer Application Using Protocol Buffers"

application {
mainClass.set("org.kpipe.App")
}
Loading
Loading