Hi all
This might be my issue but I seem to be unable to read from topics that have snappy compressed messages on them
The Kafka console methods are able to read these topics (and as a result I suspect, but I have not tested, that java clients are able to as well).
My python test code is really simple
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
kafka = KafkaClient('compute-10-2-60-63.us-east-1.urx.internal', 6667)
consumer = SimpleConsumer(kafka, 'etl-pipeline', 'raw-clicks')
it = iter(consumer)
next(it)
This gives me
greg@localhost ~/projects/kafka-python $ ipython -i -- ./test.py
Python 2.7.6 (default, Dec 1 2013, 21:04:11)
Type "copyright", "credits" or "license" for more information.
IPython 1.1.0 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
---------------------------------------------------------------------------
UncompressError Traceback (most recent call last)
/usr/lib64/python2.7/site-packages/IPython/utils/py3compat.pyc in execfile(fname, *where)
202 else:
203 filename = fname
--> 204 __builtin__.execfile(filename, *where)
/home/greg/projects/kafka-python/test.py in <module>()
5 consumer = SimpleConsumer(kafka, 'etl-pipeline', 'raw-clicks')
6 it = iter(consumer)
----> 7 next(it)
/home/greg/projects/kafka-python/kafka/consumer.pyc in __iter__(self)
392
393 while True:
--> 394 message = self.get_message(True, timeout)
395 if message:
396 yield message
/home/greg/projects/kafka-python/kafka/consumer.pyc in get_message(self, block, timeout, get_partition_info)
351
352 def get_message(self, block=True, timeout=0.1, get_partition_info=None):
--> 353 return self._get_message(block, timeout, get_partition_info)
354
355 def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
/home/greg/projects/kafka-python/kafka/consumer.pyc in _get_message(self, block, timeout, get_partition_info, update_offset)
364 # We're out of messages, go grab some more.
365 with FetchContext(self, block, timeout):
--> 366 self._fetch()
367 try:
368 partition, message = self.queue.get_nowait()
/home/greg/projects/kafka-python/kafka/consumer.pyc in _fetch(self)
422 partition = resp.partition
423 try:
--> 424 for message in resp.messages:
425 # Put the message in our queue
426 self.queue.put((partition, message))
/home/greg/projects/kafka-python/kafka/protocol.pyc in _decode_message_set_iter(cls, data)
116 ((offset, ), cur) = relative_unpack('>q', data, cur)
117 (msg, cur) = read_int_string(data, cur)
--> 118 for (offset, message) in KafkaProtocol._decode_message(msg, offset):
119 read_message = True
120 yield OffsetAndMessage(offset, message)
/home/greg/projects/kafka-python/kafka/protocol.pyc in _decode_message(cls, data, offset)
163
164 elif codec == KafkaProtocol.CODEC_SNAPPY:
--> 165 snp = snappy_decode(value)
166 for (offset, msg) in KafkaProtocol._decode_message_set_iter(snp):
167 yield (offset, msg)
/home/greg/projects/kafka-python/kafka/codec.pyc in snappy_decode(payload)
46 if not _has_snappy:
47 raise NotImplementedError("Snappy codec is not available")
---> 48 return snappy.decompress(payload)
UncompressError: Error while decompressing: invalid input
In [1]: %debug
> /home/greg/projects/kafka-python/kafka/codec.py(48)snappy_decode()
46 if not _has_snappy:
47 raise NotImplementedError("Snappy codec is not available")
---> 48 return snappy.decompress(payload)
ipdb> payload
'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x04\xdb\xb9\r\x00\x00\x15\x01....
open('/tmp/out', 'w').write(payload)
It is interesting to note that the payload contains the literal word SNAPPY as if it is being used as a magic marker, from python snappy seems to compress strings without this marker
In [1]: import snappy
In [2]: snappy.compress('aaaaaaaaaaaaaaaa')
Out[2]: '\x10<aaaaaaaaaaaaaaaa'
I also note that with the following C test program, the google snappy library itself cannot uncompress the data
#include <stdio.h>
#include <snappy-c.h>
#include <stdlib.h>
int main(int argc, char** argv) {
char* buffer = calloc(1024*1024, sizeof(char));
if (buffer == NULL) {
return -1;
}
FILE *fp = fopen("/tmp/out", "r");
int i = 0;
fseek(fp, 0, SEEK_END);
int length = 0;
length = ftell(fp);
fseek(fp, 0, SEEK_SET);
char* input;
input = malloc(length);
if (input) {
fread(input, 1, length, fp);
}
fclose(fp);
size_t len2 = 1024*1024;
if (snappy_uncompress(input, length, buffer, &len2) != SNAPPY_OK) {
printf("BAD BAD BAD\n");
} else {
printf("%s\n", buffer);
}
return 0;
}
..... This leads me to reason that maybe the protocol handling for snappy might be not quite right (or just as likely I have done something moronic)
I am using:
- kafka-python as of master
- snappy-python as of whatever is in pypi
- snappy 1.1.1 (not the recommended version, but what I had already on my machine)
- kafka 0.8.0 (scala 2.10)
I will keep poking at it to see if I can figure out why its not working right.
Any thoughts ?
Hi all
This might be my issue but I seem to be unable to read from topics that have snappy compressed messages on them
The Kafka console methods are able to read these topics (and as a result I suspect, but I have not tested, that java clients are able to as well).
My python test code is really simple
This gives me
It is interesting to note that the payload contains the literal word SNAPPY as if it is being used as a magic marker, from python snappy seems to compress strings without this marker
I also note that with the following C test program, the google snappy library itself cannot uncompress the data
..... This leads me to reason that maybe the protocol handling for snappy might be not quite right (or just as likely I have done something moronic)
I am using:
I will keep poking at it to see if I can figure out why its not working right.
Any thoughts ?