Making a PySpark File for BigQuery Samples










0














I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;



I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.



import json
import pprint
import subprocess
import pyspark

sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
#input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)
input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf =
# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'bigquery-public-data',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',


# Output Parameters
#output_dataset = 'wordcount_dataset'
#output_table = 'wordcount_table'
output_dataset = 'CS410'
output_table = 'A4'

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)

# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-:05'.format(i) for i in partitions]

(word_counts
.map(lambda (w, c): json.dumps('word': w, 'word_count': c))
.saveAsTextFile(output_directory))

# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--schema word:STRING,word_count:INTEGER '
'dataset.table files'.format(
dataset=output_dataset, table=output_table, files=','.join(output_files)
).split())

# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)


I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).



# Perform word count.
word_counts = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))


Any help is extremely appreciated!










share|improve this question


























    0














    I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;



    I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.



    import json
    import pprint
    import subprocess
    import pyspark

    sc = pyspark.SparkContext()

    # Use the Google Cloud Storage bucket for temporary BigQuery export data used
    # by the InputFormat. This assumes the Google Cloud Storage connector for
    # Hadoop is configured.
    bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
    project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
    #input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)
    input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

    conf =
    # Input Parameters
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'bigquery-public-data',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',


    # Output Parameters
    #output_dataset = 'wordcount_dataset'
    #output_table = 'wordcount_table'
    output_dataset = 'CS410'
    output_table = 'A4'

    # Load data in from BigQuery.
    table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

    # Perform word count.
    word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

    # Display 10 results.
    pprint.pprint(word_counts.take(10))

    # Stage data formatted as newline-delimited JSON in Google Cloud Storage.
    output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
    partitions = range(word_counts.getNumPartitions())
    output_files = [output_directory + '/part-:05'.format(i) for i in partitions]

    (word_counts
    .map(lambda (w, c): json.dumps('word': w, 'word_count': c))
    .saveAsTextFile(output_directory))

    # Shell out to bq CLI to perform BigQuery import.
    subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--schema word:STRING,word_count:INTEGER '
    'dataset.table files'.format(
    dataset=output_dataset, table=output_table, files=','.join(output_files)
    ).split())

    # Manually clean up the staging_directories, otherwise BigQuery
    # files will remain indefinitely.
    input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
    input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
    output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
    output_path, True)


    I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).



    # Perform word count.
    word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))


    Any help is extremely appreciated!










    share|improve this question
























      0












      0








      0







      I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;



      I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.



      import json
      import pprint
      import subprocess
      import pyspark

      sc = pyspark.SparkContext()

      # Use the Google Cloud Storage bucket for temporary BigQuery export data used
      # by the InputFormat. This assumes the Google Cloud Storage connector for
      # Hadoop is configured.
      bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
      project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
      #input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)
      input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

      conf =
      # Input Parameters
      'mapred.bq.project.id': project,
      'mapred.bq.gcs.bucket': bucket,
      'mapred.bq.temp.gcs.path': input_directory,
      'mapred.bq.input.project.id': 'bigquery-public-data',
      'mapred.bq.input.dataset.id': 'samples',
      'mapred.bq.input.table.id': 'shakespeare',


      # Output Parameters
      #output_dataset = 'wordcount_dataset'
      #output_table = 'wordcount_table'
      output_dataset = 'CS410'
      output_table = 'A4'

      # Load data in from BigQuery.
      table_data = sc.newAPIHadoopRDD(
      'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
      'org.apache.hadoop.io.LongWritable',
      'com.google.gson.JsonObject',
      conf=conf)

      # Perform word count.
      word_counts = (
      table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda x: (x['word'].lower(), int(x['word_count'])))
      .reduceByKey(lambda x, y: x + y))

      # Display 10 results.
      pprint.pprint(word_counts.take(10))

      # Stage data formatted as newline-delimited JSON in Google Cloud Storage.
      output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
      partitions = range(word_counts.getNumPartitions())
      output_files = [output_directory + '/part-:05'.format(i) for i in partitions]

      (word_counts
      .map(lambda (w, c): json.dumps('word': w, 'word_count': c))
      .saveAsTextFile(output_directory))

      # Shell out to bq CLI to perform BigQuery import.
      subprocess.check_call(
      'bq load --source_format NEWLINE_DELIMITED_JSON '
      '--schema word:STRING,word_count:INTEGER '
      'dataset.table files'.format(
      dataset=output_dataset, table=output_table, files=','.join(output_files)
      ).split())

      # Manually clean up the staging_directories, otherwise BigQuery
      # files will remain indefinitely.
      input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
      input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
      output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
      output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
      output_path, True)


      I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).



      # Perform word count.
      word_counts = (
      table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda x: (x['word'].lower(), int(x['word_count'])))
      .reduceByKey(lambda x, y: x + y))


      Any help is extremely appreciated!










      share|improve this question













      I am at a complete loss as to how to modify some code I got from an assignment to do what I want it to do. The code was given to us to upload to the Google Cloud Platform to perform a word count on the Sample Shakespeare dataset. I managed to do this, but the second part of my assignment is asking to modify the code to count something else from any other dataset in the sample section of BigQuery. My idea was to use the Natality dataset to count the number of males born in a year compared to females. This is what I need help with;



      I am not sure how to model the table I need to create in Google Cloud. I created an "is_male" boolean field, like the one in the Natality dataset, and a "source_year" integer field like the one in the Natality dataset. I am not sure how to modify the pyspark code to do what I want it to do. I attached the code here, this performs the shakespeare wordcount. In this example, my 'A4' table contains a 'word' STRING field and a 'word_count' INTEGER field.



      import json
      import pprint
      import subprocess
      import pyspark

      sc = pyspark.SparkContext()

      # Use the Google Cloud Storage bucket for temporary BigQuery export data used
      # by the InputFormat. This assumes the Google Cloud Storage connector for
      # Hadoop is configured.
      bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
      project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
      #input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)
      input_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

      conf =
      # Input Parameters
      'mapred.bq.project.id': project,
      'mapred.bq.gcs.bucket': bucket,
      'mapred.bq.temp.gcs.path': input_directory,
      'mapred.bq.input.project.id': 'bigquery-public-data',
      'mapred.bq.input.dataset.id': 'samples',
      'mapred.bq.input.table.id': 'shakespeare',


      # Output Parameters
      #output_dataset = 'wordcount_dataset'
      #output_table = 'wordcount_table'
      output_dataset = 'CS410'
      output_table = 'A4'

      # Load data in from BigQuery.
      table_data = sc.newAPIHadoopRDD(
      'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
      'org.apache.hadoop.io.LongWritable',
      'com.google.gson.JsonObject',
      conf=conf)

      # Perform word count.
      word_counts = (
      table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda x: (x['word'].lower(), int(x['word_count'])))
      .reduceByKey(lambda x, y: x + y))

      # Display 10 results.
      pprint.pprint(word_counts.take(10))

      # Stage data formatted as newline-delimited JSON in Google Cloud Storage.
      output_directory = 'gs://dataproc-0c5cbd37-40a2-4bb1-9451-03581fed1fb9-us-west1/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
      partitions = range(word_counts.getNumPartitions())
      output_files = [output_directory + '/part-:05'.format(i) for i in partitions]

      (word_counts
      .map(lambda (w, c): json.dumps('word': w, 'word_count': c))
      .saveAsTextFile(output_directory))

      # Shell out to bq CLI to perform BigQuery import.
      subprocess.check_call(
      'bq load --source_format NEWLINE_DELIMITED_JSON '
      '--schema word:STRING,word_count:INTEGER '
      'dataset.table files'.format(
      dataset=output_dataset, table=output_table, files=','.join(output_files)
      ).split())

      # Manually clean up the staging_directories, otherwise BigQuery
      # files will remain indefinitely.
      input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
      input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
      output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
      output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
      output_path, True)


      I particularly do not know what this piece of code does, and I assume this is the part I need to modify to be able to perform my idea (other than point the links to the natality dataset, and the new table).



      # Perform word count.
      word_counts = (
      table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda x: (x['word'].lower(), int(x['word_count'])))
      .reduceByKey(lambda x, y: x + y))


      Any help is extremely appreciated!







      pyspark google-cloud-platform google-cloud-datastore






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 10 at 1:08









      Adrian Bernat

      187




      187



























          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53235163%2fmaking-a-pyspark-file-for-bigquery-samples%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown






























          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53235163%2fmaking-a-pyspark-file-for-bigquery-samples%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

          ữḛḳṊẴ ẋ,Ẩṙ,ỹḛẪẠứụỿṞṦ,Ṉẍừ,ứ Ị,Ḵ,ṏ ṇỪḎḰṰọửḊ ṾḨḮữẑỶṑỗḮṣṉẃ Ữẩụ,ṓ,ḹẕḪḫỞṿḭ ỒṱṨẁṋṜ ḅẈ ṉ ứṀḱṑỒḵ,ḏ,ḊḖỹẊ Ẻḷổ,ṥ ẔḲẪụḣể Ṱ ḭỏựẶ Ồ Ṩ,ẂḿṡḾồ ỗṗṡịṞẤḵṽẃ ṸḒẄẘ,ủẞẵṦṟầṓế

          ⃀⃉⃄⃅⃍,⃂₼₡₰⃉₡₿₢⃉₣⃄₯⃊₮₼₹₱₦₷⃄₪₼₶₳₫⃍₽ ₫₪₦⃆₠₥⃁₸₴₷⃊₹⃅⃈₰⃁₫ ⃎⃍₩₣₷ ₻₮⃊⃀⃄⃉₯,⃏⃊,₦⃅₪,₼⃀₾₧₷₾ ₻ ₸₡ ₾,₭⃈₴⃋,€⃁,₩ ₺⃌⃍⃁₱⃋⃋₨⃊⃁⃃₼,⃎,₱⃍₲₶₡ ⃍⃅₶₨₭,⃉₭₾₡₻⃀ ₼₹⃅₹,₻₭ ⃌