Kafka Stream custom State Store
I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.
apache-kafka-streams
add a comment |
I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.
apache-kafka-streams
add a comment |
I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.
apache-kafka-streams
I have been readying the doc about state store but it is still not clear to me if it can fit my purpose. I would like to use some Distributed Graph Database as as a state store that other external application can consume from. Is that possible, what effort does that involve and can anyone point me to the class/code that will need to be extended for that functionality to happen.
apache-kafka-streams
apache-kafka-streams
asked Nov 11 '18 at 12:06
MaatDeamonMaatDeamon
1,90232160
1,90232160
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53248566%2fkafka-stream-custom-state-store%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
add a comment |
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
add a comment |
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
You can implement custom state store using Processor API as described here :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
- Your custom state store must implement StateStore.
- You must have an interface to represent the operations available on the store.
- You must provide an implementation of StoreBuilder for creating instances of your store.
- It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.
Implementation will look something like this :
public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V>
// implementation of the actual store
// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V>
void write(K Key, V value);
// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V>
V read(K key);
public class MyCustomStoreBuilder implements StoreBuilder<MyCustomStore<K,V>>
// implementation of the supplier for MyCustomStore
In order to make it queryable;
- Provide an implementation of QueryableStoreType.
- Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.
Example :
public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>>
// Only accept StateStores that are of type MyCustomStore
public boolean accepts(final StateStore stateStore)
return stateStore instanceOf MyCustomStore;
public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName)
return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
answered Nov 11 '18 at 16:16
Nishu TayalNishu Tayal
12.1k73481
12.1k73481
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53248566%2fkafka-stream-custom-state-store%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown