// Unless explicitly stated otherwise all files in this repository are licensed // under the Apache License 2.0. // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2021 Datadog, Inc. package ddsketch import ( "errors" "io" "math" enc "github.com/DataDog/sketches-go/ddsketch/encoding" "github.com/DataDog/sketches-go/ddsketch/mapping" "github.com/DataDog/sketches-go/ddsketch/pb/sketchpb" "github.com/DataDog/sketches-go/ddsketch/stat" "github.com/DataDog/sketches-go/ddsketch/store" ) var ( ErrUntrackableNaN = errors.New("input value is NaN and cannot be tracked by the sketch") ErrUntrackableTooLow = errors.New("input value is too low and cannot be tracked by the sketch") ErrUntrackableTooHigh = errors.New("input value is too high and cannot be tracked by the sketch") ErrNegativeCount = errors.New("count cannot be negative") errEmptySketch = errors.New("no such element exists") errUnknownFlag = errors.New("unknown encoding flag") ) // Unexported to prevent usage and avoid the cost of dynamic dispatch type quantileSketch interface { RelativeAccuracy() float64 IsEmpty() bool GetCount() float64 GetZeroCount() float64 GetSum() float64 GetPositiveValueStore() store.Store GetNegativeValueStore() store.Store GetMinValue() (float64, error) GetMaxValue() (float64, error) GetValueAtQuantile(quantile float64) (float64, error) GetValuesAtQuantiles(quantiles []float64) ([]float64, error) ForEach(f func(value, count float64) (stop bool)) Add(value float64) error AddWithCount(value, count float64) error // MergeWith // ChangeMapping Reweight(factor float64) error Clear() // Copy Encode(b *[]byte, omitIndexMapping bool) DecodeAndMergeWith(b []byte) error } var _ quantileSketch = (*DDSketch)(nil) var _ quantileSketch = (*DDSketchWithExactSummaryStatistics)(nil) type DDSketch struct { mapping.IndexMapping positiveValueStore store.Store negativeValueStore store.Store zeroCount float64 } func NewDDSketchFromStoreProvider(indexMapping mapping.IndexMapping, storeProvider store.Provider) *DDSketch { return NewDDSketch(indexMapping, storeProvider(), storeProvider()) } func NewDDSketch(indexMapping mapping.IndexMapping, positiveValueStore store.Store, negativeValueStore store.Store) *DDSketch { return &DDSketch{ IndexMapping: indexMapping, positiveValueStore: positiveValueStore, negativeValueStore: negativeValueStore, } } func NewDefaultDDSketch(relativeAccuracy float64) (*DDSketch, error) { m, err := mapping.NewDefaultMapping(relativeAccuracy) if err != nil { return nil, err } return NewDDSketchFromStoreProvider(m, store.DefaultProvider), nil } // Constructs an instance of DDSketch that offers constant-time insertion and whose size grows indefinitely // to accommodate for the range of input values. func LogUnboundedDenseDDSketch(relativeAccuracy float64) (*DDSketch, error) { indexMapping, err := mapping.NewLogarithmicMapping(relativeAccuracy) if err != nil { return nil, err } return NewDDSketch(indexMapping, store.NewDenseStore(), store.NewDenseStore()), nil } // Constructs an instance of DDSketch that offers constant-time insertion and whose size grows until the // maximum number of bins is reached, at which point bins with lowest indices are collapsed, which causes the // relative accuracy guarantee to be lost on lowest quantiles if values are all positive, or the mid-range // quantiles for values closest to zero if values include negative numbers. func LogCollapsingLowestDenseDDSketch(relativeAccuracy float64, maxNumBins int) (*DDSketch, error) { indexMapping, err := mapping.NewLogarithmicMapping(relativeAccuracy) if err != nil { return nil, err } return NewDDSketch(indexMapping, store.NewCollapsingLowestDenseStore(maxNumBins), store.NewCollapsingLowestDenseStore(maxNumBins)), nil } // Constructs an instance of DDSketch that offers constant-time insertion and whose size grows until the // maximum number of bins is reached, at which point bins with highest indices are collapsed, which causes the // relative accuracy guarantee to be lost on highest quantiles if values are all positive, or the lowest and // highest quantiles if values include negative numbers. func LogCollapsingHighestDenseDDSketch(relativeAccuracy float64, maxNumBins int) (*DDSketch, error) { indexMapping, err := mapping.NewLogarithmicMapping(relativeAccuracy) if err != nil { return nil, err } return NewDDSketch(indexMapping, store.NewCollapsingHighestDenseStore(maxNumBins), store.NewCollapsingHighestDenseStore(maxNumBins)), nil } // Adds a value to the sketch. func (s *DDSketch) Add(value float64) error { return s.AddWithCount(value, float64(1)) } // Adds a value to the sketch with a float64 count. func (s *DDSketch) AddWithCount(value, count float64) error { if count < 0 { return ErrNegativeCount } if value > s.MinIndexableValue() { if value > s.MaxIndexableValue() { return ErrUntrackableTooHigh } s.positiveValueStore.AddWithCount(s.Index(value), count) } else if value < -s.MinIndexableValue() { if value < -s.MaxIndexableValue() { return ErrUntrackableTooLow } s.negativeValueStore.AddWithCount(s.Index(-value), count) } else if math.IsNaN(value) { return ErrUntrackableNaN } else { s.zeroCount += count } return nil } // Return a (deep) copy of this sketch. func (s *DDSketch) Copy() *DDSketch { return &DDSketch{ IndexMapping: s.IndexMapping, positiveValueStore: s.positiveValueStore.Copy(), negativeValueStore: s.negativeValueStore.Copy(), zeroCount: s.zeroCount, } } // Clear empties the sketch while allowing reusing already allocated memory. func (s *DDSketch) Clear() { s.positiveValueStore.Clear() s.negativeValueStore.Clear() s.zeroCount = 0 } // Return the value at the specified quantile. Return a non-nil error if the quantile is invalid // or if the sketch is empty. func (s *DDSketch) GetValueAtQuantile(quantile float64) (float64, error) { if quantile < 0 || quantile > 1 { return math.NaN(), errors.New("The quantile must be between 0 and 1.") } count := s.GetCount() if count == 0 { return math.NaN(), errEmptySketch } rank := quantile * (count - 1) negativeValueCount := s.negativeValueStore.TotalCount() if rank < negativeValueCount { return -s.Value(s.negativeValueStore.KeyAtRank(negativeValueCount - 1 - rank)), nil } else if rank < s.zeroCount+negativeValueCount { return 0, nil } else { return s.Value(s.positiveValueStore.KeyAtRank(rank - s.zeroCount - negativeValueCount)), nil } } // Return the values at the respective specified quantiles. Return a non-nil error if any of the quantiles // is invalid or if the sketch is empty. func (s *DDSketch) GetValuesAtQuantiles(quantiles []float64) ([]float64, error) { values := make([]float64, len(quantiles)) for i, q := range quantiles { val, err := s.GetValueAtQuantile(q) if err != nil { return nil, err } values[i] = val } return values, nil } // Return the total number of values that have been added to this sketch. func (s *DDSketch) GetCount() float64 { return s.zeroCount + s.positiveValueStore.TotalCount() + s.negativeValueStore.TotalCount() } // GetZeroCount returns the number of zero values that have been added to this sketch. // Note: values that are very small (lower than MinIndexableValue if positive, or higher than -MinIndexableValue if negative) // are also mapped to the zero bucket. func (s *DDSketch) GetZeroCount() float64 { return s.zeroCount } // Return true iff no value has been added to this sketch. func (s *DDSketch) IsEmpty() bool { return s.zeroCount == 0 && s.positiveValueStore.IsEmpty() && s.negativeValueStore.IsEmpty() } // Return the maximum value that has been added to this sketch. Return a non-nil error if the sketch // is empty. func (s *DDSketch) GetMaxValue() (float64, error) { if !s.positiveValueStore.IsEmpty() { maxIndex, _ := s.positiveValueStore.MaxIndex() return s.Value(maxIndex), nil } else if s.zeroCount > 0 { return 0, nil } else { minIndex, err := s.negativeValueStore.MinIndex() if err != nil { return math.NaN(), err } return -s.Value(minIndex), nil } } // Return the minimum value that has been added to this sketch. Returns a non-nil error if the sketch // is empty. func (s *DDSketch) GetMinValue() (float64, error) { if !s.negativeValueStore.IsEmpty() { maxIndex, _ := s.negativeValueStore.MaxIndex() return -s.Value(maxIndex), nil } else if s.zeroCount > 0 { return 0, nil } else { minIndex, err := s.positiveValueStore.MinIndex() if err != nil { return math.NaN(), err } return s.Value(minIndex), nil } } // GetSum returns an approximation of the sum of the values that have been added to the sketch. If the // values that have been added to the sketch all have the same sign, the approximation error has // the relative accuracy guarantees of the mapping used for this sketch. func (s *DDSketch) GetSum() (sum float64) { s.ForEach(func(value float64, count float64) (stop bool) { sum += value * count return false }) return sum } // GetPositiveValueStore returns the store.Store object that contains the positive // values of the sketch. func (s *DDSketch) GetPositiveValueStore() (store.Store) { return s.positiveValueStore } // GetNegativeValueStore returns the store.Store object that contains the negative // values of the sketch. func (s *DDSketch) GetNegativeValueStore() (store.Store) { return s.negativeValueStore } // ForEach applies f on the bins of the sketches until f returns true. // There is no guarantee on the bin iteration order. func (s *DDSketch) ForEach(f func(value, count float64) (stop bool)) { if s.zeroCount != 0 && f(0, s.zeroCount) { return } stopped := false s.positiveValueStore.ForEach(func(index int, count float64) bool { stopped = f(s.IndexMapping.Value(index), count) return stopped }) if stopped { return } s.negativeValueStore.ForEach(func(index int, count float64) bool { return f(-s.IndexMapping.Value(index), count) }) } // Merges the other sketch into this one. After this operation, this sketch encodes the values that // were added to both this and the other sketches. func (s *DDSketch) MergeWith(other *DDSketch) error { if !s.IndexMapping.Equals(other.IndexMapping) { return errors.New("Cannot merge sketches with different index mappings.") } s.positiveValueStore.MergeWith(other.positiveValueStore) s.negativeValueStore.MergeWith(other.negativeValueStore) s.zeroCount += other.zeroCount return nil } // Generates a protobuf representation of this DDSketch. func (s *DDSketch) ToProto() *sketchpb.DDSketch { return &sketchpb.DDSketch{ Mapping: s.IndexMapping.ToProto(), PositiveValues: s.positiveValueStore.ToProto(), NegativeValues: s.negativeValueStore.ToProto(), ZeroCount: s.zeroCount, } } // FromProto builds a new instance of DDSketch based on the provided protobuf representation, using a Dense store. func FromProto(pb *sketchpb.DDSketch) (*DDSketch, error) { return FromProtoWithStoreProvider(pb, store.DenseStoreConstructor) } func FromProtoWithStoreProvider(pb *sketchpb.DDSketch, storeProvider store.Provider) (*DDSketch, error) { positiveValueStore := storeProvider() store.MergeWithProto(positiveValueStore, pb.PositiveValues) negativeValueStore := storeProvider() store.MergeWithProto(negativeValueStore, pb.NegativeValues) m, err := mapping.FromProto(pb.Mapping) if err != nil { return nil, err } return &DDSketch{ IndexMapping: m, positiveValueStore: positiveValueStore, negativeValueStore: negativeValueStore, zeroCount: pb.ZeroCount, }, nil } // Encode serializes the sketch and appends the serialized content to the provided []byte. // If the capacity of the provided []byte is large enough, Encode does not allocate memory space. // When the index mapping is known at the time of deserialization, omitIndexMapping can be set to true to avoid encoding it and to make the serialized content smaller. // The encoding format is described in the encoding/flag module. func (s *DDSketch) Encode(b *[]byte, omitIndexMapping bool) { if s.zeroCount != 0 { enc.EncodeFlag(b, enc.FlagZeroCountVarFloat) enc.EncodeVarfloat64(b, s.zeroCount) } if !omitIndexMapping { s.IndexMapping.Encode(b) } s.positiveValueStore.Encode(b, enc.FlagTypePositiveStore) s.negativeValueStore.Encode(b, enc.FlagTypeNegativeStore) } // DecodeDDSketch deserializes a sketch. // Stores are built using storeProvider. The store type needs not match the // store that the serialized sketch initially used. However, using the same // store type may make decoding faster. In the absence of high performance // requirements, store.BufferedPaginatedStoreConstructor is a sound enough // choice of store provider. // To avoid memory allocations, it is possible to use a store provider that // reuses stores, by calling Clear() on previously used stores before providing // the store. // If the serialized data does not contain the index mapping, you need to // specify the index mapping that was used in the sketch that was encoded. // Otherwise, you can use nil and the index mapping will be decoded from the // serialized data. // It is possible to decode with this function an encoded // DDSketchWithExactSummaryStatistics, but the exact summary statistics will be // lost. func DecodeDDSketch(b []byte, storeProvider store.Provider, indexMapping mapping.IndexMapping) (*DDSketch, error) { s := &DDSketch{ IndexMapping: indexMapping, positiveValueStore: storeProvider(), negativeValueStore: storeProvider(), zeroCount: float64(0), } err := s.DecodeAndMergeWith(b) return s, err } // DecodeAndMergeWith deserializes a sketch and merges its content in the // receiver sketch. // If the serialized content contains an index mapping that differs from the one // of the receiver, DecodeAndMergeWith returns an error. func (s *DDSketch) DecodeAndMergeWith(bb []byte) error { return s.decodeAndMergeWith(bb, func(b *[]byte, flag enc.Flag) error { switch flag { case enc.FlagCount, enc.FlagSum, enc.FlagMin, enc.FlagMax: // Exact summary stats are ignored. if len(*b) < 8 { return io.EOF } *b = (*b)[8:] return nil default: return errUnknownFlag } }) } func (s *DDSketch) decodeAndMergeWith(bb []byte, fallbackDecode func(b *[]byte, flag enc.Flag) error) error { b := &bb for len(*b) > 0 { flag, err := enc.DecodeFlag(b) if err != nil { return err } switch flag.Type() { case enc.FlagTypePositiveStore: s.positiveValueStore.DecodeAndMergeWith(b, flag.SubFlag()) case enc.FlagTypeNegativeStore: s.negativeValueStore.DecodeAndMergeWith(b, flag.SubFlag()) case enc.FlagTypeIndexMapping: decodedIndexMapping, err := mapping.Decode(b, flag) if err != nil { return err } if s.IndexMapping != nil && !s.IndexMapping.Equals(decodedIndexMapping) { return errors.New("index mapping mismatch") } s.IndexMapping = decodedIndexMapping default: switch flag { case enc.FlagZeroCountVarFloat: decodedZeroCount, err := enc.DecodeVarfloat64(b) if err != nil { return err } s.zeroCount += decodedZeroCount default: err := fallbackDecode(b, flag) if err != nil { return err } } } } if s.IndexMapping == nil { return errors.New("missing index mapping") } return nil } // ChangeMapping changes the store to a new mapping. // it doesn't change s but returns a newly created sketch. // positiveStore and negativeStore must be different stores, and be empty when the function is called. // It is not the conversion that minimizes the loss in relative // accuracy, but it avoids artefacts like empty bins that make the histograms look bad. // scaleFactor allows to scale out / in all values. (changing units for eg) func (s *DDSketch) ChangeMapping(newMapping mapping.IndexMapping, positiveStore store.Store, negativeStore store.Store, scaleFactor float64) *DDSketch { if scaleFactor == 1 && s.IndexMapping.Equals(newMapping) { return s.Copy() } changeStoreMapping(s.IndexMapping, newMapping, s.positiveValueStore, positiveStore, scaleFactor) changeStoreMapping(s.IndexMapping, newMapping, s.negativeValueStore, negativeStore, scaleFactor) newSketch := NewDDSketch(newMapping, positiveStore, negativeStore) newSketch.zeroCount = s.zeroCount return newSketch } func changeStoreMapping(oldMapping, newMapping mapping.IndexMapping, oldStore, newStore store.Store, scaleFactor float64) { oldStore.ForEach(func(index int, count float64) (stop bool) { inLowerBound := oldMapping.LowerBound(index) * scaleFactor inHigherBound := oldMapping.LowerBound(index+1) * scaleFactor inSize := inHigherBound - inLowerBound for outIndex := newMapping.Index(inLowerBound); newMapping.LowerBound(outIndex) < inHigherBound; outIndex++ { outLowerBound := newMapping.LowerBound(outIndex) outHigherBound := newMapping.LowerBound(outIndex + 1) lowerIntersectionBound := math.Max(outLowerBound, inLowerBound) higherIntersectionBound := math.Min(outHigherBound, inHigherBound) intersectionSize := higherIntersectionBound - lowerIntersectionBound proportion := intersectionSize / inSize newStore.AddWithCount(outIndex, proportion*count) } return false }) } // Reweight multiplies all values from the sketch by w, but keeps the same global distribution. // w has to be strictly greater than 0. func (s *DDSketch) Reweight(w float64) error { if w <= 0 { return errors.New("can't reweight by a negative factor") } if w == 1 { return nil } s.zeroCount *= w if err := s.positiveValueStore.Reweight(w); err != nil { return err } if err := s.negativeValueStore.Reweight(w); err != nil { return err } return nil } // DDSketchWithExactSummaryStatistics returns exact count, sum, min and max, as // opposed to DDSketch, which may return approximate values for those // statistics. Because of the need to track them exactly, adding and merging // operations are slightly more exepensive than those of DDSketch. type DDSketchWithExactSummaryStatistics struct { *DDSketch summaryStatistics *stat.SummaryStatistics } func NewDefaultDDSketchWithExactSummaryStatistics(relativeAccuracy float64) (*DDSketchWithExactSummaryStatistics, error) { sketch, err := NewDefaultDDSketch(relativeAccuracy) if err != nil { return nil, err } return &DDSketchWithExactSummaryStatistics{ DDSketch: sketch, summaryStatistics: stat.NewSummaryStatistics(), }, nil } func NewDDSketchWithExactSummaryStatistics(mapping mapping.IndexMapping, storeProvider store.Provider) *DDSketchWithExactSummaryStatistics { return &DDSketchWithExactSummaryStatistics{ DDSketch: NewDDSketchFromStoreProvider(mapping, storeProvider), summaryStatistics: stat.NewSummaryStatistics(), } } // NewDDSketchWithExactSummaryStatisticsFromData constructs DDSketchWithExactSummaryStatistics from the provided sketch and exact summary statistics. func NewDDSketchWithExactSummaryStatisticsFromData(sketch *DDSketch, summaryStatistics *stat.SummaryStatistics) (*DDSketchWithExactSummaryStatistics, error) { if sketch.IsEmpty() != (summaryStatistics.Count() == 0) { return nil, errors.New("sketch and summary statistics do not match") } return &DDSketchWithExactSummaryStatistics{ DDSketch: sketch, summaryStatistics: summaryStatistics, }, nil } func (s *DDSketchWithExactSummaryStatistics) IsEmpty() bool { return s.summaryStatistics.Count() == 0 } func (s *DDSketchWithExactSummaryStatistics) GetCount() float64 { return s.summaryStatistics.Count() } // GetZeroCount returns the number of zero values that have been added to this sketch. // Note: values that are very small (lower than MinIndexableValue if positive, or higher than -MinIndexableValue if negative) // are also mapped to the zero bucket. func (s *DDSketchWithExactSummaryStatistics) GetZeroCount() float64 { return s.DDSketch.zeroCount } func (s *DDSketchWithExactSummaryStatistics) GetSum() float64 { return s.summaryStatistics.Sum() } // GetPositiveValueStore returns the store.Store object that contains the positive // values of the sketch. func (s *DDSketchWithExactSummaryStatistics) GetPositiveValueStore() (store.Store) { return s.DDSketch.positiveValueStore } // GetNegativeValueStore returns the store.Store object that contains the negative // values of the sketch. func (s *DDSketchWithExactSummaryStatistics) GetNegativeValueStore() (store.Store) { return s.DDSketch.negativeValueStore } func (s *DDSketchWithExactSummaryStatistics) GetMinValue() (float64, error) { if s.DDSketch.IsEmpty() { return math.NaN(), errEmptySketch } return s.summaryStatistics.Min(), nil } func (s *DDSketchWithExactSummaryStatistics) GetMaxValue() (float64, error) { if s.DDSketch.IsEmpty() { return math.NaN(), errEmptySketch } return s.summaryStatistics.Max(), nil } func (s *DDSketchWithExactSummaryStatistics) GetValueAtQuantile(quantile float64) (float64, error) { value, err := s.DDSketch.GetValueAtQuantile(quantile) min := s.summaryStatistics.Min() if value < min { return min, err } max := s.summaryStatistics.Max() if value > max { return max, err } return value, err } func (s *DDSketchWithExactSummaryStatistics) GetValuesAtQuantiles(quantiles []float64) ([]float64, error) { values, err := s.DDSketch.GetValuesAtQuantiles(quantiles) min := s.summaryStatistics.Min() max := s.summaryStatistics.Max() for i := range values { if values[i] < min { values[i] = min } else if values[i] > max { values[i] = max } } return values, err } func (s *DDSketchWithExactSummaryStatistics) ForEach(f func(value, count float64) (stop bool)) { s.DDSketch.ForEach(f) } func (s *DDSketchWithExactSummaryStatistics) Clear() { s.DDSketch.Clear() s.summaryStatistics.Clear() } func (s *DDSketchWithExactSummaryStatistics) Add(value float64) error { err := s.DDSketch.Add(value) if err != nil { return err } s.summaryStatistics.Add(value, 1) return nil } func (s *DDSketchWithExactSummaryStatistics) AddWithCount(value, count float64) error { if count == 0 { return nil } err := s.DDSketch.AddWithCount(value, count) if err != nil { return err } s.summaryStatistics.Add(value, count) return nil } func (s *DDSketchWithExactSummaryStatistics) MergeWith(o *DDSketchWithExactSummaryStatistics) error { err := s.DDSketch.MergeWith(o.DDSketch) if err != nil { return err } s.summaryStatistics.MergeWith(o.summaryStatistics) return nil } func (s *DDSketchWithExactSummaryStatistics) Copy() *DDSketchWithExactSummaryStatistics { return &DDSketchWithExactSummaryStatistics{ DDSketch: s.DDSketch.Copy(), summaryStatistics: s.summaryStatistics.Copy(), } } func (s *DDSketchWithExactSummaryStatistics) Reweight(factor float64) error { err := s.DDSketch.Reweight(factor) if err != nil { return err } s.summaryStatistics.Reweight(factor) return nil } func (s *DDSketchWithExactSummaryStatistics) ChangeMapping(newMapping mapping.IndexMapping, storeProvider store.Provider, scaleFactor float64) *DDSketchWithExactSummaryStatistics { summaryStatisticsCopy := s.summaryStatistics.Copy() summaryStatisticsCopy.Rescale(scaleFactor) return &DDSketchWithExactSummaryStatistics{ DDSketch: s.DDSketch.ChangeMapping(newMapping, storeProvider(), storeProvider(), scaleFactor), summaryStatistics: summaryStatisticsCopy, } } func (s *DDSketchWithExactSummaryStatistics) Encode(b *[]byte, omitIndexMapping bool) { if s.summaryStatistics.Count() != 0 { enc.EncodeFlag(b, enc.FlagCount) enc.EncodeVarfloat64(b, s.summaryStatistics.Count()) } if s.summaryStatistics.Sum() != 0 { enc.EncodeFlag(b, enc.FlagSum) enc.EncodeFloat64LE(b, s.summaryStatistics.Sum()) } if s.summaryStatistics.Min() != math.Inf(1) { enc.EncodeFlag(b, enc.FlagMin) enc.EncodeFloat64LE(b, s.summaryStatistics.Min()) } if s.summaryStatistics.Max() != math.Inf(-1) { enc.EncodeFlag(b, enc.FlagMax) enc.EncodeFloat64LE(b, s.summaryStatistics.Max()) } s.DDSketch.Encode(b, omitIndexMapping) } // DecodeDDSketchWithExactSummaryStatistics deserializes a sketch. // Stores are built using storeProvider. The store type needs not match the // store that the serialized sketch initially used. However, using the same // store type may make decoding faster. In the absence of high performance // requirements, store.DefaultProvider is a sound enough choice of store // provider. // To avoid memory allocations, it is possible to use a store provider that // reuses stores, by calling Clear() on previously used stores before providing // the store. // If the serialized data does not contain the index mapping, you need to // specify the index mapping that was used in the sketch that was encoded. // Otherwise, you can use nil and the index mapping will be decoded from the // serialized data. // It is not possible to decode with this function an encoded DDSketch (unless // it is empty), because it does not track exact summary statistics func DecodeDDSketchWithExactSummaryStatistics(b []byte, storeProvider store.Provider, indexMapping mapping.IndexMapping) (*DDSketchWithExactSummaryStatistics, error) { s := &DDSketchWithExactSummaryStatistics{ DDSketch: &DDSketch{ IndexMapping: indexMapping, positiveValueStore: storeProvider(), negativeValueStore: storeProvider(), zeroCount: float64(0), }, summaryStatistics: stat.NewSummaryStatistics(), } err := s.DecodeAndMergeWith(b) return s, err } func (s *DDSketchWithExactSummaryStatistics) DecodeAndMergeWith(bb []byte) error { err := s.DDSketch.decodeAndMergeWith(bb, func(b *[]byte, flag enc.Flag) error { switch flag { case enc.FlagCount: count, err := enc.DecodeVarfloat64(b) if err != nil { return err } s.summaryStatistics.AddToCount(count) return nil case enc.FlagSum: sum, err := enc.DecodeFloat64LE(b) if err != nil { return err } s.summaryStatistics.AddToSum(sum) return nil case enc.FlagMin, enc.FlagMax: stat, err := enc.DecodeFloat64LE(b) if err != nil { return err } s.summaryStatistics.Add(stat, 0) return nil default: return errUnknownFlag } }) if err != nil { return err } // It is assumed that if the count is encoded, other exact summary // statistics are encoded as well, which is the case if Encode is used. if s.summaryStatistics.Count() == 0 && !s.DDSketch.IsEmpty() { return errors.New("missing exact summary statistics") } return nil }