CompletableFuture: proper way to run a list of futures, wait for result and handle exception
I have a legacy code which have dozen database calls to populate a report, it takes noticeable amount of time which I try to reduce using CompletableFuture
.
I have some doubts that I do things correctly and not overuse this technology.
My code now looks like this:
Start asynchronous population of document sections with many database calls inside each methods
CompletableFuture section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture section1oFuture = CompletableFuture.supplyAsync(() -> populateSection10(arguments));Then I'm arranging futures in specific order in
arrayList
and joining all of them to make sure that my code will run further only when all futures are finished.List<CompletableFuture> futures = Arrays.asList(
section1Future,
section2Future, ...
section10Future);
List<Object> futureResults = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());Then I'm populating PDF document itself with its pieces
Optional.ofNullable((PdfPTable) futureResults.get(0)).ifPresent(el -> populatePdfElement(document, el));
Optional.ofNullable((PdfPTable) futureResults.get(1)).ifPresent(el -> populatePdfElement(document, el));
...
Optional.ofNullable((PdfPTable) futureResults.get(10)).ifPresent(el -> populatePdfElement(document, el));return document
My concerns are:
1) Is it okay to create and instantiate many Completable Futures in such way? Order them in required sequence in arrayList
, join them to make sure that they are all finished, and then get result by casting them into specific object?
2) Is it okay to run without specifying an executor service but to rely on common ForkJoinPool
? However this code runs in web container, so probably in order to use JTA I need to use container provided thread pool executor via JNDI?
3) If this code is surrounded in try-catch I should be able to catch CompletionException
in main thread, right? Or In order to do that I should declare each features like following:
CompletableFuture.supplyAsync(() -> populateSection1(arguments))
.exceptionally (ex ->
throw new RuntimeException(ex.getCause());
);
4) Is it possible to overuse CompletableFutures so they become a performance bottleneck itself? Like many futures waits one executor to start running? How to avoid that? Use container provided executor service?
If yes, could someone please point me to some best practice on how to correctly configure executor service taking to account processors and memory amount?
5) A memory impact. I read in parallel thread that there can be a problem with OOME as many object are created and garbage collected. Is there a best practice on how to calculate correct amount of memory required for application?
java asynchronous design-patterns java-8 completable-future
add a comment |
I have a legacy code which have dozen database calls to populate a report, it takes noticeable amount of time which I try to reduce using CompletableFuture
.
I have some doubts that I do things correctly and not overuse this technology.
My code now looks like this:
Start asynchronous population of document sections with many database calls inside each methods
CompletableFuture section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture section1oFuture = CompletableFuture.supplyAsync(() -> populateSection10(arguments));Then I'm arranging futures in specific order in
arrayList
and joining all of them to make sure that my code will run further only when all futures are finished.List<CompletableFuture> futures = Arrays.asList(
section1Future,
section2Future, ...
section10Future);
List<Object> futureResults = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());Then I'm populating PDF document itself with its pieces
Optional.ofNullable((PdfPTable) futureResults.get(0)).ifPresent(el -> populatePdfElement(document, el));
Optional.ofNullable((PdfPTable) futureResults.get(1)).ifPresent(el -> populatePdfElement(document, el));
...
Optional.ofNullable((PdfPTable) futureResults.get(10)).ifPresent(el -> populatePdfElement(document, el));return document
My concerns are:
1) Is it okay to create and instantiate many Completable Futures in such way? Order them in required sequence in arrayList
, join them to make sure that they are all finished, and then get result by casting them into specific object?
2) Is it okay to run without specifying an executor service but to rely on common ForkJoinPool
? However this code runs in web container, so probably in order to use JTA I need to use container provided thread pool executor via JNDI?
3) If this code is surrounded in try-catch I should be able to catch CompletionException
in main thread, right? Or In order to do that I should declare each features like following:
CompletableFuture.supplyAsync(() -> populateSection1(arguments))
.exceptionally (ex ->
throw new RuntimeException(ex.getCause());
);
4) Is it possible to overuse CompletableFutures so they become a performance bottleneck itself? Like many futures waits one executor to start running? How to avoid that? Use container provided executor service?
If yes, could someone please point me to some best practice on how to correctly configure executor service taking to account processors and memory amount?
5) A memory impact. I read in parallel thread that there can be a problem with OOME as many object are created and garbage collected. Is there a best practice on how to calculate correct amount of memory required for application?
java asynchronous design-patterns java-8 completable-future
what you really want is to parallelize your program in hope it will run faster. So first you need to depict a schema of your parallelized program, and only then choose the most appropriate tools to implement it. Besides CompletableFuture, there exist threads, parallel and reactive streams, and actors, to name a few. Are you sure CompletableFuture is the best tool for your task?
– Alexei Kaigorodov
Nov 10 '18 at 15:51
Thanks for reply Alexei! Yes I try to parallelize in order to speed up an already written Spring monolith. I'm using CompletableFuture because I didn't have a chance to work with other asynchronous stuff except Play! framework which AFAIK is backed by CF. But if to abstract from application is it okay to instantiate, run, join and get result of CompletableFuture in this way? Am I wrong saying that CompletableFuture is aimed to ease developer life using it instead of threads? P.S I'm not saying that I'm not interested or scary to learn best practices of pure multithreading
– AnMi
Nov 10 '18 at 16:59
add a comment |
I have a legacy code which have dozen database calls to populate a report, it takes noticeable amount of time which I try to reduce using CompletableFuture
.
I have some doubts that I do things correctly and not overuse this technology.
My code now looks like this:
Start asynchronous population of document sections with many database calls inside each methods
CompletableFuture section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture section1oFuture = CompletableFuture.supplyAsync(() -> populateSection10(arguments));Then I'm arranging futures in specific order in
arrayList
and joining all of them to make sure that my code will run further only when all futures are finished.List<CompletableFuture> futures = Arrays.asList(
section1Future,
section2Future, ...
section10Future);
List<Object> futureResults = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());Then I'm populating PDF document itself with its pieces
Optional.ofNullable((PdfPTable) futureResults.get(0)).ifPresent(el -> populatePdfElement(document, el));
Optional.ofNullable((PdfPTable) futureResults.get(1)).ifPresent(el -> populatePdfElement(document, el));
...
Optional.ofNullable((PdfPTable) futureResults.get(10)).ifPresent(el -> populatePdfElement(document, el));return document
My concerns are:
1) Is it okay to create and instantiate many Completable Futures in such way? Order them in required sequence in arrayList
, join them to make sure that they are all finished, and then get result by casting them into specific object?
2) Is it okay to run without specifying an executor service but to rely on common ForkJoinPool
? However this code runs in web container, so probably in order to use JTA I need to use container provided thread pool executor via JNDI?
3) If this code is surrounded in try-catch I should be able to catch CompletionException
in main thread, right? Or In order to do that I should declare each features like following:
CompletableFuture.supplyAsync(() -> populateSection1(arguments))
.exceptionally (ex ->
throw new RuntimeException(ex.getCause());
);
4) Is it possible to overuse CompletableFutures so they become a performance bottleneck itself? Like many futures waits one executor to start running? How to avoid that? Use container provided executor service?
If yes, could someone please point me to some best practice on how to correctly configure executor service taking to account processors and memory amount?
5) A memory impact. I read in parallel thread that there can be a problem with OOME as many object are created and garbage collected. Is there a best practice on how to calculate correct amount of memory required for application?
java asynchronous design-patterns java-8 completable-future
I have a legacy code which have dozen database calls to populate a report, it takes noticeable amount of time which I try to reduce using CompletableFuture
.
I have some doubts that I do things correctly and not overuse this technology.
My code now looks like this:
Start asynchronous population of document sections with many database calls inside each methods
CompletableFuture section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture section1oFuture = CompletableFuture.supplyAsync(() -> populateSection10(arguments));Then I'm arranging futures in specific order in
arrayList
and joining all of them to make sure that my code will run further only when all futures are finished.List<CompletableFuture> futures = Arrays.asList(
section1Future,
section2Future, ...
section10Future);
List<Object> futureResults = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());Then I'm populating PDF document itself with its pieces
Optional.ofNullable((PdfPTable) futureResults.get(0)).ifPresent(el -> populatePdfElement(document, el));
Optional.ofNullable((PdfPTable) futureResults.get(1)).ifPresent(el -> populatePdfElement(document, el));
...
Optional.ofNullable((PdfPTable) futureResults.get(10)).ifPresent(el -> populatePdfElement(document, el));return document
My concerns are:
1) Is it okay to create and instantiate many Completable Futures in such way? Order them in required sequence in arrayList
, join them to make sure that they are all finished, and then get result by casting them into specific object?
2) Is it okay to run without specifying an executor service but to rely on common ForkJoinPool
? However this code runs in web container, so probably in order to use JTA I need to use container provided thread pool executor via JNDI?
3) If this code is surrounded in try-catch I should be able to catch CompletionException
in main thread, right? Or In order to do that I should declare each features like following:
CompletableFuture.supplyAsync(() -> populateSection1(arguments))
.exceptionally (ex ->
throw new RuntimeException(ex.getCause());
);
4) Is it possible to overuse CompletableFutures so they become a performance bottleneck itself? Like many futures waits one executor to start running? How to avoid that? Use container provided executor service?
If yes, could someone please point me to some best practice on how to correctly configure executor service taking to account processors and memory amount?
5) A memory impact. I read in parallel thread that there can be a problem with OOME as many object are created and garbage collected. Is there a best practice on how to calculate correct amount of memory required for application?
java asynchronous design-patterns java-8 completable-future
java asynchronous design-patterns java-8 completable-future
edited Nov 10 '18 at 14:56
Mikhail Kholodkov
4,15452546
4,15452546
asked Nov 10 '18 at 13:13
AnMi
183
183
what you really want is to parallelize your program in hope it will run faster. So first you need to depict a schema of your parallelized program, and only then choose the most appropriate tools to implement it. Besides CompletableFuture, there exist threads, parallel and reactive streams, and actors, to name a few. Are you sure CompletableFuture is the best tool for your task?
– Alexei Kaigorodov
Nov 10 '18 at 15:51
Thanks for reply Alexei! Yes I try to parallelize in order to speed up an already written Spring monolith. I'm using CompletableFuture because I didn't have a chance to work with other asynchronous stuff except Play! framework which AFAIK is backed by CF. But if to abstract from application is it okay to instantiate, run, join and get result of CompletableFuture in this way? Am I wrong saying that CompletableFuture is aimed to ease developer life using it instead of threads? P.S I'm not saying that I'm not interested or scary to learn best practices of pure multithreading
– AnMi
Nov 10 '18 at 16:59
add a comment |
what you really want is to parallelize your program in hope it will run faster. So first you need to depict a schema of your parallelized program, and only then choose the most appropriate tools to implement it. Besides CompletableFuture, there exist threads, parallel and reactive streams, and actors, to name a few. Are you sure CompletableFuture is the best tool for your task?
– Alexei Kaigorodov
Nov 10 '18 at 15:51
Thanks for reply Alexei! Yes I try to parallelize in order to speed up an already written Spring monolith. I'm using CompletableFuture because I didn't have a chance to work with other asynchronous stuff except Play! framework which AFAIK is backed by CF. But if to abstract from application is it okay to instantiate, run, join and get result of CompletableFuture in this way? Am I wrong saying that CompletableFuture is aimed to ease developer life using it instead of threads? P.S I'm not saying that I'm not interested or scary to learn best practices of pure multithreading
– AnMi
Nov 10 '18 at 16:59
what you really want is to parallelize your program in hope it will run faster. So first you need to depict a schema of your parallelized program, and only then choose the most appropriate tools to implement it. Besides CompletableFuture, there exist threads, parallel and reactive streams, and actors, to name a few. Are you sure CompletableFuture is the best tool for your task?
– Alexei Kaigorodov
Nov 10 '18 at 15:51
what you really want is to parallelize your program in hope it will run faster. So first you need to depict a schema of your parallelized program, and only then choose the most appropriate tools to implement it. Besides CompletableFuture, there exist threads, parallel and reactive streams, and actors, to name a few. Are you sure CompletableFuture is the best tool for your task?
– Alexei Kaigorodov
Nov 10 '18 at 15:51
Thanks for reply Alexei! Yes I try to parallelize in order to speed up an already written Spring monolith. I'm using CompletableFuture because I didn't have a chance to work with other asynchronous stuff except Play! framework which AFAIK is backed by CF. But if to abstract from application is it okay to instantiate, run, join and get result of CompletableFuture in this way? Am I wrong saying that CompletableFuture is aimed to ease developer life using it instead of threads? P.S I'm not saying that I'm not interested or scary to learn best practices of pure multithreading
– AnMi
Nov 10 '18 at 16:59
Thanks for reply Alexei! Yes I try to parallelize in order to speed up an already written Spring monolith. I'm using CompletableFuture because I didn't have a chance to work with other asynchronous stuff except Play! framework which AFAIK is backed by CF. But if to abstract from application is it okay to instantiate, run, join and get result of CompletableFuture in this way? Am I wrong saying that CompletableFuture is aimed to ease developer life using it instead of threads? P.S I'm not saying that I'm not interested or scary to learn best practices of pure multithreading
– AnMi
Nov 10 '18 at 16:59
add a comment |
1 Answer
1
active
oldest
votes
The approach is not wrong in general, but there are things to improve.
Most notably, you should not use raw types, like CompletableFuture
.
When populateSection…
returns a PdfPTable
, you should use use CompletableFuture<PdfPTable>
consistently throughout the code.
I.e.
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
even if these methods do not declare the return type you are assuming to be always returned at runtime, you should insert the type cast at this early stage:
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection10(arguments));
Then, you can use
Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
By not using raw types, you already get the desired result type and you can do the 3rd step’s operations, i.e. filtering and performing the final action, right in this stream operation.
If you still need the list, you may use
List<PdfPTable> results = Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
That said, the parallelism depends on the thread pool used for the operation (specified to supplyAsync
). When you don’t specify an executor, you get the default Fork/Join pool used by parallel streams, so in this specific case, you get the same result much simpler as
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
or
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
While both variants ensure that populatePdfElement
will be called in the right order and one at a time, only the latter will perform all calls from the initiating thread.
Regarding exception handling, you’ll get any exception thrown by a supplier wrapped in a CompletionException
when you call CompletableFuture::join
. Chaining something like .exceptionally (ex -> throw new RuntimeException(ex.getCause()); );
makes no sense, the new RuntimeException
will also be wrapped in a CompletionException
when you call CompletableFuture::join
.
In the Stream variant, you’ll get the exception without a wrapper. Since Supplier
does not allow checked exceptions, only subtypes of RuntimeException
or Error
are possible.
The other questions are too broad for the Q&A.
Thanks for your comment @Holger. I like your approach of using forEachOrdered. The one thing I should probably have mentioned is that there is one section which I'm still refactoring (as for now it requires repopulated document) and this section should be inserted in between others in predefined order( Thus I cannot apply this approach in my code for now. Regarding shared ForkJoin pool - AFAIK threads are not supposed to be created within an application but has to be used a container provided executer service which also have an access to transaction manager?
– AnMi
Nov 16 '18 at 18:17
If you are required to use a specific executor service, you have to pass it to thesupplyAsync
method. Then, you can still use the solution of the first part of the answer.
– Holger
Nov 19 '18 at 7:36
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53239294%2fcompletablefuture-proper-way-to-run-a-list-of-futures-wait-for-result-and-hand%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
The approach is not wrong in general, but there are things to improve.
Most notably, you should not use raw types, like CompletableFuture
.
When populateSection…
returns a PdfPTable
, you should use use CompletableFuture<PdfPTable>
consistently throughout the code.
I.e.
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
even if these methods do not declare the return type you are assuming to be always returned at runtime, you should insert the type cast at this early stage:
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection10(arguments));
Then, you can use
Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
By not using raw types, you already get the desired result type and you can do the 3rd step’s operations, i.e. filtering and performing the final action, right in this stream operation.
If you still need the list, you may use
List<PdfPTable> results = Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
That said, the parallelism depends on the thread pool used for the operation (specified to supplyAsync
). When you don’t specify an executor, you get the default Fork/Join pool used by parallel streams, so in this specific case, you get the same result much simpler as
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
or
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
While both variants ensure that populatePdfElement
will be called in the right order and one at a time, only the latter will perform all calls from the initiating thread.
Regarding exception handling, you’ll get any exception thrown by a supplier wrapped in a CompletionException
when you call CompletableFuture::join
. Chaining something like .exceptionally (ex -> throw new RuntimeException(ex.getCause()); );
makes no sense, the new RuntimeException
will also be wrapped in a CompletionException
when you call CompletableFuture::join
.
In the Stream variant, you’ll get the exception without a wrapper. Since Supplier
does not allow checked exceptions, only subtypes of RuntimeException
or Error
are possible.
The other questions are too broad for the Q&A.
Thanks for your comment @Holger. I like your approach of using forEachOrdered. The one thing I should probably have mentioned is that there is one section which I'm still refactoring (as for now it requires repopulated document) and this section should be inserted in between others in predefined order( Thus I cannot apply this approach in my code for now. Regarding shared ForkJoin pool - AFAIK threads are not supposed to be created within an application but has to be used a container provided executer service which also have an access to transaction manager?
– AnMi
Nov 16 '18 at 18:17
If you are required to use a specific executor service, you have to pass it to thesupplyAsync
method. Then, you can still use the solution of the first part of the answer.
– Holger
Nov 19 '18 at 7:36
add a comment |
The approach is not wrong in general, but there are things to improve.
Most notably, you should not use raw types, like CompletableFuture
.
When populateSection…
returns a PdfPTable
, you should use use CompletableFuture<PdfPTable>
consistently throughout the code.
I.e.
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
even if these methods do not declare the return type you are assuming to be always returned at runtime, you should insert the type cast at this early stage:
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection10(arguments));
Then, you can use
Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
By not using raw types, you already get the desired result type and you can do the 3rd step’s operations, i.e. filtering and performing the final action, right in this stream operation.
If you still need the list, you may use
List<PdfPTable> results = Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
That said, the parallelism depends on the thread pool used for the operation (specified to supplyAsync
). When you don’t specify an executor, you get the default Fork/Join pool used by parallel streams, so in this specific case, you get the same result much simpler as
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
or
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
While both variants ensure that populatePdfElement
will be called in the right order and one at a time, only the latter will perform all calls from the initiating thread.
Regarding exception handling, you’ll get any exception thrown by a supplier wrapped in a CompletionException
when you call CompletableFuture::join
. Chaining something like .exceptionally (ex -> throw new RuntimeException(ex.getCause()); );
makes no sense, the new RuntimeException
will also be wrapped in a CompletionException
when you call CompletableFuture::join
.
In the Stream variant, you’ll get the exception without a wrapper. Since Supplier
does not allow checked exceptions, only subtypes of RuntimeException
or Error
are possible.
The other questions are too broad for the Q&A.
Thanks for your comment @Holger. I like your approach of using forEachOrdered. The one thing I should probably have mentioned is that there is one section which I'm still refactoring (as for now it requires repopulated document) and this section should be inserted in between others in predefined order( Thus I cannot apply this approach in my code for now. Regarding shared ForkJoin pool - AFAIK threads are not supposed to be created within an application but has to be used a container provided executer service which also have an access to transaction manager?
– AnMi
Nov 16 '18 at 18:17
If you are required to use a specific executor service, you have to pass it to thesupplyAsync
method. Then, you can still use the solution of the first part of the answer.
– Holger
Nov 19 '18 at 7:36
add a comment |
The approach is not wrong in general, but there are things to improve.
Most notably, you should not use raw types, like CompletableFuture
.
When populateSection…
returns a PdfPTable
, you should use use CompletableFuture<PdfPTable>
consistently throughout the code.
I.e.
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
even if these methods do not declare the return type you are assuming to be always returned at runtime, you should insert the type cast at this early stage:
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection10(arguments));
Then, you can use
Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
By not using raw types, you already get the desired result type and you can do the 3rd step’s operations, i.e. filtering and performing the final action, right in this stream operation.
If you still need the list, you may use
List<PdfPTable> results = Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
That said, the parallelism depends on the thread pool used for the operation (specified to supplyAsync
). When you don’t specify an executor, you get the default Fork/Join pool used by parallel streams, so in this specific case, you get the same result much simpler as
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
or
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
While both variants ensure that populatePdfElement
will be called in the right order and one at a time, only the latter will perform all calls from the initiating thread.
Regarding exception handling, you’ll get any exception thrown by a supplier wrapped in a CompletionException
when you call CompletableFuture::join
. Chaining something like .exceptionally (ex -> throw new RuntimeException(ex.getCause()); );
makes no sense, the new RuntimeException
will also be wrapped in a CompletionException
when you call CompletableFuture::join
.
In the Stream variant, you’ll get the exception without a wrapper. Since Supplier
does not allow checked exceptions, only subtypes of RuntimeException
or Error
are possible.
The other questions are too broad for the Q&A.
The approach is not wrong in general, but there are things to improve.
Most notably, you should not use raw types, like CompletableFuture
.
When populateSection…
returns a PdfPTable
, you should use use CompletableFuture<PdfPTable>
consistently throughout the code.
I.e.
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
even if these methods do not declare the return type you are assuming to be always returned at runtime, you should insert the type cast at this early stage:
CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection2(arguments));
...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection10(arguments));
Then, you can use
Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
By not using raw types, you already get the desired result type and you can do the 3rd step’s operations, i.e. filtering and performing the final action, right in this stream operation.
If you still need the list, you may use
List<PdfPTable> results = Stream.of(section1Future, section2Future, ..., section10Future)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
That said, the parallelism depends on the thread pool used for the operation (specified to supplyAsync
). When you don’t specify an executor, you get the default Fork/Join pool used by parallel streams, so in this specific case, you get the same result much simpler as
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.forEachOrdered(el -> populatePdfElement(document, el));
or
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
() -> populateSection1(arguments),
() -> populateSection2(arguments));
...
() -> populateSection10(arguments)))
.parallel()
.map(Supplier::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
results.forEach(el -> populatePdfElement(document, el));
While both variants ensure that populatePdfElement
will be called in the right order and one at a time, only the latter will perform all calls from the initiating thread.
Regarding exception handling, you’ll get any exception thrown by a supplier wrapped in a CompletionException
when you call CompletableFuture::join
. Chaining something like .exceptionally (ex -> throw new RuntimeException(ex.getCause()); );
makes no sense, the new RuntimeException
will also be wrapped in a CompletionException
when you call CompletableFuture::join
.
In the Stream variant, you’ll get the exception without a wrapper. Since Supplier
does not allow checked exceptions, only subtypes of RuntimeException
or Error
are possible.
The other questions are too broad for the Q&A.
answered Nov 12 '18 at 13:41
Holger
162k23231437
162k23231437
Thanks for your comment @Holger. I like your approach of using forEachOrdered. The one thing I should probably have mentioned is that there is one section which I'm still refactoring (as for now it requires repopulated document) and this section should be inserted in between others in predefined order( Thus I cannot apply this approach in my code for now. Regarding shared ForkJoin pool - AFAIK threads are not supposed to be created within an application but has to be used a container provided executer service which also have an access to transaction manager?
– AnMi
Nov 16 '18 at 18:17
If you are required to use a specific executor service, you have to pass it to thesupplyAsync
method. Then, you can still use the solution of the first part of the answer.
– Holger
Nov 19 '18 at 7:36
add a comment |
Thanks for your comment @Holger. I like your approach of using forEachOrdered. The one thing I should probably have mentioned is that there is one section which I'm still refactoring (as for now it requires repopulated document) and this section should be inserted in between others in predefined order( Thus I cannot apply this approach in my code for now. Regarding shared ForkJoin pool - AFAIK threads are not supposed to be created within an application but has to be used a container provided executer service which also have an access to transaction manager?
– AnMi
Nov 16 '18 at 18:17
If you are required to use a specific executor service, you have to pass it to thesupplyAsync
method. Then, you can still use the solution of the first part of the answer.
– Holger
Nov 19 '18 at 7:36
Thanks for your comment @Holger. I like your approach of using forEachOrdered. The one thing I should probably have mentioned is that there is one section which I'm still refactoring (as for now it requires repopulated document) and this section should be inserted in between others in predefined order( Thus I cannot apply this approach in my code for now. Regarding shared ForkJoin pool - AFAIK threads are not supposed to be created within an application but has to be used a container provided executer service which also have an access to transaction manager?
– AnMi
Nov 16 '18 at 18:17
Thanks for your comment @Holger. I like your approach of using forEachOrdered. The one thing I should probably have mentioned is that there is one section which I'm still refactoring (as for now it requires repopulated document) and this section should be inserted in between others in predefined order( Thus I cannot apply this approach in my code for now. Regarding shared ForkJoin pool - AFAIK threads are not supposed to be created within an application but has to be used a container provided executer service which also have an access to transaction manager?
– AnMi
Nov 16 '18 at 18:17
If you are required to use a specific executor service, you have to pass it to the
supplyAsync
method. Then, you can still use the solution of the first part of the answer.– Holger
Nov 19 '18 at 7:36
If you are required to use a specific executor service, you have to pass it to the
supplyAsync
method. Then, you can still use the solution of the first part of the answer.– Holger
Nov 19 '18 at 7:36
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53239294%2fcompletablefuture-proper-way-to-run-a-list-of-futures-wait-for-result-and-hand%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
what you really want is to parallelize your program in hope it will run faster. So first you need to depict a schema of your parallelized program, and only then choose the most appropriate tools to implement it. Besides CompletableFuture, there exist threads, parallel and reactive streams, and actors, to name a few. Are you sure CompletableFuture is the best tool for your task?
– Alexei Kaigorodov
Nov 10 '18 at 15:51
Thanks for reply Alexei! Yes I try to parallelize in order to speed up an already written Spring monolith. I'm using CompletableFuture because I didn't have a chance to work with other asynchronous stuff except Play! framework which AFAIK is backed by CF. But if to abstract from application is it okay to instantiate, run, join and get result of CompletableFuture in this way? Am I wrong saying that CompletableFuture is aimed to ease developer life using it instead of threads? P.S I'm not saying that I'm not interested or scary to learn best practices of pure multithreading
– AnMi
Nov 10 '18 at 16:59