Skip to content

Commit

Permalink
chore: Create FDv2 compatible polling data source (#187)
Browse files Browse the repository at this point in the history
Expands the `datasourcev2` package with a polling datasource
implementation that is compatible with our current SDK, but pulls data
updates from a FDv2-compatible source.

It does not support any of the other FDv2-specific features like picking
up from a known state. That will be introduced in later work as we
re-shape the datasource integration on a larger scale.
  • Loading branch information
keelerm84 committed Sep 16, 2024
1 parent ab9c9b8 commit 521d3fd
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 62 deletions.
31 changes: 31 additions & 0 deletions internal/datasourcev2/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@ package datasourcev2

import (
"fmt"
"net/http"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
)

type httpStatusError struct {
Message string
Code int
}

func (e httpStatusError) Error() string {
return e.Message
}

// Tests whether an HTTP error status represents a condition that might resolve on its own if we retry,
// or at least should not make us permanently stop sending requests.
func isHTTPErrorRecoverable(statusCode int) bool {
Expand Down Expand Up @@ -52,3 +62,24 @@ func checkIfErrorIsRecoverableAndLog(
loggers.Warnf("Error %s (%s): %s", errorContext, recoverableMessage, errorDesc)
return true
}

func checkForHTTPError(statusCode int, url string) error {
if statusCode == http.StatusUnauthorized {
return httpStatusError{
Message: fmt.Sprintf("Invalid SDK key when accessing URL: %s. Verify that your SDK key is correct.", url),
Code: statusCode}
}

if statusCode == http.StatusNotFound {
return httpStatusError{
Message: fmt.Sprintf("Resource not found when accessing URL: %s. Verify that this resource exists.", url),
Code: statusCode}
}

if statusCode/100 != 2 {
return httpStatusError{
Message: fmt.Sprintf("Unexpected response code: %d when accessing URL: %s", statusCode, url),
Code: statusCode}
}
return nil
}
201 changes: 201 additions & 0 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package datasourcev2

import (
"sync"
"time"

"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/internal"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
)

const (
pollingErrorContext = "on polling request"
pollingWillRetryMessage = "will retry at next scheduled poll interval"
)

// Requester allows PollingProcessor to delegate fetching data to another component.
// This is useful for testing the PollingProcessor without needing to set up a test HTTP server.
type Requester interface {
Request() (data []ldstoretypes.Collection, cached bool, err error)
BaseURI() string
FilterKey() string
}

// PollingProcessor is the internal implementation of the polling data source.
//
// This type is exported from internal so that the PollingDataSourceBuilder tests can verify its
// configuration. All other code outside of this package should interact with it only via the
// DataSource interface.
type PollingProcessor struct {
dataSourceUpdates subsystems.DataSourceUpdateSink
requester Requester
pollInterval time.Duration
loggers ldlog.Loggers
setInitializedOnce sync.Once
isInitialized internal.AtomicBoolean
quit chan struct{}
closeOnce sync.Once
}

// NewPollingProcessor creates the internal implementation of the polling data source.
func NewPollingProcessor(
context subsystems.ClientContext,
dataSourceUpdates subsystems.DataSourceUpdateSink,
cfg datasource.PollingConfig,
) *PollingProcessor {
httpRequester := newPollingRequester(context, context.GetHTTP().CreateHTTPClient(), cfg.BaseURI, cfg.FilterKey)
return newPollingProcessor(context, dataSourceUpdates, httpRequester, cfg.PollInterval)
}

func newPollingProcessor(
context subsystems.ClientContext,
dataSourceUpdates subsystems.DataSourceUpdateSink,
requester Requester,
pollInterval time.Duration,
) *PollingProcessor {
pp := &PollingProcessor{
dataSourceUpdates: dataSourceUpdates,
requester: requester,
pollInterval: pollInterval,
loggers: context.GetLogging().Loggers,
quit: make(chan struct{}),
}
return pp
}

//nolint:revive // no doc comment for standard method
func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval)

ticker := newTickerWithInitialTick(pp.pollInterval)

go func() {
defer ticker.Stop()

var readyOnce sync.Once
notifyReady := func() {
readyOnce.Do(func() {
close(closeWhenReady)
})
}
// Ensure we stop waiting for initialization if we exit, even if initialization fails
defer notifyReady()

for {
select {
case <-pp.quit:
return
case <-ticker.C:
if err := pp.poll(); err != nil {
if hse, ok := err.(httpStatusError); ok {
errorInfo := interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindErrorResponse,
StatusCode: hse.Code,
Time: time.Now(),
}
recoverable := checkIfErrorIsRecoverableAndLog(
pp.loggers,
httpErrorDescription(hse.Code),
pollingErrorContext,
hse.Code,
pollingWillRetryMessage,
)
if recoverable {
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
} else {
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
notifyReady()
return
}
} else {
errorInfo := interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindNetworkError,
Message: err.Error(),
Time: time.Now(),
}
if _, ok := err.(malformedJSONError); ok {
errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData
}
checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage)
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
}
continue
}
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
pp.setInitializedOnce.Do(func() {
pp.isInitialized.Set(true)
pp.loggers.Info("First polling request successful")
notifyReady()
})
}
}
}()
}

func (pp *PollingProcessor) poll() error {
allData, cached, err := pp.requester.Request()

if err != nil {
return err
}

// We initialize the store only if the request wasn't cached
if !cached {
pp.dataSourceUpdates.Init(allData)
}
return nil
}

//nolint:revive // no doc comment for standard method
func (pp *PollingProcessor) Close() error {
pp.closeOnce.Do(func() {
close(pp.quit)
})
return nil
}

//nolint:revive // no doc comment for standard method
func (pp *PollingProcessor) IsInitialized() bool {
return pp.isInitialized.Get()
}

// GetBaseURI returns the configured polling base URI, for testing.
func (pp *PollingProcessor) GetBaseURI() string {
return pp.requester.BaseURI()
}

// GetPollInterval returns the configured polling interval, for testing.
func (pp *PollingProcessor) GetPollInterval() time.Duration {
return pp.pollInterval
}

// GetFilterKey returns the configured filter key, for testing.
func (pp *PollingProcessor) GetFilterKey() string {
return pp.requester.FilterKey()
}

type tickerWithInitialTick struct {
*time.Ticker
C <-chan time.Time
}

func newTickerWithInitialTick(interval time.Duration) *tickerWithInitialTick {
c := make(chan time.Time)
ticker := time.NewTicker(interval)
t := &tickerWithInitialTick{
C: c,
Ticker: ticker,
}
go func() {
c <- time.Now() // Ensure we do an initial poll immediately
for tt := range ticker.C {
c <- tt
}
}()
return t
}
151 changes: 151 additions & 0 deletions internal/datasourcev2/polling_http_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package datasourcev2

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"

es "github.com/launchdarkly/eventsource"
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
"github.com/launchdarkly/go-server-sdk/v7/internal/endpoints"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"

"github.com/gregjones/httpcache"
"golang.org/x/exp/maps"
)

// pollingRequester is the internal implementation of getting flag/segment data from the LD polling endpoints.
type pollingRequester struct {
httpClient *http.Client
baseURI string
filterKey string
headers http.Header
loggers ldlog.Loggers
}

type malformedJSONError struct {
innerError error
}

func (e malformedJSONError) Error() string {
return e.innerError.Error()
}

func newPollingRequester(
context subsystems.ClientContext,
httpClient *http.Client,
baseURI string,
filterKey string,
) *pollingRequester {
if httpClient == nil {
httpClient = context.GetHTTP().CreateHTTPClient()
}

modifiedClient := *httpClient
modifiedClient.Transport = &httpcache.Transport{
Cache: httpcache.NewMemoryCache(),
MarkCachedResponses: true,
Transport: httpClient.Transport,
}

return &pollingRequester{
httpClient: &modifiedClient,
baseURI: baseURI,
filterKey: filterKey,
headers: context.GetHTTP().DefaultHeaders,
loggers: context.GetLogging().Loggers,
}
}
func (r *pollingRequester) BaseURI() string {
return r.baseURI
}

func (r *pollingRequester) FilterKey() string {
return r.filterKey
}
func (r *pollingRequester) Request() ([]ldstoretypes.Collection, bool, error) {
if r.loggers.IsDebugEnabled() {
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
}

body, cached, err := r.makeRequest(endpoints.PollingRequestPath)
if err != nil {
return nil, false, err
}
if cached {
return nil, true, nil
}

var payload pollingPayload
if err = json.Unmarshal(body, &payload); err != nil {
return nil, false, malformedJSONError{err}
}

esEvents := make([]es.Event, 0, len(payload.Events))
for _, event := range payload.Events {
esEvents = append(esEvents, event)
}

data, err := convertChangesetEventsToPutData(esEvents)
if err != nil {
return nil, false, malformedJSONError{err}
} else if len(data) != 1 {
return nil, false, malformedJSONError{errors.New("missing expected put event")}
}

putData, ok := data[0].(datasource.PutData)
if !ok {
return nil, false, malformedJSONError{errors.New("payload is not a PutData")}
}

return putData.Data, cached, nil
}

func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) {
req, reqErr := http.NewRequest("GET", endpoints.AddPath(r.baseURI, resource), nil)
if reqErr != nil {
reqErr = fmt.Errorf(
"unable to create a poll request; this is not a network problem, most likely a bad base URI: %w",
reqErr,
)
return nil, false, reqErr
}
if r.filterKey != "" {
req.URL.RawQuery = url.Values{
"filter": {r.filterKey},
}.Encode()
}
url := req.URL.String()
if r.headers != nil {
req.Header = maps.Clone(r.headers)
}

res, resErr := r.httpClient.Do(req)

if resErr != nil {
return nil, false, resErr
}

defer func() {
_, _ = io.ReadAll(res.Body)
_ = res.Body.Close()
}()

if err := checkForHTTPError(res.StatusCode, url); err != nil {
return nil, false, err
}

cached := res.Header.Get(httpcache.XFromCache) != ""

body, ioErr := io.ReadAll(res.Body)

if ioErr != nil {
return nil, false, ioErr // COVERAGE: there is no way to simulate this condition in unit tests
}
return body, cached, nil
}
Loading

0 comments on commit 521d3fd

Please sign in to comment.