Skip to content

Add avro support#58

Merged
eschizoid merged 1 commit intomainfrom
feature/add-avro-support
Jun 12, 2025
Merged

Add avro support#58
eschizoid merged 1 commit intomainfrom
feature/add-avro-support

Conversation

@eschizoid
Copy link
Owner

@eschizoid eschizoid commented Jun 4, 2025

The main purpose of this PR is to add support for the Avro binary format (#8). At the same time, I would like to achieve parity with what we have in JsonMessageProcessor.java.

I am still debating whether this is overkill or unnecessary in terms of the APIs exposed, and I am wondering if we should remove some of the APIs I created as part of this PR. I completely understand that Avro is not meant to be treated as JSON.

Wondering what you think, @rkolesnev. Do you see any value here?

@eschizoid eschizoid force-pushed the feature/add-avro-support branch from aeaee97 to 62c561e Compare June 4, 2025 04:53
@codecov
Copy link

codecov bot commented Jun 4, 2025

Codecov Report

Attention: Patch coverage is 67.27273% with 90 lines in your changes missing coverage. Please review.

Project coverage is 77.01%. Comparing base (8301cfb) to head (f70f030).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...java/org/kpipe/processor/AvroMessageProcessor.java 68.75% 31 Missing and 24 partials ⚠️
...rc/main/java/org/kpipe/registry/MessageFormat.java 66.66% 19 Missing and 6 partials ⚠️
...a/org/kpipe/registry/MessageProcessorRegistry.java 58.33% 10 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main      #58      +/-   ##
============================================
+ Coverage     76.22%   77.01%   +0.79%     
- Complexity      265      332      +67     
============================================
  Files            18       20       +2     
  Lines          1106     1392     +286     
  Branches         98      134      +36     
============================================
+ Hits            843     1072     +229     
- Misses          204      234      +30     
- Partials         59       86      +27     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@eschizoid eschizoid force-pushed the feature/add-avro-support branch 3 times, most recently from 2066fe9 to 6c99f1d Compare June 4, 2025 05:25
@eschizoid eschizoid marked this pull request as draft June 4, 2025 05:26
@eschizoid eschizoid requested a review from rkolesnev June 4, 2025 05:34
@eschizoid eschizoid marked this pull request as ready for review June 4, 2025 05:34
@eschizoid
Copy link
Owner Author

One of the reasons this PR can add value is that Avro, paired with the schema registry, is a combination that is quite common in large enterprises.

Copy link
Collaborator

@rkolesnev rkolesnev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally all looks OK to me - but I am not very deep into avro schema parsing / manipulation...
Few questions though - I do not fully follow why parsing is decoupled from Consumer?
Even when it is decoupled from Consumer - why not use Kafka Avro deserializers? they can be used outside of Consumer quite easily as well...
In Kafka (at least Confluent implementation and i guess others keeping compatibility) schemas are stored in external registry and schema ID is encoded in message as magic byte...
The implementation here seems to assume avro messages without magic byte / or with the magic bytes stripped - which looks very non-standard / niche.


private static final Logger LOGGER = System.getLogger(AvroMessageProcessor.class.getName());
private static final byte[] EMPTY_AVRO = new byte[0];
private static final ConcurrentHashMap<String, Schema> SCHEMA_REGISTRY = new ConcurrentHashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't more typical be integration with external schema registry? i.e. confluent schema registry or Karapace etc..

Copy link
Owner Author

@eschizoid eschizoid Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should support both 😄

I have seen Avro implemented with and without an external schema registry

@rkolesnev
Copy link
Collaborator

Yes in general i don't see that much use of custom Avro deserialization... At least personally I only ever saw Avro used with schema registry...
Avro is more rigid and less allowing then Json - you would need to create new Avro schema and re-encode to add a field that doesn't exist in original schema for example... Json - at least schema-less Json is much easier in that regard...

Comment on lines +455 to +456
if (field.hasDefaultValue()) {
newRecord.put(fieldName, field.defaultVal());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use default value if specified for Strings / numbers as well instead of "" or 0?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix that

@eschizoid eschizoid force-pushed the feature/add-avro-support branch from bec1dd7 to 3fe093c Compare June 4, 2025 23:50
Copy link

@lombardo-chcg lombardo-chcg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice addition to the library with excellent docs! I might even call them world-class docs 😎


private static final ThreadLocal<Schema.Parser> SCHEMA_PARSER = ThreadLocal.withInitial(Schema.Parser::new);
private static final ThreadLocal<ByteArrayOutputStream> OUTPUT_STREAM_CACHE = ThreadLocal.withInitial(() ->
new ByteArrayOutputStream(8192)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps make this a configurable value with a default of 8192?

* byte[] result = pipeline.apply(avroBytes);
* }</pre>
*/
public class AvroMessageProcessor {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The public interface has great documentation. The class itself seems to have a lot of responsibilities, perhaps it can be decomposed into separate classes for schema operations, transformation logic, and core processing logic? That might make it easier later when and if you want to add support for external schema registry, for example.

Copy link
Owner Author

@eschizoid eschizoid Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted some functionality to MessageFormat.java

@eschizoid
Copy link
Owner Author

eschizoid commented Jun 7, 2025

Generally all looks OK to me - but I am not very deep into avro schema parsing / manipulation... Few questions though - I do not fully follow why parsing is decoupled from Consumer? Even when it is decoupled from Consumer - why not use Kafka Avro deserializers? they can be used outside of Consumer quite easily as well... In Kafka (at least Confluent implementation and i guess others keeping compatibility) schemas are stored in external registry and schema ID is encoded in message as magic byte... The implementation here seems to assume avro messages without magic byte / or with the magic bytes stripped - which looks very non-standard / niche.

@rkolesnev These are excellent questions, and let me address all of them and why I think the Function<byte[], byte[]> provides a powerful abstraction and could offer better performance.

Why decoupling parsing from Consumer

Decoupling parsing from the consumer provides several performance benefits:

  1. Parallel Processing: Separating these concerns allows us to scale them independently. Consumers can focus on polling for data while processors and sinks can be optimized separately.

  2. Resource Reuse: Our implementation uses ThreadLocal variables for parsers, encoders, and output streams, which reduces allocation overhead and GC pressure.

  3. Pipeline Optimization: The functional composition pattern allows processors to be chained without unnecessary intermediate object creation.

  4. Batching: We can batch transformations together when they're decoupled, reducing the overhead of repeated serialization/deserialization.

Why not use Kafka Avro deserializers

Our implementation could be faster for several reasons:

  1. No Remote Schema Registry Calls: Kafka Avro deserializers typically make network requests to fetch schemas, which adds latency. Our implementation keeps schemas in memory.

  2. Direct Access: Our processors directly access and modify the bytes/objects without the additional abstraction layers of Kafka's deserializers (aka kafka streams Serdes)

  3. Optimized for Throughput: Thread-local resources and pooled objects reduce allocation overhead in high-throughput scenarios.

  4. DslJson Performance: For JSON processing, we're using DslJson, which is known for high performance compared to other JSON libraries.

Regarding schema registry and magic bytes

You're correct that the standard Confluent format includes magic bytes and schema IDs at the beginning of messages. Our implementation takes a different approach:

  1. Local Schema Management: We register schemas directly within the application instead of an external service, which eliminates network latency.

  2. Flexible Schema Handling: Our implementation allows registering schemas by name and using them directly, which can be more efficient when schemas are known in advance.

If compatibility with Confluent's format is needed, we could enhance the processors to handle the magic bytes and schema ID prefix. This would involve:

public static Function<byte[], byte[]> parseAvroWithMagicBytes(final String schemaName) {
    return avroBytes -> {
        if (avroBytes.length > 5 && avroBytes[0] == 0) {
            byte[] content = Arrays.copyOfRange(avroBytes, 5, avroBytes.length);
            return processAvro(content, getSchema(schemaName), record -> record);
        } else {
            return processAvro(avroBytes, getSchema(schemaName), record -> record);
        }
    };
}

Our current implementation is optimized for performance by reducing network calls, object creation, and memory pressure - especially for high-throughput message processing systems.

@eschizoid eschizoid force-pushed the feature/add-avro-support branch 3 times, most recently from a2228a4 to 7fd4500 Compare June 8, 2025 16:03
Copy link
Collaborator

@rkolesnev rkolesnev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eschizoid eschizoid force-pushed the feature/add-avro-support branch from d67a08e to cb0ddb5 Compare June 12, 2025 07:48
@eschizoid eschizoid force-pushed the feature/add-avro-support branch from 4973ba8 to f70f030 Compare June 12, 2025 14:20
@eschizoid eschizoid merged commit 456df73 into main Jun 12, 2025
3 of 4 checks passed
@eschizoid eschizoid deleted the feature/add-avro-support branch June 12, 2025 15:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants