Making a PySpark File for BigQuery Samples
I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;
I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.
import json
import pprint
import subprocess
import pyspark
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
#input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)
input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf =
# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'bigquery-public-data',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
# Output Parameters
#output_dataset = 'wordcount_dataset'
#output_table = 'wordcount_table'
output_dataset = 'CS410'
output_table = 'A4'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
# Display 10 results.
pprint.pprint(word_counts.take(10))
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-:05'.format(i) for i in partitions]
(word_counts
.map(lambda (w, c): json.dumps('word': w, 'word_count': c))
.saveAsTextFile(output_directory))
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--schema word:STRING,word_count:INTEGER '
'dataset.table files'.format(
dataset=output_dataset, table=output_table, files=','.join(output_files)
).split())
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)
I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
Any help is extremely appreciated!
pyspark google-cloud-platform google-cloud-datastore
add a comment |
I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;
I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.
import json
import pprint
import subprocess
import pyspark
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
#input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)
input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf =
# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'bigquery-public-data',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
# Output Parameters
#output_dataset = 'wordcount_dataset'
#output_table = 'wordcount_table'
output_dataset = 'CS410'
output_table = 'A4'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
# Display 10 results.
pprint.pprint(word_counts.take(10))
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-:05'.format(i) for i in partitions]
(word_counts
.map(lambda (w, c): json.dumps('word': w, 'word_count': c))
.saveAsTextFile(output_directory))
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--schema word:STRING,word_count:INTEGER '
'dataset.table files'.format(
dataset=output_dataset, table=output_table, files=','.join(output_files)
).split())
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)
I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
Any help is extremely appreciated!
pyspark google-cloud-platform google-cloud-datastore
add a comment |
I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;
I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.
import json
import pprint
import subprocess
import pyspark
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
#input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)
input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf =
# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'bigquery-public-data',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
# Output Parameters
#output_dataset = 'wordcount_dataset'
#output_table = 'wordcount_table'
output_dataset = 'CS410'
output_table = 'A4'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
# Display 10 results.
pprint.pprint(word_counts.take(10))
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-:05'.format(i) for i in partitions]
(word_counts
.map(lambda (w, c): json.dumps('word': w, 'word_count': c))
.saveAsTextFile(output_directory))
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--schema word:STRING,word_count:INTEGER '
'dataset.table files'.format(
dataset=output_dataset, table=output_table, files=','.join(output_files)
).split())
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)
I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
Any help is extremely appreciated!
pyspark google-cloud-platform google-cloud-datastore
I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;
I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.
import json
import pprint
import subprocess
import pyspark
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
#input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)
input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf =
# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'bigquery-public-data',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
# Output Parameters
#output_dataset = 'wordcount_dataset'
#output_table = 'wordcount_table'
output_dataset = 'CS410'
output_table = 'A4'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
# Display 10 results.
pprint.pprint(word_counts.take(10))
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-:05'.format(i) for i in partitions]
(word_counts
.map(lambda (w, c): json.dumps('word': w, 'word_count': c))
.saveAsTextFile(output_directory))
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--schema word:STRING,word_count:INTEGER '
'dataset.table files'.format(
dataset=output_dataset, table=output_table, files=','.join(output_files)
).split())
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)
I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).
# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
Any help is extremely appreciated!
pyspark google-cloud-platform google-cloud-datastore
pyspark google-cloud-platform google-cloud-datastore
asked Nov 10 at 1:08
Adrian Bernat
187
187
add a comment |
add a comment |
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53235163%2fmaking-a-pyspark-file-for-bigquery-samples%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53235163%2fmaking-a-pyspark-file-for-bigquery-samples%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown