How do I write a futures::Stream to disk without storing it entirely in memory first?










0















There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?



The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?










share|improve this question
























  • for x in stream file.write_all(&x) something like that...

    – Stargateur
    Nov 11 '18 at 4:04











  • That would require StreamingBody to be an iterator, which it is not.

    – Nicholas Bishop
    Nov 11 '18 at 4:09















0















There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?



The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?










share|improve this question
























  • for x in stream file.write_all(&x) something like that...

    – Stargateur
    Nov 11 '18 at 4:04











  • That would require StreamingBody to be an iterator, which it is not.

    – Nicholas Bishop
    Nov 11 '18 at 4:09













0












0








0








There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?



The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?










share|improve this question
















There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?



The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?







stream rust future






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 11 '18 at 4:26









Shepmaster

151k13292431




151k13292431










asked Nov 11 '18 at 2:46









Nicholas BishopNicholas Bishop

572413




572413












  • for x in stream file.write_all(&x) something like that...

    – Stargateur
    Nov 11 '18 at 4:04











  • That would require StreamingBody to be an iterator, which it is not.

    – Nicholas Bishop
    Nov 11 '18 at 4:09

















  • for x in stream file.write_all(&x) something like that...

    – Stargateur
    Nov 11 '18 at 4:04











  • That would require StreamingBody to be an iterator, which it is not.

    – Nicholas Bishop
    Nov 11 '18 at 4:09
















for x in stream file.write_all(&x) something like that...

– Stargateur
Nov 11 '18 at 4:04





for x in stream file.write_all(&x) something like that...

– Stargateur
Nov 11 '18 at 4:04













That would require StreamingBody to be an iterator, which it is not.

– Nicholas Bishop
Nov 11 '18 at 4:09





That would require StreamingBody to be an iterator, which it is not.

– Nicholas Bishop
Nov 11 '18 at 4:09












1 Answer
1






active

oldest

votes


















0














Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> file.write_all(&chunk).map_err(Into::into))



We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?





share|improve this answer

























  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?

    – Nicholas Bishop
    Nov 12 '18 at 2:45






  • 1





    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)

    – Sven Marnach
    Nov 12 '18 at 10:12











  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.

    – Nicholas Bishop
    Nov 12 '18 at 15:11











  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.

    – Sven Marnach
    Nov 12 '18 at 15:27






  • 1





    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.

    – Shepmaster
    Nov 12 '18 at 16:29










Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245412%2fhow-do-i-write-a-futuresstream-to-disk-without-storing-it-entirely-in-memory-f%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









0














Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> file.write_all(&chunk).map_err(Into::into))



We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?





share|improve this answer

























  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?

    – Nicholas Bishop
    Nov 12 '18 at 2:45






  • 1





    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)

    – Sven Marnach
    Nov 12 '18 at 10:12











  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.

    – Nicholas Bishop
    Nov 12 '18 at 15:11











  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.

    – Sven Marnach
    Nov 12 '18 at 15:27






  • 1





    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.

    – Shepmaster
    Nov 12 '18 at 16:29















0














Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> file.write_all(&chunk).map_err(Into::into))



We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?





share|improve this answer

























  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?

    – Nicholas Bishop
    Nov 12 '18 at 2:45






  • 1





    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)

    – Sven Marnach
    Nov 12 '18 at 10:12











  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.

    – Nicholas Bishop
    Nov 12 '18 at 15:11











  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.

    – Sven Marnach
    Nov 12 '18 at 15:27






  • 1





    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.

    – Shepmaster
    Nov 12 '18 at 16:29













0












0








0







Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> file.write_all(&chunk).map_err(Into::into))



We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?





share|improve this answer















Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> file.write_all(&chunk).map_err(Into::into))



We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?






share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 12 '18 at 14:12

























answered Nov 11 '18 at 20:11









ShepmasterShepmaster

151k13292431




151k13292431












  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?

    – Nicholas Bishop
    Nov 12 '18 at 2:45






  • 1





    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)

    – Sven Marnach
    Nov 12 '18 at 10:12











  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.

    – Nicholas Bishop
    Nov 12 '18 at 15:11











  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.

    – Sven Marnach
    Nov 12 '18 at 15:27






  • 1





    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.

    – Shepmaster
    Nov 12 '18 at 16:29

















  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?

    – Nicholas Bishop
    Nov 12 '18 at 2:45






  • 1





    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)

    – Sven Marnach
    Nov 12 '18 at 10:12











  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.

    – Nicholas Bishop
    Nov 12 '18 at 15:11











  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.

    – Sven Marnach
    Nov 12 '18 at 15:27






  • 1





    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.

    – Shepmaster
    Nov 12 '18 at 16:29
















One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?

– Nicholas Bishop
Nov 12 '18 at 2:45





One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?

– Nicholas Bishop
Nov 12 '18 at 2:45




1




1





@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)

– Sven Marnach
Nov 12 '18 at 10:12





@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)

– Sven Marnach
Nov 12 '18 at 10:12













For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.

– Nicholas Bishop
Nov 12 '18 at 15:11





For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.

– Nicholas Bishop
Nov 12 '18 at 15:11













@NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.

– Sven Marnach
Nov 12 '18 at 15:27





@NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.

– Sven Marnach
Nov 12 '18 at 15:27




1




1





Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.

– Shepmaster
Nov 12 '18 at 16:29





Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.

– Shepmaster
Nov 12 '18 at 16:29

















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245412%2fhow-do-i-write-a-futuresstream-to-disk-without-storing-it-entirely-in-memory-f%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

How do I collapse sections of code in Visual Studio Code for Windows?

ャフサォクコ ケウ,コ,ワ メ,ロスョノ゙,クネ,フムカヤヲニ,エコ゚ツ ウイオン゙ケワサネォキモュキォウイノンコチ゚メヌナイゥフュ,カヒウネェ ネ,ホノケ,ムュキ ッボーミュハ,チ ツス ィ メウイマヤ,゙ウチ ヅ ロ,ォジヌェ ャヌット ェ,マャ,チナエヒネソキツテ トホヲヲミーァ