Conversation
aeaee97 to
62c561e
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #58 +/- ##
============================================
+ Coverage 76.22% 77.01% +0.79%
- Complexity 265 332 +67
============================================
Files 18 20 +2
Lines 1106 1392 +286
Branches 98 134 +36
============================================
+ Hits 843 1072 +229
- Misses 204 234 +30
- Partials 59 86 +27 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
2066fe9 to
6c99f1d
Compare
|
One of the reasons this PR can add value is that Avro, paired with the schema registry, is a combination that is quite common in large enterprises. |
rkolesnev
left a comment
There was a problem hiding this comment.
Generally all looks OK to me - but I am not very deep into avro schema parsing / manipulation...
Few questions though - I do not fully follow why parsing is decoupled from Consumer?
Even when it is decoupled from Consumer - why not use Kafka Avro deserializers? they can be used outside of Consumer quite easily as well...
In Kafka (at least Confluent implementation and i guess others keeping compatibility) schemas are stored in external registry and schema ID is encoded in message as magic byte...
The implementation here seems to assume avro messages without magic byte / or with the magic bytes stripped - which looks very non-standard / niche.
|
|
||
| private static final Logger LOGGER = System.getLogger(AvroMessageProcessor.class.getName()); | ||
| private static final byte[] EMPTY_AVRO = new byte[0]; | ||
| private static final ConcurrentHashMap<String, Schema> SCHEMA_REGISTRY = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Wouldn't more typical be integration with external schema registry? i.e. confluent schema registry or Karapace etc..
There was a problem hiding this comment.
I think we should support both 😄
I have seen Avro implemented with and without an external schema registry
|
Yes in general i don't see that much use of custom Avro deserialization... At least personally I only ever saw Avro used with schema registry... |
| if (field.hasDefaultValue()) { | ||
| newRecord.put(fieldName, field.defaultVal()); |
There was a problem hiding this comment.
should use default value if specified for Strings / numbers as well instead of "" or 0?
bec1dd7 to
3fe093c
Compare
lombardo-chcg
left a comment
There was a problem hiding this comment.
Nice addition to the library with excellent docs! I might even call them world-class docs 😎
|
|
||
| private static final ThreadLocal<Schema.Parser> SCHEMA_PARSER = ThreadLocal.withInitial(Schema.Parser::new); | ||
| private static final ThreadLocal<ByteArrayOutputStream> OUTPUT_STREAM_CACHE = ThreadLocal.withInitial(() -> | ||
| new ByteArrayOutputStream(8192) |
There was a problem hiding this comment.
Perhaps make this a configurable value with a default of 8192?
| * byte[] result = pipeline.apply(avroBytes); | ||
| * }</pre> | ||
| */ | ||
| public class AvroMessageProcessor { |
There was a problem hiding this comment.
The public interface has great documentation. The class itself seems to have a lot of responsibilities, perhaps it can be decomposed into separate classes for schema operations, transformation logic, and core processing logic? That might make it easier later when and if you want to add support for external schema registry, for example.
There was a problem hiding this comment.
Extracted some functionality to MessageFormat.java
@rkolesnev These are excellent questions, and let me address all of them and why I think the
Decoupling parsing from the consumer provides several performance benefits:
Our implementation could be faster for several reasons:
You're correct that the standard Confluent format includes magic bytes and schema IDs at the beginning of messages. Our implementation takes a different approach:
If compatibility with Confluent's format is needed, we could enhance the processors to handle the magic bytes and schema ID prefix. This would involve: public static Function<byte[], byte[]> parseAvroWithMagicBytes(final String schemaName) {
return avroBytes -> {
if (avroBytes.length > 5 && avroBytes[0] == 0) {
byte[] content = Arrays.copyOfRange(avroBytes, 5, avroBytes.length);
return processAvro(content, getSchema(schemaName), record -> record);
} else {
return processAvro(avroBytes, getSchema(schemaName), record -> record);
}
};
}Our current implementation is optimized for performance by reducing network calls, object creation, and memory pressure - especially for high-throughput message processing systems. |
a2228a4 to
7fd4500
Compare
d67a08e to
cb0ddb5
Compare
4973ba8 to
f70f030
Compare
The main purpose of this PR is to add support for the
Avrobinary format (#8). At the same time, I would like to achieve parity with what we have inJsonMessageProcessor.java.I am still debating whether this is overkill or unnecessary in terms of the APIs exposed, and I am wondering if we should remove some of the APIs I created as part of this PR. I completely understand that
Avrois not meant to be treated asJSON.Wondering what you think, @rkolesnev. Do you see any value here?