Dataflow Python SDK Avro Source/Sync

Dataflow Python SDK Avro Source/Sync



I am looking to ingest and write Avro files in GCS with the Python SDK. Is this currently possible with Avro leveraging the Python SDK? If so how would I do this? I see TODO comments in the source regarding this so I am not too optimistic.




2 Answers
2



You are correct: the Python SDK does not yet support this, but it will soon.






Any clue as to how soon? I keep checking the beam github to see if it has been added. We are currently using the Java SDK because of this.

– dkroy
Sep 19 '16 at 19:57






We hope to add an Avro sink within next two months. Python SDK already has an Avro source: github.com/apache/incubator-beam/blob/python-sdk/sdks/python/…

– chamikara
Sep 20 '16 at 17:21







Is it available now?

– kaxil
Oct 24 '17 at 14:57



As of version 2.6.0 of the Apache Beam/Dataflow Python SDK, it is indeed possible to read (and write to) avro files in GCS.



Even better, the Python SDK for Beam now supports fastavro reads and writes which can be upto 10x faster than regular avro IO.


fastavro



Sample code:


import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
import avro.schema


RUNNER = 'DataflowRunner'
GCP_PROJECT_ID = 'YOUR_PROJECT_ID'
BUCKET_NAME = 'YOUR_BUCKET_HERE'
STAGING_LOCATION = 'gs:///staging'.format(BUCKET_NAME)
TEMP_LOCATION = 'gs:///temp'.format(BUCKET_NAME)
GCS_INPUT = "gs:///input-*.avro".format(BUCKET_NAME)
GCS_OUTPUT = "gs:///".format(BUCKET_NAME)
JOB_NAME = 'conversion-test'

SCHEMA_PATH="YOUR_AVRO_SCHEMA.avsc"
AVRO_SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

OPTIONS =
'runner': RUNNER,
'job_name': JOB_NAME,
'staging_location': STAGING_LOCATION,
'temp_location': TEMP_LOCATION,
'project': GCP_PROJECT_ID,
'max_num_workers': 2,
'save_main_session': True,


PIPELINE = beam.Pipeline(options=beam.pipeline.PipelineOptions(flags=, **OPTIONS))


def main():
# note: have to force `use_fastavro` to enable `fastavro`:
results = PIPELINE | ReadFromAvro(file_pattern=GCS_INPUT, use_fastavro=True)
results | WriteToAvro(file_path_prefix=GCS_OUTPUT, schema=AVRO_SCHEMA, use_fastavro=True)


if __name__ == '__main__':
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'PATH_TO_YOUR_SERVICE_ACCOUNT_KEY'
main()



Thanks for contributing an answer to Stack Overflow!



But avoid



To learn more, see our tips on writing great answers.



Required, but never shown



Required, but never shown




By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

How do I collapse sections of code in Visual Studio Code for Windows?

ャフサォクコ ケウ,コ,ワ メ,ロスョノ゙,クネ,フムカヤヲニ,エコ゚ツ ウイオン゙ケワサネォキモュキォウイノンコチ゚メヌナイゥフュ,カヒウネェ ネ,ホノケ,ムュキ ッボーミュハ,チ ツス ィ メウイマヤ,゙ウチ ヅ ロ,ォジヌェ ャヌット ェ,マャ,チナエヒネソキツテ トホヲヲミーァ