how to convert text file to parquet with java spark

how to convert text file to parquet with java spark



I'm trying to convert a text file into a parquet file. I can only find "how to convert to parquet" from other file format or code written in scala/python.
Here is what I came up with


import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

private static final StructField fields = new StructField
new StructField("timeCreate", DataTypes.StringType, false, Metadata.empty()),
new StructField("cookieCreate", DataTypes.StringType, false,Metadata.empty())
;//simplified
private static final StructType schema = new StructType(fields);

public static void main(String args) throws IOException
SparkSession spark = SparkSession
.builder().master("spark://levanhuong:7077")
.appName("Convert text file to Parquet")
.getOrCreate();
spark.conf().set("spark.executor.memory", "1G");
WriteParquet(spark, args);


public static void WriteParquet(SparkSession spark, String args)
JavaRDD<String> data = spark.read().textFile(args[0]).toJavaRDD();
JavaRDD<Row> output = data.map((Function<String, Row>) s ->
DataModel model = new DataModel(s);
return RowFactory.create(model);
);

Dataset<Row> df = spark.createDataFrame(output.rdd(),schema);
df.printSchema();
df.show(2);
df.write().parquet(args[1]);



args[0] is a path to input file, args[1] is a path to the output file. here is the simplified DataModel. DateTime fields are properly formated in set() function


args[0]


args[1]


DateTime


public class DataModel implements Serializable {
DateTime timeCreate;
DateTime cookieCreate;

public DataModel(String data)
String model = data.split("t");
setTimeCreate(model[0]);
setCookieCreate(model[1]);



And here is the error. Error log point to df.show(2) but i think the error was caused by map(). I'm not sure why since I don't see any casting in the code


df.show(2)


map()


>java.lang.ClassCastException: cannot assign instance of

java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1
of type org.apache.spark.api.java.function.Function in instance

of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1



I think this is enough to recreate the error, please tell me if I need to provide any more information.





You can use spark.read.csv for your data, just set delimiter "t" to read tsv format.
– Hitobat
Aug 28 at 21:06


spark.read.csv





@Hitobat thanks, this got me a bit further. i can show() but when i write it throw this error "NoClassDefFoundError: org/apache/parquet/hadoop/metadata/CompressionCodecName"
– Hưởng Lê Văn
Aug 29 at 22:18






idk why but submitting in terminal instead of running in IntelliJ sovled this problem.
– Hưởng Lê Văn
Aug 29 at 23:39




1 Answer
1



A little bit other approach can be used, working fine:


JavaRDD<String> data = spark().read().textFile(args[0]).toJavaRDD();
JavaRDD<DataModel> output = data.map(s ->
String parts = s.split("t");
return new DataModel(parts[0], parts[1]);
);
Dataset<Row> result = spark().createDataFrame(output, DataModel.class);



Class "DataModel" is better looks as simple TO, without functionality:


public class DataModel implements Serializable
private final String timeCreate;
private final String cookieCreate;

public DataModel(String timeCreate, String cookieCreate)
this.timeCreate = timeCreate;
this.cookieCreate = cookieCreate;


public String getTimeCreate()
return timeCreate;


public String getCookieCreate()
return cookieCreate;







i used your code for a test run and it still not working, also () in spark() is wrong syntax. The code inside map() was not reached so i don't think that matter. Maybe it's something with the library? i updated the imported class, can you check if there are any difference?
– Hưởng Lê Văn
Aug 29 at 20:04






yes, parentheses after "spark" are not required in yours case. Code inside "map" will be reached if call action, for example "df.show(2);" as last statement in my code.
– pasha701
Aug 29 at 20:52





Yes, i know map() will be called by other action, but when i tried to add breakpoint or print() inside map(), none of them are reached(or this is normal?). And since the code are identical, the last thing i can think of is library
– Hưởng Lê Văn
Aug 29 at 21:36





Print from "map" will be visible in executors logs, not on driver node. For check is mapper works or not, just print "result.count()". If value is not zero, mapper works fine.
– pasha701
Aug 30 at 12:04






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

Edmonton

Crossroads (UK TV series)