How to write numpy arrays directly to s3 in a deep learning application backed by spark
up vote
1
down vote
favorite
We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?
I have my rdd of numpy array as:
features_rdd
options I tried:-
def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)
features_rdd.foreachpartition(writePartition)
option 2:-
def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)
features_rdd.foreachpartition(writePartition1)
Error:-
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
imports:-
from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc
So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above
updates:-
def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......'.format(e.args))
return
def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)
spark = SparkSession
.builder
.appName('test-app')
.getOrCreate()
sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()
features_rdd = s3_files_rdd.mapPartitions(extract_features_)
numpy amazon-s3 pyspark deep-learning
|
show 8 more comments
up vote
1
down vote
favorite
We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?
I have my rdd of numpy array as:
features_rdd
options I tried:-
def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)
features_rdd.foreachpartition(writePartition)
option 2:-
def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)
features_rdd.foreachpartition(writePartition1)
Error:-
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
imports:-
from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc
So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above
updates:-
def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......'.format(e.args))
return
def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)
spark = SparkSession
.builder
.appName('test-app')
.getOrCreate()
sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()
features_rdd = s3_files_rdd.mapPartitions(extract_features_)
numpy amazon-s3 pyspark deep-learning
Can you show some spark code you have tried so far?
– karma4917
Nov 8 at 15:43
updated the post. option 1 with cottoncandy and option 2 with boto3
– dks551
Nov 8 at 16:06
Is this the full stack trace?pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
– karma4917
Nov 8 at 16:10
@karma4917 you can check the full stack trace. updated
– dks551
Nov 8 at 16:14
Ah - I think I am getting close. What are your imports?
– karma4917
Nov 8 at 16:32
|
show 8 more comments
up vote
1
down vote
favorite
up vote
1
down vote
favorite
We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?
I have my rdd of numpy array as:
features_rdd
options I tried:-
def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)
features_rdd.foreachpartition(writePartition)
option 2:-
def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)
features_rdd.foreachpartition(writePartition1)
Error:-
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
imports:-
from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc
So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above
updates:-
def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......'.format(e.args))
return
def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)
spark = SparkSession
.builder
.appName('test-app')
.getOrCreate()
sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()
features_rdd = s3_files_rdd.mapPartitions(extract_features_)
numpy amazon-s3 pyspark deep-learning
We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?
I have my rdd of numpy array as:
features_rdd
options I tried:-
def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)
features_rdd.foreachpartition(writePartition)
option 2:-
def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)
features_rdd.foreachpartition(writePartition1)
Error:-
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects
imports:-
from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc
So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above
updates:-
def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......'.format(e.args))
return
def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)
spark = SparkSession
.builder
.appName('test-app')
.getOrCreate()
sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()
features_rdd = s3_files_rdd.mapPartitions(extract_features_)
numpy amazon-s3 pyspark deep-learning
numpy amazon-s3 pyspark deep-learning
edited Nov 8 at 17:10
asked Nov 8 at 5:11
dks551
17410
17410
Can you show some spark code you have tried so far?
– karma4917
Nov 8 at 15:43
updated the post. option 1 with cottoncandy and option 2 with boto3
– dks551
Nov 8 at 16:06
Is this the full stack trace?pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
– karma4917
Nov 8 at 16:10
@karma4917 you can check the full stack trace. updated
– dks551
Nov 8 at 16:14
Ah - I think I am getting close. What are your imports?
– karma4917
Nov 8 at 16:32
|
show 8 more comments
Can you show some spark code you have tried so far?
– karma4917
Nov 8 at 15:43
updated the post. option 1 with cottoncandy and option 2 with boto3
– dks551
Nov 8 at 16:06
Is this the full stack trace?pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
– karma4917
Nov 8 at 16:10
@karma4917 you can check the full stack trace. updated
– dks551
Nov 8 at 16:14
Ah - I think I am getting close. What are your imports?
– karma4917
Nov 8 at 16:32
Can you show some spark code you have tried so far?
– karma4917
Nov 8 at 15:43
Can you show some spark code you have tried so far?
– karma4917
Nov 8 at 15:43
updated the post. option 1 with cottoncandy and option 2 with boto3
– dks551
Nov 8 at 16:06
updated the post. option 1 with cottoncandy and option 2 with boto3
– dks551
Nov 8 at 16:06
Is this the full stack trace?
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.– karma4917
Nov 8 at 16:10
Is this the full stack trace?
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.– karma4917
Nov 8 at 16:10
@karma4917 you can check the full stack trace. updated
– dks551
Nov 8 at 16:14
@karma4917 you can check the full stack trace. updated
– dks551
Nov 8 at 16:14
Ah - I think I am getting close. What are your imports?
– karma4917
Nov 8 at 16:32
Ah - I think I am getting close. What are your imports?
– karma4917
Nov 8 at 16:32
|
show 8 more comments
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53201885%2fhow-to-write-numpy-arrays-directly-to-s3-in-a-deep-learning-application-backed-b%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Can you show some spark code you have tried so far?
– karma4917
Nov 8 at 15:43
updated the post. option 1 with cottoncandy and option 2 with boto3
– dks551
Nov 8 at 16:06
Is this the full stack trace?
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.– karma4917
Nov 8 at 16:10
@karma4917 you can check the full stack trace. updated
– dks551
Nov 8 at 16:14
Ah - I think I am getting close. What are your imports?
– karma4917
Nov 8 at 16:32