How to write numpy arrays directly to s3 in a deep learning application backed by spark









up vote
1
down vote

favorite












We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-



pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?



I have my rdd of numpy array as:



features_rdd


options I tried:-



def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'

LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)


features_rdd.foreachpartition(writePartition)


option 2:-



def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)




features_rdd.foreachpartition(writePartition1)


Error:-



File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


imports:-



from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc


So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above



updates:-



def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......'.format(e.args))
return

def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)

spark = SparkSession
.builder
.appName('test-app')
.getOrCreate()

sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()

features_rdd = s3_files_rdd.mapPartitions(extract_features_)









share|improve this question























  • Can you show some spark code you have tried so far?
    – karma4917
    Nov 8 at 15:43










  • updated the post. option 1 with cottoncandy and option 2 with boto3
    – dks551
    Nov 8 at 16:06










  • Is this the full stack trace? pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
    – karma4917
    Nov 8 at 16:10










  • @karma4917 you can check the full stack trace. updated
    – dks551
    Nov 8 at 16:14










  • Ah - I think I am getting close. What are your imports?
    – karma4917
    Nov 8 at 16:32














up vote
1
down vote

favorite












We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-



pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?



I have my rdd of numpy array as:



features_rdd


options I tried:-



def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'

LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)


features_rdd.foreachpartition(writePartition)


option 2:-



def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)




features_rdd.foreachpartition(writePartition1)


Error:-



File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


imports:-



from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc


So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above



updates:-



def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......'.format(e.args))
return

def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)

spark = SparkSession
.builder
.appName('test-app')
.getOrCreate()

sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()

features_rdd = s3_files_rdd.mapPartitions(extract_features_)









share|improve this question























  • Can you show some spark code you have tried so far?
    – karma4917
    Nov 8 at 15:43










  • updated the post. option 1 with cottoncandy and option 2 with boto3
    – dks551
    Nov 8 at 16:06










  • Is this the full stack trace? pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
    – karma4917
    Nov 8 at 16:10










  • @karma4917 you can check the full stack trace. updated
    – dks551
    Nov 8 at 16:14










  • Ah - I think I am getting close. What are your imports?
    – karma4917
    Nov 8 at 16:32












up vote
1
down vote

favorite









up vote
1
down vote

favorite











We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-



pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?



I have my rdd of numpy array as:



features_rdd


options I tried:-



def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'

LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)


features_rdd.foreachpartition(writePartition)


option 2:-



def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)




features_rdd.foreachpartition(writePartition1)


Error:-



File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


imports:-



from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc


So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above



updates:-



def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......'.format(e.args))
return

def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)

spark = SparkSession
.builder
.appName('test-app')
.getOrCreate()

sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()

features_rdd = s3_files_rdd.mapPartitions(extract_features_)









share|improve this question















We are generating ~10k numpy arrays using keras and then finally we have to save those arrays as .npy files to s3. But the problem is for saving to s3 inside the map function of spark we have to create intermediate file.What we want is instead of creating intermediate files directly stream them to s3. I used this "Cottoncandy" library but then its not working inside spark map function and throwing error as:-



pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


Is there any possible way/library available which we can use inside a deep learning application inside spark map function to directly stream the numpy arrays to s3 ?



I have my rdd of numpy array as:



features_rdd


options I tried:-



def writePartition(xs):
cci = cc.get_interface('BUCKET_NAME', ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
#output_path, format_name
for k,v in xs:
file_name_with_domain = get_file_with_parents(k, 1)
file_name = ...
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = OUTPUT.split('/', 1)[0]
rest_of_path = OUTPUT.split('/', 1)[1]
final_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'

LOGGER.info("Saving to S3....")
response = cci.upload_npy_array(final_path, v)


features_rdd.foreachpartition(writePartition)


option 2:-



def writePartition1(xs):
s3 = boto3.client('s3',region_name='us-east-1')
for k,v in xs:
...
...
np.save(local_dir_full_path, v)
s3.upload_file(local_dir_full_path, 'BUCKET', s3_full_path)
os.remove(local_dir_full_path)




features_rdd.foreachpartition(writePartition1)


Error:-



File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "six_file_boto3_write1.py", line 248, in <module>
run()
File "six_file_boto3_write1.py", line 239, in run
features_rdd.foreachPartition(writePartitionWithBoto)
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0003/container_1541683970451_0003_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


imports:-



from pyspark.sql import SparkSession
from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
import numpy as np
import logging
import os
import boto3
import cottoncandy as cc


So,basically the application works perfectly fine till features_rdd. Even I can verify the count. But when I am trying to save these features that part its not working. Added the imports above



updates:-



def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......'.format(e.args))
return

def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features(model_data, v)

spark = SparkSession
.builder
.appName('test-app')
.getOrCreate()

sc = spark.sparkContext
s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)
s3_files_rdd.persist()

features_rdd = s3_files_rdd.mapPartitions(extract_features_)






numpy amazon-s3 pyspark deep-learning






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 8 at 17:10

























asked Nov 8 at 5:11









dks551

17410




17410











  • Can you show some spark code you have tried so far?
    – karma4917
    Nov 8 at 15:43










  • updated the post. option 1 with cottoncandy and option 2 with boto3
    – dks551
    Nov 8 at 16:06










  • Is this the full stack trace? pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
    – karma4917
    Nov 8 at 16:10










  • @karma4917 you can check the full stack trace. updated
    – dks551
    Nov 8 at 16:14










  • Ah - I think I am getting close. What are your imports?
    – karma4917
    Nov 8 at 16:32
















  • Can you show some spark code you have tried so far?
    – karma4917
    Nov 8 at 15:43










  • updated the post. option 1 with cottoncandy and option 2 with boto3
    – dks551
    Nov 8 at 16:06










  • Is this the full stack trace? pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
    – karma4917
    Nov 8 at 16:10










  • @karma4917 you can check the full stack trace. updated
    – dks551
    Nov 8 at 16:14










  • Ah - I think I am getting close. What are your imports?
    – karma4917
    Nov 8 at 16:32















Can you show some spark code you have tried so far?
– karma4917
Nov 8 at 15:43




Can you show some spark code you have tried so far?
– karma4917
Nov 8 at 15:43












updated the post. option 1 with cottoncandy and option 2 with boto3
– dks551
Nov 8 at 16:06




updated the post. option 1 with cottoncandy and option 2 with boto3
– dks551
Nov 8 at 16:06












Is this the full stack trace? pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
– karma4917
Nov 8 at 16:10




Is this the full stack trace? pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.
– karma4917
Nov 8 at 16:10












@karma4917 you can check the full stack trace. updated
– dks551
Nov 8 at 16:14




@karma4917 you can check the full stack trace. updated
– dks551
Nov 8 at 16:14












Ah - I think I am getting close. What are your imports?
– karma4917
Nov 8 at 16:32




Ah - I think I am getting close. What are your imports?
– karma4917
Nov 8 at 16:32

















active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53201885%2fhow-to-write-numpy-arrays-directly-to-s3-in-a-deep-learning-application-backed-b%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown






























active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes















 

draft saved


draft discarded















































 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53201885%2fhow-to-write-numpy-arrays-directly-to-s3-in-a-deep-learning-application-backed-b%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

ャフサォクコ ケウ,コ,ワ メ,ロスョノ゙,クネ,フムカヤヲニ,エコ゚ツ ウイオン゙ケワサネォキモュキォウイノンコチ゚メヌナイゥフュ,カヒウネェ ネ,ホノケ,ムュキ ッボーミュハ,チ ツス ィ メウイマヤ,゙ウチ ヅ ロ,ォジヌェ ャヌット ェ,マャ,チナエヒネソキツテ トホヲヲミーァ

Node.js puppeteer - Use values from array in a loop to cycle through pages