Kafka state store returns null when using Avro
up vote
0
down vote
favorite
We have a kTable where we map the values and materialize to KeyValueStore using the following code:
@Bean
public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
ValueMapper<CancelEvent, CancelEvent> mapper,
SpecificAvroSerde<CancelEvent> serde)
return kStreamBuilder
.table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
.mapValues(mapper, Materialized.as("cancel-event-store"));
And mapper code:
@Override
public CancelEvent apply(CancelEvent value)
if (value.getData().getCancelled())
return null;
return value;
We use Avro for serialization and deserialization. When we try to query the state store
with a key, it always returns null
.
Code for querying the state store:
ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
.store("cancel-event-store", QueryableStoreTypes.keyValueStore());
CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
event
is always null
. While debugging, I iterated through the keyValuestore
, and realized something is appened to the key (magic byte may be!). Please refer the image below
So, the question is how to query with key?
Update 1
private Map<String, Object> kStreamsConfigsProperties()
final Map<String, Object> config = new HashMap<>();
config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());
config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));
config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return config;
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
|
show 3 more comments
up vote
0
down vote
favorite
We have a kTable where we map the values and materialize to KeyValueStore using the following code:
@Bean
public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
ValueMapper<CancelEvent, CancelEvent> mapper,
SpecificAvroSerde<CancelEvent> serde)
return kStreamBuilder
.table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
.mapValues(mapper, Materialized.as("cancel-event-store"));
And mapper code:
@Override
public CancelEvent apply(CancelEvent value)
if (value.getData().getCancelled())
return null;
return value;
We use Avro for serialization and deserialization. When we try to query the state store
with a key, it always returns null
.
Code for querying the state store:
ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
.store("cancel-event-store", QueryableStoreTypes.keyValueStore());
CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
event
is always null
. While debugging, I iterated through the keyValuestore
, and realized something is appened to the key (magic byte may be!). Please refer the image below
So, the question is how to query with key?
Update 1
private Map<String, Object> kStreamsConfigsProperties()
final Map<String, Object> config = new HashMap<>();
config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());
config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));
config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return config;
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
WhatSerde
are specified in you config? BecauseMaterialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not useKTable#filter()
?
– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how doesKTable#filter()
will solve the issue? I'm making the value asnull
, because it has to be consideredtombstone
message on certain criterial. Pleas correct me if I'm doing something wrong
– Thiru
Nov 9 at 6:32
|
show 3 more comments
up vote
0
down vote
favorite
up vote
0
down vote
favorite
We have a kTable where we map the values and materialize to KeyValueStore using the following code:
@Bean
public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
ValueMapper<CancelEvent, CancelEvent> mapper,
SpecificAvroSerde<CancelEvent> serde)
return kStreamBuilder
.table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
.mapValues(mapper, Materialized.as("cancel-event-store"));
And mapper code:
@Override
public CancelEvent apply(CancelEvent value)
if (value.getData().getCancelled())
return null;
return value;
We use Avro for serialization and deserialization. When we try to query the state store
with a key, it always returns null
.
Code for querying the state store:
ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
.store("cancel-event-store", QueryableStoreTypes.keyValueStore());
CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
event
is always null
. While debugging, I iterated through the keyValuestore
, and realized something is appened to the key (magic byte may be!). Please refer the image below
So, the question is how to query with key?
Update 1
private Map<String, Object> kStreamsConfigsProperties()
final Map<String, Object> config = new HashMap<>();
config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());
config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));
config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return config;
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
We have a kTable where we map the values and materialize to KeyValueStore using the following code:
@Bean
public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
ValueMapper<CancelEvent, CancelEvent> mapper,
SpecificAvroSerde<CancelEvent> serde)
return kStreamBuilder
.table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
.mapValues(mapper, Materialized.as("cancel-event-store"));
And mapper code:
@Override
public CancelEvent apply(CancelEvent value)
if (value.getData().getCancelled())
return null;
return value;
We use Avro for serialization and deserialization. When we try to query the state store
with a key, it always returns null
.
Code for querying the state store:
ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
.store("cancel-event-store", QueryableStoreTypes.keyValueStore());
CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
event
is always null
. While debugging, I iterated through the keyValuestore
, and realized something is appened to the key (magic byte may be!). Please refer the image below
So, the question is how to query with key?
Update 1
private Map<String, Object> kStreamsConfigsProperties()
final Map<String, Object> config = new HashMap<>();
config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());
config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));
config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return config;
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
java apache-kafka apache-kafka-streams spring-kafka confluent-schema-registry
edited Nov 9 at 6:27
asked Nov 8 at 14:15
Thiru
1,152717
1,152717
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
WhatSerde
are specified in you config? BecauseMaterialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not useKTable#filter()
?
– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how doesKTable#filter()
will solve the issue? I'm making the value asnull
, because it has to be consideredtombstone
message on certain criterial. Pleas correct me if I'm doing something wrong
– Thiru
Nov 9 at 6:32
|
show 3 more comments
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
WhatSerde
are specified in you config? BecauseMaterialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not useKTable#filter()
?
– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how doesKTable#filter()
will solve the issue? I'm making the value asnull
, because it has to be consideredtombstone
message on certain criterial. Pleas correct me if I'm doing something wrong
– Thiru
Nov 9 at 6:32
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
What
Serde
are specified in you config? Because Materialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not use KTable#filter()
?– Matthias J. Sax
Nov 8 at 22:35
What
Serde
are specified in you config? Because Materialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not use KTable#filter()
?– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how does
KTable#filter()
will solve the issue? I'm making the value as null
, because it has to be considered tombstone
message on certain criterial. Pleas correct me if I'm doing something wrong– Thiru
Nov 9 at 6:32
And how does
KTable#filter()
will solve the issue? I'm making the value as null
, because it has to be considered tombstone
message on certain criterial. Pleas correct me if I'm doing something wrong– Thiru
Nov 9 at 6:32
|
show 3 more comments
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53209572%2fkafka-state-store-returns-null-when-using-avro%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Did you check the data in KTable if value contains null or data?
– Nishu Tayal
Nov 8 at 14:18
Yes, it has values and its not null
– Thiru
Nov 8 at 14:29
What
Serde
are specified in you config? BecauseMaterialized
does not specify serdes explicitly, the config ones will be used. Side remark: why not useKTable#filter()
?– Matthias J. Sax
Nov 8 at 22:35
Thanks @MatthiasJ.Sac, I have updated the question with config
– Thiru
Nov 9 at 6:28
And how does
KTable#filter()
will solve the issue? I'm making the value asnull
, because it has to be consideredtombstone
message on certain criterial. Pleas correct me if I'm doing something wrong– Thiru
Nov 9 at 6:32