how to convert text file to parquet with java spark
how to convert text file to parquet with java spark
I'm trying to convert a text file into a parquet file. I can only find "how to convert to parquet" from other file format or code written in scala/python.
Here is what I came up with
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
private static final StructField fields = new StructField
new StructField("timeCreate", DataTypes.StringType, false, Metadata.empty()),
new StructField("cookieCreate", DataTypes.StringType, false,Metadata.empty())
;//simplified
private static final StructType schema = new StructType(fields);
public static void main(String args) throws IOException
SparkSession spark = SparkSession
.builder().master("spark://levanhuong:7077")
.appName("Convert text file to Parquet")
.getOrCreate();
spark.conf().set("spark.executor.memory", "1G");
WriteParquet(spark, args);
public static void WriteParquet(SparkSession spark, String args)
JavaRDD<String> data = spark.read().textFile(args[0]).toJavaRDD();
JavaRDD<Row> output = data.map((Function<String, Row>) s ->
DataModel model = new DataModel(s);
return RowFactory.create(model);
);
Dataset<Row> df = spark.createDataFrame(output.rdd(),schema);
df.printSchema();
df.show(2);
df.write().parquet(args[1]);
args[0]
is a path to input file, args[1]
is a path to the output file. here is the simplified DataModel. DateTime
fields are properly formated in set() function
args[0]
args[1]
DateTime
public class DataModel implements Serializable {
DateTime timeCreate;
DateTime cookieCreate;
public DataModel(String data)
String model = data.split("t");
setTimeCreate(model[0]);
setCookieCreate(model[1]);
And here is the error. Error log point to df.show(2)
but i think the error was caused by map()
. I'm not sure why since I don't see any casting in the code
df.show(2)
map()
>java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1
of type org.apache.spark.api.java.function.Function in instance
of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1
I think this is enough to recreate the error, please tell me if I need to provide any more information.
spark.read.csv
@Hitobat thanks, this got me a bit further. i can show() but when i write it throw this error "NoClassDefFoundError: org/apache/parquet/hadoop/metadata/CompressionCodecName"
– Hưởng Lê Văn
Aug 29 at 22:18
idk why but submitting in terminal instead of running in IntelliJ sovled this problem.
– Hưởng Lê Văn
Aug 29 at 23:39
1 Answer
1
A little bit other approach can be used, working fine:
JavaRDD<String> data = spark().read().textFile(args[0]).toJavaRDD();
JavaRDD<DataModel> output = data.map(s ->
String parts = s.split("t");
return new DataModel(parts[0], parts[1]);
);
Dataset<Row> result = spark().createDataFrame(output, DataModel.class);
Class "DataModel" is better looks as simple TO, without functionality:
public class DataModel implements Serializable
private final String timeCreate;
private final String cookieCreate;
public DataModel(String timeCreate, String cookieCreate)
this.timeCreate = timeCreate;
this.cookieCreate = cookieCreate;
public String getTimeCreate()
return timeCreate;
public String getCookieCreate()
return cookieCreate;
i used your code for a test run and it still not working, also () in spark() is wrong syntax. The code inside map() was not reached so i don't think that matter. Maybe it's something with the library? i updated the imported class, can you check if there are any difference?
– Hưởng Lê Văn
Aug 29 at 20:04
yes, parentheses after "spark" are not required in yours case. Code inside "map" will be reached if call action, for example "df.show(2);" as last statement in my code.
– pasha701
Aug 29 at 20:52
Yes, i know map() will be called by other action, but when i tried to add breakpoint or print() inside map(), none of them are reached(or this is normal?). And since the code are identical, the last thing i can think of is library
– Hưởng Lê Văn
Aug 29 at 21:36
Print from "map" will be visible in executors logs, not on driver node. For check is mapper works or not, just print "result.count()". If value is not zero, mapper works fine.
– pasha701
Aug 30 at 12:04
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
You can use
spark.read.csv
for your data, just set delimiter "t" to read tsv format.– Hitobat
Aug 28 at 21:06