I was trying to send message to Apache Kafka from a java program. There is a limit on the number of bytes that are being processed by Kafka.
The Java Program that sends the message to Kafka reads from an Excel file and I have problems when reading large files where the total no of bytes exceeded the kafka byte limit. So I decided to chunk the ArrayList by splitting it into equal chunks using the following code
The Java Program that sends the message to Kafka reads from an Excel file and I have problems when reading large files where the total no of bytes exceeded the kafka byte limit. So I decided to chunk the ArrayList by splitting it into equal chunks using the following code
List <> partionList = new LinkedList<<>>();
for(int i =0; i< origList.size(); i += chunkSize) { partionList.add(origList.subList(i,i+Math.min(chunkSize, origList.size.size() - i)));}for(List<> dataRows: partionList) { KeyedMessage message = new KeyedMessage<>( KAFKA_TOPIC, (key).getBytes(), SerializationUtils .serialize(convertToJsonLogData(dataRows))); kafkaProducer.send(message); } I got the following error
org.apache.commons.lang3.SerializationException: java.io.NotSerializableException: java.util.ArrayList$SubList at org.apache.commons.lang3.SerializationUtils.serialize(SerializationUtils.java:139) at org.apache.commons.lang3.SerializationUtils.serialize(SerializationUtils.java:161)
I got that corrected by changing the below in the first for loop. IList -obj-list = new ArrayList -obj-(logDataRows.subList(i, i + Math.min(chunkSize, origList.size.size() - i))); partionList.add(list);
No comments:
Post a Comment