/* * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of this * software and associated documentation files (the "Software"), to deal in the Software * without restriction, including without limitation the rights to use, copy, modify, * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ package com.amazonaws.samples.kinesisanalytics.flink.streaming.etl.utils; import java.io.Serializable; import com.amazonaws.samples.kinesisanalytics.flink.streaming.etl.events.TripEvent; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; public class TripEventBucketAssigner implements BucketAssigner, Serializable { private final String prefix; public TripEventBucketAssigner(String prefix) { this.prefix = prefix; } public String getBucketId(TripEvent event, Context context) { return String.format("%spickup_location=%03d/year=%04d/month=%02d", prefix, event.getPickupLocationId(), event.getPickupDatetime().getYear(), event.getPickupDatetime().getMonthOfYear() ); } public SimpleVersionedSerializer getSerializer() { return SimpleVersionedStringSerializer.INSTANCE; } }