Kafka state store returns null when using Avro

Multi tool use
up vote
0
down vote
favorite
We have a kTable where we map the values and materialize to KeyValueStore using the following code:
@Bean
public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
ValueMapper<CancelEvent, CancelEvent> mapper,
SpecificAvroSerde<CancelEvent> serde)
return kStreamBuilder
.table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
.mapValues(mapper, Materialized.as("cancel-event-store"));
And mapper code:
@Override
public CancelEvent apply(CancelEvent value)
if (value.getData().getCancelled())
return null;
return value;
We use Avro for serialization and deserialization. When we try to query the state store
with a key, it always returns null
.
Code for querying the state store:
ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
.store("cancel-event-store", QueryableStoreTypes.keyValueStore());
CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
event
is always null
. While debugging, I iterated through the keyValuestore
, and realized something is appened to the key (magic byte may be!). Please refer the image below
So, the question is how to query with key?
Update 1
private Map<String, Object> kStreamsConfigsProperties()
final Map<String, Object> config = new HashMap<>();
config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());
config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));
config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return config;
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
|
show 3 more comments
up vote
0
down vote
favorite
We have a kTable where we map the values and materialize to KeyValueStore using the following code:
@Bean
public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
ValueMapper<CancelEvent, CancelEvent> mapper,
SpecificAvroSerde<CancelEvent> serde)
return kStreamBuilder
.table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
.mapValues(mapper, Materialized.as("cancel-event-store"));
And mapper code:
@Override
public CancelEvent apply(CancelEvent value)
if (value.getData().getCancelled())
return null;
return value;
We use Avro for serialization and deserialization. When we try to query the state store
with a key, it always returns null
.
Code for querying the state store:
ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
.store("cancel-event-store", QueryableStoreTypes.keyValueStore());
CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
event
is always null
. While debugging, I iterated through the keyValuestore
, and realized something is appened to the key (magic byte may be!). Please refer the image below
So, the question is how to query with key?
Update 1
private Map<String, Object> kStreamsConfigsProperties()
final Map<String, Object> config = new HashMap<>();
config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());
config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));
config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return config;
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
WhatSerde
are specified in you config? BecauseMaterialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not useKTable#filter()
?
– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how doesKTable#filter()
will solve the issue? I'm making the value asnull
, because it has to be consideredtombstone
message on certain criterial. Pleas correct me if I'm doing something wrong
– Thiru
Nov 9 at 6:32
|
show 3 more comments
up vote
0
down vote
favorite
up vote
0
down vote
favorite
We have a kTable where we map the values and materialize to KeyValueStore using the following code:
@Bean
public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
ValueMapper<CancelEvent, CancelEvent> mapper,
SpecificAvroSerde<CancelEvent> serde)
return kStreamBuilder
.table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
.mapValues(mapper, Materialized.as("cancel-event-store"));
And mapper code:
@Override
public CancelEvent apply(CancelEvent value)
if (value.getData().getCancelled())
return null;
return value;
We use Avro for serialization and deserialization. When we try to query the state store
with a key, it always returns null
.
Code for querying the state store:
ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
.store("cancel-event-store", QueryableStoreTypes.keyValueStore());
CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
event
is always null
. While debugging, I iterated through the keyValuestore
, and realized something is appened to the key (magic byte may be!). Please refer the image below
So, the question is how to query with key?
Update 1
private Map<String, Object> kStreamsConfigsProperties()
final Map<String, Object> config = new HashMap<>();
config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());
config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));
config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return config;
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
We have a kTable where we map the values and materialize to KeyValueStore using the following code:
@Bean
public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
ValueMapper<CancelEvent, CancelEvent> mapper,
SpecificAvroSerde<CancelEvent> serde)
return kStreamBuilder
.table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
.mapValues(mapper, Materialized.as("cancel-event-store"));
And mapper code:
@Override
public CancelEvent apply(CancelEvent value)
if (value.getData().getCancelled())
return null;
return value;
We use Avro for serialization and deserialization. When we try to query the state store
with a key, it always returns null
.
Code for querying the state store:
ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
.store("cancel-event-store", QueryableStoreTypes.keyValueStore());
CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
event
is always null
. While debugging, I iterated through the keyValuestore
, and realized something is appened to the key (magic byte may be!). Please refer the image below
So, the question is how to query with key?
Update 1
private Map<String, Object> kStreamsConfigsProperties()
final Map<String, Object> config = new HashMap<>();
config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());
config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));
config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return config;
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
edited Nov 9 at 6:27
asked Nov 8 at 14:15


Thiru
1,152717
1,152717
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
WhatSerde
are specified in you config? BecauseMaterialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not useKTable#filter()
?
– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how doesKTable#filter()
will solve the issue? I'm making the value asnull
, because it has to be consideredtombstone
message on certain criterial. Pleas correct me if I'm doing something wrong
– Thiru
Nov 9 at 6:32
|
show 3 more comments
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
WhatSerde
are specified in you config? BecauseMaterialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not useKTable#filter()
?
– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how doesKTable#filter()
will solve the issue? I'm making the value asnull
, because it has to be consideredtombstone
message on certain criterial. Pleas correct me if I'm doing something wrong
– Thiru
Nov 9 at 6:32
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
What
Serde
are specified in you config? Because Materialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not use KTable#filter()
?– Matthias J. Sax
Nov 8 at 22:35
What
Serde
are specified in you config? Because Materialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not use KTable#filter()
?– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how does
KTable#filter()
will solve the issue? I'm making the value as null
, because it has to be considered tombstone
message on certain criterial. Pleas correct me if I'm doing something wrong– Thiru
Nov 9 at 6:32
And how does
KTable#filter()
will solve the issue? I'm making the value as null
, because it has to be considered tombstone
message on certain criterial. Pleas correct me if I'm doing something wrong– Thiru
Nov 9 at 6:32
|
show 3 more comments
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53209572%2fkafka-state-store-returns-null-when-using-avro%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
2U3GEjm oMouJFK,LZ6LVSDSwuU2a 7Wo1,F
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
What
Serde
are specified in you config? BecauseMaterialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not useKTable#filter()
?– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how does
KTable#filter()
will solve the issue? I'm making the value asnull
, because it has to be consideredtombstone
message on certain criterial. Pleas correct me if I'm doing something wrong– Thiru
Nov 9 at 6:32