package com.amazonaws.services.kinesisanalytics.utils;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;

public class AvroSerializationFn<T extends SpecificRecordBase> implements SerializationSchema<T> {

    private static final Logger LOG = LoggerFactory.getLogger(AvroSerializationFn.class);

    @Override
    public byte[] serialize(T record) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = null;

        try {
            oos = new ObjectOutputStream(baos);

            record.writeExternal(oos);
            oos.flush();
            baos.flush();

            return baos.toByteArray();
        } catch (Exception ex) {
            LOG.error("Unable to serialize employee info");
            LOG.error(ex.getMessage());

            return null;
        } finally {
            try {
                if(oos != null) oos.close();
                baos.close();
            } catch (Exception ex) {
                LOG.error("Unable to serialize employee info (while calling close on ByteArrayOutputStream");
                LOG.error(ex.getMessage());

                return null;
            }
        }
    }
}